123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- /*
- Copyright 2015 Google Inc. All Rights Reserved.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package bigtable // import "google.golang.org/cloud/bigtable"
- import (
- "fmt"
- "io"
- "strconv"
- "time"
- "github.com/golang/protobuf/proto"
- "golang.org/x/net/context"
- "google.golang.org/cloud"
- btdpb "google.golang.org/cloud/bigtable/internal/data_proto"
- btspb "google.golang.org/cloud/bigtable/internal/service_proto"
- "google.golang.org/cloud/internal/transport"
- "google.golang.org/grpc"
- )
- const prodAddr = "bigtable.googleapis.com:443"
- // Client is a client for reading and writing data to tables in a cluster.
- //
- // A Client is safe to use concurrently, except for its Close method.
- type Client struct {
- conn *grpc.ClientConn
- client btspb.BigtableServiceClient
- project, zone, cluster string
- }
- // NewClient creates a new Client for a given project, zone and cluster.
- func NewClient(ctx context.Context, project, zone, cluster string, opts ...cloud.ClientOption) (*Client, error) {
- o := []cloud.ClientOption{
- cloud.WithEndpoint(prodAddr),
- cloud.WithScopes(Scope),
- cloud.WithUserAgent(clientUserAgent),
- }
- o = append(o, opts...)
- conn, err := transport.DialGRPC(ctx, o...)
- if err != nil {
- return nil, fmt.Errorf("dialing: %v", err)
- }
- return &Client{
- conn: conn,
- client: btspb.NewBigtableServiceClient(conn),
- project: project,
- zone: zone,
- cluster: cluster,
- }, nil
- }
- // Close closes the Client.
- func (c *Client) Close() {
- c.conn.Close()
- }
- func (c *Client) fullTableName(table string) string {
- return fmt.Sprintf("projects/%s/zones/%s/clusters/%s/tables/%s", c.project, c.zone, c.cluster, table)
- }
- // A Table refers to a table.
- //
- // A Table is safe to use concurrently.
- type Table struct {
- c *Client
- table string
- }
- // Open opens a table.
- func (c *Client) Open(table string) *Table {
- return &Table{
- c: c,
- table: table,
- }
- }
- // TODO(dsymonds): Read method that returns a sequence of ReadItems.
- // ReadRows reads rows from a table. f is called for each row.
- // If f returns false, the stream is shut down and ReadRows returns.
- // f owns its argument, and f is called serially.
- //
- // By default, the yielded rows will contain all values in all cells.
- // Use RowFilter to limit the cells returned.
- func (t *Table) ReadRows(ctx context.Context, arg RowRange, f func(Row) bool, opts ...ReadOption) error {
- req := &btspb.ReadRowsRequest{
- TableName: t.c.fullTableName(t.table),
- Target: &btspb.ReadRowsRequest_RowRange{arg.proto()},
- }
- for _, opt := range opts {
- opt.set(req)
- }
- ctx, cancel := context.WithCancel(ctx) // for aborting the stream
- defer cancel()
- stream, err := t.c.client.ReadRows(ctx, req)
- if err != nil {
- return err
- }
- cr := new(chunkReader)
- for {
- res, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
- if row := cr.process(res); row != nil {
- if !f(row) {
- // Cancel and drain stream.
- cancel()
- for {
- if _, err := stream.Recv(); err != nil {
- // The stream has ended. We don't return an error
- // because the caller has intentionally interrupted the scan.
- return nil
- }
- }
- }
- }
- }
- return nil
- }
- // ReadRow is a convenience implementation of a single-row reader.
- // A missing row will return a zero-length map and a nil error.
- func (t *Table) ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error) {
- var r Row
- err := t.ReadRows(ctx, SingleRow(row), func(rr Row) bool {
- r = rr
- return true
- }, opts...)
- return r, err
- }
- type chunkReader struct {
- partial map[string]Row // incomplete rows
- }
- // process handles a single btspb.ReadRowsResponse.
- // If it completes a row, that row is returned.
- func (cr *chunkReader) process(rrr *btspb.ReadRowsResponse) Row {
- if cr.partial == nil {
- cr.partial = make(map[string]Row)
- }
- row := string(rrr.RowKey)
- r := cr.partial[row]
- if r == nil {
- r = make(Row)
- cr.partial[row] = r
- }
- for _, chunk := range rrr.Chunks {
- switch c := chunk.Chunk.(type) {
- case *btspb.ReadRowsResponse_Chunk_ResetRow:
- r = make(Row)
- cr.partial[row] = r
- continue
- case *btspb.ReadRowsResponse_Chunk_CommitRow:
- delete(cr.partial, row)
- if len(r) == 0 {
- // Treat zero-content commits as absent.
- continue
- }
- return r // assume that this is the last chunk
- case *btspb.ReadRowsResponse_Chunk_RowContents:
- decodeFamilyProto(r, row, c.RowContents)
- }
- }
- return nil
- }
- // decodeFamilyProto adds the cell data from f to the given row.
- func decodeFamilyProto(r Row, row string, f *btdpb.Family) {
- fam := f.Name // does not have colon
- for _, col := range f.Columns {
- for _, cell := range col.Cells {
- ri := ReadItem{
- Row: row,
- Column: fmt.Sprintf("%s:%s", fam, col.Qualifier),
- Timestamp: Timestamp(cell.TimestampMicros),
- Value: cell.Value,
- }
- r[fam] = append(r[fam], ri)
- }
- }
- }
- // A RowRange is used to describe the rows to be read.
- // A RowRange is a half-open interval [Start, Limit) encompassing
- // all the rows with keys at least as large as Start, and less than Limit.
- // (Bigtable string comparison is the same as Go's.)
- // A RowRange can be unbounded, encompassing all keys at least as large as Start.
- type RowRange struct {
- start string
- limit string
- }
- // NewRange returns the new RowRange [begin, end).
- func NewRange(begin, end string) RowRange {
- return RowRange{
- start: begin,
- limit: end,
- }
- }
- // Unbounded tests whether a RowRange is unbounded.
- func (r RowRange) Unbounded() bool {
- return r.limit == ""
- }
- // Contains says whether the RowRange contains the key.
- func (r RowRange) Contains(row string) bool {
- return r.start <= row && (r.limit == "" || r.limit > row)
- }
- // String provides a printable description of a RowRange.
- func (r RowRange) String() string {
- a := strconv.Quote(r.start)
- if r.Unbounded() {
- return fmt.Sprintf("[%s,∞)", a)
- }
- return fmt.Sprintf("[%s,%q)", a, r.limit)
- }
- func (r RowRange) proto() *btdpb.RowRange {
- if r.Unbounded() {
- return &btdpb.RowRange{StartKey: []byte(r.start)}
- }
- return &btdpb.RowRange{
- StartKey: []byte(r.start),
- EndKey: []byte(r.limit),
- }
- }
- // SingleRow returns a RowRange for reading a single row.
- func SingleRow(row string) RowRange {
- return RowRange{
- start: row,
- limit: row + "\x00",
- }
- }
- // PrefixRange returns a RowRange consisting of all keys starting with the prefix.
- func PrefixRange(prefix string) RowRange {
- return RowRange{
- start: prefix,
- limit: prefixSuccessor(prefix),
- }
- }
- // InfiniteRange returns the RowRange consisting of all keys at least as
- // large as start.
- func InfiniteRange(start string) RowRange {
- return RowRange{
- start: start,
- limit: "",
- }
- }
- // prefixSuccessor returns the lexically smallest string greater than the
- // prefix, if it exists, or "" otherwise. In either case, it is the string
- // needed for the Limit of a RowRange.
- func prefixSuccessor(prefix string) string {
- if prefix == "" {
- return "" // infinite range
- }
- n := len(prefix)
- for n--; n >= 0 && prefix[n] == '\xff'; n-- {
- }
- if n == -1 {
- return ""
- }
- ans := []byte(prefix[:n])
- ans = append(ans, prefix[n]+1)
- return string(ans)
- }
- // A ReadOption is an optional argument to ReadRows.
- type ReadOption interface {
- set(req *btspb.ReadRowsRequest)
- }
- // RowFilter returns a ReadOption that applies f to the contents of read rows.
- func RowFilter(f Filter) ReadOption { return rowFilter{f} }
- type rowFilter struct{ f Filter }
- func (rf rowFilter) set(req *btspb.ReadRowsRequest) { req.Filter = rf.f.proto() }
- // LimitRows returns a ReadOption that will limit the number of rows to be read.
- func LimitRows(limit int64) ReadOption { return limitRows{limit} }
- type limitRows struct{ limit int64 }
- func (lr limitRows) set(req *btspb.ReadRowsRequest) { req.NumRowsLimit = lr.limit }
- // A Row is returned by ReadRow. The map is keyed by column family (the prefix
- // of the column name before the colon). The values are the returned ReadItems
- // for that column family in the order returned by Read.
- type Row map[string][]ReadItem
- // Key returns the row's key, or "" if the row is empty.
- func (r Row) Key() string {
- for _, items := range r {
- if len(items) > 0 {
- return items[0].Row
- }
- }
- return ""
- }
- // A ReadItem is returned by Read. A ReadItem contains data from a specific row and column.
- type ReadItem struct {
- Row, Column string
- Timestamp Timestamp
- Value []byte
- }
- // Apply applies a Mutation to a specific row.
- func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) error {
- after := func(res proto.Message) {
- for _, o := range opts {
- o.after(res)
- }
- }
- if m.cond == nil {
- req := &btspb.MutateRowRequest{
- TableName: t.c.fullTableName(t.table),
- RowKey: []byte(row),
- Mutations: m.ops,
- }
- res, err := t.c.client.MutateRow(ctx, req)
- if err == nil {
- after(res)
- }
- return err
- }
- req := &btspb.CheckAndMutateRowRequest{
- TableName: t.c.fullTableName(t.table),
- RowKey: []byte(row),
- PredicateFilter: m.cond.proto(),
- }
- if m.mtrue != nil {
- req.TrueMutations = m.mtrue.ops
- }
- if m.mfalse != nil {
- req.FalseMutations = m.mfalse.ops
- }
- res, err := t.c.client.CheckAndMutateRow(ctx, req)
- if err == nil {
- after(res)
- }
- return err
- }
- // An ApplyOption is an optional argument to Apply.
- type ApplyOption interface {
- after(res proto.Message)
- }
- type applyAfterFunc func(res proto.Message)
- func (a applyAfterFunc) after(res proto.Message) { a(res) }
- // GetCondMutationResult returns an ApplyOption that reports whether the conditional
- // mutation's condition matched.
- func GetCondMutationResult(matched *bool) ApplyOption {
- return applyAfterFunc(func(res proto.Message) {
- if res, ok := res.(*btspb.CheckAndMutateRowResponse); ok {
- *matched = res.PredicateMatched
- }
- })
- }
- // Mutation represents a set of changes for a single row of a table.
- type Mutation struct {
- ops []*btdpb.Mutation
- // for conditional mutations
- cond Filter
- mtrue, mfalse *Mutation
- }
- // NewMutation returns a new mutation.
- func NewMutation() *Mutation {
- return new(Mutation)
- }
- // NewCondMutation returns a conditional mutation.
- // The given row filter determines which mutation is applied:
- // If the filter matches any cell in the row, mtrue is applied;
- // otherwise, mfalse is applied.
- // Either given mutation may be nil.
- func NewCondMutation(cond Filter, mtrue, mfalse *Mutation) *Mutation {
- return &Mutation{cond: cond, mtrue: mtrue, mfalse: mfalse}
- }
- // Set sets a value in a specified column, with the given timestamp.
- // The timestamp will be truncated to millisecond resolution.
- // A timestamp of ServerTime means to use the server timestamp.
- func (m *Mutation) Set(family, column string, ts Timestamp, value []byte) {
- if ts != ServerTime {
- // Truncate to millisecond resolution, since that's the default table config.
- // TODO(dsymonds): Provide a way to override this behaviour.
- ts -= ts % 1000
- }
- m.ops = append(m.ops, &btdpb.Mutation{Mutation: &btdpb.Mutation_SetCell_{&btdpb.Mutation_SetCell{
- FamilyName: family,
- ColumnQualifier: []byte(column),
- TimestampMicros: int64(ts),
- Value: value,
- }}})
- }
- // DeleteCellsInColumn will delete all the cells whose columns are family:column.
- func (m *Mutation) DeleteCellsInColumn(family, column string) {
- m.ops = append(m.ops, &btdpb.Mutation{Mutation: &btdpb.Mutation_DeleteFromColumn_{&btdpb.Mutation_DeleteFromColumn{
- FamilyName: family,
- ColumnQualifier: []byte(column),
- }}})
- }
- // DeleteTimestampRange deletes all cells whose columns are family:column
- // and whose timestamps are in the half-open interval [start, end).
- // If end is zero, it will be interpreted as infinity.
- func (m *Mutation) DeleteTimestampRange(family, column string, start, end Timestamp) {
- m.ops = append(m.ops, &btdpb.Mutation{Mutation: &btdpb.Mutation_DeleteFromColumn_{&btdpb.Mutation_DeleteFromColumn{
- FamilyName: family,
- ColumnQualifier: []byte(column),
- TimeRange: &btdpb.TimestampRange{
- StartTimestampMicros: int64(start),
- EndTimestampMicros: int64(end),
- },
- }}})
- }
- // DeleteCellsInFamily will delete all the cells whose columns are family:*.
- func (m *Mutation) DeleteCellsInFamily(family string) {
- m.ops = append(m.ops, &btdpb.Mutation{Mutation: &btdpb.Mutation_DeleteFromFamily_{&btdpb.Mutation_DeleteFromFamily{
- FamilyName: family,
- }}})
- }
- // DeleteRow deletes the entire row.
- func (m *Mutation) DeleteRow() {
- m.ops = append(m.ops, &btdpb.Mutation{Mutation: &btdpb.Mutation_DeleteFromRow_{&btdpb.Mutation_DeleteFromRow{}}})
- }
- // Timestamp is in units of microseconds since 1 January 1970.
- type Timestamp int64
- // ServerTime is a specific Timestamp that may be passed to (*Mutation).Set.
- // It indicates that the server's timestamp should be used.
- const ServerTime Timestamp = -1
- // Time converts a time.Time into a Timestamp.
- func Time(t time.Time) Timestamp { return Timestamp(t.UnixNano() / 1e3) }
- // Now returns the Timestamp representation of the current time on the client.
- func Now() Timestamp { return Time(time.Now()) }
- // Time converts a Timestamp into a time.Time.
- func (ts Timestamp) Time() time.Time { return time.Unix(0, int64(ts)*1e3) }
- // ApplyReadModifyWrite applies a ReadModifyWrite to a specific row.
- // It returns the newly written cells.
- func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
- req := &btspb.ReadModifyWriteRowRequest{
- TableName: t.c.fullTableName(t.table),
- RowKey: []byte(row),
- Rules: m.ops,
- }
- res, err := t.c.client.ReadModifyWriteRow(ctx, req)
- if err != nil {
- return nil, err
- }
- r := make(Row)
- for _, fam := range res.Families { // res is *btdpb.Row, fam is *btdpb.Family
- decodeFamilyProto(r, row, fam)
- }
- return r, nil
- }
- // ReadModifyWrite represents a set of operations on a single row of a table.
- // It is like Mutation but for non-idempotent changes.
- // When applied, these operations operate on the latest values of the row's cells,
- // and result in a new value being written to the relevant cell with a timestamp
- // that is max(existing timestamp, current server time).
- //
- // The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will
- // be executed serially by the server.
- type ReadModifyWrite struct {
- ops []*btdpb.ReadModifyWriteRule
- }
- // NewReadModifyWrite returns a new ReadModifyWrite.
- func NewReadModifyWrite() *ReadModifyWrite { return new(ReadModifyWrite) }
- // AppendValue appends a value to a specific cell's value.
- // If the cell is unset, it will be treated as an empty value.
- func (m *ReadModifyWrite) AppendValue(family, column string, v []byte) {
- m.ops = append(m.ops, &btdpb.ReadModifyWriteRule{
- FamilyName: family,
- ColumnQualifier: []byte(column),
- Rule: &btdpb.ReadModifyWriteRule_AppendValue{v},
- })
- }
- // Increment interprets the value in a specific cell as a 64-bit big-endian signed integer,
- // and adds a value to it. If the cell is unset, it will be treated as zero.
- // If the cell is set and is not an 8-byte value, the entire ApplyReadModifyWrite
- // operation will fail.
- func (m *ReadModifyWrite) Increment(family, column string, delta int64) {
- m.ops = append(m.ops, &btdpb.ReadModifyWriteRule{
- FamilyName: family,
- ColumnQualifier: []byte(column),
- Rule: &btdpb.ReadModifyWriteRule_IncrementAmount{delta},
- })
- }
|