summaryrefslogtreecommitdiff
path: root/vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go')
-rw-r--r--vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go204
1 files changed, 178 insertions, 26 deletions
diff --git a/vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go b/vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go
index 0841077a..d30f2b6b 100644
--- a/vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go
+++ b/vendor/github.com/mitchellh/packer/vendor/github.com/aws/aws-sdk-go/service/s3/s3manager/download.go
@@ -31,15 +31,30 @@ const DefaultDownloadConcurrency = 5
type Downloader struct {
// The buffer size (in bytes) to use when buffering data into chunks and
// sending them as parts to S3. The minimum allowed part size is 5MB, and
- // if this value is set to zero, the DefaultPartSize value will be used.
+ // if this value is set to zero, the DefaultDownloadPartSize value will be used.
+ //
+ // PartSize is ignored if the Range input parameter is provided.
PartSize int64
// The number of goroutines to spin up in parallel when sending parts.
// If this is set to zero, the DefaultDownloadConcurrency value will be used.
+ //
+ // Concurrency is ignored if the Range input parameter is provided.
Concurrency int
// An S3 client to use when performing downloads.
S3 s3iface.S3API
+
+ // List of request options that will be passed down to individual API
+ // operation requests made by the downloader.
+ RequestOptions []request.Option
+}
+
+// WithDownloaderRequestOptions appends to the Downloader's API request options.
+func WithDownloaderRequestOptions(opts ...request.Option) func(*Downloader) {
+ return func(d *Downloader) {
+ d.RequestOptions = append(d.RequestOptions, opts...)
+ }
}
// NewDownloader creates a new Downloader instance to downloads objects from
@@ -119,32 +134,125 @@ type maxRetrier interface {
//
// The w io.WriterAt can be satisfied by an os.File to do multipart concurrent
// downloads, or in memory []byte wrapper using aws.WriteAtBuffer.
+//
+// If the GetObjectInput's Range value is provided that will cause the downloader
+// to perform a single GetObjectInput request for that object's range. This will
+// caused the part size, and concurrency configurations to be ignored.
func (d Downloader) Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error) {
- impl := downloader{w: w, in: input, ctx: d}
+ return d.DownloadWithContext(aws.BackgroundContext(), w, input, options...)
+}
+
+// DownloadWithContext downloads an object in S3 and writes the payload into w
+// using concurrent GET requests.
+//
+// DownloadWithContext is the same as Download with the additional support for
+// Context input parameters. The Context must not be nil. A nil Context will
+// cause a panic. Use the Context to add deadlining, timeouts, ect. The
+// DownloadWithContext may create sub-contexts for individual underlying
+// requests.
+//
+// Additional functional options can be provided to configure the individual
+// download. These options are copies of the Downloader instance Download is
+// called from. Modifying the options will not impact the original Downloader
+// instance. Use the WithDownloaderRequestOptions helper function to pass in request
+// options that will be applied to all API operations made with this downloader.
+//
+// The w io.WriterAt can be satisfied by an os.File to do multipart concurrent
+// downloads, or in memory []byte wrapper using aws.WriteAtBuffer.
+//
+// It is safe to call this method concurrently across goroutines.
+//
+// If the GetObjectInput's Range value is provided that will cause the downloader
+// to perform a single GetObjectInput request for that object's range. This will
+// caused the part size, and concurrency configurations to be ignored.
+func (d Downloader) DownloadWithContext(ctx aws.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error) {
+ impl := downloader{w: w, in: input, cfg: d, ctx: ctx}
for _, option := range options {
- option(&impl.ctx)
+ option(&impl.cfg)
}
+ impl.cfg.RequestOptions = append(impl.cfg.RequestOptions, request.WithAppendUserAgent("S3Manager"))
if s, ok := d.S3.(maxRetrier); ok {
impl.partBodyMaxRetries = s.MaxRetries()
}
impl.totalBytes = -1
- if impl.ctx.Concurrency == 0 {
- impl.ctx.Concurrency = DefaultDownloadConcurrency
+ if impl.cfg.Concurrency == 0 {
+ impl.cfg.Concurrency = DefaultDownloadConcurrency
}
- if impl.ctx.PartSize == 0 {
- impl.ctx.PartSize = DefaultDownloadPartSize
+ if impl.cfg.PartSize == 0 {
+ impl.cfg.PartSize = DefaultDownloadPartSize
}
return impl.download()
}
+// DownloadWithIterator will download a batched amount of objects in S3 and writes them
+// to the io.WriterAt specificed in the iterator.
+//
+// Example:
+// svc := s3manager.NewDownloader(session)
+//
+// fooFile, err := os.Open("/tmp/foo.file")
+// if err != nil {
+// return err
+// }
+//
+// barFile, err := os.Open("/tmp/bar.file")
+// if err != nil {
+// return err
+// }
+//
+// objects := []s3manager.BatchDownloadObject {
+// {
+// Input: &s3.GetObjectInput {
+// Bucket: aws.String("bucket"),
+// Key: aws.String("foo"),
+// },
+// Writer: fooFile,
+// },
+// {
+// Input: &s3.GetObjectInput {
+// Bucket: aws.String("bucket"),
+// Key: aws.String("bar"),
+// },
+// Writer: barFile,
+// },
+// }
+//
+// iter := &s3manager.DownloadObjectsIterator{Objects: objects}
+// if err := svc.DownloadWithIterator(aws.BackgroundContext(), iter); err != nil {
+// return err
+// }
+func (d Downloader) DownloadWithIterator(ctx aws.Context, iter BatchDownloadIterator, opts ...func(*Downloader)) error {
+ var errs []Error
+ for iter.Next() {
+ object := iter.DownloadObject()
+ if _, err := d.DownloadWithContext(ctx, object.Writer, object.Object, opts...); err != nil {
+ errs = append(errs, newError(err, object.Object.Bucket, object.Object.Key))
+ }
+
+ if object.After == nil {
+ continue
+ }
+
+ if err := object.After(); err != nil {
+ errs = append(errs, newError(err, object.Object.Bucket, object.Object.Key))
+ }
+ }
+
+ if len(errs) > 0 {
+ return NewBatchError("BatchedDownloadIncomplete", "some objects have failed to download.", errs)
+ }
+ return nil
+}
+
// downloader is the implementation structure used internally by Downloader.
type downloader struct {
- ctx Downloader
+ ctx aws.Context
+ cfg Downloader
in *s3.GetObjectInput
w io.WriterAt
@@ -163,14 +271,22 @@ type downloader struct {
// download performs the implementation of the object download across ranged
// GETs.
func (d *downloader) download() (n int64, err error) {
+ // If range is specified fall back to single download of that range
+ // this enables the functionality of ranged gets with the downloader but
+ // at the cost of no multipart downloads.
+ if rng := aws.StringValue(d.in.Range); len(rng) > 0 {
+ d.downloadRange(rng)
+ return d.written, d.err
+ }
+
// Spin off first worker to check additional header information
d.getChunk()
if total := d.getTotalBytes(); total >= 0 {
// Spin up workers
- ch := make(chan dlchunk, d.ctx.Concurrency)
+ ch := make(chan dlchunk, d.cfg.Concurrency)
- for i := 0; i < d.ctx.Concurrency; i++ {
+ for i := 0; i < d.cfg.Concurrency; i++ {
d.wg.Add(1)
go d.downloadPart(ch)
}
@@ -182,8 +298,8 @@ func (d *downloader) download() (n int64, err error) {
}
// Queue the next range of bytes to read.
- ch <- dlchunk{w: d.w, start: d.pos, size: d.ctx.PartSize}
- d.pos += d.ctx.PartSize
+ ch <- dlchunk{w: d.w, start: d.pos, size: d.cfg.PartSize}
+ d.pos += d.cfg.PartSize
}
// Wait for completion
@@ -219,13 +335,17 @@ func (d *downloader) downloadPart(ch chan dlchunk) {
defer d.wg.Done()
for {
chunk, ok := <-ch
- if !ok || d.getErr() != nil {
+ if !ok {
break
}
+ if d.getErr() != nil {
+ // Drain the channel if there is an error, to prevent deadlocking
+ // of download producer.
+ continue
+ }
if err := d.downloadChunk(chunk); err != nil {
d.setErr(err)
- break
}
}
}
@@ -237,30 +357,46 @@ func (d *downloader) getChunk() {
return
}
- chunk := dlchunk{w: d.w, start: d.pos, size: d.ctx.PartSize}
- d.pos += d.ctx.PartSize
+ chunk := dlchunk{w: d.w, start: d.pos, size: d.cfg.PartSize}
+ d.pos += d.cfg.PartSize
if err := d.downloadChunk(chunk); err != nil {
d.setErr(err)
}
}
-// downloadChunk downloads the chunk froom s3
+// downloadRange downloads an Object given the passed in Byte-Range value.
+// The chunk used down download the range will be configured for that range.
+func (d *downloader) downloadRange(rng string) {
+ if d.getErr() != nil {
+ return
+ }
+
+ chunk := dlchunk{w: d.w, start: d.pos}
+ // Ranges specified will short circuit the multipart download
+ chunk.withRange = rng
+
+ if err := d.downloadChunk(chunk); err != nil {
+ d.setErr(err)
+ }
+
+ // Update the position based on the amount of data received.
+ d.pos = d.written
+}
+
+// downloadChunk downloads the chunk from s3
func (d *downloader) downloadChunk(chunk dlchunk) error {
in := &s3.GetObjectInput{}
awsutil.Copy(in, d.in)
// Get the next byte range of data
- rng := fmt.Sprintf("bytes=%d-%d", chunk.start, chunk.start+chunk.size-1)
- in.Range = &rng
+ in.Range = aws.String(chunk.ByteRange())
var n int64
var err error
for retry := 0; retry <= d.partBodyMaxRetries; retry++ {
- req, resp := d.ctx.S3.GetObjectRequest(in)
- req.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler("S3Manager"))
-
- err = req.Send()
+ var resp *s3.GetObjectOutput
+ resp, err = d.cfg.S3.GetObjectWithContext(d.ctx, in, d.cfg.RequestOptions...)
if err != nil {
return err
}
@@ -273,7 +409,7 @@ func (d *downloader) downloadChunk(chunk dlchunk) error {
}
chunk.cur = 0
- logMessage(d.ctx.S3, aws.LogDebugWithRequestRetries,
+ logMessage(d.cfg.S3, aws.LogDebugWithRequestRetries,
fmt.Sprintf("DEBUG: object part body download interrupted %s, err, %v, retrying attempt %d",
aws.StringValue(in.Key), err, retry))
}
@@ -320,7 +456,7 @@ func (d *downloader) setTotalBytes(resp *s3.GetObjectOutput) {
}
if resp.ContentRange == nil {
- // ContentRange is nil when the full file contents is provied, and
+ // ContentRange is nil when the full file contents is provided, and
// is not chunked. Use ContentLength instead.
if resp.ContentLength != nil {
d.totalBytes = *resp.ContentLength
@@ -379,12 +515,18 @@ type dlchunk struct {
start int64
size int64
cur int64
+
+ // specifies the byte range the chunk should be downloaded with.
+ withRange string
}
// Write wraps io.WriterAt for the dlchunk, writing from the dlchunk's start
// position to its end (or EOF).
+//
+// If a range is specified on the dlchunk the size will be ignored when writing.
+// as the total size may not of be known ahead of time.
func (c *dlchunk) Write(p []byte) (n int, err error) {
- if c.cur >= c.size {
+ if c.cur >= c.size && len(c.withRange) == 0 {
return 0, io.EOF
}
@@ -393,3 +535,13 @@ func (c *dlchunk) Write(p []byte) (n int, err error) {
return
}
+
+// ByteRange returns a HTTP Byte-Range header value that should be used by the
+// client to request the chunk's range.
+func (c *dlchunk) ByteRange() string {
+ if len(c.withRange) != 0 {
+ return c.withRange
+ }
+
+ return fmt.Sprintf("bytes=%d-%d", c.start, c.start+c.size-1)
+}