transfermanager

package
v1.43.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package transfermanager provides an easy way to parallelize downloads in Google Cloud Storage.

More information about Google Cloud Storage is available at https://cloud.google.com/storage/docs.

See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.

NOTE: This package is in preview. It is not stable, and is likely to change.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DownloadBuffer added in v1.43.0

type DownloadBuffer struct {
	// contains filtered or unexported fields
}

DownloadBuffer satisfies the io.WriterAt interface, allowing you to use it as a buffer to download to when using Downloader. DownloadBuffer is thread-safe as long as the ranges being written to do not overlap.

func NewDownloadBuffer added in v1.43.0

func NewDownloadBuffer(buf []byte) *DownloadBuffer

NewDownloadBuffer initializes a DownloadBuffer using buf as the underlying buffer. Preferred way to create a DownloadBuffer as it does not need to grow the buffer if len(buf) is larger than or equal to the object length or range being downloaded to.

func (*DownloadBuffer) Bytes added in v1.43.0

func (db *DownloadBuffer) Bytes() []byte

Bytes returns the slice of bytes written to DownloadBuffer. The slice aliases the buffer content at least until the next buffer modification, so immediate changes to the slice will affect the result of future reads.

func (*DownloadBuffer) WriteAt added in v1.43.0

func (db *DownloadBuffer) WriteAt(p []byte, off int64) (n int, err error)

WriteAt writes len(p) bytes from p to the underlying buffer at offset off, growing the buffer if needed. It returns the number of bytes written from p and any error encountered that caused the write to stop early. WriteAt is thread-safe as long as the ranges being written to do not overlap. The supplied slice p is not retained.

type DownloadDirectoryInput added in v1.43.0

type DownloadDirectoryInput struct {
	// Bucket is the bucket in GCS to download from. Required.
	Bucket string

	// LocalDirectory specifies the directory to download the matched objects
	// to. Relative paths are allowed. The directory structure and contents
	// must not be modified while the download is in progress.
	// The directory will be created if it does not already exist. Required.
	LocalDirectory string

	// Prefix is the prefix filter to download objects whose names begin with this.
	// Optional.
	Prefix string

	// StartOffset is used to filter results to objects whose names are
	// lexicographically equal to or after startOffset. If endOffset is also
	// set, the objects listed will have names between startOffset (inclusive)
	// and endOffset (exclusive). Optional.
	StartOffset string

	// EndOffset is used to filter results to objects whose names are
	// lexicographically before endOffset. If startOffset is also set, the
	// objects listed will have names between startOffset (inclusive) and
	// endOffset (exclusive). Optional.
	EndOffset string

	// MatchGlob is a glob pattern used to filter results (for example, foo*bar). See
	// https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob
	// for syntax details. Optional.
	MatchGlob string

	// Callback will run after all the objects in the directory as selected by
	// the provided filters are finished downloading.
	// It must be set if and only if the [WithCallbacks] option is set.
	// WaitAndClose will wait for all callbacks to finish.
	Callback func([]DownloadOutput)

	// OnObjectDownload will run after every finished object download. Optional.
	OnObjectDownload func(*DownloadOutput)
}

DownloadDirectoryInput is the input for a directory to download.

type DownloadObjectInput

type DownloadObjectInput struct {
	// Required fields
	Bucket      string
	Object      string
	Destination io.WriterAt

	// Optional fields
	Generation    *int64
	Conditions    *storage.Conditions
	EncryptionKey []byte
	Range         *DownloadRange // if specified, reads only a range

	// Callback will be run once the object is finished downloading. It must be
	// set if and only if the [WithCallbacks] option is set; otherwise, it must
	// not be set.
	// A worker will be used to execute the callback; therefore, it should not
	// be a long-running function. WaitAndClose will wait for all callbacks to
	// finish.
	Callback func(*DownloadOutput)
	// contains filtered or unexported fields
}

DownloadObjectInput is the input for a single object to download.

type DownloadOutput

type DownloadOutput struct {
	Bucket string
	Object string
	Range  *DownloadRange             // requested range, if it was specified
	Err    error                      // error occurring during download
	Attrs  *storage.ReaderObjectAttrs // attributes of downloaded object, if successful
}

DownloadOutput provides output for a single object download, including all errors received while downloading object parts. If the download was successful, Attrs will be populated.

type DownloadRange

type DownloadRange struct {
	// Offset is the starting offset (inclusive) from with the object is read.
	// If offset is negative, the object is not sharded and is read by a single
	// worker abs(offset) bytes from the end, and length must also be negative
	// to indicate all remaining bytes will be read.
	Offset int64
	// Length is the number of bytes to read.
	// If length is negative or larger than the object size, the object is read
	// until the end.
	Length int64
}

DownloadRange specifies the object range. If the object's metadata property "Content-Encoding" is set to "gzip" or satisfies decompressive transcoding per https://cloud.google.com/storage/docs/transcoding that file will be served back whole, regardless of the requested range as Google Cloud Storage dictates.

type Downloader

type Downloader struct {
	// contains filtered or unexported fields
}

Downloader manages a set of parallelized downloads.

Example (Asynchronous)
package main

import (
	"context"
	"log"
	"os"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/transfermanager"
)

