@@ -37,10 +37,6 @@ const (
3737 defaultStorageVersionValue = "1.0.0"
3838 fastStorageVersionValue = "1.1.0"
3939 fastNodeCacheSize = 100000
40-
41- // This is used to avoid the case which pruning blocks the main process.
42- deleteBatchCount = 1000
43- deletePauseDuration = 100 * time .Millisecond
4440)
4541
4642var (
@@ -86,9 +82,12 @@ type nodeDB struct {
8682 storageVersion string // Storage version
8783 firstVersion int64 // First version of nodeDB.
8884 latestVersion int64 // Latest version of nodeDB.
85+ pruneVersion int64 // Version to prune up to.
8986 legacyLatestVersion int64 // Latest version of nodeDB in legacy format.
9087 nodeCache cache.Cache // Cache for nodes in the regular tree that consists of key-value pairs at any version.
9188 fastNodeCache cache.Cache // Cache for nodes in the fast index that represents only key-value pairs at the latest version.
89+ isCommitting bool // Flag to indicate that the nodeDB is committing.
90+ chCommitting chan struct {} // Channel to signal that the committing is done.
9291}
9392
9493func newNodeDB (db dbm.DB , cacheSize int , opts Options , lg Logger ) * nodeDB {
@@ -98,19 +97,27 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB {
9897 storeVersion = []byte (defaultStorageVersionValue )
9998 }
10099
101- return & nodeDB {
100+ ndb := & nodeDB {
102101 logger : lg ,
103102 db : db ,
104103 batch : NewBatchWithFlusher (db , opts .FlushThreshold ),
105104 opts : opts ,
106105 firstVersion : 0 ,
107106 latestVersion : 0 , // initially invalid
108107 legacyLatestVersion : 0 ,
108+ pruneVersion : 0 ,
109109 nodeCache : cache .New (cacheSize ),
110110 fastNodeCache : cache .New (fastNodeCacheSize ),
111111 versionReaders : make (map [int64 ]uint32 , 8 ),
112112 storageVersion : string (storeVersion ),
113+ chCommitting : make (chan struct {}, 1 ),
114+ }
115+
116+ if opts .AsyncPruning {
117+ go ndb .startPruning ()
113118 }
119+
120+ return ndb
114121}
115122
116123// GetNode gets a node from memory or disk. If it is an inner node, it does not
@@ -243,6 +250,33 @@ func (ndb *nodeDB) SaveFastNodeNoCache(node *fastnode.Node) error {
243250 return ndb .saveFastNodeUnlocked (node , false )
244251}
245252
253+ // SetCommitting sets the committing flag to true.
254+ // This is used to let the pruning process know that the nodeDB is committing.
255+ func (ndb * nodeDB ) SetCommitting () {
256+ for len (ndb .chCommitting ) > 0 {
257+ <- ndb .chCommitting
258+ }
259+ ndb .mtx .Lock ()
260+ defer ndb .mtx .Unlock ()
261+ ndb .isCommitting = true
262+ }
263+
264+ // UnsetCommitting sets the committing flag to false.
265+ // This is used to let the pruning process know that the nodeDB is done committing.
266+ func (ndb * nodeDB ) UnsetCommitting () {
267+ ndb .mtx .Lock ()
268+ ndb .isCommitting = false
269+ ndb .mtx .Unlock ()
270+ ndb .chCommitting <- struct {}{}
271+ }
272+
273+ // IsCommitting returns true if the nodeDB is committing, false otherwise.
274+ func (ndb * nodeDB ) IsCommitting () bool {
275+ ndb .mtx .Lock ()
276+ defer ndb .mtx .Unlock ()
277+ return ndb .isCommitting
278+ }
279+
246280// SetFastStorageVersionToBatch sets storage version to fast where the version is
247281// 1.1.0-<version of the current live state>. Returns error if storage version is incorrect or on
248282// db error, nil otherwise. Requires changes to be committed after to be persisted.
@@ -330,6 +364,37 @@ func (ndb *nodeDB) Has(nk []byte) (bool, error) {
330364 return ndb .db .Has (ndb .nodeKey (nk ))
331365}
332366
367+ // deleteFromPruning deletes the orphan nodes from the pruning process.
368+ func (ndb * nodeDB ) deleteFromPruning (key []byte ) error {
369+ if ndb .IsCommitting () {
370+ // if the nodeDB is committing, the pruning process will be done after the committing.
371+ <- ndb .chCommitting
372+ }
373+
374+ ndb .mtx .Lock ()
375+ defer ndb .mtx .Unlock ()
376+ return ndb .batch .Delete (key )
377+ }
378+
379+ // saveNodeFromPruning saves the orphan nodes to the pruning process.
380+ func (ndb * nodeDB ) saveNodeFromPruning (node * Node ) error {
381+ if ndb .IsCommitting () {
382+ // if the nodeDB is committing, the pruning process will be done after the committing.
383+ <- ndb .chCommitting
384+ }
385+
386+ ndb .mtx .Lock ()
387+ defer ndb .mtx .Unlock ()
388+
389+ // Save node bytes to db.
390+ var buf bytes.Buffer
391+ buf .Grow (node .encodedSize ())
392+ if err := node .writeBytes (& buf ); err != nil {
393+ return err
394+ }
395+ return ndb .batch .Set (ndb .nodeKey (node .GetKey ()), buf .Bytes ())
396+ }
397+
333398// deleteVersion deletes a tree version from disk.
334399// deletes orphans
335400func (ndb * nodeDB ) deleteVersion (version int64 ) error {
@@ -342,7 +407,7 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
342407 if orphan .nodeKey .nonce == 0 && ! orphan .isLegacy {
343408 // if the orphan is a reformatted root, it can be a legacy root
344409 // so it should be removed from the pruning process.
345- if err := ndb .batch . Delete (ndb .legacyNodeKey (orphan .hash )); err != nil {
410+ if err := ndb .deleteFromPruning (ndb .legacyNodeKey (orphan .hash )); err != nil {
346411 return err
347412 }
348413 }
@@ -354,9 +419,9 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
354419 }
355420 nk := orphan .GetKey ()
356421 if orphan .isLegacy {
357- return ndb .batch . Delete (ndb .legacyNodeKey (nk ))
422+ return ndb .deleteFromPruning (ndb .legacyNodeKey (nk ))
358423 }
359- return ndb .batch . Delete (ndb .nodeKey (nk ))
424+ return ndb .deleteFromPruning (ndb .nodeKey (nk ))
360425 }); err != nil {
361426 return err
362427 }
@@ -365,7 +430,7 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
365430 if rootKey == nil || ! bytes .Equal (rootKey , literalRootKey ) {
366431 // if the root key is not matched with the literal root key, it means the given root
367432 // is a reference root to the previous version.
368- if err := ndb .batch . Delete (ndb .nodeKey (literalRootKey )); err != nil {
433+ if err := ndb .deleteFromPruning (ndb .nodeKey (literalRootKey )); err != nil {
369434 return err
370435 }
371436 }
@@ -381,12 +446,12 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
381446 return err
382447 }
383448 // ensure that the given version is not included in the root search
384- if err := ndb .batch . Delete (ndb .nodeKey (literalRootKey )); err != nil {
449+ if err := ndb .deleteFromPruning (ndb .nodeKey (literalRootKey )); err != nil {
385450 return err
386451 }
387452 // instead, the root should be reformatted to (version, 0)
388453 root .nodeKey .nonce = 0
389- if err := ndb .SaveNode (root ); err != nil {
454+ if err := ndb .saveNodeFromPruning (root ); err != nil {
390455 return err
391456 }
392457 }
@@ -420,36 +485,30 @@ func (ndb *nodeDB) deleteLegacyNodes(version int64, nk []byte) error {
420485
421486// deleteLegacyVersions deletes all legacy versions from disk.
422487func (ndb * nodeDB ) deleteLegacyVersions (legacyLatestVersion int64 ) error {
423- count := 0
424-
425- checkDeletePause := func () {
426- count ++
427- if count % deleteBatchCount == 0 {
428- time .Sleep (deletePauseDuration )
429- count = 0
430- }
488+ // Delete the last version for the legacyLastVersion
489+ if err := ndb .traverseOrphans (legacyLatestVersion , legacyLatestVersion + 1 , func (orphan * Node ) error {
490+ return ndb .deleteFromPruning (ndb .legacyNodeKey (orphan .hash ))
491+ }); err != nil {
492+ return err
431493 }
432494
433495 // Delete orphans for all legacy versions
434496 if err := ndb .traversePrefix (legacyOrphanKeyFormat .Key (), func (key , value []byte ) error {
435- checkDeletePause ()
436- if err := ndb .batch .Delete (key ); err != nil {
497+ if err := ndb .deleteFromPruning (key ); err != nil {
437498 return err
438499 }
439500 var fromVersion , toVersion int64
440501 legacyOrphanKeyFormat .Scan (key , & toVersion , & fromVersion )
441502 if (fromVersion <= legacyLatestVersion && toVersion < legacyLatestVersion ) || fromVersion > legacyLatestVersion {
442- checkDeletePause ()
443- return ndb .batch .Delete (ndb .legacyNodeKey (value ))
503+ return ndb .deleteFromPruning (ndb .legacyNodeKey (value ))
444504 }
445505 return nil
446506 }); err != nil {
447507 return err
448508 }
449509 // Delete all legacy roots
450510 if err := ndb .traversePrefix (legacyRootKeyFormat .Key (), func (key , _ []byte ) error {
451- checkDeletePause ()
452- return ndb .batch .Delete (key )
511+ return ndb .deleteFromPruning (key )
453512 }); err != nil {
454513 return err
455514 }
@@ -515,8 +574,45 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error {
515574 return nil
516575}
517576
577+ // startPruning starts the pruning process.
578+ func (ndb * nodeDB ) startPruning () {
579+ for {
580+ ndb .mtx .Lock ()
581+ toVersion := ndb .pruneVersion
582+ ndb .mtx .Unlock ()
583+
584+ if toVersion == 0 {
585+ time .Sleep (100 * time .Millisecond )
586+ continue
587+ }
588+
589+ if err := ndb .deleteVersionsTo (toVersion ); err != nil {
590+ ndb .logger .Error ("Error while pruning" , "err" , err )
591+ time .Sleep (1 * time .Second )
592+ continue
593+ }
594+
595+ ndb .mtx .Lock ()
596+ if ndb .pruneVersion <= toVersion {
597+ ndb .pruneVersion = 0
598+ }
599+ ndb .mtx .Unlock ()
600+ }
601+ }
602+
518603// DeleteVersionsTo deletes the oldest versions up to the given version from disk.
519604func (ndb * nodeDB ) DeleteVersionsTo (toVersion int64 ) error {
605+ if ! ndb .opts .AsyncPruning {
606+ return ndb .deleteVersionsTo (toVersion )
607+ }
608+
609+ ndb .mtx .Lock ()
610+ defer ndb .mtx .Unlock ()
611+ ndb .pruneVersion = toVersion
612+ return nil
613+ }
614+
615+ func (ndb * nodeDB ) deleteVersionsTo (toVersion int64 ) error {
520616 legacyLatestVersion , err := ndb .getLegacyLatestVersion ()
521617 if err != nil {
522618 return err
@@ -553,20 +649,12 @@ func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error {
553649
554650 // Delete the legacy versions
555651 if legacyLatestVersion >= first {
556- // Delete the last version for the legacyLastVersion
557- if err := ndb .traverseOrphans (legacyLatestVersion , legacyLatestVersion + 1 , func (orphan * Node ) error {
558- return ndb .batch .Delete (ndb .legacyNodeKey (orphan .hash ))
559- }); err != nil {
560- return err
652+ if err := ndb .deleteLegacyVersions (legacyLatestVersion ); err != nil {
653+ ndb .logger .Error ("Error deleting legacy versions" , "err" , err )
561654 }
655+ first = legacyLatestVersion + 1
562656 // reset the legacy latest version forcibly to avoid multiple calls
563657 ndb .resetLegacyLatestVersion (- 1 )
564- go func () {
565- if err := ndb .deleteLegacyVersions (legacyLatestVersion ); err != nil {
566- ndb .logger .Error ("Error deleting legacy versions" , "err" , err )
567- }
568- }()
569- first = legacyLatestVersion + 1
570658 }
571659
572660 for version := first ; version <= toVersion ; version ++ {
0 commit comments