upload_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. package s3manager_test
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "net/http/httptest"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "testing"
  13. "github.com/aws/aws-sdk-go/aws"
  14. "github.com/aws/aws-sdk-go/aws/awserr"
  15. "github.com/aws/aws-sdk-go/aws/awsutil"
  16. "github.com/aws/aws-sdk-go/aws/request"
  17. "github.com/aws/aws-sdk-go/internal/test/unit"
  18. "github.com/aws/aws-sdk-go/service/s3"
  19. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  20. "github.com/stretchr/testify/assert"
  21. )
  22. var _ = unit.Imported
  23. var buf12MB = make([]byte, 1024*1024*12)
  24. var buf2MB = make([]byte, 1024*1024*2)
  25. var emptyList = []string{}
  26. func val(i interface{}, s string) interface{} {
  27. return awsutil.ValuesAtPath(i, s)[0]
  28. }
  29. func contains(src []string, s string) bool {
  30. for _, v := range src {
  31. if s == v {
  32. return true
  33. }
  34. }
  35. return false
  36. }
  37. func loggingSvc(ignoreOps []string) (*s3.S3, *[]string, *[]interface{}) {
  38. var m sync.Mutex
  39. partNum := 0
  40. names := []string{}
  41. params := []interface{}{}
  42. svc := s3.New(nil)
  43. svc.Handlers.Unmarshal.Clear()
  44. svc.Handlers.UnmarshalMeta.Clear()
  45. svc.Handlers.UnmarshalError.Clear()
  46. svc.Handlers.Send.Clear()
  47. svc.Handlers.Send.PushBack(func(r *request.Request) {
  48. m.Lock()
  49. defer m.Unlock()
  50. if !contains(ignoreOps, r.Operation.Name) {
  51. names = append(names, r.Operation.Name)
  52. params = append(params, r.Params)
  53. }
  54. r.HTTPResponse = &http.Response{
  55. StatusCode: 200,
  56. Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
  57. }
  58. switch data := r.Data.(type) {
  59. case *s3.CreateMultipartUploadOutput:
  60. data.UploadId = aws.String("UPLOAD-ID")
  61. case *s3.UploadPartOutput:
  62. partNum++
  63. data.ETag = aws.String(fmt.Sprintf("ETAG%d", partNum))
  64. case *s3.CompleteMultipartUploadOutput:
  65. data.Location = aws.String("https://location")
  66. }
  67. })
  68. return svc, &names, &params
  69. }
  70. func buflen(i interface{}) int {
  71. r := i.(io.Reader)
  72. b, _ := ioutil.ReadAll(r)
  73. return len(b)
  74. }
  75. func TestUploadOrderMulti(t *testing.T) {
  76. s, ops, args := loggingSvc(emptyList)
  77. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s})
  78. resp, err := mgr.Upload(&s3manager.UploadInput{
  79. Bucket: aws.String("Bucket"),
  80. Key: aws.String("Key"),
  81. Body: bytes.NewReader(buf12MB),
  82. ServerSideEncryption: aws.String("AES256"),
  83. ContentType: aws.String("content/type"),
  84. })
  85. assert.NoError(t, err)
  86. assert.Equal(t, []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *ops)
  87. assert.Equal(t, "https://location", resp.Location)
  88. assert.Equal(t, "UPLOAD-ID", resp.UploadID)
  89. // Validate input values
  90. // UploadPart
  91. assert.Equal(t, "UPLOAD-ID", val((*args)[1], "UploadId"))
  92. assert.Equal(t, "UPLOAD-ID", val((*args)[2], "UploadId"))
  93. assert.Equal(t, "UPLOAD-ID", val((*args)[3], "UploadId"))
  94. // CompleteMultipartUpload
  95. assert.Equal(t, "UPLOAD-ID", val((*args)[4], "UploadId"))
  96. assert.Equal(t, int64(1), val((*args)[4], "MultipartUpload.Parts[0].PartNumber"))
  97. assert.Equal(t, int64(2), val((*args)[4], "MultipartUpload.Parts[1].PartNumber"))
  98. assert.Equal(t, int64(3), val((*args)[4], "MultipartUpload.Parts[2].PartNumber"))
  99. assert.Regexp(t, `^ETAG\d+$`, val((*args)[4], "MultipartUpload.Parts[0].ETag"))
  100. assert.Regexp(t, `^ETAG\d+$`, val((*args)[4], "MultipartUpload.Parts[1].ETag"))
  101. assert.Regexp(t, `^ETAG\d+$`, val((*args)[4], "MultipartUpload.Parts[2].ETag"))
  102. // Custom headers
  103. assert.Equal(t, "AES256", val((*args)[0], "ServerSideEncryption"))
  104. assert.Equal(t, "content/type", val((*args)[0], "ContentType"))
  105. }
  106. func TestUploadOrderMultiDifferentPartSize(t *testing.T) {
  107. s, ops, args := loggingSvc(emptyList)
  108. mgr := s3manager.NewUploader(&s3manager.UploadOptions{
  109. S3: s,
  110. PartSize: 1024 * 1024 * 7,
  111. Concurrency: 1,
  112. })
  113. _, err := mgr.Upload(&s3manager.UploadInput{
  114. Bucket: aws.String("Bucket"),
  115. Key: aws.String("Key"),
  116. Body: bytes.NewReader(buf12MB),
  117. })
  118. assert.NoError(t, err)
  119. assert.Equal(t, []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *ops)
  120. // Part lengths
  121. assert.Equal(t, 1024*1024*7, buflen(val((*args)[1], "Body")))
  122. assert.Equal(t, 1024*1024*5, buflen(val((*args)[2], "Body")))
  123. }
  124. func TestUploadIncreasePartSize(t *testing.T) {
  125. s3manager.MaxUploadParts = 2
  126. defer func() { s3manager.MaxUploadParts = 10000 }()
  127. s, ops, args := loggingSvc(emptyList)
  128. opts := &s3manager.UploadOptions{S3: s, Concurrency: 1}
  129. mgr := s3manager.NewUploader(opts)
  130. _, err := mgr.Upload(&s3manager.UploadInput{
  131. Bucket: aws.String("Bucket"),
  132. Key: aws.String("Key"),
  133. Body: bytes.NewReader(buf12MB),
  134. })
  135. assert.NoError(t, err)
  136. assert.Equal(t, int64(0), opts.PartSize) // don't modify orig options
  137. assert.Equal(t, []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *ops)
  138. // Part lengths
  139. assert.Equal(t, 1024*1024*6, buflen(val((*args)[1], "Body")))
  140. assert.Equal(t, 1024*1024*6, buflen(val((*args)[2], "Body")))
  141. }
  142. func TestUploadFailIfPartSizeTooSmall(t *testing.T) {
  143. opts := &s3manager.UploadOptions{PartSize: 5}
  144. mgr := s3manager.NewUploader(opts)
  145. resp, err := mgr.Upload(&s3manager.UploadInput{
  146. Bucket: aws.String("Bucket"),
  147. Key: aws.String("Key"),
  148. Body: bytes.NewReader(buf12MB),
  149. })
  150. assert.Nil(t, resp)
  151. assert.NotNil(t, err)
  152. aerr := err.(awserr.Error)
  153. assert.Equal(t, "ConfigError", aerr.Code())
  154. assert.Contains(t, aerr.Message(), "part size must be at least")
  155. }
  156. func TestUploadOrderSingle(t *testing.T) {
  157. s, ops, args := loggingSvc(emptyList)
  158. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s})
  159. resp, err := mgr.Upload(&s3manager.UploadInput{
  160. Bucket: aws.String("Bucket"),
  161. Key: aws.String("Key"),
  162. Body: bytes.NewReader(buf2MB),
  163. ServerSideEncryption: aws.String("AES256"),
  164. ContentType: aws.String("content/type"),
  165. })
  166. assert.NoError(t, err)
  167. assert.Equal(t, []string{"PutObject"}, *ops)
  168. assert.NotEqual(t, "", resp.Location)
  169. assert.Equal(t, "", resp.UploadID)
  170. assert.Equal(t, "AES256", val((*args)[0], "ServerSideEncryption"))
  171. assert.Equal(t, "content/type", val((*args)[0], "ContentType"))
  172. }
  173. func TestUploadOrderSingleFailure(t *testing.T) {
  174. s, ops, _ := loggingSvc(emptyList)
  175. s.Handlers.Send.PushBack(func(r *request.Request) {
  176. r.HTTPResponse.StatusCode = 400
  177. })
  178. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s})
  179. resp, err := mgr.Upload(&s3manager.UploadInput{
  180. Bucket: aws.String("Bucket"),
  181. Key: aws.String("Key"),
  182. Body: bytes.NewReader(buf2MB),
  183. })
  184. assert.Error(t, err)
  185. assert.Equal(t, []string{"PutObject"}, *ops)
  186. assert.Nil(t, resp)
  187. }
  188. func TestUploadOrderZero(t *testing.T) {
  189. s, ops, args := loggingSvc(emptyList)
  190. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s})
  191. resp, err := mgr.Upload(&s3manager.UploadInput{
  192. Bucket: aws.String("Bucket"),
  193. Key: aws.String("Key"),
  194. Body: bytes.NewReader(make([]byte, 0)),
  195. })
  196. assert.NoError(t, err)
  197. assert.Equal(t, []string{"PutObject"}, *ops)
  198. assert.NotEqual(t, "", resp.Location)
  199. assert.Equal(t, "", resp.UploadID)
  200. assert.Equal(t, 0, buflen(val((*args)[0], "Body")))
  201. }
  202. func TestUploadOrderMultiFailure(t *testing.T) {
  203. s, ops, _ := loggingSvc(emptyList)
  204. s.Handlers.Send.PushBack(func(r *request.Request) {
  205. switch t := r.Data.(type) {
  206. case *s3.UploadPartOutput:
  207. if *t.ETag == "ETAG2" {
  208. r.HTTPResponse.StatusCode = 400
  209. }
  210. }
  211. })
  212. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s, Concurrency: 1})
  213. _, err := mgr.Upload(&s3manager.UploadInput{
  214. Bucket: aws.String("Bucket"),
  215. Key: aws.String("Key"),
  216. Body: bytes.NewReader(buf12MB),
  217. })
  218. assert.Error(t, err)
  219. assert.Equal(t, []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "AbortMultipartUpload"}, *ops)
  220. }
  221. func TestUploadOrderMultiFailureOnComplete(t *testing.T) {
  222. s, ops, _ := loggingSvc(emptyList)
  223. s.Handlers.Send.PushBack(func(r *request.Request) {
  224. switch r.Data.(type) {
  225. case *s3.CompleteMultipartUploadOutput:
  226. r.HTTPResponse.StatusCode = 400
  227. }
  228. })
  229. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s, Concurrency: 1})
  230. _, err := mgr.Upload(&s3manager.UploadInput{
  231. Bucket: aws.String("Bucket"),
  232. Key: aws.String("Key"),
  233. Body: bytes.NewReader(buf12MB),
  234. })
  235. assert.Error(t, err)
  236. assert.Equal(t, []string{"CreateMultipartUpload", "UploadPart", "UploadPart",
  237. "UploadPart", "CompleteMultipartUpload", "AbortMultipartUpload"}, *ops)
  238. }
  239. func TestUploadOrderMultiFailureOnCreate(t *testing.T) {
  240. s, ops, _ := loggingSvc(emptyList)
  241. s.Handlers.Send.PushBack(func(r *request.Request) {
  242. switch r.Data.(type) {
  243. case *s3.CreateMultipartUploadOutput:
  244. r.HTTPResponse.StatusCode = 400
  245. }
  246. })
  247. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s})
  248. _, err := mgr.Upload(&s3manager.UploadInput{
  249. Bucket: aws.String("Bucket"),
  250. Key: aws.String("Key"),
  251. Body: bytes.NewReader(make([]byte, 1024*1024*12)),
  252. })
  253. assert.Error(t, err)
  254. assert.Equal(t, []string{"CreateMultipartUpload"}, *ops)
  255. }
  256. func TestUploadOrderMultiFailureLeaveParts(t *testing.T) {
  257. s, ops, _ := loggingSvc(emptyList)
  258. s.Handlers.Send.PushBack(func(r *request.Request) {
  259. switch data := r.Data.(type) {
  260. case *s3.UploadPartOutput:
  261. if *data.ETag == "ETAG2" {
  262. r.HTTPResponse.StatusCode = 400
  263. }
  264. }
  265. })
  266. mgr := s3manager.NewUploader(&s3manager.UploadOptions{
  267. S3: s,
  268. Concurrency: 1,
  269. LeavePartsOnError: true,
  270. })
  271. _, err := mgr.Upload(&s3manager.UploadInput{
  272. Bucket: aws.String("Bucket"),
  273. Key: aws.String("Key"),
  274. Body: bytes.NewReader(make([]byte, 1024*1024*12)),
  275. })
  276. assert.Error(t, err)
  277. assert.Equal(t, []string{"CreateMultipartUpload", "UploadPart", "UploadPart"}, *ops)
  278. }
  279. type failreader struct {
  280. times int
  281. failCount int
  282. }
  283. func (f *failreader) Read(b []byte) (int, error) {
  284. f.failCount++
  285. if f.failCount >= f.times {
  286. return 0, fmt.Errorf("random failure")
  287. }
  288. return len(b), nil
  289. }
  290. func TestUploadOrderReadFail1(t *testing.T) {
  291. s, ops, _ := loggingSvc(emptyList)
  292. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s})
  293. _, err := mgr.Upload(&s3manager.UploadInput{
  294. Bucket: aws.String("Bucket"),
  295. Key: aws.String("Key"),
  296. Body: &failreader{times: 1},
  297. })
  298. assert.Equal(t, "ReadRequestBody", err.(awserr.Error).Code())
  299. assert.EqualError(t, err.(awserr.Error).OrigErr(), "random failure")
  300. assert.Equal(t, []string{}, *ops)
  301. }
  302. func TestUploadOrderReadFail2(t *testing.T) {
  303. s, ops, _ := loggingSvc([]string{"UploadPart"})
  304. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s, Concurrency: 1})
  305. _, err := mgr.Upload(&s3manager.UploadInput{
  306. Bucket: aws.String("Bucket"),
  307. Key: aws.String("Key"),
  308. Body: &failreader{times: 2},
  309. })
  310. assert.Equal(t, "ReadRequestBody", err.(awserr.Error).Code())
  311. assert.EqualError(t, err.(awserr.Error).OrigErr(), "random failure")
  312. assert.Equal(t, []string{"CreateMultipartUpload", "AbortMultipartUpload"}, *ops)
  313. }
  314. type sizedReader struct {
  315. size int
  316. cur int
  317. }
  318. func (s *sizedReader) Read(p []byte) (n int, err error) {
  319. if s.cur >= s.size {
  320. return 0, io.EOF
  321. }
  322. n = len(p)
  323. s.cur += len(p)
  324. if s.cur > s.size {
  325. n -= s.cur - s.size
  326. }
  327. return
  328. }
  329. func TestUploadOrderMultiBufferedReader(t *testing.T) {
  330. s, ops, args := loggingSvc(emptyList)
  331. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s})
  332. _, err := mgr.Upload(&s3manager.UploadInput{
  333. Bucket: aws.String("Bucket"),
  334. Key: aws.String("Key"),
  335. Body: &sizedReader{size: 1024 * 1024 * 12},
  336. })
  337. assert.NoError(t, err)
  338. assert.Equal(t, []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *ops)
  339. // Part lengths
  340. parts := []int{
  341. buflen(val((*args)[1], "Body")),
  342. buflen(val((*args)[2], "Body")),
  343. buflen(val((*args)[3], "Body")),
  344. }
  345. sort.Ints(parts)
  346. assert.Equal(t, []int{1024 * 1024 * 2, 1024 * 1024 * 5, 1024 * 1024 * 5}, parts)
  347. }
  348. func TestUploadOrderMultiBufferedReaderExceedTotalParts(t *testing.T) {
  349. s3manager.MaxUploadParts = 2
  350. defer func() { s3manager.MaxUploadParts = 10000 }()
  351. s, ops, _ := loggingSvc([]string{"UploadPart"})
  352. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s, Concurrency: 1})
  353. resp, err := mgr.Upload(&s3manager.UploadInput{
  354. Bucket: aws.String("Bucket"),
  355. Key: aws.String("Key"),
  356. Body: &sizedReader{size: 1024 * 1024 * 12},
  357. })
  358. assert.Error(t, err)
  359. assert.Nil(t, resp)
  360. assert.Equal(t, []string{"CreateMultipartUpload", "AbortMultipartUpload"}, *ops)
  361. aerr := err.(awserr.Error)
  362. assert.Equal(t, "TotalPartsExceeded", aerr.Code())
  363. assert.Contains(t, aerr.Message(), "exceeded total allowed parts (2)")
  364. }
  365. func TestUploadOrderSingleBufferedReader(t *testing.T) {
  366. s, ops, _ := loggingSvc(emptyList)
  367. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: s})
  368. resp, err := mgr.Upload(&s3manager.UploadInput{
  369. Bucket: aws.String("Bucket"),
  370. Key: aws.String("Key"),
  371. Body: &sizedReader{size: 1024 * 1024 * 2},
  372. })
  373. assert.NoError(t, err)
  374. assert.Equal(t, []string{"PutObject"}, *ops)
  375. assert.NotEqual(t, "", resp.Location)
  376. assert.Equal(t, "", resp.UploadID)
  377. }
  378. func TestUploadZeroLenObject(t *testing.T) {
  379. requestMade := false
  380. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  381. requestMade = true
  382. w.WriteHeader(http.StatusOK)
  383. }))
  384. svc := s3.New(&aws.Config{
  385. Endpoint: aws.String(server.URL),
  386. })
  387. mgr := s3manager.NewUploader(&s3manager.UploadOptions{S3: svc})
  388. resp, err := mgr.Upload(&s3manager.UploadInput{
  389. Bucket: aws.String("Bucket"),
  390. Key: aws.String("Key"),
  391. Body: strings.NewReader(""),
  392. })
  393. assert.NoError(t, err)
  394. assert.True(t, requestMade)
  395. assert.NotEqual(t, "", resp.Location)
  396. assert.Equal(t, "", resp.UploadID)
  397. }