upload.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. package s3manager
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "sort"
  7. "sync"
  8. "time"
  9. "github.com/aws/aws-sdk-go/aws/awserr"
  10. "github.com/aws/aws-sdk-go/aws/awsutil"
  11. "github.com/aws/aws-sdk-go/service/s3"
  12. )
  13. // The maximum allowed number of parts in a multi-part upload on Amazon S3.
  14. var MaxUploadParts = 10000
  15. // The minimum allowed part size when uploading a part to Amazon S3.
  16. var MinUploadPartSize int64 = 1024 * 1024 * 5
  17. // The default part size to buffer chunks of a payload into.
  18. var DefaultUploadPartSize = MinUploadPartSize
  19. // The default number of goroutines to spin up when using Upload().
  20. var DefaultUploadConcurrency = 5
  21. // The default set of options used when opts is nil in Upload().
  22. var DefaultUploadOptions = &UploadOptions{
  23. PartSize: DefaultUploadPartSize,
  24. Concurrency: DefaultUploadConcurrency,
  25. LeavePartsOnError: false,
  26. S3: nil,
  27. }
  28. // A MultiUploadFailure wraps a failed S3 multipart upload. An error returned
  29. // will satisfy this interface when a multi part upload failed to upload all
  30. // chucks to S3. In the case of a failure the UploadID is needed to operate on
  31. // the chunks, if any, which were uploaded.
  32. //
  33. // Example:
  34. //
  35. // u := s3manager.NewUploader(opts)
  36. // output, err := u.upload(input)
  37. // if err != nil {
  38. // if multierr, ok := err.(MultiUploadFailure); ok {
  39. // // Process error and its associated uploadID
  40. // fmt.Println("Error:", multierr.Code(), multierr.Message(), multierr.UploadID())
  41. // } else {
  42. // // Process error generically
  43. // fmt.Println("Error:", err.Error())
  44. // }
  45. // }
  46. //
  47. type MultiUploadFailure interface {
  48. awserr.Error
  49. // Returns the upload id for the S3 multipart upload that failed.
  50. UploadID() string
  51. }
  52. // So that the Error interface type can be included as an anonymous field
  53. // in the multiUploadError struct and not conflict with the error.Error() method.
  54. type awsError awserr.Error
  55. // A multiUploadError wraps the upload ID of a failed s3 multipart upload.
  56. // Composed of BaseError for code, message, and original error
  57. //
  58. // Should be used for an error that occurred failing a S3 multipart upload,
  59. // and a upload ID is available. If an uploadID is not available a more relevant
  60. type multiUploadError struct {
  61. awsError
  62. // ID for multipart upload which failed.
  63. uploadID string
  64. }
  65. // Error returns the string representation of the error.
  66. //
  67. // See apierr.BaseError ErrorWithExtra for output format
  68. //
  69. // Satisfies the error interface.
  70. func (m multiUploadError) Error() string {
  71. extra := fmt.Sprintf("upload id: %s", m.uploadID)
  72. return awserr.SprintError(m.Code(), m.Message(), extra, m.OrigErr())
  73. }
  74. // String returns the string representation of the error.
  75. // Alias for Error to satisfy the stringer interface.
  76. func (m multiUploadError) String() string {
  77. return m.Error()
  78. }
  79. // UploadID returns the id of the S3 upload which failed.
  80. func (m multiUploadError) UploadID() string {
  81. return m.uploadID
  82. }
  83. // UploadInput contains all input for upload requests to Amazon S3.
  84. type UploadInput struct {
  85. // The canned ACL to apply to the object.
  86. ACL *string `location:"header" locationName:"x-amz-acl" type:"string"`
  87. Bucket *string `location:"uri" locationName:"Bucket" type:"string" required:"true"`
  88. // Specifies caching behavior along the request/reply chain.
  89. CacheControl *string `location:"header" locationName:"Cache-Control" type:"string"`
  90. // Specifies presentational information for the object.
  91. ContentDisposition *string `location:"header" locationName:"Content-Disposition" type:"string"`
  92. // Specifies what content encodings have been applied to the object and thus
  93. // what decoding mechanisms must be applied to obtain the media-type referenced
  94. // by the Content-Type header field.
  95. ContentEncoding *string `location:"header" locationName:"Content-Encoding" type:"string"`
  96. // The language the content is in.
  97. ContentLanguage *string `location:"header" locationName:"Content-Language" type:"string"`
  98. // A standard MIME type describing the format of the object data.
  99. ContentType *string `location:"header" locationName:"Content-Type" type:"string"`
  100. // The date and time at which the object is no longer cacheable.
  101. Expires *time.Time `location:"header" locationName:"Expires" type:"timestamp" timestampFormat:"rfc822"`
  102. // Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object.
  103. GrantFullControl *string `location:"header" locationName:"x-amz-grant-full-control" type:"string"`
  104. // Allows grantee to read the object data and its metadata.
  105. GrantRead *string `location:"header" locationName:"x-amz-grant-read" type:"string"`
  106. // Allows grantee to read the object ACL.
  107. GrantReadACP *string `location:"header" locationName:"x-amz-grant-read-acp" type:"string"`
  108. // Allows grantee to write the ACL for the applicable object.
  109. GrantWriteACP *string `location:"header" locationName:"x-amz-grant-write-acp" type:"string"`
  110. Key *string `location:"uri" locationName:"Key" type:"string" required:"true"`
  111. // A map of metadata to store with the object in S3.
  112. Metadata map[string]*string `location:"headers" locationName:"x-amz-meta-" type:"map"`
  113. // Confirms that the requester knows that she or he will be charged for the
  114. // request. Bucket owners need not specify this parameter in their requests.
  115. // Documentation on downloading objects from requester pays buckets can be found
  116. // at http://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectsinRequesterPaysBuckets.html
  117. RequestPayer *string `location:"header" locationName:"x-amz-request-payer" type:"string"`
  118. // Specifies the algorithm to use to when encrypting the object (e.g., AES256,
  119. // aws:kms).
  120. SSECustomerAlgorithm *string `location:"header" locationName:"x-amz-server-side-encryption-customer-algorithm" type:"string"`
  121. // Specifies the customer-provided encryption key for Amazon S3 to use in encrypting
  122. // data. This value is used to store the object and then it is discarded; Amazon
  123. // does not store the encryption key. The key must be appropriate for use with
  124. // the algorithm specified in the x-amz-server-side​-encryption​-customer-algorithm
  125. // header.
  126. SSECustomerKey *string `location:"header" locationName:"x-amz-server-side-encryption-customer-key" type:"string"`
  127. // Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321.
  128. // Amazon S3 uses this header for a message integrity check to ensure the encryption
  129. // key was transmitted without error.
  130. SSECustomerKeyMD5 *string `location:"header" locationName:"x-amz-server-side-encryption-customer-key-MD5" type:"string"`
  131. // Specifies the AWS KMS key ID to use for object encryption. All GET and PUT
  132. // requests for an object protected by AWS KMS will fail if not made via SSL
  133. // or using SigV4. Documentation on configuring any of the officially supported
  134. // AWS SDKs and CLI can be found at http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version
  135. SSEKMSKeyID *string `location:"header" locationName:"x-amz-server-side-encryption-aws-kms-key-id" type:"string"`
  136. // The Server-side encryption algorithm used when storing this object in S3
  137. // (e.g., AES256, aws:kms).
  138. ServerSideEncryption *string `location:"header" locationName:"x-amz-server-side-encryption" type:"string"`
  139. // The type of storage to use for the object. Defaults to 'STANDARD'.
  140. StorageClass *string `location:"header" locationName:"x-amz-storage-class" type:"string"`
  141. // If the bucket is configured as a website, redirects requests for this object
  142. // to another object in the same bucket or to an external URL. Amazon S3 stores
  143. // the value of this header in the object metadata.
  144. WebsiteRedirectLocation *string `location:"header" locationName:"x-amz-website-redirect-location" type:"string"`
  145. // The readable body payload to send to S3.
  146. Body io.Reader
  147. }
  148. // UploadOutput represents a response from the Upload() call.
  149. type UploadOutput struct {
  150. // The URL where the object was uploaded to.
  151. Location string
  152. // The ID for a multipart upload to S3. In the case of an error the error
  153. // can be cast to the MultiUploadFailure interface to extract the upload ID.
  154. UploadID string
  155. }
  156. // UploadOptions keeps tracks of extra options to pass to an Upload() call.
  157. type UploadOptions struct {
  158. // The buffer size (in bytes) to use when buffering data into chunks and
  159. // sending them as parts to S3. The minimum allowed part size is 5MB, and
  160. // if this value is set to zero, the DefaultPartSize value will be used.
  161. PartSize int64
  162. // The number of goroutines to spin up in parallel when sending parts.
  163. // If this is set to zero, the DefaultConcurrency value will be used.
  164. Concurrency int
  165. // Setting this value to true will cause the SDK to avoid calling
  166. // AbortMultipartUpload on a failure, leaving all successfully uploaded
  167. // parts on S3 for manual recovery.
  168. //
  169. // Note that storing parts of an incomplete multipart upload counts towards
  170. // space usage on S3 and will add additional costs if not cleaned up.
  171. LeavePartsOnError bool
  172. // The client to use when uploading to S3. Leave this as nil to use the
  173. // default S3 client.
  174. S3 *s3.S3
  175. }
  176. // NewUploader creates a new Uploader object to upload data to S3. Pass in
  177. // an optional opts structure to customize the uploader behavior.
  178. func NewUploader(opts *UploadOptions) *Uploader {
  179. if opts == nil {
  180. opts = DefaultUploadOptions
  181. }
  182. return &Uploader{opts: opts}
  183. }
  184. // The Uploader structure that calls Upload(). It is safe to call Upload()
  185. // on this structure for multiple objects and across concurrent goroutines.
  186. type Uploader struct {
  187. opts *UploadOptions
  188. }
  189. // Upload uploads an object to S3, intelligently buffering large files into
  190. // smaller chunks and sending them in parallel across multiple goroutines. You
  191. // can configure the buffer size and concurrency through the opts parameter.
  192. //
  193. // If opts is set to nil, DefaultUploadOptions will be used.
  194. //
  195. // It is safe to call this method for multiple objects and across concurrent
  196. // goroutines.
  197. func (u *Uploader) Upload(input *UploadInput) (*UploadOutput, error) {
  198. i := uploader{in: input, opts: *u.opts}
  199. return i.upload()
  200. }
  201. // internal structure to manage an upload to S3.
  202. type uploader struct {
  203. in *UploadInput
  204. opts UploadOptions
  205. readerPos int64 // current reader position
  206. totalSize int64 // set to -1 if the size is not known
  207. }
  208. // internal logic for deciding whether to upload a single part or use a
  209. // multipart upload.
  210. func (u *uploader) upload() (*UploadOutput, error) {
  211. u.init()
  212. if u.opts.PartSize < MinUploadPartSize {
  213. msg := fmt.Sprintf("part size must be at least %d bytes", MinUploadPartSize)
  214. return nil, awserr.New("ConfigError", msg, nil)
  215. }
  216. // Do one read to determine if we have more than one part
  217. buf, err := u.nextReader()
  218. if err == io.EOF || err == io.ErrUnexpectedEOF { // single part
  219. return u.singlePart(buf)
  220. } else if err != nil {
  221. return nil, awserr.New("ReadRequestBody", "read upload data failed", err)
  222. }
  223. mu := multiuploader{uploader: u}
  224. return mu.upload(buf)
  225. }
  226. // init will initialize all default options.
  227. func (u *uploader) init() {
  228. if u.opts.S3 == nil {
  229. u.opts.S3 = s3.New(nil)
  230. }
  231. if u.opts.Concurrency == 0 {
  232. u.opts.Concurrency = DefaultUploadConcurrency
  233. }
  234. if u.opts.PartSize == 0 {
  235. u.opts.PartSize = DefaultUploadPartSize
  236. }
  237. // Try to get the total size for some optimizations
  238. u.initSize()
  239. }
  240. // initSize tries to detect the total stream size, setting u.totalSize. If
  241. // the size is not known, totalSize is set to -1.
  242. func (u *uploader) initSize() {
  243. u.totalSize = -1
  244. switch r := u.in.Body.(type) {
  245. case io.Seeker:
  246. pos, _ := r.Seek(0, 1)
  247. defer r.Seek(pos, 0)
  248. n, err := r.Seek(0, 2)
  249. if err != nil {
  250. return
  251. }
  252. u.totalSize = n
  253. // try to adjust partSize if it is too small
  254. if u.totalSize/u.opts.PartSize >= int64(MaxUploadParts) {
  255. u.opts.PartSize = u.totalSize / int64(MaxUploadParts)
  256. }
  257. }
  258. }
  259. // nextReader returns a seekable reader representing the next packet of data.
  260. // This operation increases the shared u.readerPos counter, but note that it
  261. // does not need to be wrapped in a mutex because nextReader is only called
  262. // from the main thread.
  263. func (u *uploader) nextReader() (io.ReadSeeker, error) {
  264. switch r := u.in.Body.(type) {
  265. case io.ReaderAt:
  266. var err error
  267. n := u.opts.PartSize
  268. if u.totalSize >= 0 {
  269. bytesLeft := u.totalSize - u.readerPos
  270. if bytesLeft == 0 {
  271. err = io.EOF
  272. n = bytesLeft
  273. } else if bytesLeft <= u.opts.PartSize {
  274. err = io.ErrUnexpectedEOF
  275. n = bytesLeft
  276. }
  277. }
  278. buf := io.NewSectionReader(r, u.readerPos, n)
  279. u.readerPos += n
  280. return buf, err
  281. default:
  282. packet := make([]byte, u.opts.PartSize)
  283. n, err := io.ReadFull(u.in.Body, packet)
  284. u.readerPos += int64(n)
  285. return bytes.NewReader(packet[0:n]), err
  286. }
  287. }
  288. // singlePart contains upload logic for uploading a single chunk via
  289. // a regular PutObject request. Multipart requests require at least two
  290. // parts, or at least 5MB of data.
  291. func (u *uploader) singlePart(buf io.ReadSeeker) (*UploadOutput, error) {
  292. params := &s3.PutObjectInput{}
  293. awsutil.Copy(params, u.in)
  294. params.Body = buf
  295. req, _ := u.opts.S3.PutObjectRequest(params)
  296. if err := req.Send(); err != nil {
  297. return nil, err
  298. }
  299. url := req.HTTPRequest.URL.String()
  300. return &UploadOutput{Location: url}, nil
  301. }
  302. // internal structure to manage a specific multipart upload to S3.
  303. type multiuploader struct {
  304. *uploader
  305. wg sync.WaitGroup
  306. m sync.Mutex
  307. err error
  308. uploadID string
  309. parts completedParts
  310. }
  311. // keeps track of a single chunk of data being sent to S3.
  312. type chunk struct {
  313. buf io.ReadSeeker
  314. num int64
  315. }
  316. // completedParts is a wrapper to make parts sortable by their part number,
  317. // since S3 required this list to be sent in sorted order.
  318. type completedParts []*s3.CompletedPart
  319. func (a completedParts) Len() int { return len(a) }
  320. func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  321. func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
  322. // upload will perform a multipart upload using the firstBuf buffer containing
  323. // the first chunk of data.
  324. func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) {
  325. params := &s3.CreateMultipartUploadInput{}
  326. awsutil.Copy(params, u.in)
  327. // Create the multipart
  328. resp, err := u.opts.S3.CreateMultipartUpload(params)
  329. if err != nil {
  330. return nil, err
  331. }
  332. u.uploadID = *resp.UploadId
  333. // Create the workers
  334. ch := make(chan chunk, u.opts.Concurrency)
  335. for i := 0; i < u.opts.Concurrency; i++ {
  336. u.wg.Add(1)
  337. go u.readChunk(ch)
  338. }
  339. // Send part 1 to the workers
  340. var num int64 = 1
  341. ch <- chunk{buf: firstBuf, num: num}
  342. // Read and queue the rest of the parts
  343. for u.geterr() == nil {
  344. // This upload exceeded maximum number of supported parts, error now.
  345. if num > int64(MaxUploadParts) {
  346. msg := fmt.Sprintf("exceeded total allowed parts (%d). "+
  347. "Adjust PartSize to fit in this limit", MaxUploadParts)
  348. u.seterr(awserr.New("TotalPartsExceeded", msg, nil))
  349. break
  350. }
  351. num++
  352. buf, err := u.nextReader()
  353. if err == io.EOF {
  354. break
  355. }
  356. ch <- chunk{buf: buf, num: num}
  357. if err != nil && err != io.ErrUnexpectedEOF {
  358. u.seterr(awserr.New(
  359. "ReadRequestBody",
  360. "read multipart upload data failed",
  361. err))
  362. break
  363. }
  364. }
  365. // Close the channel, wait for workers, and complete upload
  366. close(ch)
  367. u.wg.Wait()
  368. complete := u.complete()
  369. if err := u.geterr(); err != nil {
  370. return nil, &multiUploadError{
  371. awsError: awserr.New(
  372. "MultipartUpload",
  373. "upload multipart failed",
  374. err),
  375. uploadID: u.uploadID,
  376. }
  377. }
  378. return &UploadOutput{
  379. Location: *complete.Location,
  380. UploadID: u.uploadID,
  381. }, nil
  382. }
  383. // readChunk runs in worker goroutines to pull chunks off of the ch channel
  384. // and send() them as UploadPart requests.
  385. func (u *multiuploader) readChunk(ch chan chunk) {
  386. defer u.wg.Done()
  387. for {
  388. data, ok := <-ch
  389. if !ok {
  390. break
  391. }
  392. if u.geterr() == nil {
  393. if err := u.send(data); err != nil {
  394. u.seterr(err)
  395. }
  396. }
  397. }
  398. }
  399. // send performs an UploadPart request and keeps track of the completed
  400. // part information.
  401. func (u *multiuploader) send(c chunk) error {
  402. resp, err := u.opts.S3.UploadPart(&s3.UploadPartInput{
  403. Bucket: u.in.Bucket,
  404. Key: u.in.Key,
  405. Body: c.buf,
  406. UploadId: &u.uploadID,
  407. PartNumber: &c.num,
  408. })
  409. if err != nil {
  410. return err
  411. }
  412. n := c.num
  413. completed := &s3.CompletedPart{ETag: resp.ETag, PartNumber: &n}
  414. u.m.Lock()
  415. u.parts = append(u.parts, completed)
  416. u.m.Unlock()
  417. return nil
  418. }
  419. // geterr is a thread-safe getter for the error object
  420. func (u *multiuploader) geterr() error {
  421. u.m.Lock()
  422. defer u.m.Unlock()
  423. return u.err
  424. }
  425. // seterr is a thread-safe setter for the error object
  426. func (u *multiuploader) seterr(e error) {
  427. u.m.Lock()
  428. defer u.m.Unlock()
  429. u.err = e
  430. }
  431. // fail will abort the multipart unless LeavePartsOnError is set to true.
  432. func (u *multiuploader) fail() {
  433. if u.opts.LeavePartsOnError {
  434. return
  435. }
  436. u.opts.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
  437. Bucket: u.in.Bucket,
  438. Key: u.in.Key,
  439. UploadId: &u.uploadID,
  440. })
  441. }
  442. // complete successfully completes a multipart upload and returns the response.
  443. func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput {
  444. if u.geterr() != nil {
  445. u.fail()
  446. return nil
  447. }
  448. // Parts must be sorted in PartNumber order.
  449. sort.Sort(u.parts)
  450. resp, err := u.opts.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
  451. Bucket: u.in.Bucket,
  452. Key: u.in.Key,
  453. UploadId: &u.uploadID,
  454. MultipartUpload: &s3.CompletedMultipartUpload{Parts: u.parts},
  455. })
  456. if err != nil {
  457. u.seterr(err)
  458. u.fail()
  459. }
  460. return resp
  461. }