inmem.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847
  1. /*
  2. Copyright 2015 Google Inc. All Rights Reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. Package bttest contains test helpers for working with the bigtable package.
  15. To use a Server, create it, and then connect to it with no security:
  16. (The project/zone/cluster values are ignored.)
  17. srv, err := bttest.NewServer()
  18. ...
  19. conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
  20. ...
  21. client, err := bigtable.NewClient(ctx, proj, zone, cluster,
  22. cloud.WithBaseGRPC(conn))
  23. ...
  24. */
  25. package bttest // import "google.golang.org/cloud/bigtable/bttest"
  26. import (
  27. "encoding/binary"
  28. "fmt"
  29. "log"
  30. "math/rand"
  31. "net"
  32. "regexp"
  33. "sort"
  34. "strings"
  35. "sync"
  36. "time"
  37. "golang.org/x/net/context"
  38. btdpb "google.golang.org/cloud/bigtable/internal/data_proto"
  39. emptypb "google.golang.org/cloud/bigtable/internal/empty"
  40. btspb "google.golang.org/cloud/bigtable/internal/service_proto"
  41. bttdpb "google.golang.org/cloud/bigtable/internal/table_data_proto"
  42. bttspb "google.golang.org/cloud/bigtable/internal/table_service_proto"
  43. "google.golang.org/grpc"
  44. )
  45. // Server is an in-memory Cloud Bigtable fake.
  46. // It is unauthenticated, and only a rough approximation.
  47. type Server struct {
  48. Addr string
  49. l net.Listener
  50. srv *grpc.Server
  51. s *server
  52. }
  53. // server is the real implementation of the fake.
  54. // It is a separate and unexported type so the API won't be cluttered with
  55. // methods that are only relevant to the fake's implementation.
  56. type server struct {
  57. mu sync.Mutex
  58. tables map[string]*table // keyed by fully qualified name
  59. gcc chan int // set when gcloop starts, closed when server shuts down
  60. // Any unimplemented methods will cause a panic.
  61. bttspb.BigtableTableServiceServer
  62. btspb.BigtableServiceServer
  63. }
  64. // NewServer creates a new Server. The Server will be listening for gRPC connections
  65. // at the address named by the Addr field, without TLS.
  66. func NewServer() (*Server, error) {
  67. l, err := net.Listen("tcp", "127.0.0.1:0")
  68. if err != nil {
  69. return nil, err
  70. }
  71. s := &Server{
  72. Addr: l.Addr().String(),
  73. l: l,
  74. srv: grpc.NewServer(),
  75. s: &server{
  76. tables: make(map[string]*table),
  77. },
  78. }
  79. bttspb.RegisterBigtableTableServiceServer(s.srv, s.s)
  80. btspb.RegisterBigtableServiceServer(s.srv, s.s)
  81. go s.srv.Serve(s.l)
  82. return s, nil
  83. }
  84. // Close shuts down the server.
  85. func (s *Server) Close() {
  86. s.s.mu.Lock()
  87. if s.s.gcc != nil {
  88. close(s.s.gcc)
  89. }
  90. s.s.mu.Unlock()
  91. s.srv.Stop()
  92. s.l.Close()
  93. }
  94. func (s *server) CreateTable(ctx context.Context, req *bttspb.CreateTableRequest) (*bttdpb.Table, error) {
  95. tbl := req.Name + "/tables/" + req.TableId
  96. s.mu.Lock()
  97. if _, ok := s.tables[tbl]; ok {
  98. s.mu.Unlock()
  99. return nil, fmt.Errorf("table %q already exists", tbl)
  100. }
  101. s.tables[tbl] = newTable()
  102. s.mu.Unlock()
  103. return &bttdpb.Table{Name: tbl}, nil
  104. }
  105. func (s *server) ListTables(ctx context.Context, req *bttspb.ListTablesRequest) (*bttspb.ListTablesResponse, error) {
  106. res := &bttspb.ListTablesResponse{}
  107. prefix := req.Name + "/tables/"
  108. s.mu.Lock()
  109. for tbl := range s.tables {
  110. if strings.HasPrefix(tbl, prefix) {
  111. res.Tables = append(res.Tables, &bttdpb.Table{Name: tbl})
  112. }
  113. }
  114. s.mu.Unlock()
  115. return res, nil
  116. }
  117. func (s *server) GetTable(ctx context.Context, req *bttspb.GetTableRequest) (*bttdpb.Table, error) {
  118. tbl := req.Name
  119. s.mu.Lock()
  120. tblIns, ok := s.tables[tbl]
  121. s.mu.Unlock()
  122. if !ok {
  123. return nil, fmt.Errorf("table %q not found", tbl)
  124. }
  125. return &bttdpb.Table{
  126. Name: tbl,
  127. ColumnFamilies: toColumnFamilies(tblIns.families),
  128. }, nil
  129. }
  130. func (s *server) DeleteTable(ctx context.Context, req *bttspb.DeleteTableRequest) (*emptypb.Empty, error) {
  131. s.mu.Lock()
  132. defer s.mu.Unlock()
  133. if _, ok := s.tables[req.Name]; !ok {
  134. return nil, fmt.Errorf("no such table %q", req.Name)
  135. }
  136. delete(s.tables, req.Name)
  137. return &emptypb.Empty{}, nil
  138. }
  139. func (s *server) CreateColumnFamily(ctx context.Context, req *bttspb.CreateColumnFamilyRequest) (*bttdpb.ColumnFamily, error) {
  140. s.mu.Lock()
  141. tbl, ok := s.tables[req.Name]
  142. s.mu.Unlock()
  143. if !ok {
  144. return nil, fmt.Errorf("no such table %q", req.Name)
  145. }
  146. // Check it is unique and record it.
  147. fam := req.ColumnFamilyId
  148. tbl.mu.Lock()
  149. defer tbl.mu.Unlock()
  150. if _, ok := tbl.families[fam]; ok {
  151. return nil, fmt.Errorf("family %q already exists", fam)
  152. }
  153. newcf := &columnFamily{
  154. name: req.Name + "/columnFamilies/" + fam,
  155. }
  156. tbl.families[fam] = newcf
  157. return newcf.proto(), nil
  158. }
  159. func (s *server) UpdateColumnFamily(ctx context.Context, req *bttdpb.ColumnFamily) (*bttdpb.ColumnFamily, error) {
  160. index := strings.Index(req.Name, "/columnFamilies/")
  161. if index == -1 {
  162. return nil, fmt.Errorf("bad family name %q", req.Name)
  163. }
  164. tblName := req.Name[:index]
  165. fam := req.Name[index+len("/columnFamilies/"):]
  166. s.mu.Lock()
  167. tbl, ok := s.tables[tblName]
  168. s.mu.Unlock()
  169. if !ok {
  170. return nil, fmt.Errorf("no such table %q", req.Name)
  171. }
  172. tbl.mu.Lock()
  173. defer tbl.mu.Unlock()
  174. // Check it is unique and record it.
  175. if _, ok := tbl.families[fam]; !ok {
  176. return nil, fmt.Errorf("no such family %q", fam)
  177. }
  178. newcf := &columnFamily{
  179. name: req.Name,
  180. gcRule: req.GcRule,
  181. }
  182. // assume that we ALWAYS want to replace by the new setting
  183. // we may need partial update through
  184. tbl.families[fam] = newcf
  185. s.needGC()
  186. return newcf.proto(), nil
  187. }
  188. func (s *server) ReadRows(req *btspb.ReadRowsRequest, stream btspb.BigtableService_ReadRowsServer) error {
  189. s.mu.Lock()
  190. tbl, ok := s.tables[req.TableName]
  191. s.mu.Unlock()
  192. if !ok {
  193. return fmt.Errorf("no such table %q", req.TableName)
  194. }
  195. var start, end string // half-open interval
  196. switch targ := req.Target.(type) {
  197. case *btspb.ReadRowsRequest_RowRange:
  198. start, end = string(targ.RowRange.StartKey), string(targ.RowRange.EndKey)
  199. case *btspb.ReadRowsRequest_RowKey:
  200. // A single row read is simply an edge case.
  201. start = string(targ.RowKey)
  202. end = start + "\x00"
  203. default:
  204. return fmt.Errorf("unknown ReadRowsRequest.Target oneof %T", targ)
  205. }
  206. // Get rows to stream back.
  207. tbl.mu.RLock()
  208. si, ei := 0, len(tbl.rows) // half-open interval
  209. if start != "" {
  210. si = sort.Search(len(tbl.rows), func(i int) bool { return tbl.rows[i].key >= start })
  211. }
  212. if end != "" {
  213. ei = sort.Search(len(tbl.rows), func(i int) bool { return tbl.rows[i].key >= end })
  214. }
  215. if si >= ei {
  216. tbl.mu.RUnlock()
  217. return nil
  218. }
  219. rows := make([]*row, ei-si)
  220. copy(rows, tbl.rows[si:ei])
  221. tbl.mu.RUnlock()
  222. for _, r := range rows {
  223. if err := streamRow(stream, r, req.Filter); err != nil {
  224. return err
  225. }
  226. }
  227. return nil
  228. }
  229. func streamRow(stream btspb.BigtableService_ReadRowsServer, r *row, f *btdpb.RowFilter) error {
  230. r.mu.Lock()
  231. nr := r.copy()
  232. r.mu.Unlock()
  233. r = nr
  234. filterRow(f, r)
  235. rrr := &btspb.ReadRowsResponse{
  236. RowKey: []byte(r.key),
  237. }
  238. for col, cells := range r.cells {
  239. i := strings.Index(col, ":") // guaranteed to exist
  240. fam, col := col[:i], col[i+1:]
  241. if len(cells) == 0 {
  242. continue
  243. }
  244. // TODO(dsymonds): Apply transformers.
  245. colm := &btdpb.Column{
  246. Qualifier: []byte(col),
  247. // Cells is populated below.
  248. }
  249. for _, cell := range cells {
  250. colm.Cells = append(colm.Cells, &btdpb.Cell{
  251. TimestampMicros: cell.ts,
  252. Value: cell.value,
  253. })
  254. }
  255. rrr.Chunks = append(rrr.Chunks, &btspb.ReadRowsResponse_Chunk{
  256. Chunk: &btspb.ReadRowsResponse_Chunk_RowContents{&btdpb.Family{
  257. Name: fam,
  258. Columns: []*btdpb.Column{colm},
  259. }},
  260. })
  261. }
  262. rrr.Chunks = append(rrr.Chunks, &btspb.ReadRowsResponse_Chunk{Chunk: &btspb.ReadRowsResponse_Chunk_CommitRow{true}})
  263. return stream.Send(rrr)
  264. }
  265. // filterRow modifies a row with the given filter.
  266. func filterRow(f *btdpb.RowFilter, r *row) {
  267. if f == nil {
  268. return
  269. }
  270. // Handle filters that apply beyond just including/excluding cells.
  271. switch f := f.Filter.(type) {
  272. case *btdpb.RowFilter_Chain_:
  273. for _, sub := range f.Chain.Filters {
  274. filterRow(sub, r)
  275. }
  276. return
  277. case *btdpb.RowFilter_Interleave_:
  278. srs := make([]*row, 0, len(f.Interleave.Filters))
  279. for _, sub := range f.Interleave.Filters {
  280. sr := r.copy()
  281. filterRow(sub, sr)
  282. srs = append(srs, sr)
  283. }
  284. // merge
  285. // TODO(dsymonds): is this correct?
  286. r.cells = make(map[string][]cell)
  287. for _, sr := range srs {
  288. for col, cs := range sr.cells {
  289. r.cells[col] = append(r.cells[col], cs...)
  290. }
  291. }
  292. for _, cs := range r.cells {
  293. sort.Sort(byDescTS(cs))
  294. }
  295. return
  296. case *btdpb.RowFilter_CellsPerColumnLimitFilter:
  297. lim := int(f.CellsPerColumnLimitFilter)
  298. for col, cs := range r.cells {
  299. if len(cs) > lim {
  300. r.cells[col] = cs[:lim]
  301. }
  302. }
  303. return
  304. }
  305. // Any other case, operate on a per-cell basis.
  306. for key, cs := range r.cells {
  307. i := strings.Index(key, ":") // guaranteed to exist
  308. fam, col := key[:i], key[i+1:]
  309. r.cells[key] = filterCells(f, fam, col, cs)
  310. }
  311. }
  312. func filterCells(f *btdpb.RowFilter, fam, col string, cs []cell) []cell {
  313. var ret []cell
  314. for _, cell := range cs {
  315. if includeCell(f, fam, col, cell) {
  316. ret = append(ret, cell)
  317. }
  318. }
  319. return ret
  320. }
  321. func includeCell(f *btdpb.RowFilter, fam, col string, cell cell) bool {
  322. if f == nil {
  323. return true
  324. }
  325. // TODO(dsymonds): Implement many more filters.
  326. switch f := f.Filter.(type) {
  327. default:
  328. log.Printf("WARNING: don't know how to handle filter of type %T (ignoring it)", f)
  329. return true
  330. case *btdpb.RowFilter_FamilyNameRegexFilter:
  331. pat := string(f.FamilyNameRegexFilter)
  332. rx, err := regexp.Compile(pat)
  333. if err != nil {
  334. log.Printf("Bad family_name_regex_filter pattern %q: %v", pat, err)
  335. return false
  336. }
  337. return rx.MatchString(fam)
  338. case *btdpb.RowFilter_ColumnQualifierRegexFilter:
  339. pat := string(f.ColumnQualifierRegexFilter)
  340. rx, err := regexp.Compile(pat)
  341. if err != nil {
  342. log.Printf("Bad column_qualifier_regex_filter pattern %q: %v", pat, err)
  343. return false
  344. }
  345. return rx.MatchString(col)
  346. case *btdpb.RowFilter_ValueRegexFilter:
  347. pat := string(f.ValueRegexFilter)
  348. rx, err := regexp.Compile(pat)
  349. if err != nil {
  350. log.Printf("Bad value_regex_filter pattern %q: %v", pat, err)
  351. return false
  352. }
  353. return rx.Match(cell.value)
  354. }
  355. }
  356. func (s *server) MutateRow(ctx context.Context, req *btspb.MutateRowRequest) (*emptypb.Empty, error) {
  357. s.mu.Lock()
  358. tbl, ok := s.tables[req.TableName]
  359. s.mu.Unlock()
  360. if !ok {
  361. return nil, fmt.Errorf("no such table %q", req.TableName)
  362. }
  363. r := tbl.mutableRow(string(req.RowKey))
  364. r.mu.Lock()
  365. defer r.mu.Unlock()
  366. if err := applyMutations(tbl, r, req.Mutations); err != nil {
  367. return nil, err
  368. }
  369. return &emptypb.Empty{}, nil
  370. }
  371. func (s *server) CheckAndMutateRow(ctx context.Context, req *btspb.CheckAndMutateRowRequest) (*btspb.CheckAndMutateRowResponse, error) {
  372. s.mu.Lock()
  373. tbl, ok := s.tables[req.TableName]
  374. s.mu.Unlock()
  375. if !ok {
  376. return nil, fmt.Errorf("no such table %q", req.TableName)
  377. }
  378. res := &btspb.CheckAndMutateRowResponse{}
  379. r := tbl.mutableRow(string(req.RowKey))
  380. r.mu.Lock()
  381. defer r.mu.Unlock()
  382. // Figure out which mutation to apply.
  383. whichMut := false
  384. if req.PredicateFilter == nil {
  385. // Use true_mutations iff row contains any cells.
  386. whichMut = len(r.cells) > 0
  387. } else {
  388. // Use true_mutations iff any cells in the row match the filter.
  389. // TODO(dsymonds): This could be cheaper.
  390. nr := r.copy()
  391. filterRow(req.PredicateFilter, nr)
  392. for _, cs := range nr.cells {
  393. if len(cs) > 0 {
  394. whichMut = true
  395. break
  396. }
  397. }
  398. // TODO(dsymonds): Figure out if this is supposed to be set
  399. // even when there's no predicate filter.
  400. res.PredicateMatched = whichMut
  401. }
  402. muts := req.FalseMutations
  403. if whichMut {
  404. muts = req.TrueMutations
  405. }
  406. if err := applyMutations(tbl, r, muts); err != nil {
  407. return nil, err
  408. }
  409. return res, nil
  410. }
  411. // applyMutations applies a sequence of mutations to a row.
  412. // It assumes r.mu is locked.
  413. func applyMutations(tbl *table, r *row, muts []*btdpb.Mutation) error {
  414. for _, mut := range muts {
  415. switch mut := mut.Mutation.(type) {
  416. default:
  417. return fmt.Errorf("can't handle mutation type %T", mut)
  418. case *btdpb.Mutation_SetCell_:
  419. set := mut.SetCell
  420. tbl.mu.RLock()
  421. _, famOK := tbl.families[set.FamilyName]
  422. tbl.mu.RUnlock()
  423. if !famOK {
  424. return fmt.Errorf("unknown family %q", set.FamilyName)
  425. }
  426. ts := set.TimestampMicros
  427. if ts == -1 { // bigtable.ServerTime
  428. ts = time.Now().UnixNano() / 1e3
  429. ts -= ts % 1000 // round to millisecond granularity
  430. }
  431. if !tbl.validTimestamp(ts) {
  432. return fmt.Errorf("invalid timestamp %d", ts)
  433. }
  434. col := fmt.Sprintf("%s:%s", set.FamilyName, set.ColumnQualifier)
  435. cs := r.cells[col]
  436. newCell := cell{ts: ts, value: set.Value}
  437. replaced := false
  438. for i, cell := range cs {
  439. if cell.ts == newCell.ts {
  440. cs[i] = newCell
  441. replaced = true
  442. break
  443. }
  444. }
  445. if !replaced {
  446. cs = append(cs, newCell)
  447. }
  448. sort.Sort(byDescTS(cs))
  449. r.cells[col] = cs
  450. case *btdpb.Mutation_DeleteFromColumn_:
  451. del := mut.DeleteFromColumn
  452. col := fmt.Sprintf("%s:%s", del.FamilyName, del.ColumnQualifier)
  453. cs := r.cells[col]
  454. if del.TimeRange != nil {
  455. tsr := del.TimeRange
  456. if !tbl.validTimestamp(tsr.StartTimestampMicros) {
  457. return fmt.Errorf("invalid timestamp %d", tsr.StartTimestampMicros)
  458. }
  459. if !tbl.validTimestamp(tsr.EndTimestampMicros) {
  460. return fmt.Errorf("invalid timestamp %d", tsr.EndTimestampMicros)
  461. }
  462. // Find half-open interval to remove.
  463. // Cells are in descending timestamp order,
  464. // so the predicates to sort.Search are inverted.
  465. si, ei := 0, len(cs)
  466. if tsr.StartTimestampMicros > 0 {
  467. ei = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.StartTimestampMicros })
  468. }
  469. if tsr.EndTimestampMicros > 0 {
  470. si = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.EndTimestampMicros })
  471. }
  472. if si < ei {
  473. copy(cs[si:], cs[ei:])
  474. cs = cs[:len(cs)-(ei-si)]
  475. }
  476. } else {
  477. cs = nil
  478. }
  479. if len(cs) == 0 {
  480. delete(r.cells, col)
  481. } else {
  482. r.cells[col] = cs
  483. }
  484. case *btdpb.Mutation_DeleteFromRow_:
  485. r.cells = make(map[string][]cell)
  486. }
  487. }
  488. return nil
  489. }
  490. func (s *server) ReadModifyWriteRow(ctx context.Context, req *btspb.ReadModifyWriteRowRequest) (*btdpb.Row, error) {
  491. s.mu.Lock()
  492. tbl, ok := s.tables[req.TableName]
  493. s.mu.Unlock()
  494. if !ok {
  495. return nil, fmt.Errorf("no such table %q", req.TableName)
  496. }
  497. updates := make(map[string]cell) // copy of updated cells; keyed by full column name
  498. r := tbl.mutableRow(string(req.RowKey))
  499. r.mu.Lock()
  500. defer r.mu.Unlock()
  501. // Assume all mutations apply to the most recent version of the cell.
  502. // TODO(dsymonds): Verify this assumption and document it in the proto.
  503. for _, rule := range req.Rules {
  504. tbl.mu.RLock()
  505. _, famOK := tbl.families[rule.FamilyName]
  506. tbl.mu.RUnlock()
  507. if !famOK {
  508. return nil, fmt.Errorf("unknown family %q", rule.FamilyName)
  509. }
  510. key := fmt.Sprintf("%s:%s", rule.FamilyName, rule.ColumnQualifier)
  511. newCell := false
  512. if len(r.cells[key]) == 0 {
  513. r.cells[key] = []cell{{
  514. // TODO(dsymonds): should this set a timestamp?
  515. }}
  516. newCell = true
  517. }
  518. cell := &r.cells[key][0]
  519. switch rule := rule.Rule.(type) {
  520. default:
  521. return nil, fmt.Errorf("unknown RMW rule oneof %T", rule)
  522. case *btdpb.ReadModifyWriteRule_AppendValue:
  523. cell.value = append(cell.value, rule.AppendValue...)
  524. case *btdpb.ReadModifyWriteRule_IncrementAmount:
  525. var v int64
  526. if !newCell {
  527. if len(cell.value) != 8 {
  528. return nil, fmt.Errorf("increment on non-64-bit value")
  529. }
  530. v = int64(binary.BigEndian.Uint64(cell.value))
  531. }
  532. v += rule.IncrementAmount
  533. var val [8]byte
  534. binary.BigEndian.PutUint64(val[:], uint64(v))
  535. cell.value = val[:]
  536. }
  537. updates[key] = *cell
  538. }
  539. res := &btdpb.Row{
  540. Key: req.RowKey,
  541. }
  542. for col, cell := range updates {
  543. i := strings.Index(col, ":")
  544. fam, qual := col[:i], col[i+1:]
  545. var f *btdpb.Family
  546. for _, ff := range res.Families {
  547. if ff.Name == fam {
  548. f = ff
  549. break
  550. }
  551. }
  552. if f == nil {
  553. f = &btdpb.Family{Name: fam}
  554. res.Families = append(res.Families, f)
  555. }
  556. f.Columns = append(f.Columns, &btdpb.Column{
  557. Qualifier: []byte(qual),
  558. Cells: []*btdpb.Cell{{
  559. Value: cell.value,
  560. }},
  561. })
  562. }
  563. return res, nil
  564. }
  565. // needGC is invoked whenever the server needs gcloop running.
  566. func (s *server) needGC() {
  567. s.mu.Lock()
  568. if s.gcc == nil {
  569. s.gcc = make(chan int)
  570. go s.gcloop(s.gcc)
  571. }
  572. s.mu.Unlock()
  573. }
  574. func (s *server) gcloop(done <-chan int) {
  575. const (
  576. minWait = 500 // ms
  577. maxWait = 1500 // ms
  578. )
  579. for {
  580. // Wait for a random time interval.
  581. d := time.Duration(minWait+rand.Intn(maxWait-minWait)) * time.Millisecond
  582. select {
  583. case <-time.After(d):
  584. case <-done:
  585. return // server has been closed
  586. }
  587. // Do a GC pass over all tables.
  588. var tables []*table
  589. s.mu.Lock()
  590. for _, tbl := range s.tables {
  591. tables = append(tables, tbl)
  592. }
  593. s.mu.Unlock()
  594. for _, tbl := range tables {
  595. tbl.gc()
  596. }
  597. }
  598. }
  599. type table struct {
  600. mu sync.RWMutex
  601. families map[string]*columnFamily // keyed by plain family name
  602. rows []*row // sorted by row key
  603. rowIndex map[string]*row // indexed by row key
  604. }
  605. func newTable() *table {
  606. return &table{
  607. families: make(map[string]*columnFamily),
  608. rowIndex: make(map[string]*row),
  609. }
  610. }
  611. func (t *table) validTimestamp(ts int64) bool {
  612. // Assume millisecond granularity is required.
  613. return ts%1000 == 0
  614. }
  615. func (t *table) mutableRow(row string) *row {
  616. // Try fast path first.
  617. t.mu.RLock()
  618. r := t.rowIndex[row]
  619. t.mu.RUnlock()
  620. if r != nil {
  621. return r
  622. }
  623. // We probably need to create the row.
  624. t.mu.Lock()
  625. r = t.rowIndex[row]
  626. if r == nil {
  627. r = newRow(row)
  628. t.rowIndex[row] = r
  629. t.rows = append(t.rows, r)
  630. sort.Sort(byRowKey(t.rows)) // yay, inefficient!
  631. }
  632. t.mu.Unlock()
  633. return r
  634. }
  635. func (t *table) gc() {
  636. // This method doesn't add or remove rows, so we only need a read lock for the table.
  637. t.mu.RLock()
  638. defer t.mu.RUnlock()
  639. // Gather GC rules we'll apply.
  640. rules := make(map[string]*bttdpb.GcRule) // keyed by "fam"
  641. for fam, cf := range t.families {
  642. if cf.gcRule != nil {
  643. rules[fam] = cf.gcRule
  644. }
  645. }
  646. if len(rules) == 0 {
  647. return
  648. }
  649. for _, r := range t.rows {
  650. r.mu.Lock()
  651. r.gc(rules)
  652. r.mu.Unlock()
  653. }
  654. }
  655. type byRowKey []*row
  656. func (b byRowKey) Len() int { return len(b) }
  657. func (b byRowKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  658. func (b byRowKey) Less(i, j int) bool { return b[i].key < b[j].key }
  659. type row struct {
  660. key string
  661. mu sync.Mutex
  662. cells map[string][]cell // keyed by full column name; cells are in descending timestamp order
  663. }
  664. func newRow(key string) *row {
  665. return &row{
  666. key: key,
  667. cells: make(map[string][]cell),
  668. }
  669. }
  670. // copy returns a copy of the row.
  671. // Cell values are aliased.
  672. // r.mu should be held.
  673. func (r *row) copy() *row {
  674. nr := &row{
  675. key: r.key,
  676. cells: make(map[string][]cell, len(r.cells)),
  677. }
  678. for col, cs := range r.cells {
  679. // Copy the []cell slice, but not the []byte inside each cell.
  680. nr.cells[col] = append([]cell(nil), cs...)
  681. }
  682. return nr
  683. }
  684. // gc applies the given GC rules to the row.
  685. // r.mu should be held.
  686. func (r *row) gc(rules map[string]*bttdpb.GcRule) {
  687. for col, cs := range r.cells {
  688. fam := col[:strings.Index(col, ":")]
  689. rule, ok := rules[fam]
  690. if !ok {
  691. continue
  692. }
  693. r.cells[col] = applyGC(cs, rule)
  694. }
  695. }
  696. var gcTypeWarn sync.Once
  697. // applyGC applies the given GC rule to the cells.
  698. func applyGC(cells []cell, rule *bttdpb.GcRule) []cell {
  699. switch rule := rule.Rule.(type) {
  700. default:
  701. // TODO(dsymonds): Support GcRule_Intersection_
  702. gcTypeWarn.Do(func() {
  703. log.Printf("Unsupported GC rule type %T", rule)
  704. })
  705. case *bttdpb.GcRule_Union_:
  706. for _, sub := range rule.Union.Rules {
  707. cells = applyGC(cells, sub)
  708. }
  709. return cells
  710. case *bttdpb.GcRule_MaxAge:
  711. // Timestamps are in microseconds.
  712. cutoff := time.Now().UnixNano() / 1e3
  713. cutoff -= rule.MaxAge.Seconds * 1e6
  714. cutoff -= int64(rule.MaxAge.Nanos) / 1e3
  715. // The slice of cells in in descending timestamp order.
  716. // This sort.Search will return the index of the first cell whose timestamp is chronologically before the cutoff.
  717. si := sort.Search(len(cells), func(i int) bool { return cells[i].ts < cutoff })
  718. if si < len(cells) {
  719. log.Printf("bttest: GC MaxAge(%v) deleted %d cells.", rule.MaxAge, len(cells)-si)
  720. }
  721. return cells[:si]
  722. case *bttdpb.GcRule_MaxNumVersions:
  723. n := int(rule.MaxNumVersions)
  724. if len(cells) > n {
  725. log.Printf("bttest: GC MaxNumVersions(%d) deleted %d cells.", n, len(cells)-n)
  726. cells = cells[:n]
  727. }
  728. return cells
  729. }
  730. return cells
  731. }
  732. type cell struct {
  733. ts int64
  734. value []byte
  735. }
  736. type byDescTS []cell
  737. func (b byDescTS) Len() int { return len(b) }
  738. func (b byDescTS) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
  739. func (b byDescTS) Less(i, j int) bool { return b[i].ts > b[j].ts }
  740. type columnFamily struct {
  741. name string
  742. gcRule *bttdpb.GcRule
  743. }
  744. func (c *columnFamily) proto() *bttdpb.ColumnFamily {
  745. return &bttdpb.ColumnFamily{
  746. Name: c.name,
  747. GcRule: c.gcRule,
  748. }
  749. }
  750. func toColumnFamilies(families map[string]*columnFamily) map[string]*bttdpb.ColumnFamily {
  751. f := make(map[string]*bttdpb.ColumnFamily)
  752. for k, v := range families {
  753. f[k] = v.proto()
  754. }
  755. return f
  756. }