Skip to content

Commit 7705be9

Browse files
author
sandeeppal
committed
support atomic writes
1 parent 6d4b684 commit 7705be9

File tree

10 files changed

+126
-55
lines changed

10 files changed

+126
-55
lines changed

docstore/docstore.go

+16-33
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,10 @@ func (c *Collection) Actions() *ActionList {
135135
// document; a Get after the write will see the new value if the service is strongly
136136
// consistent, but may see the old value if the service is eventually consistent.
137137
type ActionList struct {
138-
coll *Collection
139-
actions []*Action
140-
beforeDo func(asFunc func(interface{}) bool) error
138+
coll *Collection
139+
actions []*Action
140+
atomicWrites bool
141+
beforeDo func(asFunc func(interface{}) bool) error
141142
}
142143

143144
// An Action is a read or write on a single document.
@@ -171,11 +172,7 @@ func (l *ActionList) add(a *Action) *ActionList {
171172
// Except for setting the revision field and possibly setting the key fields, the doc
172173
// argument is not modified.
173174
func (l *ActionList) Create(doc Document) *ActionList {
174-
return l.add(&Action{kind: driver.Create, doc: doc})
175-
}
176-
177-
func (l *ActionList) CreateTx(doc Document) *ActionList {
178-
return l.add(&Action{kind: driver.Create, doc: doc, transaction: true})
175+
return l.add(&Action{kind: driver.Create, doc: doc, transaction: l.atomicWrites})
179176
}
180177

181178
// Replace adds an action that replaces a document to the given ActionList, and
@@ -187,11 +184,7 @@ func (l *ActionList) CreateTx(doc Document) *ActionList {
187184
// See the Revisions section of the package documentation for how revisions are
188185
// handled.
189186
func (l *ActionList) Replace(doc Document) *ActionList {
190-
return l.add(&Action{kind: driver.Replace, doc: doc})
191-
}
192-
193-
func (l *ActionList) ReplaceTx(doc Document) *ActionList {
194-
return l.add(&Action{kind: driver.Replace, doc: doc, transaction: true})
187+
return l.add(&Action{kind: driver.Replace, doc: doc, transaction: l.atomicWrites})
195188
}
196189

197190
// Put adds an action that adds or replaces a document to the given ActionList, and returns the ActionList.
@@ -204,11 +197,7 @@ func (l *ActionList) ReplaceTx(doc Document) *ActionList {
204197
// See the Revisions section of the package documentation for how revisions are
205198
// handled.
206199
func (l *ActionList) Put(doc Document) *ActionList {
207-
return l.add(&Action{kind: driver.Put, doc: doc})
208-
}
209-
210-
func (l *ActionList) PutTx(doc Document) *ActionList {
211-
return l.add(&Action{kind: driver.Put, doc: doc, transaction: true})
200+
return l.add(&Action{kind: driver.Put, doc: doc, transaction: l.atomicWrites})
212201
}
213202

214203
// Delete adds an action that deletes a document to the given ActionList, and returns
@@ -223,11 +212,7 @@ func (l *ActionList) Delete(doc Document) *ActionList {
223212
// semantics of an action list are to stop at first error, then we might abort a
224213
// list of Deletes just because one of the docs was not present, and that seems
225214
// wrong, or at least something you'd want to turn off.
226-
return l.add(&Action{kind: driver.Delete, doc: doc})
227-
}
228-
229-
func (l *ActionList) DeleteTx(doc Document) *ActionList {
230-
return l.add(&Action{kind: driver.Delete, doc: doc, transaction: true})
215+
return l.add(&Action{kind: driver.Delete, doc: doc, transaction: l.atomicWrites})
231216
}
232217

233218
// Get adds an action that retrieves a document to the given ActionList, and
@@ -268,19 +253,11 @@ func (l *ActionList) Get(doc Document, fps ...FieldPath) *ActionList {
268253
// Update does not modify its doc argument, except to set the new revision. To obtain
269254
// the updated document, call Get after calling Update.
270255
func (l *ActionList) Update(doc Document, mods Mods) *ActionList {
271-
return l.add(&Action{
272-
kind: driver.Update,
273-
doc: doc,
274-
mods: mods,
275-
})
276-
}
277-
278-
func (l *ActionList) UpdateTx(doc Document, mods Mods) *ActionList {
279256
return l.add(&Action{
280257
kind: driver.Update,
281258
doc: doc,
282259
mods: mods,
283-
transaction: true,
260+
transaction: l.atomicWrites,
284261
})
285262
}
286263

@@ -456,7 +433,7 @@ func (c *Collection) toDriverAction(a *Action) (*driver.Action, error) {
456433
// A Put with a revision field is equivalent to a Replace.
457434
kind = driver.Replace
458435
}
459-
d := &driver.Action{Kind: kind, Doc: ddoc, Key: key, Transaction: a.transaction}
436+
d := &driver.Action{Kind: kind, Doc: ddoc, Key: key, InAtomicWrite: a.transaction}
460437
if a.fieldpaths != nil {
461438
d.FieldPaths, err = parseFieldPaths(a.fieldpaths)
462439
if err != nil {
@@ -560,6 +537,12 @@ func (l *ActionList) String() string {
560537
return "[" + strings.Join(as, ", ") + "]"
561538
}
562539

540+
// AtomicWrites causes all following writes in the list to execute atomically.
541+
func (l *ActionList) AtomicWrites() *ActionList {
542+
l.atomicWrites = true
543+
return l
544+
}
545+
563546
func (a *Action) String() string {
564547
buf := &strings.Builder{}
565548
fmt.Fprintf(buf, "%s(%v", a.kind, a.doc)

docstore/driver/driver.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,13 @@ const (
126126
//go:generate stringer -type=ActionKind
127127

128128
type Action struct {
129-
Kind ActionKind // the kind of action
130-
Doc Document // the document on which to perform the action
131-
Key interface{} // the document key returned by Collection.Key, to avoid recomputing it
132-
FieldPaths [][]string // field paths to retrieve, for Get only
133-
Mods []Mod // modifications to make, for Update only
134-
Index int // the index of the action in the original action list
135-
Transaction bool // if this action is a part of transaction
129+
Kind ActionKind // the kind of action
130+
Doc Document // the document on which to perform the action
131+
Key interface{} // the document key returned by Collection.Key, to avoid recomputing it
132+
FieldPaths [][]string // field paths to retrieve, for Get only
133+
Mods []Mod // modifications to make, for Update only
134+
Index int // the index of the action in the original action list
135+
InAtomicWrite bool // if this action is a part of transaction
136136
}
137137

138138
// A Mod is a modification to a field path in a document.

docstore/driver/util.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func GroupActions(actions []*Action) (beforeGets, getList, writeList, writesTxLi
7070
} else if a.Kind == Get {
7171
// If there was a prior write with this key, make sure this get
7272
// happens after the writes.
73-
if valueExistsInAllMaps(a.Key, writes, writesTx) {
73+
if valueExistsInMaps(a.Key, writes, writesTx) {
7474
agets[a.Key] = a
7575
} else {
7676
cgets[a.Key] = a
@@ -82,7 +82,7 @@ func GroupActions(actions []*Action) (beforeGets, getList, writeList, writesTxLi
8282
delete(cgets, a.Key)
8383
bgets[a.Key] = g
8484
}
85-
if a.Transaction {
85+
if a.InAtomicWrite {
8686
writesTx[a.Key] = a
8787
} else {
8888
writes[a.Key] = a
@@ -103,13 +103,13 @@ func GroupActions(actions []*Action) (beforeGets, getList, writeList, writesTxLi
103103
return vals(bgets), vals(cgets), append(vals(writes), nilkeys...), vals(writesTx), vals(agets)
104104
}
105105

106-
func valueExistsInAllMaps(key interface{}, maps ...map[interface{}]*Action) bool {
106+
func valueExistsInMaps(key interface{}, maps ...map[interface{}]*Action) bool {
107107
for _, m := range maps {
108-
if _, ok := m[key]; !ok {
109-
return false
108+
if _, ok := m[key]; ok {
109+
return true
110110
}
111111
}
112-
return true
112+
return false
113113
}
114114

115115
// AsFunc creates and returns an "as function" that behaves as follows:

docstore/driver/util_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestGroupActions(t *testing.T) {
7979
}{
8080
{
8181
in: []*Action{{Kind: Get, Key: 1}},
82-
want: [][]int{nil, {0}, nil, nil},
82+
want: [][]int{nil, {0}, nil, nil, nil},
8383
},
8484
{
8585
in: []*Action{
@@ -89,16 +89,16 @@ func TestGroupActions(t *testing.T) {
8989
{Kind: Replace, Key: 2},
9090
{Kind: Get, Key: 2},
9191
},
92-
want: [][]int{{0}, {1}, {2, 3}, {4}},
92+
want: [][]int{{0}, {1}, {2, 3}, nil, {4}},
9393
},
9494
{
9595
in: []*Action{{Kind: Create}, {Kind: Create}, {Kind: Create}},
96-
want: [][]int{nil, nil, {0, 1, 2}, nil},
96+
want: [][]int{nil, nil, {0, 1, 2}, nil, nil},
9797
},
9898
} {
99-
got := make([][]*Action, 4)
100-
got[0], got[1], got[2], got[3] = GroupActions(test.in)
101-
want := make([][]*Action, 4)
99+
got := make([][]*Action, 5)
100+
got[0], got[1], got[2], got[3], got[4] = GroupActions(test.in)
101+
want := make([][]*Action, 5)
102102
for i, s := range test.want {
103103
for _, x := range s {
104104
want[i] = append(want[i], test.in[x])

docstore/drivertest/drivertest.go

+76
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/google/go-cmp/cmp"
3030
"github.com/google/go-cmp/cmp/cmpopts"
3131
"github.com/google/uuid"
32+
"github.com/stretchr/testify/assert"
3233
"gocloud.dev/docstore"
3334
"gocloud.dev/docstore/driver"
3435
"gocloud.dev/gcerrors"
@@ -1900,6 +1901,81 @@ func testMultipleActions(t *testing.T, coll *docstore.Collection, revField strin
19001901
}
19011902
}
19021903

1904+
func testAtomicWrites(t *testing.T, coll *docstore.Collection, revField string) {
1905+
t.Helper()
1906+
1907+
ctx := context.Background()
1908+
1909+
must := func(err error) {
1910+
t.Helper()
1911+
if err != nil {
1912+
t.Fatal(err)
1913+
}
1914+
}
1915+
1916+
var docs []docmap
1917+
for i := 0; i < 9; i++ {
1918+
docs = append(docs, docmap{
1919+
KeyField: fmt.Sprintf("testAtomicWrites%d", i),
1920+
"s": fmt.Sprint(i),
1921+
revField: nil,
1922+
})
1923+
}
1924+
1925+
compare := func(gots, wants []docmap) {
1926+
t.Helper()
1927+
for i := 0; i < len(gots); i++ {
1928+
got := gots[i]
1929+
want := clone(wants[i])
1930+
want[revField] = got[revField]
1931+
if !cmp.Equal(got, want, cmpopts.IgnoreUnexported(tspb.Timestamp{})) {
1932+
t.Errorf("index #%d:\ngot %v\nwant %v", i, got, want)
1933+
}
1934+
}
1935+
}
1936+
1937+
// Put the first six docs.
1938+
actions := coll.Actions()
1939+
for i := 0; i < 6; i++ {
1940+
actions.Create(docs[i])
1941+
}
1942+
must(actions.Do(ctx))
1943+
1944+
// Delete the first three, get the second three, and update last three in transaction.
1945+
gdocs := []docmap{
1946+
{KeyField: docs[3][KeyField]},
1947+
{KeyField: docs[4][KeyField]},
1948+
{KeyField: docs[5][KeyField]},
1949+
}
1950+
actions = coll.Actions()
1951+
actions.Get(gdocs[0])
1952+
actions.Delete(docs[0])
1953+
actions.Delete(docs[1])
1954+
actions.Get(gdocs[1])
1955+
actions.Delete(docs[2])
1956+
actions.Get(gdocs[2])
1957+
actions.AtomicWrites()
1958+
actions.Update(docs[6], docstore.Mods{"s": "66'"})
1959+
actions.Update(docs[7], docstore.Mods{"s": "77'"})
1960+
actions.Update(docs[8], docstore.Mods{"s": "88"})
1961+
1962+
must(actions.Do(ctx))
1963+
compare(gdocs, docs[3:6])
1964+
1965+
// Get the docs updated as part of atomic writes and verify that got written.
1966+
actions = coll.Actions()
1967+
1968+
doc := docmap{KeyField: docs[6][KeyField]}
1969+
_ = coll.Get(ctx, doc)
1970+
assert.Equal(t, "66", doc["s"])
1971+
doc = docmap{KeyField: docs[7][KeyField]}
1972+
_ = coll.Get(ctx, doc)
1973+
assert.Equal(t, "77", doc["s"])
1974+
doc = docmap{KeyField: docs[8][KeyField]}
1975+
_ = coll.Get(ctx, doc)
1976+
assert.Equal(t, "88", doc["s"])
1977+
}
1978+
19031979
func testActionsOnStructNoRev(t *testing.T, _ Harness, coll *docstore.Collection) {
19041980
t.Helper()
19051981

docstore/gcpfirestore/fs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ func (c *collection) RevisionField() string {
265265
// RunActions implements driver.RunActions.
266266
func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, opts *driver.RunActionsOptions) driver.ActionListError {
267267
errs := make([]error, len(actions))
268-
beforeGets, gets, writes, afterGets := driver.GroupActions(actions)
268+
beforeGets, gets, writes, _, afterGets := driver.GroupActions(actions)
269269
calls := c.buildCommitCalls(writes, errs)
270270
// runGets does not issue concurrent RPCs, so it doesn't need a throttle.
271271
c.runGets(ctx, beforeGets, errs, opts)

docstore/memdocstore/mem.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, o
191191
}
192192
}
193193

194-
beforeGets, gets, writes, afterGets := driver.GroupActions(actions)
194+
beforeGets, gets, writes, _, afterGets := driver.GroupActions(actions)
195195
run(beforeGets)
196196
run(gets)
197197
run(writes)

docstore/mongodocstore/mongo.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ const mongoIDField = "_id"
204204

205205
func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, opts *driver.RunActionsOptions) driver.ActionListError {
206206
errs := make([]error, len(actions))
207-
beforeGets, gets, writes, afterGets := driver.GroupActions(actions)
207+
beforeGets, gets, writes, _, afterGets := driver.GroupActions(actions)
208208
c.runGets(ctx, beforeGets, errs, opts)
209209
ch := make(chan []error)
210210
go func() { ch <- c.bulkWrite(ctx, writes, errs, opts) }()

go.mod

+4
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ require (
5757
github.com/google/wire v0.6.0
5858
github.com/googleapis/gax-go/v2 v2.13.0
5959
github.com/lib/pq v1.10.9
60+
github.com/stretchr/testify v1.9.0
6061
go.opencensus.io v0.24.0
6162
golang.org/x/crypto v0.26.0
6263
golang.org/x/net v0.28.0
@@ -95,6 +96,7 @@ require (
9596
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect
9697
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect
9798
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
99+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
98100
github.com/felixge/httpsnoop v1.0.4 // indirect
99101
github.com/go-logr/logr v1.4.2 // indirect
100102
github.com/go-logr/stdr v1.2.2 // indirect
@@ -107,6 +109,7 @@ require (
107109
github.com/jmespath/go-jmespath v0.4.0 // indirect
108110
github.com/kylelemons/godebug v1.1.0 // indirect
109111
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
112+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
110113
github.com/prometheus/prometheus v0.54.0 // indirect
111114
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect
112115
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
@@ -120,4 +123,5 @@ require (
120123
golang.org/x/time v0.6.0 // indirect
121124
google.golang.org/genproto/googleapis/api v0.0.0-20240812133136-8ffd90a71988 // indirect
122125
google.golang.org/genproto/googleapis/rpc v0.0.0-20240812133136-8ffd90a71988 // indirect
126+
gopkg.in/yaml.v3 v3.0.1 // indirect
123127
)

go.sum

+8
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,12 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
307307
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
308308
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
309309
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
310+
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
311+
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
310312
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
311313
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
314+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
315+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
312316
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
313317
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
314318
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
@@ -322,6 +326,8 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
322326
github.com/prometheus/prometheus v0.54.0 h1:6+VmEkohHcofl3W5LyRlhw1Lfm575w/aX6ZFyVAmzM0=
323327
github.com/prometheus/prometheus v0.54.0/go.mod h1:xlLByHhk2g3ycakQGrMaU8K7OySZx98BzeCR99991NY=
324328
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
329+
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
330+
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
325331
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
326332
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
327333
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
@@ -745,6 +751,8 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h
745751
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
746752
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
747753
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
754+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
755+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
748756
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
749757
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
750758
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

0 commit comments

Comments
 (0)