http.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package subscriber
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/base64"
  6. "encoding/json"
  7. "encoding/xml"
  8. "fmt"
  9. "git.nspix.com/golang/micro/broker"
  10. "net/http"
  11. "strconv"
  12. "time"
  13. )
  14. var (
  15. MaxHttpTimeout = time.Second * 10 //最大的HTTP处理超时时间
  16. )
  17. var (
  18. httpClient = &http.Client{
  19. Timeout: MaxHttpTimeout,
  20. }
  21. )
  22. type Http struct {
  23. Id string `json:"id"`
  24. Url string `json:"url"`
  25. Topics []string `json:"topics"`
  26. Format string `json:"format"`
  27. }
  28. func (sub *Http) ID() string {
  29. return sub.Id
  30. }
  31. func (sub *Http) HasTopic(topic string) bool {
  32. for _, s := range sub.Topics {
  33. if topic == s {
  34. return true
  35. }
  36. }
  37. return false
  38. }
  39. func (sub *Http) OnAttach() (err error) {
  40. return
  41. }
  42. func (sub *Http) Process(ctx context.Context, e *broker.Event) (err error) {
  43. var (
  44. req *http.Request
  45. resp *http.Response
  46. buf []byte
  47. )
  48. if sub.Format == "xml" {
  49. if buf, err = json.Marshal(e); err != nil {
  50. return
  51. }
  52. } else {
  53. if buf, err = xml.Marshal(e); err != nil {
  54. return
  55. }
  56. }
  57. if req, err = http.NewRequest(http.MethodPost, sub.Url, bytes.NewReader(buf)); err != nil {
  58. return
  59. }
  60. if sub.Format == "xml" {
  61. req.Header.Set("Content-Type", "application/xml")
  62. } else {
  63. req.Header.Set("Content-Type", "application/json")
  64. }
  65. req.Header.Set("X-Event-Name", e.Name)
  66. req.Header.Set("X-Event-Timestamp", strconv.FormatInt(e.Timestamp, 10))
  67. req.Header.Set("User-Agent", "micro")
  68. if resp, err = httpClient.Do(req.WithContext(ctx)); err != nil {
  69. return
  70. }
  71. if resp.StatusCode != http.StatusOK {
  72. err = fmt.Errorf("request \"%s\" got status code %d", sub.Url, resp.StatusCode)
  73. }
  74. _ = resp.Body.Close()
  75. return
  76. }
  77. func (sub *Http) OnDetach() (err error) {
  78. return
  79. }
  80. func NewHttpSubscriber(topics []string, url string) *Http {
  81. return &Http{
  82. Id: base64.StdEncoding.EncodeToString([]byte(url)),
  83. Url: url,
  84. Topics: topics,
  85. Format: "json",
  86. }
  87. }