resumable.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. // Copyright 2016 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gensupport
  5. import (
  6. "context"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "sync"
  12. "time"
  13. gax "github.com/googleapis/gax-go/v2"
  14. )
  15. // Backoff is an interface around gax.Backoff's Pause method, allowing tests to provide their
  16. // own implementation.
  17. type Backoff interface {
  18. Pause() time.Duration
  19. }
  20. // These are declared as global variables so that tests can overwrite them.
  21. var (
  22. retryDeadline = 32 * time.Second
  23. backoff = func() Backoff {
  24. return &gax.Backoff{Initial: 100 * time.Millisecond}
  25. }
  26. )
  27. const (
  28. // statusTooManyRequests is returned by the storage API if the
  29. // per-project limits have been temporarily exceeded. The request
  30. // should be retried.
  31. // https://cloud.google.com/storage/docs/json_api/v1/status-codes#standardcodes
  32. statusTooManyRequests = 429
  33. )
  34. // ResumableUpload is used by the generated APIs to provide resumable uploads.
  35. // It is not used by developers directly.
  36. type ResumableUpload struct {
  37. Client *http.Client
  38. // URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
  39. URI string
  40. UserAgent string // User-Agent for header of the request
  41. // Media is the object being uploaded.
  42. Media *MediaBuffer
  43. // MediaType defines the media type, e.g. "image/jpeg".
  44. MediaType string
  45. mu sync.Mutex // guards progress
  46. progress int64 // number of bytes uploaded so far
  47. // Callback is an optional function that will be periodically called with the cumulative number of bytes uploaded.
  48. Callback func(int64)
  49. }
  50. // Progress returns the number of bytes uploaded at this point.
  51. func (rx *ResumableUpload) Progress() int64 {
  52. rx.mu.Lock()
  53. defer rx.mu.Unlock()
  54. return rx.progress
  55. }
  56. // doUploadRequest performs a single HTTP request to upload data.
  57. // off specifies the offset in rx.Media from which data is drawn.
  58. // size is the number of bytes in data.
  59. // final specifies whether data is the final chunk to be uploaded.
  60. func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
  61. req, err := http.NewRequest("POST", rx.URI, data)
  62. if err != nil {
  63. return nil, err
  64. }
  65. req.ContentLength = size
  66. var contentRange string
  67. if final {
  68. if size == 0 {
  69. contentRange = fmt.Sprintf("bytes */%v", off)
  70. } else {
  71. contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size)
  72. }
  73. } else {
  74. contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1)
  75. }
  76. req.Header.Set("Content-Range", contentRange)
  77. req.Header.Set("Content-Type", rx.MediaType)
  78. req.Header.Set("User-Agent", rx.UserAgent)
  79. // Google's upload endpoint uses status code 308 for a
  80. // different purpose than the "308 Permanent Redirect"
  81. // since-standardized in RFC 7238. Because of the conflict in
  82. // semantics, Google added this new request header which
  83. // causes it to not use "308" and instead reply with 200 OK
  84. // and sets the upload-specific "X-HTTP-Status-Code-Override:
  85. // 308" response header.
  86. req.Header.Set("X-GUploader-No-308", "yes")
  87. return SendRequest(ctx, rx.Client, req)
  88. }
  89. func statusResumeIncomplete(resp *http.Response) bool {
  90. // This is how the server signals "status resume incomplete"
  91. // when X-GUploader-No-308 is set to "yes":
  92. return resp != nil && resp.Header.Get("X-Http-Status-Code-Override") == "308"
  93. }
  94. // reportProgress calls a user-supplied callback to report upload progress.
  95. // If old==updated, the callback is not called.
  96. func (rx *ResumableUpload) reportProgress(old, updated int64) {
  97. if updated-old == 0 {
  98. return
  99. }
  100. rx.mu.Lock()
  101. rx.progress = updated
  102. rx.mu.Unlock()
  103. if rx.Callback != nil {
  104. rx.Callback(updated)
  105. }
  106. }
  107. // transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
  108. func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
  109. chunk, off, size, err := rx.Media.Chunk()
  110. done := err == io.EOF
  111. if !done && err != nil {
  112. return nil, err
  113. }
  114. res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
  115. if err != nil {
  116. return res, err
  117. }
  118. // We sent "X-GUploader-No-308: yes" (see comment elsewhere in
  119. // this file), so we don't expect to get a 308.
  120. if res.StatusCode == 308 {
  121. return nil, errors.New("unexpected 308 response status code")
  122. }
  123. if res.StatusCode == http.StatusOK {
  124. rx.reportProgress(off, off+int64(size))
  125. }
  126. if statusResumeIncomplete(res) {
  127. rx.Media.Next()
  128. }
  129. return res, nil
  130. }
  131. // Upload starts the process of a resumable upload with a cancellable context.
  132. // It retries using the provided back off strategy until cancelled or the
  133. // strategy indicates to stop retrying.
  134. // It is called from the auto-generated API code and is not visible to the user.
  135. // Before sending an HTTP request, Upload calls any registered hook functions,
  136. // and calls the returned functions after the request returns (see send.go).
  137. // rx is private to the auto-generated API code.
  138. // Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close.
  139. func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
  140. var shouldRetry = func(status int, err error) bool {
  141. if 500 <= status && status <= 599 {
  142. return true
  143. }
  144. if status == statusTooManyRequests {
  145. return true
  146. }
  147. if err == io.ErrUnexpectedEOF {
  148. return true
  149. }
  150. if err, ok := err.(interface{ Temporary() bool }); ok {
  151. return err.Temporary()
  152. }
  153. return false
  154. }
  155. // There are a couple of cases where it's possible for err and resp to both
  156. // be non-nil. However, we expose a simpler contract to our callers: exactly
  157. // one of resp and err will be non-nil. This means that any response body
  158. // must be closed here before returning a non-nil error.
  159. var prepareReturn = func(resp *http.Response, err error) (*http.Response, error) {
  160. if err != nil {
  161. if resp != nil && resp.Body != nil {
  162. resp.Body.Close()
  163. }
  164. return nil, err
  165. }
  166. return resp, nil
  167. }
  168. // Send all chunks.
  169. for {
  170. var pause time.Duration
  171. // Each chunk gets its own initialized-at-zero retry.
  172. bo := backoff()
  173. quitAfter := time.After(retryDeadline)
  174. // Retry loop for a single chunk.
  175. for {
  176. select {
  177. case <-ctx.Done():
  178. if err == nil {
  179. err = ctx.Err()
  180. }
  181. return prepareReturn(resp, err)
  182. case <-time.After(pause):
  183. case <-quitAfter:
  184. return prepareReturn(resp, err)
  185. }
  186. resp, err = rx.transferChunk(ctx)
  187. var status int
  188. if resp != nil {
  189. status = resp.StatusCode
  190. }
  191. // Check if we should retry the request.
  192. if !shouldRetry(status, err) {
  193. break
  194. }
  195. pause = bo.Pause()
  196. if resp != nil && resp.Body != nil {
  197. resp.Body.Close()
  198. }
  199. }
  200. // If the chunk was uploaded successfully, but there's still
  201. // more to go, upload the next chunk without any delay.
  202. if statusResumeIncomplete(resp) {
  203. resp.Body.Close()
  204. continue
  205. }
  206. return prepareReturn(resp, err)
  207. }
  208. }