-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3_upload_state.go
146 lines (116 loc) · 3.56 KB
/
s3_upload_state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main
import (
"cmp"
"fmt"
"slices"
"sync"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
// S3UploadState tracks the state of an attempt to create an object or a
// multi-part object
type S3UploadState struct {
hr *S3Hasher
obj *s3.PutObjectInput
objOutput *s3.PutObjectOutput
objError error
create *s3.CreateMultipartUploadInput
createOutput *s3.CreateMultipartUploadOutput
uploadPartOutputs map[int32]*s3.UploadPartOutput
uploadPartErrors map[int32]error
completedOutput *s3.CompleteMultipartUploadOutput
completedError error
abortedOutput *s3.AbortMultipartUploadOutput
abortedError error
objectAttributesOutput *s3.GetObjectAttributesOutput
objectAttributesError error
mu *sync.Mutex
}
func (p *S3UploadState) Errors() []error {
var err []error
if p.objError != nil {
err = append(err, fmt.Errorf(
"put-object error: %w", p.objError))
}
for k, v := range p.uploadPartErrors {
if v != nil {
err = append(err, fmt.Errorf(
"upload part %d error: %w", k, v))
}
}
if p.completedError != nil {
err = append(err, fmt.Errorf(
"complete multi-part upload error: %w", p.completedError))
}
if p.abortedError != nil {
err = append(err, fmt.Errorf(
"abort multi-part upload error: %w", p.abortedError))
}
return err
}
// setPartResults records the results of processing an S3UploadParts.UploadPart
// request. It may be called either as part of processing the upload and
// recording the results from the s3.Client or recording any errors encountered
// before the the part could be passed off to the s3.Client (e.g., if the
// context was canceled)
func (p *S3UploadState) setPartResults(partID *int32, out *s3.UploadPartOutput, err error) {
p.mu.Lock()
defer p.mu.Unlock()
p.uploadPartOutputs[*partID] = out
p.uploadPartErrors[*partID] = err
}
// completeParts returns a *s3.CompleteMultipartUploadInput for the parts
// completed to this point. If there is a gap in the sequence of part numbers
// an error is returned.
func (p *S3UploadState) completeParts() (*s3.CompleteMultipartUploadInput, error) {
p.mu.Lock()
defer p.mu.Unlock()
var completedParts []types.CompletedPart
for partID, out := range p.uploadPartOutputs {
completedPart := types.CompletedPart{
ETag: out.ETag,
PartNumber: &partID,
}
checksumBase64 := aws.String(
HashSum(p.hr.SumPart(partID)).Base64())
switch p.hr.ChecksumAlgorithm() {
case ChecksumAlgorithmCRC32:
completedPart.ChecksumCRC32 = checksumBase64
case ChecksumAlgorithmCRC32C:
completedPart.ChecksumCRC32C = checksumBase64
case ChecksumAlgorithmSHA1:
completedPart.ChecksumSHA1 = checksumBase64
case ChecksumAlgorithmSHA256:
completedPart.ChecksumSHA256 = checksumBase64
}
completedParts = append(completedParts, completedPart)
}
slices.SortFunc(completedParts, func(a, b types.CompletedPart) int {
return cmp.Compare(*a.PartNumber, *b.PartNumber)
})
for i := 1; i < len(completedParts); i++ {
partID := *completedParts[i].PartNumber
if partID != int32(i+1) {
var err error
if i == 0 {
err = fmt.Errorf(
"out-of-order partID: started at %d (expected %d)",
partID, (i + 1))
} else {
err = fmt.Errorf(
"out-of-order partID: %d -> %d (expected %d)",
i, partID, (i + 1))
}
return nil, err
}
}
return &s3.CompleteMultipartUploadInput{
Bucket: p.create.Bucket,
Key: p.create.Key,
UploadId: p.createOutput.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts,
},
}, nil
}