iterator_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. // Copyright 2015 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package bigquery
  15. import (
  16. "errors"
  17. "fmt"
  18. "reflect"
  19. "testing"
  20. "golang.org/x/net/context"
  21. )
  22. type fetchResponse struct {
  23. result *readDataResult // The result to return.
  24. err error // The error to return.
  25. }
  26. // pageFetcherStub services fetch requests by returning data from an in-memory list of values.
  27. type pageFetcherStub struct {
  28. fetchResponses map[string]fetchResponse
  29. err error
  30. }
  31. func (pf *pageFetcherStub) fetch(ctx context.Context, c *Client, token string) (*readDataResult, error) {
  32. call, ok := pf.fetchResponses[token]
  33. if !ok {
  34. pf.err = fmt.Errorf("Unexpected page token: %q", token)
  35. }
  36. return call.result, call.err
  37. }
  38. func TestIterator(t *testing.T) {
  39. fetchFailure := errors.New("fetch failure")
  40. testCases := []struct {
  41. desc string
  42. alreadyConsumed int64 // amount to advance offset before commencing reading.
  43. fetchResponses map[string]fetchResponse
  44. want []ValueList
  45. wantErr error
  46. wantSchema Schema
  47. }{
  48. {
  49. desc: "Iteration over single empty page",
  50. fetchResponses: map[string]fetchResponse{
  51. "": {
  52. result: &readDataResult{
  53. pageToken: "",
  54. rows: [][]Value{},
  55. schema: Schema{},
  56. },
  57. },
  58. },
  59. want: []ValueList{},
  60. wantSchema: Schema{},
  61. },
  62. {
  63. desc: "Iteration over single page",
  64. fetchResponses: map[string]fetchResponse{
  65. "": {
  66. result: &readDataResult{
  67. pageToken: "",
  68. rows: [][]Value{{1, 2}, {11, 12}},
  69. schema: Schema{
  70. {Type: IntegerFieldType},
  71. {Type: IntegerFieldType},
  72. },
  73. },
  74. },
  75. },
  76. want: []ValueList{{1, 2}, {11, 12}},
  77. wantSchema: Schema{
  78. {Type: IntegerFieldType},
  79. {Type: IntegerFieldType},
  80. },
  81. },
  82. {
  83. desc: "Iteration over single page with different schema",
  84. fetchResponses: map[string]fetchResponse{
  85. "": {
  86. result: &readDataResult{
  87. pageToken: "",
  88. rows: [][]Value{{"1", 2}, {"11", 12}},
  89. schema: Schema{
  90. {Type: StringFieldType},
  91. {Type: IntegerFieldType},
  92. },
  93. },
  94. },
  95. },
  96. want: []ValueList{{"1", 2}, {"11", 12}},
  97. wantSchema: Schema{
  98. {Type: StringFieldType},
  99. {Type: IntegerFieldType},
  100. },
  101. },
  102. {
  103. desc: "Iteration over two pages",
  104. fetchResponses: map[string]fetchResponse{
  105. "": {
  106. result: &readDataResult{
  107. pageToken: "a",
  108. rows: [][]Value{{1, 2}, {11, 12}},
  109. schema: Schema{
  110. {Type: IntegerFieldType},
  111. {Type: IntegerFieldType},
  112. },
  113. },
  114. },
  115. "a": {
  116. result: &readDataResult{
  117. pageToken: "",
  118. rows: [][]Value{{101, 102}, {111, 112}},
  119. schema: Schema{
  120. {Type: IntegerFieldType},
  121. {Type: IntegerFieldType},
  122. },
  123. },
  124. },
  125. },
  126. want: []ValueList{{1, 2}, {11, 12}, {101, 102}, {111, 112}},
  127. wantSchema: Schema{
  128. {Type: IntegerFieldType},
  129. {Type: IntegerFieldType},
  130. },
  131. },
  132. {
  133. desc: "Server response includes empty page",
  134. fetchResponses: map[string]fetchResponse{
  135. "": {
  136. result: &readDataResult{
  137. pageToken: "a",
  138. rows: [][]Value{{1, 2}, {11, 12}},
  139. schema: Schema{
  140. {Type: IntegerFieldType},
  141. {Type: IntegerFieldType},
  142. },
  143. },
  144. },
  145. "a": {
  146. result: &readDataResult{
  147. pageToken: "b",
  148. rows: [][]Value{},
  149. schema: Schema{
  150. {Type: IntegerFieldType},
  151. {Type: IntegerFieldType},
  152. },
  153. },
  154. },
  155. "b": {
  156. result: &readDataResult{
  157. pageToken: "",
  158. rows: [][]Value{{101, 102}, {111, 112}},
  159. schema: Schema{
  160. {Type: IntegerFieldType},
  161. {Type: IntegerFieldType},
  162. },
  163. },
  164. },
  165. },
  166. want: []ValueList{{1, 2}, {11, 12}, {101, 102}, {111, 112}},
  167. wantSchema: Schema{
  168. {Type: IntegerFieldType},
  169. {Type: IntegerFieldType},
  170. },
  171. },
  172. {
  173. desc: "Fetch error",
  174. fetchResponses: map[string]fetchResponse{
  175. "": {
  176. result: &readDataResult{
  177. pageToken: "a",
  178. rows: [][]Value{{1, 2}, {11, 12}},
  179. schema: Schema{
  180. {Type: IntegerFieldType},
  181. {Type: IntegerFieldType},
  182. },
  183. },
  184. },
  185. "a": {
  186. // We returns some data from this fetch, but also an error.
  187. // So the end result should include only data from the previous fetch.
  188. err: fetchFailure,
  189. result: &readDataResult{
  190. pageToken: "b",
  191. rows: [][]Value{{101, 102}, {111, 112}},
  192. schema: Schema{
  193. {Type: IntegerFieldType},
  194. {Type: IntegerFieldType},
  195. },
  196. },
  197. },
  198. },
  199. want: []ValueList{{1, 2}, {11, 12}},
  200. wantErr: fetchFailure,
  201. wantSchema: Schema{
  202. {Type: IntegerFieldType},
  203. {Type: IntegerFieldType},
  204. },
  205. },
  206. {
  207. desc: "Skip over a single element",
  208. alreadyConsumed: 1,
  209. fetchResponses: map[string]fetchResponse{
  210. "": {
  211. result: &readDataResult{
  212. pageToken: "a",
  213. rows: [][]Value{{1, 2}, {11, 12}},
  214. schema: Schema{
  215. {Type: IntegerFieldType},
  216. {Type: IntegerFieldType},
  217. },
  218. },
  219. },
  220. "a": {
  221. result: &readDataResult{
  222. pageToken: "",
  223. rows: [][]Value{{101, 102}, {111, 112}},
  224. schema: Schema{
  225. {Type: IntegerFieldType},
  226. {Type: IntegerFieldType},
  227. },
  228. },
  229. },
  230. },
  231. want: []ValueList{{11, 12}, {101, 102}, {111, 112}},
  232. wantSchema: Schema{
  233. {Type: IntegerFieldType},
  234. {Type: IntegerFieldType},
  235. },
  236. },
  237. {
  238. desc: "Skip over an entire page",
  239. alreadyConsumed: 2,
  240. fetchResponses: map[string]fetchResponse{
  241. "": {
  242. result: &readDataResult{
  243. pageToken: "a",
  244. rows: [][]Value{{1, 2}, {11, 12}},
  245. schema: Schema{
  246. {Type: IntegerFieldType},
  247. {Type: IntegerFieldType},
  248. },
  249. },
  250. },
  251. "a": {
  252. result: &readDataResult{
  253. pageToken: "",
  254. rows: [][]Value{{101, 102}, {111, 112}},
  255. schema: Schema{
  256. {Type: IntegerFieldType},
  257. {Type: IntegerFieldType},
  258. },
  259. },
  260. },
  261. },
  262. want: []ValueList{{101, 102}, {111, 112}},
  263. wantSchema: Schema{
  264. {Type: IntegerFieldType},
  265. {Type: IntegerFieldType},
  266. },
  267. },
  268. {
  269. desc: "Skip beyond start of second page",
  270. alreadyConsumed: 3,
  271. fetchResponses: map[string]fetchResponse{
  272. "": {
  273. result: &readDataResult{
  274. pageToken: "a",
  275. rows: [][]Value{{1, 2}, {11, 12}},
  276. schema: Schema{
  277. {Type: IntegerFieldType},
  278. {Type: IntegerFieldType},
  279. },
  280. },
  281. },
  282. "a": {
  283. result: &readDataResult{
  284. pageToken: "",
  285. rows: [][]Value{{101, 102}, {111, 112}},
  286. schema: Schema{
  287. {Type: IntegerFieldType},
  288. {Type: IntegerFieldType},
  289. },
  290. },
  291. },
  292. },
  293. want: []ValueList{{111, 112}},
  294. wantSchema: Schema{
  295. {Type: IntegerFieldType},
  296. {Type: IntegerFieldType},
  297. },
  298. },
  299. {
  300. desc: "Skip beyond all data",
  301. alreadyConsumed: 4,
  302. fetchResponses: map[string]fetchResponse{
  303. "": {
  304. result: &readDataResult{
  305. pageToken: "a",
  306. rows: [][]Value{{1, 2}, {11, 12}},
  307. schema: Schema{
  308. {Type: IntegerFieldType},
  309. {Type: IntegerFieldType},
  310. },
  311. },
  312. },
  313. "a": {
  314. result: &readDataResult{
  315. pageToken: "",
  316. rows: [][]Value{{101, 102}, {111, 112}},
  317. schema: Schema{
  318. {Type: IntegerFieldType},
  319. {Type: IntegerFieldType},
  320. },
  321. },
  322. },
  323. },
  324. // In this test case, Next will return false on its first call,
  325. // so we won't even attempt to call Get.
  326. want: []ValueList{},
  327. wantSchema: Schema{},
  328. },
  329. }
  330. for _, tc := range testCases {
  331. pf := &pageFetcherStub{
  332. fetchResponses: tc.fetchResponses,
  333. }
  334. it := newIterator(nil, pf)
  335. it.offset += tc.alreadyConsumed
  336. values, schema, err := consumeIterator(it)
  337. if err != nil {
  338. t.Fatalf("%s: %v", tc.desc, err)
  339. }
  340. if (len(values) != 0 || len(tc.want) != 0) && !reflect.DeepEqual(values, tc.want) {
  341. t.Errorf("%s: values:\ngot: %v\nwant:%v", tc.desc, values, tc.want)
  342. }
  343. if it.Err() != tc.wantErr {
  344. t.Errorf("%s: iterator.Err:\ngot: %v\nwant: %v", tc.desc, it.Err(), tc.wantErr)
  345. }
  346. if (len(schema) != 0 || len(tc.wantSchema) != 0) && !reflect.DeepEqual(schema, tc.wantSchema) {
  347. t.Errorf("%s: iterator.Schema:\ngot: %v\nwant: %v", tc.desc, schema, tc.wantSchema)
  348. }
  349. }
  350. }
  351. // consumeIterator reads the schema and all values from an iterator and returns them.
  352. func consumeIterator(it *Iterator) ([]ValueList, Schema, error) {
  353. var got []ValueList
  354. var schema Schema
  355. for it.Next(context.Background()) {
  356. var vals ValueList
  357. var err error
  358. if err = it.Get(&vals); err != nil {
  359. return nil, Schema{}, fmt.Errorf("err calling Get: %v", err)
  360. }
  361. got = append(got, vals)
  362. if schema, err = it.Schema(); err != nil {
  363. return nil, Schema{}, fmt.Errorf("err calling Schema: %v", err)
  364. }
  365. }
  366. return got, schema, nil
  367. }
  368. func TestGetBeforeNext(t *testing.T) {
  369. // TODO: once mashalling/unmarshalling of iterators is implemented, do a similar test for unmarshalled iterators.
  370. pf := &pageFetcherStub{
  371. fetchResponses: map[string]fetchResponse{
  372. "": {
  373. result: &readDataResult{
  374. pageToken: "",
  375. rows: [][]Value{{1, 2}, {11, 12}},
  376. },
  377. },
  378. },
  379. }
  380. it := newIterator(nil, pf)
  381. var vals ValueList
  382. if err := it.Get(&vals); err == nil {
  383. t.Errorf("Expected error calling Get before Next")
  384. }
  385. }
  386. type delayedPageFetcher struct {
  387. pageFetcherStub
  388. delayCount int
  389. }
  390. func (pf *delayedPageFetcher) fetch(ctx context.Context, c *Client, token string) (*readDataResult, error) {
  391. if pf.delayCount > 0 {
  392. pf.delayCount--
  393. return nil, errIncompleteJob
  394. }
  395. return pf.pageFetcherStub.fetch(ctx, c, token)
  396. }
  397. func TestIterateIncompleteJob(t *testing.T) {
  398. want := []ValueList{{1, 2}, {11, 12}, {101, 102}, {111, 112}}
  399. pf := pageFetcherStub{
  400. fetchResponses: map[string]fetchResponse{
  401. "": {
  402. result: &readDataResult{
  403. pageToken: "a",
  404. rows: [][]Value{{1, 2}, {11, 12}},
  405. },
  406. },
  407. "a": {
  408. result: &readDataResult{
  409. pageToken: "",
  410. rows: [][]Value{{101, 102}, {111, 112}},
  411. },
  412. },
  413. },
  414. }
  415. dpf := &delayedPageFetcher{
  416. pageFetcherStub: pf,
  417. delayCount: 1,
  418. }
  419. it := newIterator(nil, dpf)
  420. values, _, err := consumeIterator(it)
  421. if err != nil {
  422. t.Fatal(err)
  423. }
  424. if (len(values) != 0 || len(want) != 0) && !reflect.DeepEqual(values, want) {
  425. t.Errorf("values: got:\n%v\nwant:\n%v", values, want)
  426. }
  427. if it.Err() != nil {
  428. t.Fatalf("iterator.Err: got:\n%v", it.Err())
  429. }
  430. if dpf.delayCount != 0 {
  431. t.Errorf("delayCount: got: %v, want: 0", dpf.delayCount)
  432. }
  433. }
  434. func TestGetDuringErrorState(t *testing.T) {
  435. pf := &pageFetcherStub{
  436. fetchResponses: map[string]fetchResponse{
  437. "": {err: errors.New("bang")},
  438. },
  439. }
  440. it := newIterator(nil, pf)
  441. var vals ValueList
  442. it.Next(context.Background())
  443. if it.Err() == nil {
  444. t.Errorf("Expected error after calling Next")
  445. }
  446. if err := it.Get(&vals); err == nil {
  447. t.Errorf("Expected error calling Get when iterator has a non-nil error.")
  448. }
  449. }
  450. func TestGetAfterFinished(t *testing.T) {
  451. testCases := []struct {
  452. alreadyConsumed int64 // amount to advance offset before commencing reading.
  453. fetchResponses map[string]fetchResponse
  454. want []ValueList
  455. }{
  456. {
  457. fetchResponses: map[string]fetchResponse{
  458. "": {
  459. result: &readDataResult{
  460. pageToken: "",
  461. rows: [][]Value{{1, 2}, {11, 12}},
  462. },
  463. },
  464. },
  465. want: []ValueList{{1, 2}, {11, 12}},
  466. },
  467. {
  468. fetchResponses: map[string]fetchResponse{
  469. "": {
  470. result: &readDataResult{
  471. pageToken: "",
  472. rows: [][]Value{},
  473. },
  474. },
  475. },
  476. want: []ValueList{},
  477. },
  478. {
  479. alreadyConsumed: 100,
  480. fetchResponses: map[string]fetchResponse{
  481. "": {
  482. result: &readDataResult{
  483. pageToken: "",
  484. rows: [][]Value{{1, 2}, {11, 12}},
  485. },
  486. },
  487. },
  488. want: []ValueList{},
  489. },
  490. }
  491. for _, tc := range testCases {
  492. pf := &pageFetcherStub{
  493. fetchResponses: tc.fetchResponses,
  494. }
  495. it := newIterator(nil, pf)
  496. it.offset += tc.alreadyConsumed
  497. values, _, err := consumeIterator(it)
  498. if err != nil {
  499. t.Fatal(err)
  500. }
  501. if (len(values) != 0 || len(tc.want) != 0) && !reflect.DeepEqual(values, tc.want) {
  502. t.Errorf("values: got:\n%v\nwant:\n%v", values, tc.want)
  503. }
  504. if it.Err() != nil {
  505. t.Fatalf("iterator.Err: got:\n%v\nwant:\n:nil", it.Err())
  506. }
  507. // Try calling Get again.
  508. var vals ValueList
  509. if err := it.Get(&vals); err == nil {
  510. t.Errorf("Expected error calling Get when there are no more values")
  511. }
  512. }
  513. }