// Copyright 2015 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package bigquery import ( "errors" "reflect" "testing" "golang.org/x/net/context" ) type readTabledataArgs struct { conf *readTableConf tok string } type readQueryArgs struct { conf *readQueryConf tok string } // readServiceStub services read requests by returning data from an in-memory list of values. type readServiceStub struct { // values and pageTokens are used as sources of data to return in response to calls to readTabledata or readQuery. values [][][]Value // contains pages / rows / columns. pageTokens map[string]string // maps incoming page token to returned page token. // arguments are recorded for later inspection. readTabledataCalls []readTabledataArgs readQueryCalls []readQueryArgs service } func (s *readServiceStub) readValues(tok string) *readDataResult { result := &readDataResult{ pageToken: s.pageTokens[tok], rows: s.values[0], } s.values = s.values[1:] return result } func (s *readServiceStub) readTabledata(ctx context.Context, conf *readTableConf, token string) (*readDataResult, error) { s.readTabledataCalls = append(s.readTabledataCalls, readTabledataArgs{conf, token}) return s.readValues(token), nil } func (s *readServiceStub) readQuery(ctx context.Context, conf *readQueryConf, token string) (*readDataResult, error) { s.readQueryCalls = append(s.readQueryCalls, readQueryArgs{conf, token}) return s.readValues(token), nil } func TestRead(t *testing.T) { // The data for the service stub to return is populated for each test case in the testCases for loop. service := &readServiceStub{} c := &Client{ service: service, } queryJob := &Job{ projectID: "project-id", jobID: "job-id", service: service, isQuery: true, } for _, src := range []ReadSource{defaultTable, queryJob} { testCases := []struct { data [][][]Value pageTokens map[string]string want []ValueList }{ { data: [][][]Value{{{1, 2}, {11, 12}}, {{30, 40}, {31, 41}}}, pageTokens: map[string]string{"": "a", "a": ""}, want: []ValueList{{1, 2}, {11, 12}, {30, 40}, {31, 41}}, }, { data: [][][]Value{{{1, 2}, {11, 12}}, {{30, 40}, {31, 41}}}, pageTokens: map[string]string{"": ""}, // no more pages after first one. want: []ValueList{{1, 2}, {11, 12}}, }, } for _, tc := range testCases { service.values = tc.data service.pageTokens = tc.pageTokens if got, ok := doRead(t, c, src); ok { if !reflect.DeepEqual(got, tc.want) { t.Errorf("reading: got:\n%v\nwant:\n%v", got, tc.want) } } } } } // doRead calls Read with a ReadSource. Get is repeatedly called on the Iterator returned by Read and the results are returned. func doRead(t *testing.T, c *Client, src ReadSource) ([]ValueList, bool) { it, err := c.Read(context.Background(), src) if err != nil { t.Errorf("err calling Read: %v", err) return nil, false } var got []ValueList for it.Next(context.Background()) { var vals ValueList if err := it.Get(&vals); err != nil { t.Errorf("err calling Get: %v", err) return nil, false } else { got = append(got, vals) } } return got, true } func TestNoMoreValues(t *testing.T) { c := &Client{ service: &readServiceStub{ values: [][][]Value{{{1, 2}, {11, 12}}}, }, } it, err := c.Read(context.Background(), defaultTable) if err != nil { t.Fatalf("err calling Read: %v", err) } var vals ValueList // We expect to retrieve two values and then fail on the next attempt. if !it.Next(context.Background()) { t.Fatalf("Next: got: false: want: true") } if !it.Next(context.Background()) { t.Fatalf("Next: got: false: want: true") } if err := it.Get(&vals); err != nil { t.Fatalf("Get: got: %v: want: nil", err) } if it.Next(context.Background()) { t.Fatalf("Next: got: true: want: false") } if err := it.Get(&vals); err == nil { t.Fatalf("Get: got: %v: want: non-nil", err) } } // delayedReadStub simulates reading results from a query that has not yet // completed. Its readQuery method initially reports that the query job is not // yet complete. Subsequently, it proxies the request through to another // service stub. type delayedReadStub struct { numDelays int readServiceStub } func (s *delayedReadStub) readQuery(ctx context.Context, conf *readQueryConf, token string) (*readDataResult, error) { if s.numDelays > 0 { s.numDelays-- return nil, errIncompleteJob } return s.readServiceStub.readQuery(ctx, conf, token) } // TestIncompleteJob tests that an Iterator which reads from a query job will block until the job is complete. func TestIncompleteJob(t *testing.T) { service := &delayedReadStub{ numDelays: 2, readServiceStub: readServiceStub{ values: [][][]Value{{{1, 2}}}, }, } c := &Client{service: service} queryJob := &Job{ projectID: "project-id", jobID: "job-id", service: service, isQuery: true, } it, err := c.Read(context.Background(), queryJob) if err != nil { t.Fatalf("err calling Read: %v", err) } var got ValueList want := ValueList{1, 2} if !it.Next(context.Background()) { t.Fatalf("Next: got: false: want: true") } if err := it.Get(&got); err != nil { t.Fatalf("Error calling Get: %v", err) } if service.numDelays != 0 { t.Errorf("remaining numDelays : got: %v want:0", service.numDelays) } if !reflect.DeepEqual(got, want) { t.Errorf("reading: got:\n%v\nwant:\n%v", got, want) } } type errorReadService struct { service } func (s *errorReadService) readTabledata(ctx context.Context, conf *readTableConf, token string) (*readDataResult, error) { return nil, errors.New("bang!") } func TestReadError(t *testing.T) { // test that service read errors are propagated back to the caller. c := &Client{service: &errorReadService{}} it, err := c.Read(context.Background(), defaultTable) if err != nil { // Read should not return an error; only Err should. t.Fatalf("err calling Read: %v", err) } if it.Next(context.Background()) { t.Fatalf("Next: got: true: want: false") } if err := it.Err(); err.Error() != "bang!" { t.Fatalf("Get: got: %v: want: bang!", err) } } func TestReadTabledataOptions(t *testing.T) { // test that read options are propagated. s := &readServiceStub{ values: [][][]Value{{{1, 2}}}, } c := &Client{service: s} it, err := c.Read(context.Background(), defaultTable, RecordsPerRequest(5)) if err != nil { t.Fatalf("err calling Read: %v", err) } if !it.Next(context.Background()) { t.Fatalf("Next: got: false: want: true") } want := []readTabledataArgs{{ conf: &readTableConf{ projectID: "project-id", datasetID: "dataset-id", tableID: "table-id", paging: pagingConf{ recordsPerRequest: 5, setRecordsPerRequest: true, }, }, tok: "", }} if !reflect.DeepEqual(s.readTabledataCalls, want) { t.Errorf("reading: got:\n%v\nwant:\n%v", s.readTabledataCalls, want) } } func TestReadQueryOptions(t *testing.T) { // test that read options are propagated. s := &readServiceStub{ values: [][][]Value{{{1, 2}}}, } c := &Client{service: s} queryJob := &Job{ projectID: "project-id", jobID: "job-id", service: s, isQuery: true, } it, err := c.Read(context.Background(), queryJob, RecordsPerRequest(5)) if err != nil { t.Fatalf("err calling Read: %v", err) } if !it.Next(context.Background()) { t.Fatalf("Next: got: false: want: true") } want := []readQueryArgs{{ conf: &readQueryConf{ projectID: "project-id", jobID: "job-id", paging: pagingConf{ recordsPerRequest: 5, setRecordsPerRequest: true, }, }, tok: "", }} if !reflect.DeepEqual(s.readQueryCalls, want) { t.Errorf("reading: got:\n%v\nwant:\n%v", s.readQueryCalls, want) } }