read.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. // Copyright 2012 Google Inc. All rights reserved.
  2. // Use of this source code is governed by the Apache 2.0
  3. // license that can be found in the LICENSE file.
  4. package blobstore
  5. import (
  6. "errors"
  7. "fmt"
  8. "io"
  9. "os"
  10. "sync"
  11. "github.com/golang/protobuf/proto"
  12. "golang.org/x/net/context"
  13. "google.golang.org/appengine"
  14. "google.golang.org/appengine/internal"
  15. blobpb "google.golang.org/appengine/internal/blobstore"
  16. )
  17. // openBlob returns a reader for a blob. It always succeeds; if the blob does
  18. // not exist then an error will be reported upon first read.
  19. func openBlob(c context.Context, blobKey appengine.BlobKey) Reader {
  20. return &reader{
  21. c: c,
  22. blobKey: blobKey,
  23. }
  24. }
  25. const readBufferSize = 256 * 1024
  26. // reader is a blob reader. It implements the Reader interface.
  27. type reader struct {
  28. c context.Context
  29. // Either blobKey or filename is set:
  30. blobKey appengine.BlobKey
  31. filename string
  32. closeFunc func() // is nil if unavailable or already closed.
  33. // buf is the read buffer. r is how much of buf has been read.
  34. // off is the offset of buf[0] relative to the start of the blob.
  35. // An invariant is 0 <= r && r <= len(buf).
  36. // Reads that don't require an RPC call will increment r but not off.
  37. // Seeks may modify r without discarding the buffer, but only if the
  38. // invariant can be maintained.
  39. mu sync.Mutex
  40. buf []byte
  41. r int
  42. off int64
  43. }
  44. func (r *reader) Close() error {
  45. if f := r.closeFunc; f != nil {
  46. f()
  47. }
  48. r.closeFunc = nil
  49. return nil
  50. }
  51. func (r *reader) Read(p []byte) (int, error) {
  52. if len(p) == 0 {
  53. return 0, nil
  54. }
  55. r.mu.Lock()
  56. defer r.mu.Unlock()
  57. if r.r == len(r.buf) {
  58. if err := r.fetch(r.off + int64(r.r)); err != nil {
  59. return 0, err
  60. }
  61. }
  62. n := copy(p, r.buf[r.r:])
  63. r.r += n
  64. return n, nil
  65. }
  66. func (r *reader) ReadAt(p []byte, off int64) (int, error) {
  67. if len(p) == 0 {
  68. return 0, nil
  69. }
  70. r.mu.Lock()
  71. defer r.mu.Unlock()
  72. // Convert relative offsets to absolute offsets.
  73. ab0 := r.off + int64(r.r)
  74. ab1 := r.off + int64(len(r.buf))
  75. ap0 := off
  76. ap1 := off + int64(len(p))
  77. // Check if we can satisfy the read entirely out of the existing buffer.
  78. if r.off <= ap0 && ap1 <= ab1 {
  79. // Convert off from an absolute offset to a relative offset.
  80. rp0 := int(ap0 - r.off)
  81. return copy(p, r.buf[rp0:]), nil
  82. }
  83. // Restore the original Read/Seek offset after ReadAt completes.
  84. defer r.seek(ab0)
  85. // Repeatedly fetch and copy until we have filled p.
  86. n := 0
  87. for len(p) > 0 {
  88. if err := r.fetch(off + int64(n)); err != nil {
  89. return n, err
  90. }
  91. r.r = copy(p, r.buf)
  92. n += r.r
  93. p = p[r.r:]
  94. }
  95. return n, nil
  96. }
  97. func (r *reader) Seek(offset int64, whence int) (ret int64, err error) {
  98. r.mu.Lock()
  99. defer r.mu.Unlock()
  100. switch whence {
  101. case os.SEEK_SET:
  102. ret = offset
  103. case os.SEEK_CUR:
  104. ret = r.off + int64(r.r) + offset
  105. case os.SEEK_END:
  106. return 0, errors.New("seeking relative to the end of a blob isn't supported")
  107. default:
  108. return 0, fmt.Errorf("invalid Seek whence value: %d", whence)
  109. }
  110. if ret < 0 {
  111. return 0, errors.New("negative Seek offset")
  112. }
  113. return r.seek(ret)
  114. }
  115. // fetch fetches readBufferSize bytes starting at the given offset. On success,
  116. // the data is saved as r.buf.
  117. func (r *reader) fetch(off int64) error {
  118. req := &blobpb.FetchDataRequest{
  119. BlobKey: proto.String(string(r.blobKey)),
  120. StartIndex: proto.Int64(off),
  121. EndIndex: proto.Int64(off + readBufferSize - 1), // EndIndex is inclusive.
  122. }
  123. res := &blobpb.FetchDataResponse{}
  124. if err := internal.Call(r.c, "blobstore", "FetchData", req, res); err != nil {
  125. return err
  126. }
  127. if len(res.Data) == 0 {
  128. return io.EOF
  129. }
  130. r.buf, r.r, r.off = res.Data, 0, off
  131. return nil
  132. }
  133. // seek seeks to the given offset with an effective whence equal to SEEK_SET.
  134. // It discards the read buffer if the invariant cannot be maintained.
  135. func (r *reader) seek(off int64) (int64, error) {
  136. delta := off - r.off
  137. if delta >= 0 && delta < int64(len(r.buf)) {
  138. r.r = int(delta)
  139. return off, nil
  140. }
  141. r.buf, r.r, r.off = nil, 0, off
  142. return off, nil
  143. }