Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ jobs:

- name: Set up uv
uses: astral-sh/setup-uv@v3
with:
enable-cache: true
python-version: "3.11"

- name: Validate router configs
run: |
Expand Down
8 changes: 7 additions & 1 deletion multipart_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ func (c *router) UploadPart(ctx context.Context, in *s3.UploadPartInput, optFns
)
// UploadPart typically handles large chunks (5MB-5GB), use streaming
if in.ContentLength == nil || *in.ContentLength >= c.maxBufferBytes {
r1, r2, err = teeBody(ctx, in.Body)
// For large bodies, we can only support parallel actions (Mirror, BestEffort).
// Fallback requires buffering or seeking, which we can't do for generic streams.
if action == config.ActFallback {
return nil, fmt.Errorf("%s: fallback strategy is not supported for large or unknown-length streams", op)
}
tolerant := (action == config.ActBestEffort)
r1, r2, err = teeBody(ctx, in.Body, tolerant)
} else {
r1, r2, err = drainBody(ctx, in.Body)
}
Expand Down
18 changes: 13 additions & 5 deletions ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,30 @@ func (c *router) PutObject(
primB, secB := c.cfg.PhysicalBuckets(bucket)
inPrimary, inSecondary := *in, *in
inPrimary.Bucket, inSecondary.Bucket = aws.String(primB), aws.String(secB)
if action == config.ActMirror && in.Body != nil {
if (action == config.ActMirror || action == config.ActBestEffort || action == config.ActFallback) && in.Body != nil {
var (
r1, r2 io.Reader
err error
)
// If ContentLength is not provided, S3 use chunked transfer encoding.
if in.ContentLength == nil || *in.ContentLength >= c.maxBufferBytes {
r1, r2, err = teeBody(ctx, in.Body)
// For large bodies, we can only support parallel actions (Mirror, BestEffort).
// Fallback requires buffering or seeking, which we can't do for generic streams.
if action == config.ActFallback {
return nil, fmt.Errorf("%s: fallback strategy is not supported for large or unknown-length streams", op)
}
tolerant := (action == config.ActBestEffort)
r1, r2, err = teeBody(ctx, in.Body, tolerant)
} else {
r1, r2, err = drainBody(ctx, in.Body)
}
if err != nil {
return nil, fmt.Errorf("%s: failed to split body for mirror: %w", op, err)
return nil, fmt.Errorf("%s: failed to split body for %s: %w", op, action, err)
}
if r1 != nil && r2 != nil {
inPrimary.Body = r1
inSecondary.Body = r2
}
inPrimary.Body = r1
inSecondary.Body = r2
}
return dispatch(ctx, action,
func(ctx context.Context, st store.Store, in *s3.PutObjectInput) (*s3.PutObjectOutput, error) {
Expand Down
70 changes: 59 additions & 11 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ func doParallel[I any, T any](
return out, nil
}
// best-effort: fire-and-forget secondary
out, err := op(ctx, s1, in1)
go func() {
_, _ = op(ctx, s2, in2)
}()
out, err := op(ctx, s1, in1)
return out, err
}

Expand All @@ -134,24 +134,72 @@ func drainBody(ctx context.Context, r io.Reader) (io.ReadSeeker, io.ReadSeeker,
return bytes.NewReader(data), bytes.NewReader(data), nil
}

func teeBody(ctx context.Context, r io.Reader) (io.ReadCloser, io.ReadCloser, error) {
// tolerantWriter wraps an io.Writer and ignores errors from the underlying writer.
type tolerantWriter struct {
w io.Writer
failed bool
}

func (t *tolerantWriter) Write(p []byte) (n int, err error) {
if t.failed {
return len(p), nil
}
n, err = t.w.Write(p)
if err != nil {
t.failed = true
// Act as if we wrote everything to keep MultiWriter happy.
return len(p), nil
}
return n, nil
}

func teeBody(ctx context.Context, r io.Reader, tolerantSecondary bool) (io.ReadCloser, io.ReadCloser, error) {
pr1, pw1 := io.Pipe()
pr2, pw2 := io.Pipe()
go func() {
defer pw1.Close()
defer pw2.Close()

select {
case <-ctx.Done():
err := ctx.Err()
pw1.CloseWithError(err)
pw2.CloseWithError(err)
return
default:
_, err := io.Copy(io.MultiWriter(pw1, pw2), r)
if err != nil {
var w2 io.Writer = pw2
if tolerantSecondary {
w2 = &tolerantWriter{w: pw2}
}
mw := io.MultiWriter(pw1, w2)

buf := make([]byte, 32*1024)
for {
select {
case <-ctx.Done():
err := ctx.Err()
pw1.CloseWithError(err)
pw2.CloseWithError(err)
return
default:
}

nr, er := r.Read(buf)
if nr > 0 {
nw, ew := mw.Write(buf[0:nr])
if ew != nil {
// Check if error is from pw1
// If tolerantWriter wrapped pw2, it won't return error.
// So any error here means pw1 failed (or tolerantWriter itself failed unexpectedly).
pw1.CloseWithError(ew)
pw2.CloseWithError(ew)
return
}
if nr != nw {
pw1.CloseWithError(io.ErrShortWrite)
pw2.CloseWithError(io.ErrShortWrite)
return
}
}
if er != nil {
if er != io.EOF {
pw1.CloseWithError(er)
pw2.CloseWithError(er)
}
return
}
}
}()
Expand Down
190 changes: 190 additions & 0 deletions router_ops_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package s3router

import (
"context"
"io"
"strings"
"sync"
"testing"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/wilbeibi/s3router/config"
)

type mockBodyStore struct {
name string
t *testing.T
// Optional: Simulate error for primary/secondary
fail bool
// Optional: WaitGroup to signal completion
wg *sync.WaitGroup
}

func (m *mockBodyStore) GetObject(ctx context.Context, in *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
return nil, nil
}
func (m *mockBodyStore) PutObject(ctx context.Context, in *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
if m.wg != nil {
defer m.wg.Done()
}
if in.Body == nil {
m.t.Errorf("[%s] PutObject body is nil", m.name)
return nil, nil
}
// Simulate reading the body
data, err := io.ReadAll(in.Body)
if err != nil {
m.t.Errorf("[%s] failed to read body: %v", m.name, err)
return nil, err
}

if m.fail {
return nil, io.EOF // Simulate error
}

expected := "hello world"
if string(data) != expected {
m.t.Errorf("[%s] got body %q, want %q", m.name, string(data), expected)
} else {
// m.t.Logf("[%s] successfully read body", m.name)
}
return &s3.PutObjectOutput{}, nil
}
func (m *mockBodyStore) HeadObject(ctx context.Context, in *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) {
return nil, nil
}
func (m *mockBodyStore) DeleteObject(ctx context.Context, in *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) {
return nil, nil
}
func (m *mockBodyStore) DeleteObjects(ctx context.Context, in *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) {
return nil, nil
}
func (m *mockBodyStore) ListObjectsV2(ctx context.Context, in *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) {
return nil, nil
}
func (m *mockBodyStore) CreateMultipartUpload(ctx context.Context, in *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) {
return nil, nil
}
func (m *mockBodyStore) UploadPart(ctx context.Context, in *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) {
return nil, nil
}
func (m *mockBodyStore) CompleteMultipartUpload(ctx context.Context, in *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) {
return nil, nil
}
func (m *mockBodyStore) ListParts(ctx context.Context, in *s3.ListPartsInput, optFns ...func(*s3.Options)) (*s3.ListPartsOutput, error) {
return nil, nil
}
func (m *mockBodyStore) AbortMultipartUpload(ctx context.Context, in *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) {
return nil, nil
}

func stringPtr(s string) *string {
return &s
}

func TestPutObject_BestEffort_Concurrent(t *testing.T) {
cfgYaml := `
endpoints:
primary: http://p
secondary: http://s
buckets:
testbucket:
primary: p
secondary: s
rules:
- bucket: testbucket
prefix:
"*":
PutObject: best-effort
"*": primary
`
cfg, err := config.Load(strings.NewReader(cfgYaml))
if err != nil {
t.Fatalf("failed to load config: %v", err)
}

var wg sync.WaitGroup
wg.Add(2)

primary := &mockBodyStore{name: "primary", t: t, wg: &wg}
secondary := &mockBodyStore{name: "secondary", t: t, wg: &wg}

r, err := New(cfg, primary, secondary)
if err != nil {
t.Fatalf("New failed: %v", err)
}

// Use a pipe to simulate a stream
pr, pw := io.Pipe()
go func() {
pw.Write([]byte("hello world"))
pw.Close()
}()

ctx := context.Background()
_, err = r.PutObject(ctx, &s3.PutObjectInput{
Bucket: stringPtr("testbucket"),
Key: stringPtr("obj"),
Body: pr,
})
if err != nil {
t.Fatalf("PutObject failed: %v", err)
}

// Wait for both primary (which finishes before PutObject returns)
// and secondary (which finishes async) to complete.
wg.Wait()
}

func TestPutObject_Fallback_SmallBody(t *testing.T) {
// Fallback should work for small bodies by buffering
cfgYaml := `
endpoints:
primary: http://p
secondary: http://s
buckets:
testbucket:
primary: p
secondary: s
rules:
- bucket: testbucket
prefix:
"*":
PutObject: fallback
"*": primary
`
cfg, err := config.Load(strings.NewReader(cfgYaml))
if err != nil {
t.Fatalf("failed to load config: %v", err)
}

primary := &mockBodyStore{name: "primary", t: t, fail: true} // Primary fails
secondary := &mockBodyStore{name: "secondary", t: t}

r, err := New(cfg, primary, secondary)
if err != nil {
t.Fatalf("New failed: %v", err)
}

bodyContent := "hello world"
contentLength := int64(len(bodyContent))
// ContentLength is optional but drainBody uses it to decide buffering if available.
// If nil, it checks maxBufferBytes which is large by default.
// So drainBody will buffer "hello world" only if ContentLength is set and small.

ctx := context.Background()
_, err = r.PutObject(ctx, &s3.PutObjectInput{
Bucket: stringPtr("testbucket"),
Key: stringPtr("obj"),
Body: strings.NewReader(bodyContent),
ContentLength: &contentLength,
})

// Even if it is strings.NewReader (seekable), drainBody reads it all.
// Primary mock reads it all (fails).
// Secondary mock should receive a fresh reader with same content.

if err != nil {
t.Fatalf("PutObject failed: %v", err)
}
}
2 changes: 1 addition & 1 deletion router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestTeeBody(t *testing.T) {
ctx := context.Background()
want := []byte("stream‑content")

pr1, pr2, err := teeBody(ctx, bytes.NewReader(want))
pr1, pr2, err := teeBody(ctx, bytes.NewReader(want), false)
if err != nil {
t.Fatalf("teeBody error: %v", err)
}
Expand Down