iterator.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. // Copyright 2015 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package bigquery
  15. import (
  16. "errors"
  17. "fmt"
  18. "golang.org/x/net/context"
  19. )
  20. // A pageFetcher returns a page of rows, starting from the row specified by token.
  21. type pageFetcher interface {
  22. fetch(ctx context.Context, c *Client, token string) (*readDataResult, error)
  23. }
  24. // Iterator provides access to the result of a BigQuery lookup.
  25. // Next must be called before the first call to Get.
  26. type Iterator struct {
  27. c *Client
  28. err error // contains any error encountered during calls to Next.
  29. // Once Next has been called at least once, schema has the result schema, rs contains the current
  30. // page of data, and nextToken contains the token for fetching the next
  31. // page (empty if there is no more data to be fetched).
  32. schema Schema
  33. rs [][]Value
  34. nextToken string
  35. // The remaining fields contain enough information to fetch the current
  36. // page of data, and determine which row of data from this page is the
  37. // current row.
  38. pf pageFetcher
  39. pageToken string
  40. // The offset from the start of the current page to the current row.
  41. // For a new iterator, this is -1.
  42. offset int64
  43. }
  44. func newIterator(c *Client, pf pageFetcher) *Iterator {
  45. return &Iterator{
  46. c: c,
  47. pf: pf,
  48. offset: -1,
  49. }
  50. }
  51. // fetchPage loads the current page of data from the server.
  52. // The contents of rs and nextToken are replaced with the loaded data.
  53. // If there is an error while fetching, the error is stored in it.err and false is returned.
  54. func (it *Iterator) fetchPage(ctx context.Context) bool {
  55. var res *readDataResult
  56. var err error
  57. for {
  58. res, err = it.pf.fetch(ctx, it.c, it.pageToken)
  59. if err != errIncompleteJob {
  60. break
  61. }
  62. }
  63. if err != nil {
  64. it.err = err
  65. return false
  66. }
  67. it.schema = res.schema
  68. it.rs = res.rows
  69. it.nextToken = res.pageToken
  70. return true
  71. }
  72. // getEnoughData loads new data into rs until offset no longer points beyond the end of rs.
  73. func (it *Iterator) getEnoughData(ctx context.Context) bool {
  74. if len(it.rs) == 0 {
  75. // Either we have not yet fetched any pages, or we are iterating over an empty dataset.
  76. // In the former case, we should fetch a page of data, so that we can depend on the resultant nextToken.
  77. // In the latter case, it is harmless to fetch a page of data.
  78. if !it.fetchPage(ctx) {
  79. return false
  80. }
  81. }
  82. for it.offset >= int64(len(it.rs)) {
  83. // If offset is still outside the bounds of the loaded data,
  84. // but there are no more pages of data to fetch, then we have
  85. // failed to satisfy the offset.
  86. if it.nextToken == "" {
  87. return false
  88. }
  89. // offset cannot be satisfied with the currently loaded data,
  90. // so we fetch the next page. We no longer need the existing
  91. // cached rows, so we remove them and update the offset to be
  92. // relative to the new page that we're about to fetch.
  93. // NOTE: we can't just set offset to 0, because after
  94. // marshalling/unmarshalling, it's possible for the offset to
  95. // point arbitrarily far beyond the end of rs.
  96. // This can happen if the server returns a different size
  97. // results page before and after marshalling.
  98. it.offset -= int64(len(it.rs))
  99. it.pageToken = it.nextToken
  100. if !it.fetchPage(ctx) {
  101. return false
  102. }
  103. }
  104. return true
  105. }
  106. // Next advances the Iterator to the next row, making that row available
  107. // via the Get method.
  108. // Next must be called before the first call to Get or Schema, and blocks until data is available.
  109. // Next returns false when there are no more rows available, either because
  110. // the end of the output was reached, or because there was an error (consult
  111. // the Err method to determine which).
  112. func (it *Iterator) Next(ctx context.Context) bool {
  113. if it.err != nil {
  114. return false
  115. }
  116. // Advance offset to where we want it to be for the next call to Get.
  117. it.offset++
  118. // offset may now point beyond the end of rs, so we fetch data
  119. // until offset is within its bounds again. If there are no more
  120. // results available, offset will be left pointing beyond the bounds
  121. // of rs.
  122. // At the end of this method, rs will contain at least one element
  123. // unless the dataset we are iterating over is empty.
  124. return it.getEnoughData(ctx)
  125. }
  126. // Err returns the last error encountered by Next, or nil for no error.
  127. func (it *Iterator) Err() error {
  128. return it.err
  129. }
  130. // verifyState checks that the iterator is pointing to a valid row.
  131. func (it *Iterator) verifyState() error {
  132. if it.err != nil {
  133. return fmt.Errorf("called on iterator in error state: %v", it.err)
  134. }
  135. // If Next has been called, then offset should always index into a
  136. // valid row in rs, as long as there is still data available.
  137. if it.offset >= int64(len(it.rs)) || it.offset < 0 {
  138. return errors.New("called without preceding successful call to Next")
  139. }
  140. return nil
  141. }
  142. // Get loads the current row into dst, which must implement ValueLoader.
  143. func (it *Iterator) Get(dst interface{}) error {
  144. if err := it.verifyState(); err != nil {
  145. return fmt.Errorf("Get %v", err)
  146. }
  147. if dst, ok := dst.(ValueLoader); ok {
  148. return dst.Load(it.rs[it.offset])
  149. }
  150. return errors.New("Get called with unsupported argument type")
  151. }
  152. // Schema returns the schema of the result rows.
  153. func (it *Iterator) Schema() (Schema, error) {
  154. if err := it.verifyState(); err != nil {
  155. return nil, fmt.Errorf("Schema %v", err)
  156. }
  157. return it.schema, nil
  158. }