package subscriber import ( "bytes" "context" "encoding/base64" "encoding/json" "encoding/xml" "fmt" "git.nspix.com/golang/micro/broker" "net/http" "strconv" "time" ) var ( MaxHttpTimeout = time.Second * 10 //最大的HTTP处理超时时间 ) var ( httpClient = &http.Client{ Timeout: MaxHttpTimeout, } ) type Http struct { Id string `json:"id"` Url string `json:"url"` Topics []string `json:"topics"` Format string `json:"format"` } func (sub *Http) ID() string { return sub.Id } func (sub *Http) HasTopic(topic string) bool { for _, s := range sub.Topics { if topic == s { return true } } return false } func (sub *Http) OnAttach() (err error) { return } func (sub *Http) Process(ctx context.Context, e *broker.Event) (err error) { var ( req *http.Request resp *http.Response buf []byte ) if sub.Format == "xml" { if buf, err = json.Marshal(e); err != nil { return } } else { if buf, err = xml.Marshal(e); err != nil { return } } if req, err = http.NewRequest(http.MethodPost, sub.Url, bytes.NewReader(buf)); err != nil { return } if sub.Format == "xml" { req.Header.Set("Content-Type", "application/xml") } else { req.Header.Set("Content-Type", "application/json") } req.Header.Set("X-Event-Name", e.Name) req.Header.Set("X-Event-Timestamp", strconv.FormatInt(e.Timestamp, 10)) req.Header.Set("User-Agent", "micro") if resp, err = httpClient.Do(req.WithContext(ctx)); err != nil { return } if resp.StatusCode != http.StatusOK { err = fmt.Errorf("request \"%s\" got status code %d", sub.Url, resp.StatusCode) } _ = resp.Body.Close() return } func (sub *Http) OnDetach() (err error) { return } func NewHttpSubscriber(topics []string, url string) *Http { return &Http{ Id: base64.StdEncoding.EncodeToString([]byte(url)), Url: url, Topics: topics, Format: "json", } }