123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- // Copyright 2015 Google Inc. All Rights Reserved.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package bigquery
- import (
- "errors"
- "fmt"
- "golang.org/x/net/context"
- )
- // A pageFetcher returns a page of rows, starting from the row specified by token.
- type pageFetcher interface {
- fetch(ctx context.Context, c *Client, token string) (*readDataResult, error)
- }
- // Iterator provides access to the result of a BigQuery lookup.
- // Next must be called before the first call to Get.
- type Iterator struct {
- c *Client
- err error // contains any error encountered during calls to Next.
- // Once Next has been called at least once, schema has the result schema, rs contains the current
- // page of data, and nextToken contains the token for fetching the next
- // page (empty if there is no more data to be fetched).
- schema Schema
- rs [][]Value
- nextToken string
- // The remaining fields contain enough information to fetch the current
- // page of data, and determine which row of data from this page is the
- // current row.
- pf pageFetcher
- pageToken string
- // The offset from the start of the current page to the current row.
- // For a new iterator, this is -1.
- offset int64
- }
- func newIterator(c *Client, pf pageFetcher) *Iterator {
- return &Iterator{
- c: c,
- pf: pf,
- offset: -1,
- }
- }
- // fetchPage loads the current page of data from the server.
- // The contents of rs and nextToken are replaced with the loaded data.
- // If there is an error while fetching, the error is stored in it.err and false is returned.
- func (it *Iterator) fetchPage(ctx context.Context) bool {
- var res *readDataResult
- var err error
- for {
- res, err = it.pf.fetch(ctx, it.c, it.pageToken)
- if err != errIncompleteJob {
- break
- }
- }
- if err != nil {
- it.err = err
- return false
- }
- it.schema = res.schema
- it.rs = res.rows
- it.nextToken = res.pageToken
- return true
- }
- // getEnoughData loads new data into rs until offset no longer points beyond the end of rs.
- func (it *Iterator) getEnoughData(ctx context.Context) bool {
- if len(it.rs) == 0 {
- // Either we have not yet fetched any pages, or we are iterating over an empty dataset.
- // In the former case, we should fetch a page of data, so that we can depend on the resultant nextToken.
- // In the latter case, it is harmless to fetch a page of data.
- if !it.fetchPage(ctx) {
- return false
- }
- }
- for it.offset >= int64(len(it.rs)) {
- // If offset is still outside the bounds of the loaded data,
- // but there are no more pages of data to fetch, then we have
- // failed to satisfy the offset.
- if it.nextToken == "" {
- return false
- }
- // offset cannot be satisfied with the currently loaded data,
- // so we fetch the next page. We no longer need the existing
- // cached rows, so we remove them and update the offset to be
- // relative to the new page that we're about to fetch.
- // NOTE: we can't just set offset to 0, because after
- // marshalling/unmarshalling, it's possible for the offset to
- // point arbitrarily far beyond the end of rs.
- // This can happen if the server returns a different size
- // results page before and after marshalling.
- it.offset -= int64(len(it.rs))
- it.pageToken = it.nextToken
- if !it.fetchPage(ctx) {
- return false
- }
- }
- return true
- }
- // Next advances the Iterator to the next row, making that row available
- // via the Get method.
- // Next must be called before the first call to Get or Schema, and blocks until data is available.
- // Next returns false when there are no more rows available, either because
- // the end of the output was reached, or because there was an error (consult
- // the Err method to determine which).
- func (it *Iterator) Next(ctx context.Context) bool {
- if it.err != nil {
- return false
- }
- // Advance offset to where we want it to be for the next call to Get.
- it.offset++
- // offset may now point beyond the end of rs, so we fetch data
- // until offset is within its bounds again. If there are no more
- // results available, offset will be left pointing beyond the bounds
- // of rs.
- // At the end of this method, rs will contain at least one element
- // unless the dataset we are iterating over is empty.
- return it.getEnoughData(ctx)
- }
- // Err returns the last error encountered by Next, or nil for no error.
- func (it *Iterator) Err() error {
- return it.err
- }
- // verifyState checks that the iterator is pointing to a valid row.
- func (it *Iterator) verifyState() error {
- if it.err != nil {
- return fmt.Errorf("called on iterator in error state: %v", it.err)
- }
- // If Next has been called, then offset should always index into a
- // valid row in rs, as long as there is still data available.
- if it.offset >= int64(len(it.rs)) || it.offset < 0 {
- return errors.New("called without preceding successful call to Next")
- }
- return nil
- }
- // Get loads the current row into dst, which must implement ValueLoader.
- func (it *Iterator) Get(dst interface{}) error {
- if err := it.verifyState(); err != nil {
- return fmt.Errorf("Get %v", err)
- }
- if dst, ok := dst.(ValueLoader); ok {
- return dst.Load(it.rs[it.offset])
- }
- return errors.New("Get called with unsupported argument type")
- }
- // Schema returns the schema of the result rows.
- func (it *Iterator) Schema() (Schema, error) {
- if err := it.verifyState(); err != nil {
- return nil, fmt.Errorf("Schema %v", err)
- }
- return it.schema, nil
- }
|