iterator.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. // Copyright 2016 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "io"
  17. "sync"
  18. "time"
  19. "golang.org/x/net/context"
  20. )
  21. // TODO(mcgreevy): make this more dynamic.
  22. const batchPullSize = 100
  23. type Iterator struct {
  24. // The name of the subscription that the Iterator is pulling messages from.
  25. sub string
  26. // The context to use for acking messages and extending message deadlines.
  27. ctx context.Context
  28. c *Client
  29. // Controls how often we send an ack deadline extension request.
  30. kaTicker *time.Ticker
  31. // Controls how often we acknowledge a batch of messages.
  32. ackTicker *time.Ticker
  33. ka keepAlive
  34. acker acker
  35. puller puller
  36. mu sync.Mutex
  37. closed bool
  38. }
  39. // newIterator starts a new Iterator. Close must be called on the Iterator
  40. // when it is no longer needed.
  41. // subName is the full name of the subscription to pull messages from.
  42. // ackDeadline is the default ack deadline for the subscription
  43. // maxExtension is the maximum period for which the iterator should automatically extend
  44. // the ack deadline for each message.
  45. func (c *Client) newIterator(ctx context.Context, subName string, ackDeadline, maxExtension time.Duration) *Iterator {
  46. it := &Iterator{
  47. sub: subName,
  48. ctx: ctx,
  49. c: c,
  50. }
  51. // TODO: make kaTicker frequency more configurable.
  52. // (ackDeadline - 5s) is a reasonable default for now, because the minimum ack period is 10s. This gives us 5s grace.
  53. keepAlivePeriod := ackDeadline - 5*time.Second
  54. it.kaTicker = time.NewTicker(keepAlivePeriod) // Stopped in it.Close
  55. it.ka = keepAlive{
  56. Client: it.c,
  57. Ctx: it.ctx,
  58. Sub: it.sub,
  59. ExtensionTick: it.kaTicker.C,
  60. Deadline: ackDeadline,
  61. MaxExtension: maxExtension,
  62. }
  63. // TODO: make ackTicker more configurable. Something less than
  64. // kaTicker is a reasonable default (there's no point extending
  65. // messages when they could be acked instead).
  66. it.ackTicker = time.NewTicker(keepAlivePeriod / 2) // Stopped in it.Close
  67. it.acker = acker{
  68. Client: it.c,
  69. Ctx: it.ctx,
  70. Sub: it.sub,
  71. AckTick: it.ackTicker.C,
  72. Notify: it.ka.Remove,
  73. }
  74. it.puller = puller{
  75. Client: it.c,
  76. Sub: it.sub,
  77. BatchSize: batchPullSize,
  78. Notify: it.ka.Add,
  79. }
  80. it.ka.Start()
  81. it.acker.Start()
  82. return it
  83. }
  84. // Next returns the next Message to be processed. The caller must call Done on
  85. // the returned Message once it is finished with it.
  86. // Once Close has been called, subsequent calls to Next will return io.EOF.
  87. func (it *Iterator) Next(ctx context.Context) (*Message, error) {
  88. // TODO: decide whether to use it.ctx instead of ctx.
  89. it.mu.Lock()
  90. defer it.mu.Unlock()
  91. if it.closed {
  92. return nil, io.EOF
  93. }
  94. select {
  95. case <-ctx.Done():
  96. return nil, ctx.Err()
  97. default:
  98. }
  99. // Note: this is the only place where messages are added to keepAlive,
  100. // and this code is protected by mu. This means once an iterator starts
  101. // being closed down, no more messages will be added to keepalive.
  102. m, err := it.puller.Next(ctx)
  103. if err != nil {
  104. return nil, err
  105. }
  106. m.it = it
  107. return m, nil
  108. }
  109. // Client code must call Close on an Iterator when finished with it.
  110. // Close will block until Done has been called on all Messages that have been
  111. // returned by Next.
  112. // Close need only be called once, but may be called multiple times from multiple goroutines.
  113. func (it *Iterator) Close() error {
  114. // TODO: test calling from multiple goroutines.
  115. it.mu.Lock()
  116. defer it.mu.Unlock()
  117. if it.closed {
  118. return nil
  119. }
  120. it.closed = true
  121. // Remove messages that are being kept alive, but have not been
  122. // supplied to the caller yet. Then the only messages being kept alive
  123. // will be those that have been supplied to the caller but have not yet
  124. // had their Done method called.
  125. for _, m := range it.puller.Pending() {
  126. it.ka.Remove(m.AckID)
  127. }
  128. // This will block until all messages have been removed from keepAlive.
  129. // This will happen once all outstanding messages have been either
  130. // ACKed or NACKed.
  131. it.ka.Stop()
  132. it.acker.Stop()
  133. it.kaTicker.Stop()
  134. it.ackTicker.Stop()
  135. return nil
  136. }
  137. func (it *Iterator) done(ackID string, ack bool) {
  138. // NOTE: this method does not lock mu, because it's fine for done to be
  139. // called while the iterator is in the process of being closed. In
  140. // fact, this is the only way to drain oustanding messages.
  141. if ack {
  142. it.acker.Ack(ackID)
  143. // There's no need to call it.ka.Remove here, as acker will
  144. // call it via its Notify function.
  145. } else {
  146. it.ka.Remove(ackID)
  147. }
  148. }