resumable_test.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. // Copyright 2016 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gensupport
  5. import (
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net/http"
  10. "reflect"
  11. "strings"
  12. "testing"
  13. "golang.org/x/net/context"
  14. )
  15. type unexpectedReader struct{}
  16. func (unexpectedReader) Read([]byte) (int, error) {
  17. return 0, fmt.Errorf("unexpected read in test")
  18. }
  19. // event is an expected request/response pair
  20. type event struct {
  21. // the byte range header that should be present in a request.
  22. byteRange string
  23. // the http status code to send in response.
  24. responseStatus int
  25. }
  26. // interruptibleTransport is configured with a canned set of requests/responses.
  27. // It records the incoming data, unless the corresponding event is configured to return
  28. // http.StatusServiceUnavailable.
  29. type interruptibleTransport struct {
  30. events []event
  31. buf []byte
  32. bodies bodyTracker
  33. }
  34. // bodyTracker keeps track of response bodies that have not been closed.
  35. type bodyTracker map[io.ReadCloser]struct{}
  36. func (bt bodyTracker) Add(body io.ReadCloser) {
  37. bt[body] = struct{}{}
  38. }
  39. func (bt bodyTracker) Close(body io.ReadCloser) {
  40. delete(bt, body)
  41. }
  42. type trackingCloser struct {
  43. io.Reader
  44. tracker bodyTracker
  45. }
  46. func (tc *trackingCloser) Close() error {
  47. tc.tracker.Close(tc)
  48. return nil
  49. }
  50. func (tc *trackingCloser) Open() {
  51. tc.tracker.Add(tc)
  52. }
  53. func (t *interruptibleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  54. ev := t.events[0]
  55. t.events = t.events[1:]
  56. if got, want := req.Header.Get("Content-Range"), ev.byteRange; got != want {
  57. return nil, fmt.Errorf("byte range: got %s; want %s", got, want)
  58. }
  59. if ev.responseStatus != http.StatusServiceUnavailable {
  60. buf, err := ioutil.ReadAll(req.Body)
  61. if err != nil {
  62. return nil, fmt.Errorf("error reading from request data: %v", err)
  63. }
  64. t.buf = append(t.buf, buf...)
  65. }
  66. tc := &trackingCloser{unexpectedReader{}, t.bodies}
  67. tc.Open()
  68. res := &http.Response{
  69. StatusCode: ev.responseStatus,
  70. Header: http.Header{},
  71. Body: tc,
  72. }
  73. return res, nil
  74. }
  75. // progressRecorder records updates, and calls f for every invocation of ProgressUpdate.
  76. type progressRecorder struct {
  77. updates []int64
  78. f func()
  79. }
  80. func (pr *progressRecorder) ProgressUpdate(current int64) {
  81. pr.updates = append(pr.updates, current)
  82. if pr.f != nil {
  83. pr.f()
  84. }
  85. }
  86. func TestInterruptedTransferChunks(t *testing.T) {
  87. type testCase struct {
  88. data string
  89. chunkSize int
  90. events []event
  91. wantProgress []int64
  92. }
  93. for _, tc := range []testCase{
  94. {
  95. data: strings.Repeat("a", 300),
  96. chunkSize: 90,
  97. events: []event{
  98. {"bytes 0-89/*", http.StatusServiceUnavailable},
  99. {"bytes 0-89/*", 308},
  100. {"bytes 90-179/*", 308},
  101. {"bytes 180-269/*", http.StatusServiceUnavailable},
  102. {"bytes 180-269/*", 308},
  103. {"bytes 270-299/300", 200},
  104. },
  105. wantProgress: []int64{90, 180, 270, 300},
  106. },
  107. {
  108. data: strings.Repeat("a", 20),
  109. chunkSize: 10,
  110. events: []event{
  111. {"bytes 0-9/*", http.StatusServiceUnavailable},
  112. {"bytes 0-9/*", 308},
  113. {"bytes 10-19/*", http.StatusServiceUnavailable},
  114. {"bytes 10-19/*", 308},
  115. // 0 byte final request demands a byte range with leading asterix.
  116. {"bytes */20", http.StatusServiceUnavailable},
  117. {"bytes */20", 200},
  118. },
  119. wantProgress: []int64{10, 20},
  120. },
  121. } {
  122. media := strings.NewReader(tc.data)
  123. tr := &interruptibleTransport{
  124. buf: make([]byte, 0, len(tc.data)),
  125. events: tc.events,
  126. bodies: bodyTracker{},
  127. }
  128. pr := progressRecorder{}
  129. rx := &ResumableUpload{
  130. Client: &http.Client{Transport: tr},
  131. Media: NewResumableBuffer(media, tc.chunkSize),
  132. MediaType: "text/plain",
  133. Callback: pr.ProgressUpdate,
  134. Backoff: NoPauseStrategy,
  135. }
  136. res, err := rx.Upload(context.Background())
  137. if err == nil {
  138. res.Body.Close()
  139. }
  140. if err != nil || res == nil || res.StatusCode != http.StatusOK {
  141. if res == nil {
  142. t.Errorf("Upload not successful, res=nil: %v", err)
  143. } else {
  144. t.Errorf("Upload not successful, statusCode=%v: %v", res.StatusCode, err)
  145. }
  146. }
  147. if !reflect.DeepEqual(tr.buf, []byte(tc.data)) {
  148. t.Errorf("transferred contents:\ngot %s\nwant %s", tr.buf, tc.data)
  149. }
  150. if !reflect.DeepEqual(pr.updates, tc.wantProgress) {
  151. t.Errorf("progress updates: got %v, want %v", pr.updates, tc.wantProgress)
  152. }
  153. if len(tr.events) > 0 {
  154. t.Errorf("did not observe all expected events. leftover events: %v", tr.events)
  155. }
  156. if len(tr.bodies) > 0 {
  157. t.Errorf("unclosed request bodies: %v", tr.bodies)
  158. }
  159. }
  160. }
  161. func TestCancelUploadFast(t *testing.T) {
  162. const (
  163. chunkSize = 90
  164. mediaSize = 300
  165. )
  166. media := strings.NewReader(strings.Repeat("a", mediaSize))
  167. tr := &interruptibleTransport{
  168. buf: make([]byte, 0, mediaSize),
  169. }
  170. pr := progressRecorder{}
  171. rx := &ResumableUpload{
  172. Client: &http.Client{Transport: tr},
  173. Media: NewResumableBuffer(media, chunkSize),
  174. MediaType: "text/plain",
  175. Callback: pr.ProgressUpdate,
  176. Backoff: NoPauseStrategy,
  177. }
  178. ctx, cancelFunc := context.WithCancel(context.Background())
  179. cancelFunc() // stop the upload that hasn't started yet
  180. res, err := rx.Upload(ctx)
  181. if err != context.Canceled {
  182. t.Errorf("Upload err: got: %v; want: context cancelled", err)
  183. }
  184. if res != nil {
  185. t.Errorf("Upload result: got: %v; want: nil", res)
  186. }
  187. if pr.updates != nil {
  188. t.Errorf("progress updates: got %v; want: nil", pr.updates)
  189. }
  190. }
  191. func TestCancelUpload(t *testing.T) {
  192. const (
  193. chunkSize = 90
  194. mediaSize = 300
  195. )
  196. media := strings.NewReader(strings.Repeat("a", mediaSize))
  197. tr := &interruptibleTransport{
  198. buf: make([]byte, 0, mediaSize),
  199. events: []event{
  200. {"bytes 0-89/*", http.StatusServiceUnavailable},
  201. {"bytes 0-89/*", 308},
  202. {"bytes 90-179/*", 308},
  203. {"bytes 180-269/*", 308}, // Upload should be cancelled before this event.
  204. },
  205. bodies: bodyTracker{},
  206. }
  207. ctx, cancelFunc := context.WithCancel(context.Background())
  208. numUpdates := 0
  209. pr := progressRecorder{f: func() {
  210. numUpdates++
  211. if numUpdates >= 2 {
  212. cancelFunc()
  213. }
  214. }}
  215. rx := &ResumableUpload{
  216. Client: &http.Client{Transport: tr},
  217. Media: NewResumableBuffer(media, chunkSize),
  218. MediaType: "text/plain",
  219. Callback: pr.ProgressUpdate,
  220. Backoff: NoPauseStrategy,
  221. }
  222. res, err := rx.Upload(ctx)
  223. if err != context.Canceled {
  224. t.Errorf("Upload err: got: %v; want: context cancelled", err)
  225. }
  226. if res != nil {
  227. t.Errorf("Upload result: got: %v; want: nil", res)
  228. }
  229. if got, want := tr.buf, []byte(strings.Repeat("a", chunkSize*2)); !reflect.DeepEqual(got, want) {
  230. t.Errorf("transferred contents:\ngot %s\nwant %s", got, want)
  231. }
  232. if got, want := pr.updates, []int64{chunkSize, chunkSize * 2}; !reflect.DeepEqual(got, want) {
  233. t.Errorf("progress updates: got %v; want: %v", got, want)
  234. }
  235. if len(tr.bodies) > 0 {
  236. t.Errorf("unclosed request bodies: %v", tr.bodies)
  237. }
  238. }