keepalive.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. // Copyright 2016 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "sync"
  17. "time"
  18. "golang.org/x/net/context"
  19. )
  20. // keepAlive keeps track of which Messages need to have their deadline extended, and
  21. // periodically extends them.
  22. // Messages are tracked by Ack ID.
  23. type keepAlive struct {
  24. Client *Client
  25. Ctx context.Context // The context to use when extending deadlines.
  26. Sub string // The full name of the subscription.
  27. ExtensionTick <-chan time.Time // ExtenstionTick supplies the frequency with which to make extension requests.
  28. Deadline time.Duration // How long to extend messages for each time they are extended. Should be greater than ExtensionTick frequency.
  29. MaxExtension time.Duration // How long to keep extending each message's ack deadline before automatically removing it.
  30. // key: ackID; value: time at which ack deadline extension should cease.
  31. items map[string]time.Time
  32. done chan struct{}
  33. wg sync.WaitGroup
  34. add, remove chan string
  35. }
  36. // Start initiates the deadline extension loop. Stop must be called once keepAlive is no longer needed.
  37. func (ka *keepAlive) Start() {
  38. ka.items = make(map[string]time.Time)
  39. ka.done = make(chan struct{})
  40. ka.add = make(chan string)
  41. ka.remove = make(chan string)
  42. ka.wg.Add(1)
  43. go func() {
  44. defer ka.wg.Done()
  45. done := false
  46. for {
  47. select {
  48. case ackID := <-ka.add:
  49. ka.addItem(ackID)
  50. case ackID := <-ka.remove:
  51. ka.removeItem(ackID)
  52. case <-ka.done:
  53. done = true
  54. case <-ka.ExtensionTick:
  55. live, expired := ka.getAckIDs()
  56. ka.wg.Add(1)
  57. go func() {
  58. defer ka.wg.Done()
  59. ka.extendDeadlines(live)
  60. }()
  61. for _, id := range expired {
  62. ka.removeItem(id)
  63. }
  64. }
  65. if done && len(ka.items) == 0 {
  66. return
  67. }
  68. }
  69. }()
  70. }
  71. // Add adds an ack id to be kept alive.
  72. func (ka *keepAlive) Add(ackID string) {
  73. ka.add <- ackID
  74. }
  75. // add adds ackID to the items map.
  76. func (ka *keepAlive) addItem(ackID string) {
  77. ka.items[ackID] = time.Now().Add(ka.MaxExtension)
  78. }
  79. // Remove removes ackID from the list to be kept alive.
  80. func (ka *keepAlive) Remove(ackID string) {
  81. ka.remove <- ackID
  82. }
  83. // remove removes ackID from the items map.
  84. func (ka *keepAlive) removeItem(ackID string) {
  85. delete(ka.items, ackID)
  86. }
  87. // Stop waits until all added ackIDs have been removed, and cleans up resources.
  88. func (ka *keepAlive) Stop() {
  89. close(ka.done)
  90. ka.wg.Wait()
  91. }
  92. // getAckIDs returns the set of ackIDs that are being kept alive.
  93. // The set is divided into two lists: one with IDs that should continue to be kept alive,
  94. // and the other with IDs that should be dropped.
  95. func (ka *keepAlive) getAckIDs() (live, expired []string) {
  96. now := time.Now()
  97. for id, expiry := range ka.items {
  98. if expiry.Before(now) {
  99. expired = append(expired, id)
  100. } else {
  101. live = append(live, id)
  102. }
  103. }
  104. return live, expired
  105. }
  106. func (ka *keepAlive) extendDeadlines(ackIDs []string) {
  107. // TODO: split into separate requests if there are too many ackIDs.
  108. if len(ackIDs) > 0 {
  109. _ = ka.Client.s.modifyAckDeadline(ka.Ctx, ka.Sub, ka.Deadline, ackIDs)
  110. }
  111. // TODO: retry on error. NOTE: if we ultimately fail to extend deadlines here, the messages will be redelivered, which is OK.
  112. }