diff options
Diffstat (limited to 'vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go')
-rw-r--r-- | vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go | 204 |
1 files changed, 178 insertions, 26 deletions
diff --git a/vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go b/vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go index 0841077a..d30f2b6b 100644 --- a/vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go +++ b/vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go @@ -31,15 +31,30 @@ const DefaultDownloadConcurrency = 5 type Downloader struct { // The buffer size (in bytes) to use when buffering data into chunks and // sending them as parts to S3. The minimum allowed part size is 5MB, and - // if this value is set to zero, the DefaultPartSize value will be used. + // if this value is set to zero, the DefaultDownloadPartSize value will be used. + // + // PartSize is ignored if the Range input parameter is provided. PartSize int64 // The number of goroutines to spin up in parallel when sending parts. // If this is set to zero, the DefaultDownloadConcurrency value will be used. + // + // Concurrency is ignored if the Range input parameter is provided. Concurrency int // An S3 client to use when performing downloads. S3 s3iface.S3API + + // List of request options that will be passed down to individual API + // operation requests made by the downloader. + RequestOptions []request.Option +} + +// WithDownloaderRequestOptions appends to the Downloader's API request options. +func WithDownloaderRequestOptions(opts ...request.Option) func(*Downloader) { + return func(d *Downloader) { + d.RequestOptions = append(d.RequestOptions, opts...) + } } // NewDownloader creates a new Downloader instance to downloads objects from @@ -119,32 +134,125 @@ type maxRetrier interface { // // The w io.WriterAt can be satisfied by an os.File to do multipart concurrent // downloads, or in memory []byte wrapper using aws.WriteAtBuffer. +// +// If the GetObjectInput's Range value is provided that will cause the downloader +// to perform a single GetObjectInput request for that object's range. This will +// caused the part size, and concurrency configurations to be ignored. func (d Downloader) Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error) { - impl := downloader{w: w, in: input, ctx: d} + return d.DownloadWithContext(aws.BackgroundContext(), w, input, options...) +} + +// DownloadWithContext downloads an object in S3 and writes the payload into w +// using concurrent GET requests. +// +// DownloadWithContext is the same as Download with the additional support for +// Context input parameters. The Context must not be nil. A nil Context will +// cause a panic. Use the Context to add deadlining, timeouts, ect. The +// DownloadWithContext may create sub-contexts for individual underlying +// requests. +// +// Additional functional options can be provided to configure the individual +// download. These options are copies of the Downloader instance Download is +// called from. Modifying the options will not impact the original Downloader +// instance. Use the WithDownloaderRequestOptions helper function to pass in request +// options that will be applied to all API operations made with this downloader. +// +// The w io.WriterAt can be satisfied by an os.File to do multipart concurrent +// downloads, or in memory []byte wrapper using aws.WriteAtBuffer. +// +// It is safe to call this method concurrently across goroutines. +// +// If the GetObjectInput's Range value is provided that will cause the downloader +// to perform a single GetObjectInput request for that object's range. This will +// caused the part size, and concurrency configurations to be ignored. +func (d Downloader) DownloadWithContext(ctx aws.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error) { + impl := downloader{w: w, in: input, cfg: d, ctx: ctx} for _, option := range options { - option(&impl.ctx) + option(&impl.cfg) } + impl.cfg.RequestOptions = append(impl.cfg.RequestOptions, request.WithAppendUserAgent("S3Manager")) if s, ok := d.S3.(maxRetrier); ok { impl.partBodyMaxRetries = s.MaxRetries() } impl.totalBytes = -1 - if impl.ctx.Concurrency == 0 { - impl.ctx.Concurrency = DefaultDownloadConcurrency + if impl.cfg.Concurrency == 0 { + impl.cfg.Concurrency = DefaultDownloadConcurrency } - if impl.ctx.PartSize == 0 { - impl.ctx.PartSize = DefaultDownloadPartSize + if impl.cfg.PartSize == 0 { + impl.cfg.PartSize = DefaultDownloadPartSize } return impl.download() } +// DownloadWithIterator will download a batched amount of objects in S3 and writes them +// to the io.WriterAt specificed in the iterator. +// +// Example: +// svc := s3manager.NewDownloader(session) +// +// fooFile, err := os.Open("/tmp/foo.file") +// if err != nil { +// return err +// } +// +// barFile, err := os.Open("/tmp/bar.file") +// if err != nil { +// return err +// } +// +// objects := []s3manager.BatchDownloadObject { +// { +// Input: &s3.GetObjectInput { +// Bucket: aws.String("bucket"), +// Key: aws.String("foo"), +// }, +// Writer: fooFile, +// }, +// { +// Input: &s3.GetObjectInput { +// Bucket: aws.String("bucket"), +// Key: aws.String("bar"), +// }, +// Writer: barFile, +// }, +// } +// +// iter := &s3manager.DownloadObjectsIterator{Objects: objects} +// if err := svc.DownloadWithIterator(aws.BackgroundContext(), iter); err != nil { +// return err +// } +func (d Downloader) DownloadWithIterator(ctx aws.Context, iter BatchDownloadIterator, opts ...func(*Downloader)) error { + var errs []Error + for iter.Next() { + object := iter.DownloadObject() + if _, err := d.DownloadWithContext(ctx, object.Writer, object.Object, opts...); err != nil { + errs = append(errs, newError(err, object.Object.Bucket, object.Object.Key)) + } + + if object.After == nil { + continue + } + + if err := object.After(); err != nil { + errs = append(errs, newError(err, object.Object.Bucket, object.Object.Key)) + } + } + + if len(errs) > 0 { + return NewBatchError("BatchedDownloadIncomplete", "some objects have failed to download.", errs) + } + return nil +} + // downloader is the implementation structure used internally by Downloader. type downloader struct { - ctx Downloader + ctx aws.Context + cfg Downloader in *s3.GetObjectInput w io.WriterAt @@ -163,14 +271,22 @@ type downloader struct { // download performs the implementation of the object download across ranged // GETs. func (d *downloader) download() (n int64, err error) { + // If range is specified fall back to single download of that range + // this enables the functionality of ranged gets with the downloader but + // at the cost of no multipart downloads. + if rng := aws.StringValue(d.in.Range); len(rng) > 0 { + d.downloadRange(rng) + return d.written, d.err + } + // Spin off first worker to check additional header information d.getChunk() if total := d.getTotalBytes(); total >= 0 { // Spin up workers - ch := make(chan dlchunk, d.ctx.Concurrency) + ch := make(chan dlchunk, d.cfg.Concurrency) - for i := 0; i < d.ctx.Concurrency; i++ { + for i := 0; i < d.cfg.Concurrency; i++ { d.wg.Add(1) go d.downloadPart(ch) } @@ -182,8 +298,8 @@ func (d *downloader) download() (n int64, err error) { } // Queue the next range of bytes to read. - ch <- dlchunk{w: d.w, start: d.pos, size: d.ctx.PartSize} - d.pos += d.ctx.PartSize + ch <- dlchunk{w: d.w, start: d.pos, size: d.cfg.PartSize} + d.pos += d.cfg.PartSize } // Wait for completion @@ -219,13 +335,17 @@ func (d *downloader) downloadPart(ch chan dlchunk) { defer d.wg.Done() for { chunk, ok := <-ch - if !ok || d.getErr() != nil { + if !ok { break } + if d.getErr() != nil { + // Drain the channel if there is an error, to prevent deadlocking + // of download producer. + continue + } if err := d.downloadChunk(chunk); err != nil { d.setErr(err) - break } } } @@ -237,30 +357,46 @@ func (d *downloader) getChunk() { return } - chunk := dlchunk{w: d.w, start: d.pos, size: d.ctx.PartSize} - d.pos += d.ctx.PartSize + chunk := dlchunk{w: d.w, start: d.pos, size: d.cfg.PartSize} + d.pos += d.cfg.PartSize if err := d.downloadChunk(chunk); err != nil { d.setErr(err) } } -// downloadChunk downloads the chunk froom s3 +// downloadRange downloads an Object given the passed in Byte-Range value. +// The chunk used down download the range will be configured for that range. +func (d *downloader) downloadRange(rng string) { + if d.getErr() != nil { + return + } + + chunk := dlchunk{w: d.w, start: d.pos} + // Ranges specified will short circuit the multipart download + chunk.withRange = rng + + if err := d.downloadChunk(chunk); err != nil { + d.setErr(err) + } + + // Update the position based on the amount of data received. + d.pos = d.written +} + +// downloadChunk downloads the chunk from s3 func (d *downloader) downloadChunk(chunk dlchunk) error { in := &s3.GetObjectInput{} awsutil.Copy(in, d.in) // Get the next byte range of data - rng := fmt.Sprintf("bytes=%d-%d", chunk.start, chunk.start+chunk.size-1) - in.Range = &rng + in.Range = aws.String(chunk.ByteRange()) var n int64 var err error for retry := 0; retry <= d.partBodyMaxRetries; retry++ { - req, resp := d.ctx.S3.GetObjectRequest(in) - req.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler("S3Manager")) - - err = req.Send() + var resp *s3.GetObjectOutput + resp, err = d.cfg.S3.GetObjectWithContext(d.ctx, in, d.cfg.RequestOptions...) if err != nil { return err } @@ -273,7 +409,7 @@ func (d *downloader) downloadChunk(chunk dlchunk) error { } chunk.cur = 0 - logMessage(d.ctx.S3, aws.LogDebugWithRequestRetries, + logMessage(d.cfg.S3, aws.LogDebugWithRequestRetries, fmt.Sprintf("DEBUG: object part body download interrupted %s, err, %v, retrying attempt %d", aws.StringValue(in.Key), err, retry)) } @@ -320,7 +456,7 @@ func (d *downloader) setTotalBytes(resp *s3.GetObjectOutput) { } if resp.ContentRange == nil { - // ContentRange is nil when the full file contents is provied, and + // ContentRange is nil when the full file contents is provided, and // is not chunked. Use ContentLength instead. if resp.ContentLength != nil { d.totalBytes = *resp.ContentLength @@ -379,12 +515,18 @@ type dlchunk struct { start int64 size int64 cur int64 + + // specifies the byte range the chunk should be downloaded with. + withRange string } // Write wraps io.WriterAt for the dlchunk, writing from the dlchunk's start // position to its end (or EOF). +// +// If a range is specified on the dlchunk the size will be ignored when writing. +// as the total size may not of be known ahead of time. func (c *dlchunk) Write(p []byte) (n int, err error) { - if c.cur >= c.size { + if c.cur >= c.size && len(c.withRange) == 0 { return 0, io.EOF } @@ -393,3 +535,13 @@ func (c *dlchunk) Write(p []byte) (n int, err error) { return } + +// ByteRange returns a HTTP Byte-Range header value that should be used by the +// client to request the chunk's range. +func (c *dlchunk) ByteRange() string { + if len(c.withRange) != 0 { + return c.withRange + } + + return fmt.Sprintf("bytes=%d-%d", c.start, c.start+c.size-1) +} |