12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package subscriber
- import (
- "bytes"
- "context"
- "encoding/base64"
- "encoding/json"
- "encoding/xml"
- "fmt"
- "git.nspix.com/golang/micro/broker"
- "net/http"
- "strconv"
- "time"
- )
- var (
- MaxHttpTimeout = time.Second * 10 //最大的HTTP处理超时时间
- )
- var (
- httpClient = &http.Client{
- Timeout: MaxHttpTimeout,
- }
- )
- type Http struct {
- Id string
- Url string
- Topics []string
- Format string
- }
- func (sub *Http) ID() string {
- return sub.Id
- }
- func (sub *Http) HasTopic(topic string) bool {
- for _, s := range sub.Topics {
- if topic == s {
- return true
- }
- }
- return false
- }
- func (sub *Http) OnAttach() (err error) {
- return
- }
- func (sub *Http) Process(ctx context.Context, e *broker.Event) (err error) {
- var (
- req *http.Request
- resp *http.Response
- buf []byte
- )
- if sub.Format == "xml" {
- if buf, err = json.Marshal(e); err != nil {
- return
- }
- } else {
- if buf, err = xml.Marshal(e); err != nil {
- return
- }
- }
- if req, err = http.NewRequest(http.MethodPost, sub.Url, bytes.NewReader(buf)); err != nil {
- return
- }
- if sub.Format == "xml" {
- req.Header.Set("Content-Type", "application/xml")
- } else {
- req.Header.Set("Content-Type", "application/json")
- }
- req.Header.Set("X-Event-Name", e.Name)
- req.Header.Set("X-Event-Timestamp", strconv.FormatInt(e.Timestamp, 10))
- req.Header.Set("User-Agent", "micro")
- if resp, err = httpClient.Do(req.WithContext(ctx)); err != nil {
- return
- }
- if resp.StatusCode != http.StatusOK {
- err = fmt.Errorf("request \"%s\" got status code %d", sub.Url, resp.StatusCode)
- }
- _ = resp.Body.Close()
- return
- }
- func (sub *Http) OnDetach() (err error) {
- return
- }
- func NewHttpSubscriber(topics []string, url string) *Http {
- return &Http{
- Id: base64.StdEncoding.EncodeToString([]byte(url)),
- Url: url,
- Topics: topics,
- Format: "json",
- }
- }
|