acker.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // Copyright 2016 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "sync"
  17. "time"
  18. "golang.org/x/net/context"
  19. )
  20. // acker acks messages in batches.
  21. type acker struct {
  22. Client *Client
  23. Ctx context.Context // The context to use when acknowledging messages.
  24. Sub string // The full name of the subscription.
  25. AckTick <-chan time.Time // AckTick supplies the frequency with which to make ack requests.
  26. // Notify is called with an ack ID after the message with that ack ID
  27. // has been processed. An ackID is considered to have been processed
  28. // if at least one attempt has been made to acknowledge it.
  29. Notify func(string)
  30. in chan string
  31. done chan struct{}
  32. wg sync.WaitGroup
  33. }
  34. // Start intiates processing of ackIDs which are added via Add.
  35. // Notify is called with each ackID once it has been processed.
  36. func (a *acker) Start() {
  37. a.in = make(chan string)
  38. a.done = make(chan struct{})
  39. var pending []string
  40. a.wg.Add(1)
  41. go func() {
  42. defer a.wg.Done()
  43. for {
  44. select {
  45. case ackID := <-a.in:
  46. pending = append(pending, ackID)
  47. case <-a.AckTick:
  48. // Launch Ack requests in a separate goroutine so that we don't
  49. // block the in channel while waiting for the ack request to run.
  50. a.launchAckRequest(pending)
  51. pending = nil
  52. case <-a.done:
  53. a.launchAckRequest(pending)
  54. return
  55. }
  56. }
  57. }()
  58. }
  59. // Ack adds an ack id to be acked in the next batch.
  60. func (acker *acker) Ack(ackID string) {
  61. acker.in <- ackID
  62. }
  63. // launchAckRequest initiates an acknowledgement request in a separate goroutine.
  64. // After the acknowledgement request has completed (regardless of its success
  65. // or failure), ids will be passed to a.Notify.
  66. // Calls to Wait on a.wg will block until this goroutine is done.
  67. func (a *acker) launchAckRequest(ids []string) {
  68. a.wg.Add(1)
  69. go func() {
  70. defer a.wg.Done()
  71. a.ack(ids)
  72. for _, id := range ids {
  73. a.Notify(id)
  74. }
  75. }()
  76. }
  77. // Stop processes all pending messages, and releases resources before returning.
  78. func (a *acker) Stop() {
  79. close(a.done)
  80. a.wg.Wait()
  81. }
  82. // ack acknowledges the supplied ackIDs.
  83. func (a *acker) ack(ids []string) {
  84. // TODO: split into separate requests if there are too many ackIDs.
  85. if len(ids) > 0 {
  86. a.Client.s.acknowledge(a.Ctx, a.Sub, ids)
  87. }
  88. // TODO: retry on failure. NOTE: it is not incorrect to drop acks. The
  89. // messages will be redelievered, but this is a documented behaviour of
  90. // the API.
  91. }