Skip to content

Commit

Permalink
Implement addBatchInDisk for big trees
Browse files Browse the repository at this point in the history
Implement addBatchInDisk for big trees, which does not puts the tree in
memory, and works directly over the db data, parallelizing for each CPU.
  • Loading branch information
arnaucube committed Oct 25, 2021
1 parent 3f7e769 commit 6cf1e58
Show file tree
Hide file tree
Showing 10 changed files with 659 additions and 104 deletions.
177 changes: 119 additions & 58 deletions addbatch_test.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion circomproofs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"testing"

qt "github.com/frankban/quicktest"
"go.vocdoni.io/dvote/db"
"go.vocdoni.io/dvote/db/badgerdb"
)

func TestCircomVerifierProof(t *testing.T) {
c := qt.New(t)
database, err := badgerdb.New(badgerdb.Options{Path: c.TempDir()})
database, err := badgerdb.New(db.Options{Path: c.TempDir()})
c.Assert(err, qt.IsNil)
tree, err := NewTree(database, 4, HashFunctionPoseidon)
c.Assert(err, qt.IsNil)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ go 1.16
require (
github.com/frankban/quicktest v1.13.0
github.com/iden3/go-iden3-crypto v0.0.6-0.20210308142348-8f85683b2cef
go.vocdoni.io/dvote v1.0.4-0.20210806163627-9494efbc5382
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
go.vocdoni.io/dvote v1.0.4-0.20211025120558-83c64f440044
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63
)
293 changes: 288 additions & 5 deletions go.sum

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

qt "github.com/frankban/quicktest"
"go.vocdoni.io/dvote/db"
"go.vocdoni.io/dvote/db/badgerdb"
)

