From e452b850de6c5ac43e430a560d349c583ec0ad96 Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Fri, 11 Oct 2024 14:17:26 -0700 Subject: [PATCH] WIP: Support transaction in the docstore --- docstore/awsdynamodb/dynamo.go | 29 +++++++++++++++------------ docstore/docstore.go | 36 +++++++++++++++++++++++++++++----- docstore/driver/driver.go | 14 ++++++------- docstore/driver/util.go | 22 +++++++++++++++++---- 4 files changed, 72 insertions(+), 29 deletions(-) diff --git a/docstore/awsdynamodb/dynamo.go b/docstore/awsdynamodb/dynamo.go index 2df0aef081..a378dc40bd 100644 --- a/docstore/awsdynamodb/dynamo.go +++ b/docstore/awsdynamodb/dynamo.go @@ -162,10 +162,12 @@ func (c *collection) RevisionField() string { return c.opts.RevisionField } func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, opts *driver.RunActionsOptions) driver.ActionListError { errs := make([]error, len(actions)) - beforeGets, gets, writes, afterGets := driver.GroupActions(actions) + beforeGets, gets, writes, writesTx, afterGets := driver.GroupActions(actions) c.runGets(ctx, beforeGets, errs, opts) ch := make(chan struct{}) + ch2 := make(chan struct{}) go func() { defer close(ch); c.runWrites(ctx, writes, errs, opts) }() + go func() { defer close(ch2); c.transactWrite(ctx, writesTx, errs, opts) }() c.runGets(ctx, gets, errs, opts) <-ch c.runGets(ctx, afterGets, errs, opts) @@ -613,25 +615,26 @@ func revisionPrecondition(doc driver.Document, revField string) (*expression.Con return &cb, nil } -// TODO(jba): use this if/when we support atomic writes. -func (c *collection) transactWrite(ctx context.Context, actions []*driver.Action, errs []error, opts *driver.RunActionsOptions, start, end int) { +func (c *collection) transactWrite(ctx context.Context, actions []*driver.Action, errs []error, opts *driver.RunActionsOptions) { + if len(actions) == 0 { + return + } setErr := func(err error) { - for i := start; i <= end; i++ { - errs[actions[i].Index] = err + for _, a := range actions { + errs[a.Index] = err } } + tws := make([]*dyn.TransactWriteItem, 0, len(actions)) var ops []*writeOp - tws := make([]*dyn.TransactWriteItem, 0, end-start+1) - for i := start; i <= end; i++ { - a := actions[i] - op, err := c.newWriteOp(a, opts) + for _, w := range actions { + op, err := c.newWriteOp(w, opts) if err != nil { - setErr(err) - return + errs[w.Index] = err + } else { + ops = append(ops, op) + tws = append(tws, op.writeItem) } - ops = append(ops, op) - tws = append(tws, op.writeItem) } in := &dyn.TransactWriteItemsInput{ diff --git a/docstore/docstore.go b/docstore/docstore.go index 388d46ea8a..a7f6237678 100644 --- a/docstore/docstore.go +++ b/docstore/docstore.go @@ -143,10 +143,11 @@ type ActionList struct { // An Action is a read or write on a single document. // Use the methods of ActionList to create and execute Actions. type Action struct { - kind driver.ActionKind - doc Document - fieldpaths []FieldPath // paths to retrieve, for Get - mods Mods // modifications to make, for Update + kind driver.ActionKind + doc Document + fieldpaths []FieldPath // paths to retrieve, for Get + mods Mods // modifications to make, for Update + transaction bool // if this action is a part of transaction } func (l *ActionList) add(a *Action) *ActionList { @@ -173,6 +174,10 @@ func (l *ActionList) Create(doc Document) *ActionList { return l.add(&Action{kind: driver.Create, doc: doc}) } +func (l *ActionList) CreateTx(doc Document) *ActionList { + return l.add(&Action{kind: driver.Create, doc: doc, transaction: true}) +} + // Replace adds an action that replaces a document to the given ActionList, and // returns the ActionList. The key fields of the doc argument must be set. The // document must already exist; an error with code NotFound is returned if it does @@ -185,6 +190,10 @@ func (l *ActionList) Replace(doc Document) *ActionList { return l.add(&Action{kind: driver.Replace, doc: doc}) } +func (l *ActionList) ReplaceTx(doc Document) *ActionList { + return l.add(&Action{kind: driver.Replace, doc: doc, transaction: true}) +} + // Put adds an action that adds or replaces a document to the given ActionList, and returns the ActionList. // The key fields must be set. // @@ -198,6 +207,10 @@ func (l *ActionList) Put(doc Document) *ActionList { return l.add(&Action{kind: driver.Put, doc: doc}) } +func (l *ActionList) PutTx(doc Document) *ActionList { + return l.add(&Action{kind: driver.Put, doc: doc, transaction: true}) +} + // Delete adds an action that deletes a document to the given ActionList, and returns // the ActionList. Only the key and revision fields of doc are used. // See the Revisions section of the package documentation for how revisions are @@ -213,6 +226,10 @@ func (l *ActionList) Delete(doc Document) *ActionList { return l.add(&Action{kind: driver.Delete, doc: doc}) } +func (l *ActionList) DeleteTx(doc Document) *ActionList { + return l.add(&Action{kind: driver.Delete, doc: doc, transaction: true}) +} + // Get adds an action that retrieves a document to the given ActionList, and // returns the ActionList. // Only the key fields of doc are used. @@ -258,6 +275,15 @@ func (l *ActionList) Update(doc Document, mods Mods) *ActionList { }) } +func (l *ActionList) UpdateTx(doc Document, mods Mods) *ActionList { + return l.add(&Action{ + kind: driver.Update, + doc: doc, + mods: mods, + transaction: true, + }) +} + // Mods is a map from field paths to modifications. // At present, a modification is one of: // - nil, to delete the field @@ -430,7 +456,7 @@ func (c *Collection) toDriverAction(a *Action) (*driver.Action, error) { // A Put with a revision field is equivalent to a Replace. kind = driver.Replace } - d := &driver.Action{Kind: kind, Doc: ddoc, Key: key} + d := &driver.Action{Kind: kind, Doc: ddoc, Key: key, Transaction: a.transaction} if a.fieldpaths != nil { d.FieldPaths, err = parseFieldPaths(a.fieldpaths) if err != nil { diff --git a/docstore/driver/driver.go b/docstore/driver/driver.go index bd2bbccb0e..141750f16e 100644 --- a/docstore/driver/driver.go +++ b/docstore/driver/driver.go @@ -125,14 +125,14 @@ const ( //go:generate stringer -type=ActionKind -// An Action describes a single operation on a single document. type Action struct { - Kind ActionKind // the kind of action - Doc Document // the document on which to perform the action - Key interface{} // the document key returned by Collection.Key, to avoid recomputing it - FieldPaths [][]string // field paths to retrieve, for Get only - Mods []Mod // modifications to make, for Update only - Index int // the index of the action in the original action list + Kind ActionKind // the kind of action + Doc Document // the document on which to perform the action + Key interface{} // the document key returned by Collection.Key, to avoid recomputing it + FieldPaths [][]string // field paths to retrieve, for Get only + Mods []Mod // modifications to make, for Update only + Index int // the index of the action in the original action list + Transaction bool // if this action is a part of transaction } // A Mod is a modification to a field path in a document. diff --git a/docstore/driver/util.go b/docstore/driver/util.go index 5ead3f3fa4..b9886894de 100644 --- a/docstore/driver/util.go +++ b/docstore/driver/util.go @@ -55,12 +55,13 @@ func SplitActions(actions []*Action, split func(a, b *Action) bool) [][]*Action // GroupActions separates actions into four sets: writes, gets that must happen before the writes, // gets that must happen after the writes, and gets that can happen concurrently with the writes. -func GroupActions(actions []*Action) (beforeGets, getList, writeList, afterGets []*Action) { +func GroupActions(actions []*Action) (beforeGets, getList, writeList, writesTxList, afterGets []*Action) { // maps from key to action bgets := map[interface{}]*Action{} agets := map[interface{}]*Action{} cgets := map[interface{}]*Action{} writes := map[interface{}]*Action{} + writesTx := map[interface{}]*Action{} var nilkeys []*Action for _, a := range actions { if a.Key == nil { @@ -69,7 +70,7 @@ func GroupActions(actions []*Action) (beforeGets, getList, writeList, afterGets } else if a.Kind == Get { // If there was a prior write with this key, make sure this get // happens after the writes. - if _, ok := writes[a.Key]; ok { + if valueExistsInAllMaps(a.Key, writes, writesTx) { agets[a.Key] = a } else { cgets[a.Key] = a @@ -81,7 +82,11 @@ func GroupActions(actions []*Action) (beforeGets, getList, writeList, afterGets delete(cgets, a.Key) bgets[a.Key] = g } - writes[a.Key] = a + if a.Transaction { + writesTx[a.Key] = a + } else { + writes[a.Key] = a + } } } @@ -95,7 +100,16 @@ func GroupActions(actions []*Action) (beforeGets, getList, writeList, afterGets return as } - return vals(bgets), vals(cgets), append(vals(writes), nilkeys...), vals(agets) + return vals(bgets), vals(cgets), append(vals(writes), nilkeys...), vals(writesTx), vals(agets) +} + +func valueExistsInAllMaps(key interface{}, maps ...map[interface{}]*Action) bool { + for _, m := range maps { + if _, ok := m[key]; !ok { + return false + } + } + return true } // AsFunc creates and returns an "as function" that behaves as follows: