From d84009f9bd24ba96ecc417b0fb8f9518adcd6d0e Mon Sep 17 00:00:00 2001 From: Morgan Fainberg Date: Thu, 21 Mar 2024 11:45:46 -0700 Subject: [PATCH] Add STDOut Consumer Add an STDOut consumer for usecases where streaming the bytes to another process is appropriate. If the consumer is STDOUT we are restricted to a single file download concurrently in multifile mode. --- cmd/multifile/manifest.go | 2 +- cmd/multifile/multifile.go | 10 +++++++++- pkg/config/config.go | 3 +++ pkg/consumer/stdout.go | 20 ++++++++++++++++++++ 4 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 pkg/consumer/stdout.go diff --git a/cmd/multifile/manifest.go b/cmd/multifile/manifest.go index d4cab5a..e320125 100644 --- a/cmd/multifile/manifest.go +++ b/cmd/multifile/manifest.go @@ -90,7 +90,7 @@ func parseManifest(file io.Reader) (pget.Manifest, error) { // and make the consumer responsible for knowing if this // is allowed/not allowed/etc consumer := viper.GetString(config.OptOutputConsumer) - if consumer != config.ConsumerNull { + if consumer != config.ConsumerNull && consumer != config.ConsumerSTDOUT { err = checkSeenDestinations(seenDestinations, dest, url) if err != nil { if errors.Is(err, errDupeURLDestCombo) { diff --git a/cmd/multifile/multifile.go b/cmd/multifile/multifile.go index 4da0f8f..d7e5f33 100644 --- a/cmd/multifile/multifile.go +++ b/cmd/multifile/multifile.go @@ -97,6 +97,8 @@ func maxConcurrentFiles() int { } func multifileExecute(ctx context.Context, manifest pget.Manifest) error { + logger := logging.GetLogger() + chunkSize, err := humanize.ParseBytes(viper.GetString(config.OptChunkSize)) if err != nil { return err @@ -131,6 +133,13 @@ func multifileExecute(ctx context.Context, manifest pget.Manifest) error { return fmt.Errorf("error getting consumer: %w", err) } + // Special case, if we're writing to stdout, we only want to download one file at a time, since we can only stream + // a single bytestream to STDOUT + if viper.GetString(config.OptOutputConsumer) == config.ConsumerSTDOUT { + logger.Info().Msg("Using single file mode for STDOUT consumer") + pgetOpts.MaxConcurrentFiles = 1 + } + getter := &pget.Getter{ Downloader: download.GetBufferMode(downloadOpts), Consumer: consumer, @@ -158,7 +167,6 @@ func multifileExecute(ctx context.Context, manifest pget.Manifest) error { } throughput := float64(totalFileSize) / elapsedTime.Seconds() - logger := logging.GetLogger() logger.Info(). Int("file_count", len(manifest)). Str("total_bytes_downloaded", humanize.Bytes(uint64(totalFileSize))). diff --git a/pkg/config/config.go b/pkg/config/config.go index b4d36c8..eacb118 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -20,6 +20,7 @@ const ( ConsumerFile = "file" ConsumerTarExtractor = "tar-extractor" ConsumerNull = "null" + ConsumerSTDOUT = "stdout" ) var ( @@ -153,6 +154,8 @@ func GetConsumer() (consumer.Consumer, error) { return &consumer.TarExtractor{Overwrite: enableOverwrite}, nil case ConsumerNull: return &consumer.NullWriter{}, nil + case ConsumerSTDOUT: + return &consumer.StdoutConsumer{}, nil default: return nil, fmt.Errorf("invalid consumer specified: %s", consumerName) } diff --git a/pkg/consumer/stdout.go b/pkg/consumer/stdout.go new file mode 100644 index 0000000..653e65c --- /dev/null +++ b/pkg/consumer/stdout.go @@ -0,0 +1,20 @@ +package consumer + +import ( + "fmt" + "io" + "os" +) + +var _ Consumer = &StdoutConsumer{} + +type StdoutConsumer struct { +} + +func (s StdoutConsumer) Consume(reader io.Reader, destPath string) error { + _, err := io.Copy(os.Stdout, reader) + if err != nil { + return fmt.Errorf("error writing to stdout: %w", err) + } + return nil +}