func main() {
	ctx := context.Background()
	// Pass in any client opts or set retry policy here.
	client, err := storage.NewClient(ctx) // can also use NewGRPCClient
	if err != nil {
		// handle error
	}

	// Create Downloader with callbacks plus any desired options, including
	// number of workers, part size, per operation timeout, etc.
	d, err := transfermanager.NewDownloader(client, transfermanager.WithCallbacks())
	if err != nil {
		// handle error
	}
	defer func() {
		if _, err := d.WaitAndClose(); err != nil {
			// one or more of the downloads failed
		}
	}()

	// Create local file writer for output.
	f, err := os.Create("/path/to/localfile")
	if err != nil {
		// handle error
	}

	// Create callback function
	callback := func(out *transfermanager.DownloadOutput) {
		if out.Err != nil {
			log.Printf("download of %v failed with error %v", out.Object, out.Err)
		} else {
			log.Printf("download of %v succeeded", out.Object)
		}
	}

	// Create download input
	in := &transfermanager.DownloadObjectInput{
		Bucket:      "mybucket",
		Object:      "myblob",
		Destination: f,
		// Optionally specify params to apply to download.
		EncryptionKey: []byte("mykey"),
		// Specify the callback
		Callback: callback,
	}

	// Add to Downloader.
	if err := d.DownloadObject(ctx, in); err != nil {
		// handle error
	}

	// Repeat if desired.
}
Output:

Example (Synchronous)
package main

import (
	"context"
	"log"
	"os"

	"cloud.google.com/go/storage"
	"cloud.google.com/go/storage/transfermanager"
)

func main() {
	ctx := context.Background()
	// Pass in any client opts or set retry policy here.
	client, err := storage.NewClient(ctx) // can also use NewGRPCClient
	if err != nil {
		// handle error
	}

	// Create Downloader with desired options, including number of workers,
	// part size, per operation timeout, etc.
	d, err := transfermanager.NewDownloader(client, transfermanager.WithWorkers(16))
	if err != nil {
		// handle error
	}

	// Create local file writer for output.
	f, err := os.Create("/path/to/localfile")
	if err != nil {
		// handle error
	}

	// Create download input
	in := &transfermanager.DownloadObjectInput{
		Bucket:      "mybucket",
		Object:      "myblob",
		Destination: f,
		// Optionally specify params to apply to download.
		EncryptionKey: []byte("mykey"),
	}

	// Add to Downloader.
	if err := d.DownloadObject(ctx, in); err != nil {
		// handle error
	}

	// Repeat if desired.

	// Wait for all downloads to complete.
	results, err := d.WaitAndClose()
	if err != nil {
		// handle error
	}

	// Iterate through completed downloads and process results.
	for _, out := range results {
		if out.Err != nil {
			log.Printf("download of %v failed with error %v", out.Object, out.Err)
		} else {
			log.Printf("download of %v succeeded", out.Object)
		}
	}
}
Output:

func NewDownloader

func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error)

NewDownloader creates a new Downloader to add operations to. Choice of transport, etc is configured on the client that's passed in. The returned Downloader can be shared across goroutines to initiate downloads.

func (*Downloader) DownloadDirectory added in v1.43.0

func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) error

DownloadDirectory queues the download of a set of objects to a local path. This will initiate the download but is non-blocking; call Downloader.Results or use the callback to process the result. DownloadDirectory is thread-safe and can be called simultaneously from different goroutines. DownloadDirectory will resolve any filters on the input and create the needed directory structure locally as the operations progress. Note: DownloadDirectory overwrites existing files in the directory.

func (*Downloader) DownloadObject

func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) error

DownloadObject queues the download of a single object. This will initiate the download but is non-blocking; call Downloader.Results or use the callback to process the result. DownloadObject is thread-safe and can be called simultaneously from different goroutines. The download may not start immediately if all workers are busy, so a deadline set on the ctx may time out before the download even starts. To set a timeout that starts with the download, use the [WithPerOpTimeout()] option.

func (*Downloader) WaitAndClose

func (d *Downloader) WaitAndClose() ([]DownloadOutput, error)

WaitAndClose waits for all outstanding downloads to complete and closes the Downloader. Adding new downloads after this has been called will cause an error.

WaitAndClose returns all the results of the downloads and an error wrapping all errors that were encountered by the Downloader when downloading objects. These errors are also returned in the respective DownloadOutput for the failing download. The results are not guaranteed to be in any order. Results will be empty if using the WithCallbacks option. WaitAndClose will wait for all callbacks to finish.

type Option

type Option interface {
	// contains filtered or unexported methods
}

A Option is an option for a transfermanager Downloader or Uploader.

func WithCallbacks

func WithCallbacks() Option

WithCallbacks returns a TransferManagerOption that allows the use of callbacks to process the results. If this option is set, then results will not be returned by Downloader.WaitAndClose and must be processed through the callback.

func WithPartSize added in v1.43.0

func WithPartSize(partSize int64) Option

WithPartSize returns a TransferManagerOption that specifies the size of the shards to transfer; that is, if the object is larger than this size, it will be uploaded or downloaded in concurrent pieces. The default is 32 MiB for downloads. Note that files that support decompressive transcoding will be downloaded in a single piece regardless of the partSize set here.

func WithPerOpTimeout

func WithPerOpTimeout(timeout time.Duration) Option

WithPerOpTimeout returns a TransferManagerOption that sets a timeout on each operation that is performed to download or upload an object. The timeout is set when the operation begins processing, not when it is added. By default, no timeout is set other than an overall timeout as set on the provided context.

func WithWorkers

func WithWorkers(numWorkers int) Option

WithWorkers returns a TransferManagerOption that specifies the maximum number of concurrent goroutines that will be used to download or upload objects. Defaults to runtime.NumCPU()/2.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL