diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 68151fb..91e4dd7 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,9 +19,6 @@ jobs: - name: Set up uv uses: astral-sh/setup-uv@v3 - with: - enable-cache: true - python-version: "3.11" - name: Validate router configs run: | diff --git a/multipart_ops.go b/multipart_ops.go index ee56f1a..3f5ec30 100644 --- a/multipart_ops.go +++ b/multipart_ops.go @@ -49,7 +49,13 @@ func (c *router) UploadPart(ctx context.Context, in *s3.UploadPartInput, optFns ) // UploadPart typically handles large chunks (5MB-5GB), use streaming if in.ContentLength == nil || *in.ContentLength >= c.maxBufferBytes { - r1, r2, err = teeBody(ctx, in.Body) + // For large bodies, we can only support parallel actions (Mirror, BestEffort). + // Fallback requires buffering or seeking, which we can't do for generic streams. + if action == config.ActFallback { + return nil, fmt.Errorf("%s: fallback strategy is not supported for large or unknown-length streams", op) + } + tolerant := (action == config.ActBestEffort) + r1, r2, err = teeBody(ctx, in.Body, tolerant) } else { r1, r2, err = drainBody(ctx, in.Body) } diff --git a/ops.go b/ops.go index 7c2bd7c..502ad2c 100644 --- a/ops.go +++ b/ops.go @@ -56,22 +56,30 @@ func (c *router) PutObject( primB, secB := c.cfg.PhysicalBuckets(bucket) inPrimary, inSecondary := *in, *in inPrimary.Bucket, inSecondary.Bucket = aws.String(primB), aws.String(secB) - if action == config.ActMirror && in.Body != nil { + if (action == config.ActMirror || action == config.ActBestEffort || action == config.ActFallback) && in.Body != nil { var ( r1, r2 io.Reader err error ) // If ContentLength is not provided, S3 use chunked transfer encoding. if in.ContentLength == nil || *in.ContentLength >= c.maxBufferBytes { - r1, r2, err = teeBody(ctx, in.Body) + // For large bodies, we can only support parallel actions (Mirror, BestEffort). + // Fallback requires buffering or seeking, which we can't do for generic streams. + if action == config.ActFallback { + return nil, fmt.Errorf("%s: fallback strategy is not supported for large or unknown-length streams", op) + } + tolerant := (action == config.ActBestEffort) + r1, r2, err = teeBody(ctx, in.Body, tolerant) } else { r1, r2, err = drainBody(ctx, in.Body) } if err != nil { - return nil, fmt.Errorf("%s: failed to split body for mirror: %w", op, err) + return nil, fmt.Errorf("%s: failed to split body for %s: %w", op, action, err) + } + if r1 != nil && r2 != nil { + inPrimary.Body = r1 + inSecondary.Body = r2 } - inPrimary.Body = r1 - inSecondary.Body = r2 } return dispatch(ctx, action, func(ctx context.Context, st store.Store, in *s3.PutObjectInput) (*s3.PutObjectOutput, error) { diff --git a/router.go b/router.go index 3192c38..fc612ee 100644 --- a/router.go +++ b/router.go @@ -116,10 +116,10 @@ func doParallel[I any, T any]( return out, nil } // best-effort: fire-and-forget secondary - out, err := op(ctx, s1, in1) go func() { _, _ = op(ctx, s2, in2) }() + out, err := op(ctx, s1, in1) return out, err } @@ -134,24 +134,72 @@ func drainBody(ctx context.Context, r io.Reader) (io.ReadSeeker, io.ReadSeeker, return bytes.NewReader(data), bytes.NewReader(data), nil } -func teeBody(ctx context.Context, r io.Reader) (io.ReadCloser, io.ReadCloser, error) { +// tolerantWriter wraps an io.Writer and ignores errors from the underlying writer. +type tolerantWriter struct { + w io.Writer + failed bool +} + +func (t *tolerantWriter) Write(p []byte) (n int, err error) { + if t.failed { + return len(p), nil + } + n, err = t.w.Write(p) + if err != nil { + t.failed = true + // Act as if we wrote everything to keep MultiWriter happy. + return len(p), nil + } + return n, nil +} + +func teeBody(ctx context.Context, r io.Reader, tolerantSecondary bool) (io.ReadCloser, io.ReadCloser, error) { pr1, pw1 := io.Pipe() pr2, pw2 := io.Pipe() go func() { defer pw1.Close() defer pw2.Close() - select { - case <-ctx.Done(): - err := ctx.Err() - pw1.CloseWithError(err) - pw2.CloseWithError(err) - return - default: - _, err := io.Copy(io.MultiWriter(pw1, pw2), r) - if err != nil { + var w2 io.Writer = pw2 + if tolerantSecondary { + w2 = &tolerantWriter{w: pw2} + } + mw := io.MultiWriter(pw1, w2) + + buf := make([]byte, 32*1024) + for { + select { + case <-ctx.Done(): + err := ctx.Err() pw1.CloseWithError(err) pw2.CloseWithError(err) + return + default: + } + + nr, er := r.Read(buf) + if nr > 0 { + nw, ew := mw.Write(buf[0:nr]) + if ew != nil { + // Check if error is from pw1 + // If tolerantWriter wrapped pw2, it won't return error. + // So any error here means pw1 failed (or tolerantWriter itself failed unexpectedly). + pw1.CloseWithError(ew) + pw2.CloseWithError(ew) + return + } + if nr != nw { + pw1.CloseWithError(io.ErrShortWrite) + pw2.CloseWithError(io.ErrShortWrite) + return + } + } + if er != nil { + if er != io.EOF { + pw1.CloseWithError(er) + pw2.CloseWithError(er) + } + return } } }() diff --git a/router_ops_test.go b/router_ops_test.go new file mode 100644 index 0000000..9aba5cd --- /dev/null +++ b/router_ops_test.go @@ -0,0 +1,190 @@ +package s3router + +import ( + "context" + "io" + "strings" + "sync" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/wilbeibi/s3router/config" +) + +type mockBodyStore struct { + name string + t *testing.T + // Optional: Simulate error for primary/secondary + fail bool + // Optional: WaitGroup to signal completion + wg *sync.WaitGroup +} + +func (m *mockBodyStore) GetObject(ctx context.Context, in *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + return nil, nil +} +func (m *mockBodyStore) PutObject(ctx context.Context, in *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + if m.wg != nil { + defer m.wg.Done() + } + if in.Body == nil { + m.t.Errorf("[%s] PutObject body is nil", m.name) + return nil, nil + } + // Simulate reading the body + data, err := io.ReadAll(in.Body) + if err != nil { + m.t.Errorf("[%s] failed to read body: %v", m.name, err) + return nil, err + } + + if m.fail { + return nil, io.EOF // Simulate error + } + + expected := "hello world" + if string(data) != expected { + m.t.Errorf("[%s] got body %q, want %q", m.name, string(data), expected) + } else { + // m.t.Logf("[%s] successfully read body", m.name) + } + return &s3.PutObjectOutput{}, nil +} +func (m *mockBodyStore) HeadObject(ctx context.Context, in *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return nil, nil +} +func (m *mockBodyStore) DeleteObject(ctx context.Context, in *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + return nil, nil +} +func (m *mockBodyStore) DeleteObjects(ctx context.Context, in *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) { + return nil, nil +} +func (m *mockBodyStore) ListObjectsV2(ctx context.Context, in *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + return nil, nil +} +func (m *mockBodyStore) CreateMultipartUpload(ctx context.Context, in *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + return nil, nil +} +func (m *mockBodyStore) UploadPart(ctx context.Context, in *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) { + return nil, nil +} +func (m *mockBodyStore) CompleteMultipartUpload(ctx context.Context, in *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { + return nil, nil +} +func (m *mockBodyStore) ListParts(ctx context.Context, in *s3.ListPartsInput, optFns ...func(*s3.Options)) (*s3.ListPartsOutput, error) { + return nil, nil +} +func (m *mockBodyStore) AbortMultipartUpload(ctx context.Context, in *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { + return nil, nil +} + +func stringPtr(s string) *string { + return &s +} + +func TestPutObject_BestEffort_Concurrent(t *testing.T) { + cfgYaml := ` +endpoints: + primary: http://p + secondary: http://s +buckets: + testbucket: + primary: p + secondary: s +rules: + - bucket: testbucket + prefix: + "*": + PutObject: best-effort + "*": primary +` + cfg, err := config.Load(strings.NewReader(cfgYaml)) + if err != nil { + t.Fatalf("failed to load config: %v", err) + } + + var wg sync.WaitGroup + wg.Add(2) + + primary := &mockBodyStore{name: "primary", t: t, wg: &wg} + secondary := &mockBodyStore{name: "secondary", t: t, wg: &wg} + + r, err := New(cfg, primary, secondary) + if err != nil { + t.Fatalf("New failed: %v", err) + } + + // Use a pipe to simulate a stream + pr, pw := io.Pipe() + go func() { + pw.Write([]byte("hello world")) + pw.Close() + }() + + ctx := context.Background() + _, err = r.PutObject(ctx, &s3.PutObjectInput{ + Bucket: stringPtr("testbucket"), + Key: stringPtr("obj"), + Body: pr, + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + // Wait for both primary (which finishes before PutObject returns) + // and secondary (which finishes async) to complete. + wg.Wait() +} + +func TestPutObject_Fallback_SmallBody(t *testing.T) { + // Fallback should work for small bodies by buffering + cfgYaml := ` +endpoints: + primary: http://p + secondary: http://s +buckets: + testbucket: + primary: p + secondary: s +rules: + - bucket: testbucket + prefix: + "*": + PutObject: fallback + "*": primary +` + cfg, err := config.Load(strings.NewReader(cfgYaml)) + if err != nil { + t.Fatalf("failed to load config: %v", err) + } + + primary := &mockBodyStore{name: "primary", t: t, fail: true} // Primary fails + secondary := &mockBodyStore{name: "secondary", t: t} + + r, err := New(cfg, primary, secondary) + if err != nil { + t.Fatalf("New failed: %v", err) + } + + bodyContent := "hello world" + contentLength := int64(len(bodyContent)) + // ContentLength is optional but drainBody uses it to decide buffering if available. + // If nil, it checks maxBufferBytes which is large by default. + // So drainBody will buffer "hello world" only if ContentLength is set and small. + + ctx := context.Background() + _, err = r.PutObject(ctx, &s3.PutObjectInput{ + Bucket: stringPtr("testbucket"), + Key: stringPtr("obj"), + Body: strings.NewReader(bodyContent), + ContentLength: &contentLength, + }) + + // Even if it is strings.NewReader (seekable), drainBody reads it all. + // Primary mock reads it all (fails). + // Secondary mock should receive a fresh reader with same content. + + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } +} diff --git a/router_test.go b/router_test.go index 8958de4..5449ef2 100644 --- a/router_test.go +++ b/router_test.go @@ -124,7 +124,7 @@ func TestTeeBody(t *testing.T) { ctx := context.Background() want := []byte("stream‑content") - pr1, pr2, err := teeBody(ctx, bytes.NewReader(want)) + pr1, pr2, err := teeBody(ctx, bytes.NewReader(want), false) if err != nil { t.Fatalf("teeBody error: %v", err) }