Skip to content

Commit

Permalink
WIP: Support transaction in the docstore
Browse files Browse the repository at this point in the history
  • Loading branch information
Sandeep Pal committed Oct 11, 2024
1 parent e9bda0a commit e452b85
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 29 deletions.
29 changes: 16 additions & 13 deletions docstore/awsdynamodb/dynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,12 @@ func (c *collection) RevisionField() string { return c.opts.RevisionField }

func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, opts *driver.RunActionsOptions) driver.ActionListError {
errs := make([]error, len(actions))
beforeGets, gets, writes, afterGets := driver.GroupActions(actions)
beforeGets, gets, writes, writesTx, afterGets := driver.GroupActions(actions)
c.runGets(ctx, beforeGets, errs, opts)
ch := make(chan struct{})
ch2 := make(chan struct{})
go func() { defer close(ch); c.runWrites(ctx, writes, errs, opts) }()
go func() { defer close(ch2); c.transactWrite(ctx, writesTx, errs, opts) }()
c.runGets(ctx, gets, errs, opts)
<-ch
c.runGets(ctx, afterGets, errs, opts)
Expand Down Expand Up @@ -613,25 +615,26 @@ func revisionPrecondition(doc driver.Document, revField string) (*expression.Con
return &cb, nil
}

// TODO(jba): use this if/when we support atomic writes.
func (c *collection) transactWrite(ctx context.Context, actions []*driver.Action, errs []error, opts *driver.RunActionsOptions, start, end int) {
func (c *collection) transactWrite(ctx context.Context, actions []*driver.Action, errs []error, opts *driver.RunActionsOptions) {
if len(actions) == 0 {
return
}
setErr := func(err error) {
for i := start; i <= end; i++ {
errs[actions[i].Index] = err
for _, a := range actions {
errs[a.Index] = err
}
}

tws := make([]*dyn.TransactWriteItem, 0, len(actions))
var ops []*writeOp
tws := make([]*dyn.TransactWriteItem, 0, end-start+1)
for i := start; i <= end; i++ {
a := actions[i]
op, err := c.newWriteOp(a, opts)
for _, w := range actions {
op, err := c.newWriteOp(w, opts)
if err != nil {
setErr(err)
return
errs[w.Index] = err
} else {
ops = append(ops, op)
tws = append(tws, op.writeItem)
}
ops = append(ops, op)
tws = append(tws, op.writeItem)
}

in := &dyn.TransactWriteItemsInput{
Expand Down
36 changes: 31 additions & 5 deletions docstore/docstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ type ActionList struct {
// An Action is a read or write on a single document.
// Use the methods of ActionList to create and execute Actions.
type Action struct {
kind driver.ActionKind
doc Document
fieldpaths []FieldPath // paths to retrieve, for Get
mods Mods // modifications to make, for Update
kind driver.ActionKind
doc Document
fieldpaths []FieldPath // paths to retrieve, for Get
mods Mods // modifications to make, for Update
transaction bool // if this action is a part of transaction
}

func (l *ActionList) add(a *Action) *ActionList {
Expand All @@ -173,6 +174,10 @@ func (l *ActionList) Create(doc Document) *ActionList {
return l.add(&Action{kind: driver.Create, doc: doc})
}

func (l *ActionList) CreateTx(doc Document) *ActionList {
return l.add(&Action{kind: driver.Create, doc: doc, transaction: true})
}

// Replace adds an action that replaces a document to the given ActionList, and
// returns the ActionList. The key fields of the doc argument must be set. The
// document must already exist; an error with code NotFound is returned if it does
Expand All @@ -185,6 +190,10 @@ func (l *ActionList) Replace(doc Document) *ActionList {
return l.add(&Action{kind: driver.Replace, doc: doc})
}

func (l *ActionList) ReplaceTx(doc Document) *ActionList {
return l.add(&Action{kind: driver.Replace, doc: doc, transaction: true})
}

// Put adds an action that adds or replaces a document to the given ActionList, and returns the ActionList.
// The key fields must be set.
//
Expand All @@ -198,6 +207,10 @@ func (l *ActionList) Put(doc Document) *ActionList {
return l.add(&Action{kind: driver.Put, doc: doc})
}

func (l *ActionList) PutTx(doc Document) *ActionList {
return l.add(&Action{kind: driver.Put, doc: doc, transaction: true})
}

// Delete adds an action that deletes a document to the given ActionList, and returns
// the ActionList. Only the key and revision fields of doc are used.
// See the Revisions section of the package documentation for how revisions are
Expand All @@ -213,6 +226,10 @@ func (l *ActionList) Delete(doc Document) *ActionList {
return l.add(&Action{kind: driver.Delete, doc: doc})
}

func (l *ActionList) DeleteTx(doc Document) *ActionList {
return l.add(&Action{kind: driver.Delete, doc: doc, transaction: true})
}

// Get adds an action that retrieves a document to the given ActionList, and
// returns the ActionList.
// Only the key fields of doc are used.
Expand Down Expand Up @@ -258,6 +275,15 @@ func (l *ActionList) Update(doc Document, mods Mods) *ActionList {
})
}

func (l *ActionList) UpdateTx(doc Document, mods Mods) *ActionList {
return l.add(&Action{
kind: driver.Update,
doc: doc,
mods: mods,
transaction: true,
})
}

// Mods is a map from field paths to modifications.
// At present, a modification is one of:
// - nil, to delete the field
Expand Down Expand Up @@ -430,7 +456,7 @@ func (c *Collection) toDriverAction(a *Action) (*driver.Action, error) {
// A Put with a revision field is equivalent to a Replace.
kind = driver.Replace
}
d := &driver.Action{Kind: kind, Doc: ddoc, Key: key}
d := &driver.Action{Kind: kind, Doc: ddoc, Key: key, Transaction: a.transaction}
if a.fieldpaths != nil {
d.FieldPaths, err = parseFieldPaths(a.fieldpaths)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions docstore/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,14 @@ const (

//go:generate stringer -type=ActionKind

// An Action describes a single operation on a single document.
type Action struct {
Kind ActionKind // the kind of action
Doc Document // the document on which to perform the action
Key interface{} // the document key returned by Collection.Key, to avoid recomputing it
FieldPaths [][]string // field paths to retrieve, for Get only
Mods []Mod // modifications to make, for Update only
Index int // the index of the action in the original action list
Kind ActionKind // the kind of action
Doc Document // the document on which to perform the action
Key interface{} // the document key returned by Collection.Key, to avoid recomputing it
FieldPaths [][]string // field paths to retrieve, for Get only
Mods []Mod // modifications to make, for Update only
Index int // the index of the action in the original action list
Transaction bool // if this action is a part of transaction
}

// A Mod is a modification to a field path in a document.
Expand Down
22 changes: 18 additions & 4 deletions docstore/driver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ func SplitActions(actions []*Action, split func(a, b *Action) bool) [][]*Action

// GroupActions separates actions into four sets: writes, gets that must happen before the writes,
// gets that must happen after the writes, and gets that can happen concurrently with the writes.
func GroupActions(actions []*Action) (beforeGets, getList, writeList, afterGets []*Action) {
func GroupActions(actions []*Action) (beforeGets, getList, writeList, writesTxList, afterGets []*Action) {
// maps from key to action
bgets := map[interface{}]*Action{}
agets := map[interface{}]*Action{}
cgets := map[interface{}]*Action{}
writes := map[interface{}]*Action{}
writesTx := map[interface{}]*Action{}
var nilkeys []*Action
for _, a := range actions {
if a.Key == nil {
Expand All @@ -69,7 +70,7 @@ func GroupActions(actions []*Action) (beforeGets, getList, writeList, afterGets
} else if a.Kind == Get {
// If there was a prior write with this key, make sure this get
// happens after the writes.
if _, ok := writes[a.Key]; ok {
if valueExistsInAllMaps(a.Key, writes, writesTx) {
agets[a.Key] = a
} else {
cgets[a.Key] = a
Expand All @@ -81,7 +82,11 @@ func GroupActions(actions []*Action) (beforeGets, getList, writeList, afterGets
delete(cgets, a.Key)
bgets[a.Key] = g
}
writes[a.Key] = a
if a.Transaction {
writesTx[a.Key] = a
} else {
writes[a.Key] = a
}
}
}

Expand All @@ -95,7 +100,16 @@ func GroupActions(actions []*Action) (beforeGets, getList, writeList, afterGets
return as
}

return vals(bgets), vals(cgets), append(vals(writes), nilkeys...), vals(agets)
return vals(bgets), vals(cgets), append(vals(writes), nilkeys...), vals(writesTx), vals(agets)
}

func valueExistsInAllMaps(key interface{}, maps ...map[interface{}]*Action) bool {
for _, m := range maps {
if _, ok := m[key]; !ok {
return false
}
}
return true
}

// AsFunc creates and returns an "as function" that behaves as follows:
Expand Down

0 comments on commit e452b85

Please sign in to comment.