123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- package notifications
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "net/http"
- "sync"
- "time"
- )
- // httpSink implements a single-flight, http notification endpoint. This is
- // very lightweight in that it only makes an attempt at an http request.
- // Reliability should be provided by the caller.
- type httpSink struct {
- url string
- mu sync.Mutex
- closed bool
- client *http.Client
- listeners []httpStatusListener
- // TODO(stevvooe): Allow one to configure the media type accepted by this
- // sink and choose the serialization based on that.
- }
- // newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
- // sinks for increased reliability.
- func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink {
- return &httpSink{
- url: u,
- listeners: listeners,
- client: &http.Client{
- Transport: &headerRoundTripper{
- Transport: http.DefaultTransport.(*http.Transport),
- headers: headers,
- },
- Timeout: timeout,
- },
- }
- }
- // httpStatusListener is called on various outcomes of sending notifications.
- type httpStatusListener interface {
- success(status int, events ...Event)
- failure(status int, events ...Event)
- err(err error, events ...Event)
- }
- // Accept makes an attempt to notify the endpoint, returning an error if it
- // fails. It is the caller's responsibility to retry on error. The events are
- // accepted or rejected as a group.
- func (hs *httpSink) Write(events ...Event) error {
- hs.mu.Lock()
- defer hs.mu.Unlock()
- defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections()
- if hs.closed {
- return ErrSinkClosed
- }
- envelope := Envelope{
- Events: events,
- }
- // TODO(stevvooe): It is not ideal to keep re-encoding the request body on
- // retry but we are going to do it to keep the code simple. It is likely
- // we could change the event struct to manage its own buffer.
- p, err := json.MarshalIndent(envelope, "", " ")
- if err != nil {
- for _, listener := range hs.listeners {
- listener.err(err, events...)
- }
- return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
- }
- body := bytes.NewReader(p)
- resp, err := hs.client.Post(hs.url, EventsMediaType, body)
- if err != nil {
- for _, listener := range hs.listeners {
- listener.err(err, events...)
- }
- return fmt.Errorf("%v: error posting: %v", hs, err)
- }
- defer resp.Body.Close()
- // The notifier will treat any 2xx or 3xx response as accepted by the
- // endpoint.
- switch {
- case resp.StatusCode >= 200 && resp.StatusCode < 400:
- for _, listener := range hs.listeners {
- listener.success(resp.StatusCode, events...)
- }
- // TODO(stevvooe): This is a little accepting: we may want to support
- // unsupported media type responses with retries using the correct
- // media type. There may also be cases that will never work.
- return nil
- default:
- for _, listener := range hs.listeners {
- listener.failure(resp.StatusCode, events...)
- }
- return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
- }
- }
- // Close the endpoint
- func (hs *httpSink) Close() error {
- hs.mu.Lock()
- defer hs.mu.Unlock()
- if hs.closed {
- return fmt.Errorf("httpsink: already closed")
- }
- hs.closed = true
- return nil
- }
- func (hs *httpSink) String() string {
- return fmt.Sprintf("httpSink{%s}", hs.url)
- }
- type headerRoundTripper struct {
- *http.Transport // must be transport to support CancelRequest
- headers http.Header
- }
- func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
- var nreq http.Request
- nreq = *req
- nreq.Header = make(http.Header)
- merge := func(headers http.Header) {
- for k, v := range headers {
- nreq.Header[k] = append(nreq.Header[k], v...)
- }
- }
- merge(req.Header)
- merge(hrt.headers)
- return hrt.Transport.RoundTrip(&nreq)
- }
|