resumable.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. // Copyright 2016 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gensupport
  5. import (
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sync"
  10. "time"
  11. "golang.org/x/net/context"
  12. "golang.org/x/net/context/ctxhttp"
  13. )
  14. const (
  15. // statusResumeIncomplete is the code returned by the Google uploader
  16. // when the transfer is not yet complete.
  17. statusResumeIncomplete = 308
  18. // statusTooManyRequests is returned by the storage API if the
  19. // per-project limits have been temporarily exceeded. The request
  20. // should be retried.
  21. // https://cloud.google.com/storage/docs/json_api/v1/status-codes#standardcodes
  22. statusTooManyRequests = 429
  23. )
  24. // ResumableUpload is used by the generated APIs to provide resumable uploads.
  25. // It is not used by developers directly.
  26. type ResumableUpload struct {
  27. Client *http.Client
  28. // URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
  29. URI string
  30. UserAgent string // User-Agent for header of the request
  31. // Media is the object being uploaded.
  32. Media *ResumableBuffer
  33. // MediaType defines the media type, e.g. "image/jpeg".
  34. MediaType string
  35. mu sync.Mutex // guards progress
  36. progress int64 // number of bytes uploaded so far
  37. // Callback is an optional function that will be periodically called with the cumulative number of bytes uploaded.
  38. Callback func(int64)
  39. // If not specified, a default exponential backoff strategy will be used.
  40. Backoff BackoffStrategy
  41. }
  42. // Progress returns the number of bytes uploaded at this point.
  43. func (rx *ResumableUpload) Progress() int64 {
  44. rx.mu.Lock()
  45. defer rx.mu.Unlock()
  46. return rx.progress
  47. }
  48. // doUploadRequest performs a single HTTP request to upload data.
  49. // off specifies the offset in rx.Media from which data is drawn.
  50. // size is the number of bytes in data.
  51. // final specifies whether data is the final chunk to be uploaded.
  52. func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
  53. req, err := http.NewRequest("POST", rx.URI, data)
  54. if err != nil {
  55. return nil, err
  56. }
  57. req.ContentLength = size
  58. var contentRange string
  59. if final {
  60. if size == 0 {
  61. contentRange = fmt.Sprintf("bytes */%v", off)
  62. } else {
  63. contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size)
  64. }
  65. } else {
  66. contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1)
  67. }
  68. req.Header.Set("Content-Range", contentRange)
  69. req.Header.Set("Content-Type", rx.MediaType)
  70. req.Header.Set("User-Agent", rx.UserAgent)
  71. return ctxhttp.Do(ctx, rx.Client, req)
  72. }
  73. // reportProgress calls a user-supplied callback to report upload progress.
  74. // If old==updated, the callback is not called.
  75. func (rx *ResumableUpload) reportProgress(old, updated int64) {
  76. if updated-old == 0 {
  77. return
  78. }
  79. rx.mu.Lock()
  80. rx.progress = updated
  81. rx.mu.Unlock()
  82. if rx.Callback != nil {
  83. rx.Callback(updated)
  84. }
  85. }
  86. // transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
  87. func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
  88. chunk, off, size, err := rx.Media.Chunk()
  89. done := err == io.EOF
  90. if !done && err != nil {
  91. return nil, err
  92. }
  93. res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
  94. if err != nil {
  95. return res, err
  96. }
  97. if res.StatusCode == statusResumeIncomplete || res.StatusCode == http.StatusOK {
  98. rx.reportProgress(off, off+int64(size))
  99. }
  100. if res.StatusCode == statusResumeIncomplete {
  101. rx.Media.Next()
  102. }
  103. return res, nil
  104. }
  105. func contextDone(ctx context.Context) bool {
  106. select {
  107. case <-ctx.Done():
  108. return true
  109. default:
  110. return false
  111. }
  112. }
  113. // Upload starts the process of a resumable upload with a cancellable context.
  114. // It retries using the provided back off strategy until cancelled or the
  115. // strategy indicates to stop retrying.
  116. // It is called from the auto-generated API code and is not visible to the user.
  117. // rx is private to the auto-generated API code.
  118. // Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close.
  119. func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
  120. var pause time.Duration
  121. backoff := rx.Backoff
  122. if backoff == nil {
  123. backoff = DefaultBackoffStrategy()
  124. }
  125. for {
  126. // Ensure that we return in the case of cancelled context, even if pause is 0.
  127. if contextDone(ctx) {
  128. return nil, ctx.Err()
  129. }
  130. select {
  131. case <-ctx.Done():
  132. return nil, ctx.Err()
  133. case <-time.After(pause):
  134. }
  135. resp, err = rx.transferChunk(ctx)
  136. var status int
  137. if resp != nil {
  138. status = resp.StatusCode
  139. }
  140. // Check if we should retry the request.
  141. if shouldRetry(status, err) {
  142. var retry bool
  143. pause, retry = backoff.Pause()
  144. if retry {
  145. if resp != nil && resp.Body != nil {
  146. resp.Body.Close()
  147. }
  148. continue
  149. }
  150. }
  151. // If the chunk was uploaded successfully, but there's still
  152. // more to go, upload the next chunk without any delay.
  153. if status == statusResumeIncomplete {
  154. pause = 0
  155. backoff.Reset()
  156. resp.Body.Close()
  157. continue
  158. }
  159. // It's possible for err and resp to both be non-nil here, but we expose a simpler
  160. // contract to our callers: exactly one of resp and err will be non-nil. This means
  161. // that any response body must be closed here before returning a non-nil error.
  162. if err != nil {
  163. if resp != nil && resp.Body != nil {
  164. resp.Body.Close()
  165. }
  166. return nil, err
  167. }
  168. return resp, nil
  169. }
  170. }