http.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package notifications
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "sync"
  8. "time"
  9. )
  10. // httpSink implements a single-flight, http notification endpoint. This is
  11. // very lightweight in that it only makes an attempt at an http request.
  12. // Reliability should be provided by the caller.
  13. type httpSink struct {
  14. url string
  15. mu sync.Mutex
  16. closed bool
  17. client *http.Client
  18. listeners []httpStatusListener
  19. // TODO(stevvooe): Allow one to configure the media type accepted by this
  20. // sink and choose the serialization based on that.
  21. }
  22. // newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
  23. // sinks for increased reliability.
  24. func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink {
  25. return &httpSink{
  26. url: u,
  27. listeners: listeners,
  28. client: &http.Client{
  29. Transport: &headerRoundTripper{
  30. Transport: http.DefaultTransport.(*http.Transport),
  31. headers: headers,
  32. },
  33. Timeout: timeout,
  34. },
  35. }
  36. }
  37. // httpStatusListener is called on various outcomes of sending notifications.
  38. type httpStatusListener interface {
  39. success(status int, events ...Event)
  40. failure(status int, events ...Event)
  41. err(err error, events ...Event)
  42. }
  43. // Accept makes an attempt to notify the endpoint, returning an error if it
  44. // fails. It is the caller's responsibility to retry on error. The events are
  45. // accepted or rejected as a group.
  46. func (hs *httpSink) Write(events ...Event) error {
  47. hs.mu.Lock()
  48. defer hs.mu.Unlock()
  49. defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections()
  50. if hs.closed {
  51. return ErrSinkClosed
  52. }
  53. envelope := Envelope{
  54. Events: events,
  55. }
  56. // TODO(stevvooe): It is not ideal to keep re-encoding the request body on
  57. // retry but we are going to do it to keep the code simple. It is likely
  58. // we could change the event struct to manage its own buffer.
  59. p, err := json.MarshalIndent(envelope, "", " ")
  60. if err != nil {
  61. for _, listener := range hs.listeners {
  62. listener.err(err, events...)
  63. }
  64. return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
  65. }
  66. body := bytes.NewReader(p)
  67. resp, err := hs.client.Post(hs.url, EventsMediaType, body)
  68. if err != nil {
  69. for _, listener := range hs.listeners {
  70. listener.err(err, events...)
  71. }
  72. return fmt.Errorf("%v: error posting: %v", hs, err)
  73. }
  74. defer resp.Body.Close()
  75. // The notifier will treat any 2xx or 3xx response as accepted by the
  76. // endpoint.
  77. switch {
  78. case resp.StatusCode >= 200 && resp.StatusCode < 400:
  79. for _, listener := range hs.listeners {
  80. listener.success(resp.StatusCode, events...)
  81. }
  82. // TODO(stevvooe): This is a little accepting: we may want to support
  83. // unsupported media type responses with retries using the correct
  84. // media type. There may also be cases that will never work.
  85. return nil
  86. default:
  87. for _, listener := range hs.listeners {
  88. listener.failure(resp.StatusCode, events...)
  89. }
  90. return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
  91. }
  92. }
  93. // Close the endpoint
  94. func (hs *httpSink) Close() error {
  95. hs.mu.Lock()
  96. defer hs.mu.Unlock()
  97. if hs.closed {
  98. return fmt.Errorf("httpsink: already closed")
  99. }
  100. hs.closed = true
  101. return nil
  102. }
  103. func (hs *httpSink) String() string {
  104. return fmt.Sprintf("httpSink{%s}", hs.url)
  105. }
  106. type headerRoundTripper struct {
  107. *http.Transport // must be transport to support CancelRequest
  108. headers http.Header
  109. }
  110. func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  111. var nreq http.Request
  112. nreq = *req
  113. nreq.Header = make(http.Header)
  114. merge := func(headers http.Header) {
  115. for k, v := range headers {
  116. nreq.Header[k] = append(nreq.Header[k], v...)
  117. }
  118. }
  119. merge(req.Header)
  120. merge(hrt.headers)
  121. return hrt.Transport.RoundTrip(&nreq)
  122. }