Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support HTTP, HTTPS schemes as well as whatever Go CDK supports for blob storage #1280

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/cli/mcap/cmd/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ func run(filterOptions *filterOpts, args []string) {
die("please supply a file. see --help for usage details.")
}
} else {
closeFile, newReader, err := utils.GetReader(context.Background(), args[0])
newReader, _, err := utils.GetReader(context.Background(), args[0])
if err != nil {
die("failed to open source for reading: %s", err)
}
defer func() {
if closeErr := closeFile(); closeErr != nil {
if closeErr := newReader.Close(); closeErr != nil {
die("error closing read source: %s", closeErr)
}
}()
Expand Down
3 changes: 2 additions & 1 deletion go/cli/mcap/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/foxglove/mcap/go/cli/mcap
go 1.20

require (
cloud.google.com/go/storage v1.23.0
gocloud.dev v0.40.0
github.com/jfbus/httprs v1.0.1
github.com/fatih/color v1.13.0
github.com/foxglove/go-rosbag v0.0.6
github.com/foxglove/mcap/go/mcap v0.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@ import (
"fmt"
"io"

"cloud.google.com/go/storage"
"gocloud.dev/blob"
)

type GCSReadSeekCloser struct {
type GoCloudReadSeekCloser struct {
size int64
object *storage.ObjectHandle
key string
ctx context.Context
offset int64
r io.ReadCloser
bucket *blob.Bucket
}

func (r *GCSReadSeekCloser) Read(p []byte) (int, error) {
func (r *GoCloudReadSeekCloser) Read(p []byte) (int, error) {
n, err := r.r.Read(p)
r.offset += int64(n)
return n, err
}

func (r *GCSReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
func (r *GoCloudReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
var seekTo int64
switch whence {
case io.SeekCurrent:
Expand All @@ -41,7 +42,7 @@ func (r *GCSReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
if err != nil {
return 0, err
}
reader, err := r.object.NewRangeReader(r.ctx, seekTo, -1)
reader, err := r.bucket.NewRangeReader(r.ctx, r.key, seekTo, -1, nil)
if err != nil {
return 0, err
}
Expand All @@ -51,19 +52,21 @@ func (r *GCSReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
return seekTo, nil
}

func (r *GCSReadSeekCloser) Close() error {
func (r *GoCloudReadSeekCloser) Close() error {
return r.r.Close()
}

func NewGCSReadSeekCloser(ctx context.Context, object *storage.ObjectHandle) (*GCSReadSeekCloser, error) {
r, err := object.NewReader(ctx)
func NewGoCloudReadSeekCloser(ctx context.Context, bucket *blob.Bucket, key string) (*GoCloudReadSeekCloser, error) {
r, err := bucket.NewReader(ctx, key, nil)
if err != nil {
return nil, err
}
return &GCSReadSeekCloser{
size: r.Attrs.Size,
object: object,

return &GoCloudReadSeekCloser{
size: r.Size(),
key: key,
r: r,
ctx: ctx,
bucket: bucket,
}, nil
}
109 changes: 50 additions & 59 deletions go/cli/mcap/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,40 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"regexp"
"time"

"cloud.google.com/go/storage"
"github.com/jfbus/httprs"
"github.com/olekukonko/tablewriter"
"github.com/schollz/progressbar/v3"
"gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob" // blank import recommended by https://gocloud.dev/howto/blob/#opening
_ "gocloud.dev/blob/gcsblob" // blank import recommended by https://gocloud.dev/howto/blob/#opening
_ "gocloud.dev/blob/s3blob" // blank import recommended by https://gocloud.dev/howto/blob/#opening
)

var (
remoteFileRegex = regexp.MustCompile(`(?P<Scheme>\w+)://(?P<Bucket>[a-z0-9_.-]+)/(?P<Filename>.*)`)
schemeRegex = regexp.MustCompile(`(?P<Scheme>\w+)://(?P<Path>.*)`)
bucketRegex = regexp.MustCompile(`(?P<Bucket>[a-z0-9_.-]+)/(?P<Filename>.*)`)
)

func GetScheme(filename string) (match1 string, match2 string, match3 string) {
match := remoteFileRegex.FindStringSubmatch(filename)
func GetSchemeFromURI(uri string) (scheme string, path string) {
match := schemeRegex.FindStringSubmatch(uri)
if len(match) == 0 {
return "", "", filename
// Probably just a raw path
return "", uri
}
return match[1], match[2], match[3]
return match[1], match[2]
}

func GetBucketFromPath(path string) (bucket string, filename string) {
match := bucketRegex.FindStringSubmatch(path)
if len(match) == 0 {
return "", path
}
return match[1], match[2]
}

func ReadingStdin() (bool, error) {
Expand All @@ -42,66 +57,42 @@ func StdoutRedirected() bool {
return true
}

func GetReader(ctx context.Context, filename string) (func() error, io.ReadSeekCloser, error) {
var rs io.ReadSeekCloser
var err error
closeReader := func() error { return nil }
scheme, bucket, path := GetScheme(filename)
if scheme != "" {
switch scheme {
case "gs":
client, err := storage.NewClient(ctx)
if err != nil {
return closeReader, nil, fmt.Errorf("failed to create GCS client: %w", err)
}
closeReader = client.Close
object := client.Bucket(bucket).Object(path)
rs, err = NewGCSReadSeekCloser(ctx, object)
if err != nil {
return closeReader, nil, fmt.Errorf("failed to build read seek closer: %w", err)
}
default:
return closeReader, nil, fmt.Errorf("unsupported remote file scheme: %s", scheme)
func GetReader(ctx context.Context, uri string) (io.ReadSeekCloser, bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be useful to add a docstring here describing the behavior of the new bool returned

scheme, path := GetSchemeFromURI(uri)
switch scheme {
case "":
// Assume that a URI without a scheme is a local path
rs, err := os.Open(path)
return rs, false, err
case "http", "https":
resp, err := http.Get(uri)
if err != nil {
return nil, true, err
}
rs := httprs.NewHttpReadSeeker(resp)
return rs, true, nil
default:
// Assume that any other scheme can be handled by Go CDK
bucket, filename := GetBucketFromPath(path)
bucketClient, err := blob.OpenBucket(ctx, fmt.Sprintf("%v://%v", scheme, bucket))
if err != nil {
return nil, true, err
}
} else {
rs, err = os.Open(path)
rs, err := NewGoCloudReadSeekCloser(ctx, bucketClient, filename)
if err != nil {
return nil, nil, fmt.Errorf("failed to open local file")
return nil, true, err
}
return rs, true, err
}

return closeReader, rs, nil
}

func WithReader(ctx context.Context, filename string, f func(remote bool, rs io.ReadSeeker) error) error {
var err error
var rs io.ReadSeekCloser
var remote bool
scheme, bucket, path := GetScheme(filename)
if scheme != "" {
remote = true
switch scheme {
case "gs":
client, err := storage.NewClient(ctx)
if err != nil {
return fmt.Errorf("failed to create GCS client: %w", err)
}
object := client.Bucket(bucket).Object(path)
rs, err = NewGCSReadSeekCloser(ctx, object)
if err != nil {
return fmt.Errorf("failed to build read seek closer: %w", err)
}
default:
return fmt.Errorf("unsupported remote file scheme: %s", scheme)
}
} else {
rs, err = os.Open(path)
if err != nil {
return fmt.Errorf("failed to open local file")
}
func WithReader(ctx context.Context, uri string, f func(remote bool, rs io.ReadSeeker) error) error {
reader, remote, err := GetReader(ctx, uri)
if err != nil {
return err
}
defer rs.Close()
return f(remote, rs)
defer reader.Close()
return f(remote, reader)
}

func FormatTable(w io.Writer, rows [][]string) {
Expand Down
60 changes: 48 additions & 12 deletions go/cli/mcap/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,76 @@ import (
"github.com/stretchr/testify/assert"
)

func TestGetScheme(t *testing.T) {
func TestGetSchemFromURI(t *testing.T) {
cases := []struct {
assertion string
input string
expectedScheme string
expectedBucket string
expectedFilename string
assertion string
input string
expectedScheme string
expectedPath string
}{
{
"local file",
"foo/bar/baz.txt",
"",
"",
"foo/bar/baz.txt",
},
{
"remote file",
"gs://foo/bar/baz.txt",
"gs",
"foo",
"bar/baz.txt",
"foo/bar/baz.txt",
},
{
"remote file",
"gs://foo-bar.com123/bar/baz.txt",
"gs",
"foo-bar.com123",
"bar/baz.txt",
"foo-bar.com123/bar/baz.txt",
},
{
"remote file",
"s3://foo-bar.com/bar/baz.txt",
"s3",
"foo-bar.com/bar/baz.txt",
},
{
"remote file",
"http://foo-bar.com/bar/baz.txt",
"http",
"foo-bar.com/bar/baz.txt",
},
}
for _, c := range cases {
t.Run(c.assertion, func(t *testing.T) {
scheme, bucket, filename := GetScheme(c.input)
scheme, path := GetSchemeFromURI(c.input)
assert.Equal(t, c.expectedScheme, scheme)
assert.Equal(t, c.expectedPath, path)
})
}
}

func TestGetBucketFromPath(t *testing.T) {
cases := []struct {
assertion string
input string
expectedBucket string
expectedFilename string
}{
{
"Simple structure",
"foo/bar.txt",
"foo",
"bar.txt",
},
{
"Complex structure",
"foo.com/bar/baz.txt",
"foo.com",
"bar/baz.txt",
},
}
for _, c := range cases {
t.Run(c.assertion, func(t *testing.T) {
bucket, filename := GetBucketFromPath(c.input)
assert.Equal(t, c.expectedBucket, bucket)
assert.Equal(t, c.expectedFilename, filename)
})
Expand Down
Loading
Loading