Expand Down Expand Up @@ -87,12 +88,12 @@ func TestReadTreeDBG(t *testing.T) {

c := qt.New(t)

database1, err := badgerdb.New(badgerdb.Options{Path: c.TempDir()})
database1, err := badgerdb.New(db.Options{Path: c.TempDir()})
c.Assert(err, qt.IsNil)
tree1, err := NewTree(database1, 100, HashFunctionBlake2b)
c.Assert(err, qt.IsNil)

database2, err := badgerdb.New(badgerdb.Options{Path: c.TempDir()})
database2, err := badgerdb.New(db.Options{Path: c.TempDir()})
c.Assert(err, qt.IsNil)
tree2, err := NewTree(database2, 100, HashFunctionBlake2b)
c.Assert(err, qt.IsNil)
Expand Down
3 changes: 2 additions & 1 deletion testvectors/circom/go-data-generator/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (

qt "github.com/frankban/quicktest"
"github.com/vocdoni/arbo"
"go.vocdoni.io/dvote/db"
"go.vocdoni.io/dvote/db/badgerdb"
)

func TestGenerator(t *testing.T) {
c := qt.New(t)
database, err := badgerdb.New(badgerdb.Options{Path: c.TempDir()})
database, err := badgerdb.New(db.Options{Path: c.TempDir()})
c.Assert(err, qt.IsNil)
tree, err := arbo.NewTree(database, 4, arbo.HashFunctionPoseidon)
c.Assert(err, qt.IsNil)
Expand Down
207 changes: 202 additions & 5 deletions tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"io"
"math"
"runtime"
"sync"

"go.vocdoni.io/dvote/db"
Expand Down Expand Up @@ -45,6 +46,12 @@ const (
)

var (
// thresholdNLeafs defines the threshold number of leafs in the tree
// that determines if AddBatch will work in memory or in disk. Is
// defined as a var in order to have the ability to modify it for
// testing purposes.
thresholdNLeafs = 1024 // TODO define a reasonable value

dbKeyRoot = []byte("root")
dbKeyNLeafs = []byte("nleafs")
emptyValue = []byte{0}
Expand Down Expand Up @@ -197,11 +204,6 @@ func (t *Tree) AddBatchWithTx(wTx db.WriteTx, keys, values [][]byte) ([]Invalid,
return nil, ErrSnapshotNotEditable
}

vt, err := t.loadVT(wTx)
if err != nil {
return nil, err
}

e := []byte{}
// equal the number of keys & values
if len(keys) > len(values) {
Expand All @@ -214,6 +216,201 @@ func (t *Tree) AddBatchWithTx(wTx db.WriteTx, keys, values [][]byte) ([]Invalid,
values = values[:len(keys)]
}

nLeafs, err := t.GetNLeafsWithTx(wTx)
if err != nil {
return nil, err
}
if nLeafs > thresholdNLeafs {
return t.addBatchInDisk(wTx, keys, values)
}
return t.addBatchInMemory(wTx, keys, values)
}

func (t *Tree) addBatchInDisk(wTx db.WriteTx, keys, values [][]byte) ([]Invalid, error) {
nCPU := flp2(runtime.NumCPU())
if nCPU == 1 || len(keys) < nCPU {
var invalids []Invalid
for i := 0; i < len(keys); i++ {
if err := t.AddWithTx(wTx, keys[i], values[i]); err != nil {
invalids = append(invalids, Invalid{i, err})
}
}
return invalids, nil
}

kvs, invalids, err := keysValuesToKvs(t.maxLevels, keys, values)
if err != nil {
return nil, err
}

buckets := splitInBuckets(kvs, nCPU)

root, err := t.RootWithTx(wTx)
if err != nil {
return nil, err
}

l := int(math.Log2(float64(nCPU)))
subRoots, err := t.getSubRootsAtLevel(wTx, root, l+1)
if err != nil {
return nil, err
}
if len(subRoots) != nCPU {
// Already populated Tree but Unbalanced.

// add one key at each bucket, and then continue with the flow
for i := 0; i < len(buckets); i++ {
// add one leaf of the bucket, if there is an error when
// adding the k-v, try to add the next one of the bucket
// (until one is added)
inserted := -1
for j := 0; j < len(buckets[i]); j++ {
if newRoot, err := t.add(wTx, root, 0, buckets[i][j].k, buckets[i][j].v); err == nil {
inserted = j
root = newRoot
break
}
}

// remove the inserted element from buckets[i]
if inserted != -1 {
buckets[i] = append(buckets[i][:inserted], buckets[i][inserted+1:]...)
}
}
subRoots, err = t.getSubRootsAtLevel(wTx, root, l+1)
if err != nil {
return nil, err
}
}

if len(subRoots) != nCPU {
return nil, fmt.Errorf("This error should not be reached."+
" len(subRoots) != nCPU, len(subRoots)=%d, nCPU=%d."+
" Please report it in a new issue:"+
" https://github.com/vocdoni/arbo/issues/new", len(subRoots), nCPU)
}

invalidsInBucket := make([][]Invalid, nCPU)
txs := make([]db.WriteTx, nCPU)
for i := 0; i < nCPU; i++ {
txs[i] = t.db.WriteTx()
err := txs[i].Apply(wTx)
if err != nil {
return nil, err
}
}

var wg sync.WaitGroup
wg.Add(nCPU)
for i := 0; i < nCPU; i++ {
go func(cpu int) {
// use different wTx for each cpu, after once all
// are done, iter over the cpuWTxs and copy their
// content into the main wTx
for j := 0; j < len(buckets[cpu]); j++ {
subRoots[cpu], err = t.add(txs[cpu], subRoots[cpu], l, buckets[cpu][j].k, buckets[cpu][j].v)
if err != nil {
invalidsInBucket[cpu] = append(invalidsInBucket[cpu], Invalid{buckets[cpu][j].pos, err})
}
}
wg.Done()
}(i)
}
wg.Wait()

for i := 0; i < nCPU; i++ {
if err := wTx.Apply(txs[i]); err != nil {
return nil, err
}
txs[i].Discard()
}

for i := 0; i < len(invalidsInBucket); i++ {
invalids = append(invalids, invalidsInBucket[i]...)
}

newRoot, err := t.upFromSubRoots(wTx, subRoots)
if err != nil {
return nil, err
}

// update dbKeyNLeafs
if err := t.SetRootWithTx(wTx, newRoot); err != nil {
return nil, err
}

// update nLeafs
if err := t.incNLeafs(wTx, len(keys)-len(invalids)); err != nil {
return nil, err
}

return invalids, nil
}
func (t *Tree) upFromSubRoots(wTx db.WriteTx, subRoots [][]byte) ([]byte, error) {
// is a method of Tree just to get access to t.hashFunction and
// t.emptyHash.

// go up from subRoots to up, storing nodes in the given WriteTx
// once up at the root, store it in the WriteTx using the dbKeyRoot
if len(subRoots) == 1 {
return subRoots[0], nil
}

var newSubRoots [][]byte
// TODO store the nodes key-values into the wTx
for i := 0; i < len(subRoots); i += 2 {
if bytes.Equal(subRoots[i], t.emptyHash) && bytes.Equal(subRoots[i+1], t.emptyHash) {
// TODO: || (ns[i].typ() == vtLeaf && ns[i+1].typ() == vtEmpty) {

// when both sub nodes are empty, the parent is also empty
// or
// (TODO WIP) when 1st sub node is a leaf but the 2nd is empty, the
// leaf is used as parent

newSubRoots = append(newSubRoots, subRoots[i])
continue
}
// TODO if ns[i].typ() == vtEmpty && ns[i+1].typ() == vtLeaf {
// when 2nd sub node is a leaf but the 1st is empty, the
// leaf is used as 'parent'

k, v, err := t.newIntermediate(subRoots[i], subRoots[i+1])
if err != nil {
return nil, err
}
// store k-v to db
if err = wTx.Set(k, v); err != nil {
return nil, err
}
newSubRoots = append(newSubRoots, k)
}

return t.upFromSubRoots(wTx, newSubRoots)
}
func (t *Tree) getSubRootsAtLevel(rTx db.ReadTx, root []byte, l int) ([][]byte, error) {
// go at level l and return each node key, where each node key is the
// subRoot of the subTree that starts there

var subRoots [][]byte
err := t.iterWithStop(rTx, root, 0, func(currLvl int, k, v []byte) bool {
if currLvl == l && !bytes.Equal(k, t.emptyHash) {
subRoots = append(subRoots, k)
}
if currLvl >= l {
return true // to stop the iter from going down
}
return false
})

return subRoots, err
}

func (t *Tree) addBatchInMemory(wTx db.WriteTx, keys, values [][]byte) ([]Invalid, error) {
vt, err := t.loadVT(wTx)
if err != nil {
return nil, err
}

invalids, err := vt.addBatch(keys, values)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 6cf1e58

Please sign in to comment.