transaction.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Copyright 2014 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package datastore
  15. import (
  16. "errors"
  17. "net/http"
  18. "github.com/golang/protobuf/proto"
  19. "golang.org/x/net/context"
  20. pb "google.golang.org/cloud/internal/datastore"
  21. "google.golang.org/cloud/internal/transport"
  22. )
  23. // ErrConcurrentTransaction is returned when a transaction is rolled back due
  24. // to a conflict with a concurrent transaction.
  25. var ErrConcurrentTransaction = errors.New("datastore: concurrent transaction")
  26. var errExpiredTransaction = errors.New("datastore: transaction expired")
  27. // A TransactionOption configures the Transaction returned by NewTransaction.
  28. type TransactionOption interface {
  29. apply(*pb.BeginTransactionRequest)
  30. }
  31. type isolation struct {
  32. level pb.BeginTransactionRequest_IsolationLevel
  33. }
  34. func (i isolation) apply(req *pb.BeginTransactionRequest) {
  35. req.IsolationLevel = i.level.Enum()
  36. }
  37. var (
  38. // Snapshot causes the transaction to enforce a snapshot isolation level.
  39. Snapshot TransactionOption = isolation{pb.BeginTransactionRequest_SNAPSHOT}
  40. // Serializable causes the transaction to enforce a serializable isolation level.
  41. Serializable TransactionOption = isolation{pb.BeginTransactionRequest_SERIALIZABLE}
  42. )
  43. // Transaction represents a set of datastore operations to be committed atomically.
  44. //
  45. // Operations are enqueued by calling the Put and Delete methods on Transaction
  46. // (or their Multi-equivalents). These operations are only committed when the
  47. // Commit method is invoked. To ensure consistency, reads must be performed by
  48. // using Transaction's Get method or by using the Transaction method when
  49. // building a query.
  50. //
  51. // A Transaction must be committed or rolled back exactly once.
  52. type Transaction struct {
  53. id []byte
  54. client *Client
  55. ctx context.Context
  56. mutation *pb.Mutation // The mutations to apply.
  57. pending []*PendingKey // Incomplete keys pending transaction completion.
  58. }
  59. // NewTransaction starts a new transaction.
  60. func (c *Client) NewTransaction(ctx context.Context, opts ...TransactionOption) (*Transaction, error) {
  61. req, resp := &pb.BeginTransactionRequest{}, &pb.BeginTransactionResponse{}
  62. for _, o := range opts {
  63. o.apply(req)
  64. }
  65. if err := c.call(ctx, "beginTransaction", req, resp); err != nil {
  66. return nil, err
  67. }
  68. return &Transaction{
  69. id: resp.Transaction,
  70. ctx: ctx,
  71. client: c,
  72. mutation: &pb.Mutation{},
  73. }, nil
  74. }
  75. // RunInTransaction runs f in a transaction. f is invoked with a Transaction
  76. // that f should use for all the transaction's datastore operations.
  77. //
  78. // f must not call Commit or Rollback on the provided Transaction.
  79. //
  80. // If f returns nil, RunInTransaction commits the transaction,
  81. // returning the Commit and a nil error if it succeeds. If the commit fails due
  82. // to a conflicting transaction, RunInTransaction retries f with a new
  83. // Transaction. It gives up and returns ErrConcurrentTransaction after three
  84. // failed attempts.
  85. //
  86. // If f returns non-nil, then the transaction will be rolled back and
  87. // RunInTransaction will return the same error. The function f is not retried.
  88. //
  89. // Note that when f returns, the transaction is not committed. Calling code
  90. // must not assume that any of f's changes have been committed until
  91. // RunInTransaction returns nil.
  92. //
  93. // Since f may be called multiple times, f should usually be idempotent.
  94. // Note that Transaction.Get is not idempotent when unmarshaling slice fields.
  95. func (c *Client) RunInTransaction(ctx context.Context, f func(tx *Transaction) error, opts ...TransactionOption) (*Commit, error) {
  96. // TODO(djd): Allow configuring of attempts.
  97. const attempts = 3
  98. for n := 0; n < attempts; n++ {
  99. tx, err := c.NewTransaction(ctx, opts...)
  100. if err != nil {
  101. return nil, err
  102. }
  103. if err := f(tx); err != nil {
  104. tx.Rollback()
  105. return nil, err
  106. }
  107. if cmt, err := tx.Commit(); err != ErrConcurrentTransaction {
  108. return cmt, err
  109. }
  110. }
  111. return nil, ErrConcurrentTransaction
  112. }
  113. // Commit applies the enqueued operations atomically.
  114. func (t *Transaction) Commit() (*Commit, error) {
  115. if t.id == nil {
  116. return nil, errExpiredTransaction
  117. }
  118. req := &pb.CommitRequest{
  119. Transaction: t.id,
  120. Mutation: t.mutation,
  121. Mode: pb.CommitRequest_TRANSACTIONAL.Enum(),
  122. }
  123. t.id = nil
  124. resp := &pb.CommitResponse{}
  125. if err := t.client.call(t.ctx, "commit", req, resp); err != nil {
  126. if e, ok := err.(*transport.ErrHTTP); ok && e.StatusCode == http.StatusConflict {
  127. // TODO(jbd): Make sure that we explicitly handle the case where response
  128. // has an HTTP 409 and the error message indicates that it's an concurrent
  129. // transaction error.
  130. return nil, ErrConcurrentTransaction
  131. }
  132. return nil, err
  133. }
  134. // Copy any newly minted keys into the returned keys.
  135. if len(t.pending) != len(resp.MutationResult.InsertAutoIdKey) {
  136. return nil, errors.New("datastore: internal error: server returned the wrong number of keys")
  137. }
  138. commit := &Commit{}
  139. for i, p := range t.pending {
  140. key, err := protoToKey(resp.MutationResult.InsertAutoIdKey[i])
  141. if err != nil {
  142. return nil, errors.New("datastore: internal error: server returned an invalid key")
  143. }
  144. p.key = key
  145. p.commit = commit
  146. }
  147. return commit, nil
  148. }
  149. // Rollback abandons a pending transaction.
  150. func (t *Transaction) Rollback() error {
  151. if t.id == nil {
  152. return errExpiredTransaction
  153. }
  154. id := t.id
  155. t.id = nil
  156. return t.client.call(t.ctx, "rollback", &pb.RollbackRequest{Transaction: id}, &pb.RollbackResponse{})
  157. }
  158. // Get is the transaction-specific version of the package function Get.
  159. // All reads performed during the transaction will come from a single consistent
  160. // snapshot. Furthermore, if the transaction is set to a serializable isolation
  161. // level, another transaction cannot concurrently modify the data that is read
  162. // or modified by this transaction.
  163. func (t *Transaction) Get(key *Key, dst interface{}) error {
  164. err := t.client.get(t.ctx, []*Key{key}, []interface{}{dst}, &pb.ReadOptions{Transaction: t.id})
  165. if me, ok := err.(MultiError); ok {
  166. return me[0]
  167. }
  168. return err
  169. }
  170. // GetMulti is a batch version of Get.
  171. func (t *Transaction) GetMulti(keys []*Key, dst interface{}) error {
  172. if t.id == nil {
  173. return errExpiredTransaction
  174. }
  175. return t.client.get(t.ctx, keys, dst, &pb.ReadOptions{Transaction: t.id})
  176. }
  177. // Put is the transaction-specific version of the package function Put.
  178. //
  179. // Put returns a PendingKey which can be resolved into a Key using the
  180. // return value from a successful Commit. If key is an incomplete key, the
  181. // returned pending key will resolve to a unique key generated by the
  182. // datastore.
  183. func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) {
  184. h, err := t.PutMulti([]*Key{key}, []interface{}{src})
  185. if err != nil {
  186. if me, ok := err.(MultiError); ok {
  187. return nil, me[0]
  188. }
  189. return nil, err
  190. }
  191. return h[0], nil
  192. }
  193. // PutMulti is a batch version of Put. One PendingKey is returned for each
  194. // element of src in the same order.
  195. func (t *Transaction) PutMulti(keys []*Key, src interface{}) ([]*PendingKey, error) {
  196. if t.id == nil {
  197. return nil, errExpiredTransaction
  198. }
  199. mutation, err := putMutation(keys, src)
  200. if err != nil {
  201. return nil, err
  202. }
  203. proto.Merge(t.mutation, mutation)
  204. // Prepare the returned handles, pre-populating where possible.
  205. ret := make([]*PendingKey, len(keys))
  206. for i, key := range keys {
  207. h := &PendingKey{}
  208. if key.Incomplete() {
  209. // This key will be in the final commit result.
  210. t.pending = append(t.pending, h)
  211. } else {
  212. h.key = key
  213. }
  214. ret[i] = h
  215. }
  216. return ret, nil
  217. }
  218. // Delete is the transaction-specific version of the package function Delete.
  219. // Delete enqueues the deletion of the entity for the given key, to be
  220. // committed atomically upon calling Commit.
  221. func (t *Transaction) Delete(key *Key) error {
  222. err := t.DeleteMulti([]*Key{key})
  223. if me, ok := err.(MultiError); ok {
  224. return me[0]
  225. }
  226. return err
  227. }
  228. // DeleteMulti is a batch version of Delete.
  229. func (t *Transaction) DeleteMulti(keys []*Key) error {
  230. if t.id == nil {
  231. return errExpiredTransaction
  232. }
  233. mutation, err := deleteMutation(keys)
  234. if err != nil {
  235. return err
  236. }
  237. proto.Merge(t.mutation, mutation)
  238. return nil
  239. }
  240. // Commit represents the result of a committed transaction.
  241. type Commit struct{}
  242. // Key resolves a pending key handle into a final key.
  243. func (c *Commit) Key(p *PendingKey) *Key {
  244. if c != p.commit {
  245. panic("PendingKey was not created by corresponding transaction")
  246. }
  247. return p.key
  248. }
  249. // PendingKey represents the key for newly-inserted entity. It can be
  250. // resolved into a Key by calling the Key method of Commit.
  251. type PendingKey struct {
  252. key *Key
  253. commit *Commit
  254. }