bigtable_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. /*
  2. Copyright 2015 Google Inc. All Rights Reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package bigtable
  14. import (
  15. "flag"
  16. "fmt"
  17. "math/rand"
  18. "reflect"
  19. "sort"
  20. "strings"
  21. "sync"
  22. "testing"
  23. "time"
  24. "github.com/golang/protobuf/proto"
  25. "golang.org/x/net/context"
  26. "google.golang.org/cloud"
  27. "google.golang.org/cloud/bigtable/bttest"
  28. btspb "google.golang.org/cloud/bigtable/internal/service_proto"
  29. "google.golang.org/grpc"
  30. )
  31. func dataChunk(fam, col string, ts int64, data string) string {
  32. return fmt.Sprintf("chunks:<row_contents:<name:%q columns:<qualifier:%q cells:<timestamp_micros:%d value:%q>>>>", fam, col, ts, data)
  33. }
  34. func commit() string { return "chunks:<commit_row:true>" }
  35. func reset() string { return "chunks:<reset_row:true>" }
  36. var chunkTests = []struct {
  37. desc string
  38. chunks []string // sequence of ReadRowsResponse protos in text format
  39. want map[string]Row
  40. }{
  41. {
  42. desc: "single row single chunk",
  43. chunks: []string{
  44. `row_key: "row1" ` + dataChunk("fam", "col1", 1428382701000000, "data") + commit(),
  45. },
  46. want: map[string]Row{
  47. "row1": Row{
  48. "fam": []ReadItem{{
  49. Row: "row1",
  50. Column: "fam:col1",
  51. Timestamp: 1428382701000000,
  52. Value: []byte("data"),
  53. }},
  54. },
  55. },
  56. },
  57. {
  58. desc: "single row multiple chunks",
  59. chunks: []string{
  60. `row_key: "row1" ` + dataChunk("fam", "col1", 1428382701000000, "data"),
  61. `row_key: "row1" ` + dataChunk("fam", "col2", 1428382702000000, "more data"),
  62. `row_key: "row1" ` + commit(),
  63. },
  64. want: map[string]Row{
  65. "row1": Row{
  66. "fam": []ReadItem{
  67. {
  68. Row: "row1",
  69. Column: "fam:col1",
  70. Timestamp: 1428382701000000,
  71. Value: []byte("data"),
  72. },
  73. {
  74. Row: "row1",
  75. Column: "fam:col2",
  76. Timestamp: 1428382702000000,
  77. Value: []byte("more data"),
  78. },
  79. },
  80. },
  81. },
  82. },
  83. {
  84. desc: "chunk, reset, chunk, commit",
  85. chunks: []string{
  86. `row_key: "row1" ` + dataChunk("fam", "col1", 1428382701000000, "data"),
  87. `row_key: "row1" ` + reset(),
  88. `row_key: "row1" ` + dataChunk("fam", "col1", 1428382702000000, "data") + commit(),
  89. },
  90. want: map[string]Row{
  91. "row1": Row{
  92. "fam": []ReadItem{{
  93. Row: "row1",
  94. Column: "fam:col1",
  95. Timestamp: 1428382702000000,
  96. Value: []byte("data"),
  97. }},
  98. },
  99. },
  100. },
  101. {
  102. desc: "chunk, reset, commit",
  103. chunks: []string{
  104. `row_key: "row1" ` + dataChunk("fam", "col1", 1428382701000000, "data"),
  105. `row_key: "row1" ` + reset(),
  106. `row_key: "row1" ` + commit(),
  107. },
  108. want: map[string]Row{},
  109. },
  110. // TODO(dsymonds): More test cases, including
  111. // - multiple rows
  112. }
  113. func TestChunkReader(t *testing.T) {
  114. for _, tc := range chunkTests {
  115. cr := new(chunkReader)
  116. got := make(map[string]Row)
  117. for i, txt := range tc.chunks {
  118. rrr := new(btspb.ReadRowsResponse)
  119. if err := proto.UnmarshalText(txt, rrr); err != nil {
  120. t.Fatalf("%s: internal error: bad #%d test text: %v", tc.desc, i, err)
  121. }
  122. if row := cr.process(rrr); row != nil {
  123. got[row.Key()] = row
  124. }
  125. }
  126. // TODO(dsymonds): check for partial rows?
  127. if !reflect.DeepEqual(got, tc.want) {
  128. t.Errorf("%s: processed response mismatch.\n got %+v\nwant %+v", tc.desc, got, tc.want)
  129. }
  130. }
  131. }
  132. func TestPrefix(t *testing.T) {
  133. tests := []struct {
  134. prefix, succ string
  135. }{
  136. {"", ""},
  137. {"\xff", ""}, // when used, "" means Infinity
  138. {"x\xff", "y"},
  139. {"\xfe", "\xff"},
  140. }
  141. for _, tc := range tests {
  142. got := prefixSuccessor(tc.prefix)
  143. if got != tc.succ {
  144. t.Errorf("prefixSuccessor(%q) = %q, want %s", tc.prefix, got, tc.succ)
  145. continue
  146. }
  147. r := PrefixRange(tc.prefix)
  148. if tc.succ == "" && r.limit != "" {
  149. t.Errorf("PrefixRange(%q) got limit %q", tc.prefix, r.limit)
  150. }
  151. if tc.succ != "" && r.limit != tc.succ {
  152. t.Errorf("PrefixRange(%q) got limit %q, want %q", tc.prefix, r.limit, tc.succ)
  153. }
  154. }
  155. }
  156. var useProd = flag.String("use_prod", "", `if set to "proj,zone,cluster,table", run integration test against production`)
  157. func TestClientIntegration(t *testing.T) {
  158. start := time.Now()
  159. lastCheckpoint := start
  160. checkpoint := func(s string) {
  161. n := time.Now()
  162. t.Logf("[%s] %v since start, %v since last checkpoint", s, n.Sub(start), n.Sub(lastCheckpoint))
  163. lastCheckpoint = n
  164. }
  165. proj, zone, cluster, table := "proj", "zone", "cluster", "mytable"
  166. var clientOpts []cloud.ClientOption
  167. timeout := 10 * time.Second
  168. if *useProd == "" {
  169. srv, err := bttest.NewServer()
  170. if err != nil {
  171. t.Fatal(err)
  172. }
  173. defer srv.Close()
  174. t.Logf("bttest.Server running on %s", srv.Addr)
  175. conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
  176. if err != nil {
  177. t.Fatalf("grpc.Dial: %v", err)
  178. }
  179. clientOpts = []cloud.ClientOption{cloud.WithBaseGRPC(conn)}
  180. } else {
  181. t.Logf("Running test against production")
  182. a := strings.Split(*useProd, ",")
  183. proj, zone, cluster, table = a[0], a[1], a[2], a[3]
  184. timeout = 5 * time.Minute
  185. }
  186. ctx, _ := context.WithTimeout(context.Background(), timeout)
  187. client, err := NewClient(ctx, proj, zone, cluster, clientOpts...)
  188. if err != nil {
  189. t.Fatalf("NewClient: %v", err)
  190. }
  191. defer client.Close()
  192. checkpoint("dialed Client")
  193. adminClient, err := NewAdminClient(ctx, proj, zone, cluster, clientOpts...)
  194. if err != nil {
  195. t.Fatalf("NewAdminClient: %v", err)
  196. }
  197. defer adminClient.Close()
  198. checkpoint("dialed AdminClient")
  199. // Delete the table at the end of the test.
  200. // Do this even before creating the table so that if this is running
  201. // against production and CreateTable fails there's a chance of cleaning it up.
  202. defer adminClient.DeleteTable(ctx, table)
  203. if err := adminClient.CreateTable(ctx, table); err != nil {
  204. t.Fatalf("Creating table: %v", err)
  205. }
  206. checkpoint("created table")
  207. if err := adminClient.CreateColumnFamily(ctx, table, "follows"); err != nil {
  208. t.Fatalf("Creating column family: %v", err)
  209. }
  210. checkpoint(`created "follows" column family`)
  211. tbl := client.Open(table)
  212. // Insert some data.
  213. initialData := map[string][]string{
  214. "wmckinley": []string{"tjefferson"},
  215. "gwashington": []string{"jadams"},
  216. "tjefferson": []string{"gwashington", "jadams"}, // wmckinley set conditionally below
  217. "jadams": []string{"gwashington", "tjefferson"},
  218. }
  219. for row, ss := range initialData {
  220. mut := NewMutation()
  221. for _, name := range ss {
  222. mut.Set("follows", name, 0, []byte("1"))
  223. }
  224. if err := tbl.Apply(ctx, row, mut); err != nil {
  225. t.Errorf("Mutating row %q: %v", row, err)
  226. }
  227. }
  228. checkpoint("inserted initial data")
  229. // Do a conditional mutation with a complex filter.
  230. mutTrue := NewMutation()
  231. mutTrue.Set("follows", "wmckinley", 0, []byte("1"))
  232. filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter("."))
  233. mut := NewCondMutation(filter, mutTrue, nil)
  234. if err := tbl.Apply(ctx, "tjefferson", mut); err != nil {
  235. t.Errorf("Conditionally mutating row: %v", err)
  236. }
  237. // Do a second condition mutation with a filter that does not match,
  238. // and thus no changes should be made.
  239. mutTrue = NewMutation()
  240. mutTrue.DeleteRow()
  241. filter = ColumnFilter("snoop.dogg")
  242. mut = NewCondMutation(filter, mutTrue, nil)
  243. if err := tbl.Apply(ctx, "tjefferson", mut); err != nil {
  244. t.Errorf("Conditionally mutating row: %v", err)
  245. }
  246. checkpoint("did two conditional mutations")
  247. // Fetch a row.
  248. row, err := tbl.ReadRow(ctx, "jadams")
  249. if err != nil {
  250. t.Fatalf("Reading a row: %v", err)
  251. }
  252. wantRow := Row{
  253. "follows": []ReadItem{
  254. {Row: "jadams", Column: "follows:gwashington", Value: []byte("1")},
  255. {Row: "jadams", Column: "follows:tjefferson", Value: []byte("1")},
  256. },
  257. }
  258. for _, ris := range row {
  259. sort.Sort(byColumn(ris))
  260. }
  261. if !reflect.DeepEqual(row, wantRow) {
  262. t.Errorf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
  263. }
  264. checkpoint("tested ReadRow")
  265. // Do a bunch of reads with filters.
  266. readTests := []struct {
  267. desc string
  268. rr RowRange
  269. filter Filter // may be nil
  270. // We do the read, grab all the cells, turn them into "<row>-<col>-<val>",
  271. // sort that list, and join with a comma.
  272. want string
  273. }{
  274. {
  275. desc: "read all, unfiltered",
  276. rr: RowRange{},
  277. want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
  278. },
  279. {
  280. desc: "read with InfiniteRange, unfiltered",
  281. rr: InfiniteRange("tjefferson"),
  282. want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1",
  283. },
  284. {
  285. desc: "read with NewRange, unfiltered",
  286. rr: NewRange("gargamel", "hubbard"),
  287. want: "gwashington-jadams-1",
  288. },
  289. {
  290. desc: "read with PrefixRange, unfiltered",
  291. rr: PrefixRange("jad"),
  292. want: "jadams-gwashington-1,jadams-tjefferson-1",
  293. },
  294. {
  295. desc: "read with SingleRow, unfiltered",
  296. rr: SingleRow("wmckinley"),
  297. want: "wmckinley-tjefferson-1",
  298. },
  299. {
  300. desc: "read all, with ColumnFilter",
  301. rr: RowRange{},
  302. filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson"
  303. want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1",
  304. },
  305. }
  306. for _, tc := range readTests {
  307. var opts []ReadOption
  308. if tc.filter != nil {
  309. opts = append(opts, RowFilter(tc.filter))
  310. }
  311. var elt []string
  312. err := tbl.ReadRows(context.Background(), tc.rr, func(r Row) bool {
  313. for _, ris := range r {
  314. for _, ri := range ris {
  315. // Use the column qualifier only to make the test data briefer.
  316. col := ri.Column[strings.Index(ri.Column, ":")+1:]
  317. x := fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value)
  318. elt = append(elt, x)
  319. }
  320. }
  321. return true
  322. }, opts...)
  323. if err != nil {
  324. t.Errorf("%s: %v", tc.desc, err)
  325. continue
  326. }
  327. sort.Strings(elt)
  328. if got := strings.Join(elt, ","); got != tc.want {
  329. t.Errorf("%s: wrong reads.\n got %q\nwant %q", tc.desc, got, tc.want)
  330. }
  331. }
  332. checkpoint("tested ReadRows in a few ways")
  333. // Do a scan and stop part way through.
  334. // Verify that the ReadRows callback doesn't keep running.
  335. stopped := false
  336. err = tbl.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
  337. if r.Key() < "h" {
  338. return true
  339. }
  340. if !stopped {
  341. stopped = true
  342. return false
  343. }
  344. t.Errorf("ReadRows kept scanning to row %q after being told to stop", r.Key())
  345. return false
  346. })
  347. if err != nil {
  348. t.Errorf("Partial ReadRows: %v", err)
  349. }
  350. checkpoint("did partial ReadRows test")
  351. // Delete a row and check it goes away.
  352. mut = NewMutation()
  353. mut.DeleteRow()
  354. if err := tbl.Apply(ctx, "wmckinley", mut); err != nil {
  355. t.Errorf("Apply DeleteRow: %v", err)
  356. }
  357. row, err = tbl.ReadRow(ctx, "wmckinley")
  358. if err != nil {
  359. t.Fatalf("Reading a row after DeleteRow: %v", err)
  360. }
  361. if len(row) != 0 {
  362. t.Fatalf("Read non-zero row after DeleteRow: %v", row)
  363. }
  364. checkpoint("exercised DeleteRow")
  365. // Check ReadModifyWrite.
  366. if err := adminClient.CreateColumnFamily(ctx, table, "counter"); err != nil {
  367. t.Fatalf("Creating column family: %v", err)
  368. }
  369. appendRMW := func(b []byte) *ReadModifyWrite {
  370. rmw := NewReadModifyWrite()
  371. rmw.AppendValue("counter", "likes", b)
  372. return rmw
  373. }
  374. incRMW := func(n int64) *ReadModifyWrite {
  375. rmw := NewReadModifyWrite()
  376. rmw.Increment("counter", "likes", n)
  377. return rmw
  378. }
  379. rmwSeq := []struct {
  380. desc string
  381. rmw *ReadModifyWrite
  382. want []byte
  383. }{
  384. {
  385. desc: "append #1",
  386. rmw: appendRMW([]byte{0, 0, 0}),
  387. want: []byte{0, 0, 0},
  388. },
  389. {
  390. desc: "append #2",
  391. rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17
  392. want: []byte{0, 0, 0, 0, 0, 0, 0, 17},
  393. },
  394. {
  395. desc: "increment",
  396. rmw: incRMW(8),
  397. want: []byte{0, 0, 0, 0, 0, 0, 0, 25},
  398. },
  399. }
  400. for _, step := range rmwSeq {
  401. row, err := tbl.ApplyReadModifyWrite(ctx, "gwashington", step.rmw)
  402. if err != nil {
  403. t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err)
  404. }
  405. clearTimestamps(row)
  406. wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}}
  407. if !reflect.DeepEqual(row, wantRow) {
  408. t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow)
  409. }
  410. }
  411. checkpoint("tested ReadModifyWrite")
  412. // Test arbitrary timestamps more thoroughly.
  413. if err := adminClient.CreateColumnFamily(ctx, table, "ts"); err != nil {
  414. t.Fatalf("Creating column family: %v", err)
  415. }
  416. const numVersions = 4
  417. mut = NewMutation()
  418. for i := 0; i < numVersions; i++ {
  419. // Timestamps are used in thousands because the server
  420. // only permits that granularity.
  421. mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i)))
  422. }
  423. if err := tbl.Apply(ctx, "testrow", mut); err != nil {
  424. t.Fatalf("Mutating row: %v", err)
  425. }
  426. r, err := tbl.ReadRow(ctx, "testrow")
  427. if err != nil {
  428. t.Fatalf("Reading row: %v", err)
  429. }
  430. wantRow = Row{"ts": []ReadItem{
  431. // These should be returned in descending timestamp order.
  432. {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
  433. {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
  434. {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
  435. {Row: "testrow", Column: "ts:col", Timestamp: 0, Value: []byte("val-0")},
  436. }}
  437. if !reflect.DeepEqual(r, wantRow) {
  438. t.Errorf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow)
  439. }
  440. // Do the same read, but filter to the latest two versions.
  441. r, err = tbl.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
  442. if err != nil {
  443. t.Fatalf("Reading row: %v", err)
  444. }
  445. wantRow = Row{"ts": []ReadItem{
  446. {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
  447. {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")},
  448. }}
  449. if !reflect.DeepEqual(r, wantRow) {
  450. t.Errorf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
  451. }
  452. // Delete the cell with timestamp 2000 and repeat the last read,
  453. // checking that we get ts 3000 and ts 1000.
  454. mut = NewMutation()
  455. mut.DeleteTimestampRange("ts", "col", 2000, 3000) // half-open interval
  456. if err := tbl.Apply(ctx, "testrow", mut); err != nil {
  457. t.Fatalf("Mutating row: %v", err)
  458. }
  459. r, err = tbl.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2)))
  460. if err != nil {
  461. t.Fatalf("Reading row: %v", err)
  462. }
  463. wantRow = Row{"ts": []ReadItem{
  464. {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")},
  465. {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")},
  466. }}
  467. if !reflect.DeepEqual(r, wantRow) {
  468. t.Errorf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow)
  469. }
  470. checkpoint("tested multiple versions in a cell")
  471. // Do highly concurrent reads/writes.
  472. // TODO(dsymonds): Raise this to 1000 when https://github.com/grpc/grpc-go/issues/205 is resolved.
  473. const maxConcurrency = 100
  474. var wg sync.WaitGroup
  475. for i := 0; i < maxConcurrency; i++ {
  476. wg.Add(1)
  477. go func() {
  478. defer wg.Done()
  479. switch r := rand.Intn(100); { // r ∈ [0,100)
  480. case 0 <= r && r < 30:
  481. // Do a read.
  482. _, err := tbl.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1)))
  483. if err != nil {
  484. t.Errorf("Concurrent read: %v", err)
  485. }
  486. case 30 <= r && r < 100:
  487. // Do a write.
  488. mut := NewMutation()
  489. mut.Set("ts", "col", 0, []byte("data"))
  490. if err := tbl.Apply(ctx, "testrow", mut); err != nil {
  491. t.Errorf("Concurrent write: %v", err)
  492. }
  493. }
  494. }()
  495. }
  496. wg.Wait()
  497. checkpoint("tested high concurrency")
  498. // Large reads, writes and scans.
  499. bigBytes := make([]byte, 15<<20) // 15 MB is large
  500. nonsense := []byte("lorem ipsum dolor sit amet, ")
  501. fill(bigBytes, nonsense)
  502. mut = NewMutation()
  503. mut.Set("ts", "col", 0, bigBytes)
  504. if err := tbl.Apply(ctx, "bigrow", mut); err != nil {
  505. t.Errorf("Big write: %v", err)
  506. }
  507. r, err = tbl.ReadRow(ctx, "bigrow")
  508. if err != nil {
  509. t.Errorf("Big read: %v", err)
  510. }
  511. wantRow = Row{"ts": []ReadItem{
  512. {Row: "bigrow", Column: "ts:col", Value: bigBytes},
  513. }}
  514. if !reflect.DeepEqual(r, wantRow) {
  515. t.Errorf("Big read returned incorrect bytes: %v", r)
  516. }
  517. // Now write 1000 rows, each with 82 KB values, then scan them all.
  518. medBytes := make([]byte, 82<<10)
  519. fill(medBytes, nonsense)
  520. sem := make(chan int, 50) // do up to 50 mutations at a time.
  521. for i := 0; i < 1000; i++ {
  522. mut := NewMutation()
  523. mut.Set("ts", "big-scan", 0, medBytes)
  524. row := fmt.Sprintf("row-%d", i)
  525. wg.Add(1)
  526. go func() {
  527. defer wg.Done()
  528. defer func() { <-sem }()
  529. sem <- 1
  530. if err := tbl.Apply(ctx, row, mut); err != nil {
  531. t.Errorf("Preparing large scan: %v", err)
  532. }
  533. }()
  534. }
  535. wg.Wait()
  536. n := 0
  537. err = tbl.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
  538. for _, ris := range r {
  539. for _, ri := range ris {
  540. n += len(ri.Value)
  541. }
  542. }
  543. return true
  544. }, RowFilter(ColumnFilter("big-scan")))
  545. if err != nil {
  546. t.Errorf("Doing large scan: %v", err)
  547. }
  548. if want := 1000 * len(medBytes); n != want {
  549. t.Errorf("Large scan returned %d bytes, want %d", n, want)
  550. }
  551. checkpoint("tested big read/write/scan")
  552. }
  553. func fill(b, sub []byte) {
  554. for len(b) > len(sub) {
  555. n := copy(b, sub)
  556. b = b[n:]
  557. }
  558. }
  559. type byColumn []ReadItem
  560. func (b byColumn) Len() int { return len(b) }
  561. func (b byColumn) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  562. func (b byColumn) Less(i, j int) bool { return b[i].Column < b[j].Column }
  563. func clearTimestamps(r Row) {
  564. for _, ris := range r {
  565. for i := range ris {
  566. ris[i].Timestamp = 0
  567. }
  568. }
  569. }