Skip to content

Commit

Permalink
statesync: more improvements, and enable Snapshots and StateSync by d…
Browse files Browse the repository at this point in the history
…efault

 * SnapshotInterval=10000 by default and prune old snapshots (keepRecent=10)
 * StateSyncEnabled=true
 * snapshot: fix several log lines
 * vochain: don't warn on 'height X is not available', common error after StateSync
 * ci: show logs of gatewaySync in case of failed test_statesync
 * indexer: wait until sync is finished before ImportArchive
 * log: show comet logs during statesync progress
  • Loading branch information
altergui committed Mar 12, 2024
1 parent 58afd32 commit 63753a7
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ docker compose down

#### Connecting

Once the node has finished the blockchain fast sync process, you can connect query the API:
Once the node has finished the blockchain sync process, you can connect query the API:

`$ curl http://127.0.0.1:9090/v2/chain/info`

Expand Down
8 changes: 4 additions & 4 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ func loadConfig() *config.Config {
"do not wait for Vochain to synchronize (for testing only)")
flag.Int("vochainMempoolSize", 20000,
"vochain mempool size")
flag.Int("vochainSnapshotInterval", 0,
flag.Int("vochainSnapshotInterval", 10000,
"create state snapshot every N blocks (0 to disable)")
flag.Bool("vochainStateSyncEnabled", false,
flag.Bool("vochainStateSyncEnabled", true,
"during startup, let cometBFT ask peers for available snapshots and use them to bootstrap the state")
flag.StringSlice("vochainStateSyncRPCServers", []string{},
"list of RPC servers to bootstrap the StateSync (optional, defaults to using seeds)")
Expand Down Expand Up @@ -522,8 +522,8 @@ func main() {
log.Fatal(err)
}
}
// start the service and block until finish fast sync
// State Sync (if enabled) also happens during this step
// start the service and block until finish sync:
// StateSync (if enabled) happens first, and then fastsync in all cases
if err := srv.Start(); err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 0 additions & 2 deletions dockerfiles/testsuite/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ services:
- VOCDONI_VOCHAIN_STATESYNCENABLED=true
- VOCDONI_VOCHAIN_STATESYNCRPCSERVERS=miner0:26657,miner0:26657
- VOCDONI_VOCHAIN_STATESYNCFETCHPARAMSFROMAPI
profiles:
- statesync

networks:
blockchain:
Expand Down
1 change: 1 addition & 0 deletions dockerfiles/testsuite/env.gateway0
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ VOCDONI_METRICS_REFRESHINTERVAL=5
VOCDONI_CHAIN=dev
VOCDONI_SIGNINGKEY=e0f1412b86d6ca9f2b318f1d243ef50be23d315a2e6c1c3035bc72d44c8b2f90 # 0x88a499cEf9D1330111b41360173967c9C1bf703f
VOCDONI_ARCHIVEURL=none
VOCDONI_VOCHAIN_STATESYNCENABLED=false
1 change: 1 addition & 0 deletions dockerfiles/testsuite/env.miner0
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ VOCDONI_VOCHAIN_MINERKEY=cda909c34901c137e12bb7d0afbcb9d1c8abc66f03862a42344b1f5
VOCDONI_METRICS_ENABLED=True
VOCDONI_METRICS_REFRESHINTERVAL=5
VOCDONI_VOCHAIN_MINERTARGETBLOCKTIMESECONDS=6
VOCDONI_VOCHAIN_STATESYNCENABLED=false
1 change: 1 addition & 0 deletions dockerfiles/testsuite/env.miner1
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ VOCDONI_METRICS_ENABLED=True
VOCDONI_METRICS_REFRESHINTERVAL=5
VOCDONI_VOCHAIN_MINERTARGETBLOCKTIMESECONDS=6
VOCDONI_VOCHAIN_LOGLEVEL=error
VOCDONI_VOCHAIN_STATESYNCENABLED=false
1 change: 1 addition & 0 deletions dockerfiles/testsuite/env.miner2
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ VOCDONI_VOCHAIN_MINERKEY=e06976e5eaf3f147e12763eb140e3d5c2ed16a6fa747d787d8b92ca
VOCDONI_METRICS_ENABLED=True
VOCDONI_METRICS_REFRESHINTERVAL=5
VOCDONI_VOCHAIN_MINERTARGETBLOCKTIMESECONDS=6
VOCDONI_VOCHAIN_STATESYNCENABLED=false
1 change: 1 addition & 0 deletions dockerfiles/testsuite/env.miner3
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ VOCDONI_VOCHAIN_MINERKEY=b8d258559adee836a43e964badf541ec106cc68a01f989d3c3c9a03
VOCDONI_METRICS_ENABLED=True
VOCDONI_METRICS_REFRESHINTERVAL=5
VOCDONI_VOCHAIN_MINERTARGETBLOCKTIMESECONDS=6
VOCDONI_VOCHAIN_STATESYNCENABLED=false
1 change: 1 addition & 0 deletions dockerfiles/testsuite/env.seed
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ VOCDONI_VOCHAIN_GENESIS=/app/misc/genesis.json
VOCDONI_VOCHAIN_NODEKEY=0x2060e20d1f0894d6b23901bce3f20f26107baf0335451ad75ef27b14e4fc56ae050a65ae3883c379b70d811d6e12db2fe1e3a5cf0cae4d03dbbbfebc68601bdd
VOCDONI_METRICS_ENABLED=True
VOCDONI_METRICS_REFRESHINTERVAL=5
VOCDONI_VOCHAIN_STATESYNCENABLED=false
6 changes: 3 additions & 3 deletions dockerfiles/testsuite/start_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ e2etest_ballotelection() {

test_statesync() {
export VOCDONI_VOCHAIN_STATESYNCFETCHPARAMSFROMAPI=$APIHOST
$COMPOSE_CMD --profile statesync up gatewaySync -d
$COMPOSE_CMD up gatewaySync -d
# watch logs for 2 minutes, until catching 'startup complete'. in case of timeout, or panic, or whatever, test will fail
timeout 120 sh -c "($COMPOSE_CMD logs gatewaySync -f | grep -m 1 'startup complete')"
}
Expand All @@ -175,7 +175,7 @@ log "### Starting test suite ###"
}
$COMPOSE_CMD up -d seed # start the seed first so the nodes can properly bootstrap
sleep 10
$COMPOSE_CMD up -d
$COMPOSE_CMD up -d miner0 miner1 miner2 miner3 gateway0

check_gw_is_up() {
height=$($COMPOSE_CMD_RUN test \
Expand Down Expand Up @@ -263,7 +263,7 @@ if $COMPOSE_CMD logs | grep -q "CONSENSUS FAILURE" ; then RET=3 ; log "### CONSE

[ $CLEAN -eq 1 ] && {
log "### Cleaning docker environment ###"
$COMPOSE_CMD --profile statesync down -v --remove-orphans
$COMPOSE_CMD down -v --remove-orphans
}

if [ -n "$failed" ]; then
Expand Down
9 changes: 9 additions & 0 deletions service/vochain.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ func (vs *VocdoniService) Start() error {

if !vs.Config.NoWaitSync || vs.Config.StateSyncEnabled {
log.Infof("waiting for vochain to synchronize")
if vs.Config.StateSyncEnabled {
log.Infof("temporarily setting comet loglevel to info, to see statesync progress")
oldTenderLogLevel := log.CometLogLevel()
log.SetCometLogLevel("info")
defer func() {
log.Infof("setting comet loglevel back to %q", oldTenderLogLevel)
log.SetCometLogLevel(oldTenderLogLevel)
}()
}
timeSyncStarted := time.Now()
func() {
lastHeight := uint64(0)
Expand Down
75 changes: 70 additions & 5 deletions snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"fmt"
"hash"
"io"
"io/fs"
"math"
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"

Expand All @@ -26,6 +28,8 @@ const (
snapshotHeaderLenSize = 32
)

const snapshotKeepNRecent = 10

const (
snapshotBlobType_Tree = iota
snapshotBlobType_NoStateDB
Expand Down Expand Up @@ -233,7 +237,11 @@ func (s *Snapshot) Finish() error {
// or handle removing the directory, in case the returned err != nil
// It also restores the IndexerDB into dataDir/indexer (hardcoded)
func (s *Snapshot) Restore(dbType, dataDir string) (string, error) {
log.Infof("installing snapshot %+v into dir %s using dbType %s", s.Header(), dataDir, dbType)
log.Infow("installing snapshot",
"dataDir", dataDir, "dbType", dbType,
"version", s.Header().Version, "blobs", len(s.Header().Blobs),
"chainID", s.Header().ChainID, "height", s.Header().Height,
"root", fmt.Sprintf("%x", s.Header().Root))

tmpDir, err := os.MkdirTemp(dataDir, "newState")
if err != nil {
Expand Down Expand Up @@ -319,6 +327,7 @@ func NewManager(dataDir string, chunkSize int64) (*SnapshotManager, error) {

// Do performs a snapshot of the last committed state for all trees and dbs.
// The snapshot is stored in disk and the file path is returned.
// If the snapshot finishes successfully, it will trigger a prune of old snapshots from dataDir
func (sm *SnapshotManager) Do(v *state.State) (string, error) {
height, err := v.LastHeight()
if err != nil {
Expand All @@ -339,11 +348,17 @@ func (sm *SnapshotManager) Do(v *state.State) (string, error) {
snap.SetHeight(height)
snap.SetChainID(v.ChainID())

logLastDumpedBlob := func() {
b := snap.header.Blobs[len(snap.header.Blobs)-1]
log.Debugw("dumped blob", "index", len(snap.header.Blobs)-1, "type", b.Type, "name", b.Name, "size", b.Size,
"parent", b.Parent, "key", fmt.Sprintf("%x", b.Key), "root", fmt.Sprintf("%x", b.Root))
}

// NoStateDB
if err := snap.DumpNoStateDB(v); err != nil {
return "", err
}
log.Debugf("dumped blob %d: %+v", len(snap.header.Blobs)-1, snap.header.Blobs[len(snap.header.Blobs)-1])
logLastDumpedBlob()

// State
list, err := v.DeepListStateTrees()
Expand All @@ -354,18 +369,29 @@ func (sm *SnapshotManager) Do(v *state.State) (string, error) {
if err := snap.DumpTree(treedesc.Name, treedesc.Parent, treedesc.Key, treedesc.Tree); err != nil {
return "", err
}
log.Debugf("dumped blob %d: %+v", len(snap.header.Blobs)-1, snap.header.Blobs[len(snap.header.Blobs)-1])
logLastDumpedBlob()
}

// Indexer
if FnExportIndexer() != nil {
if err := snap.DumpIndexer(FnExportIndexer()); err != nil {
return "", err
}
log.Debugf("dumped blob %d: %+v", len(snap.header.Blobs)-1, snap.header.Blobs[len(snap.header.Blobs)-1])
logLastDumpedBlob()
}

return snap.Path(), snap.Finish()
if err := snap.Finish(); err != nil {
return "", fmt.Errorf("couldn't finish snapshot: %w", err)
}

// Prune old snapshots
defer func() {
if err := sm.Prune(snapshotKeepNRecent); err != nil {
log.Warnf("couldn't prune snapshots: %s", err)
}
}()

return snap.Path(), nil
}

// New starts the creation of a new snapshot as a disk file.
Expand Down Expand Up @@ -418,6 +444,45 @@ func (*SnapshotManager) Open(filePath string) (*Snapshot, error) {
return s, nil
}

// Prune removes old snapshots stored on disk, keeping the N most recent ones
func (sm *SnapshotManager) Prune(keepRecent int) error {
files, err := os.ReadDir(sm.dataDir)
if err != nil {
return fmt.Errorf("cannot read dataDir: %w", err)
}

// Convert fs.DirEntry to FileInfo and filter out directories.
var fileInfos []fs.FileInfo
for _, file := range files {
if file.IsDir() {
continue // Skip directories
}
info, err := file.Info()
if err != nil {
return fmt.Errorf("cannot read file %s: %w", file.Name(), err)
}
fileInfos = append(fileInfos, info)
}

// Sort files by modification time, newest first.
sort.Slice(fileInfos, func(i, j int) bool {
return fileInfos[i].ModTime().After(fileInfos[j].ModTime())
})

// Determine which files to delete.
if len(fileInfos) > keepRecent {
// Delete the older files.
for _, file := range fileInfos[keepRecent:] {
err := os.Remove(filepath.Join(sm.dataDir, file.Name()))
if err != nil {
return fmt.Errorf("cannot delete file %s: %w", file.Name(), err)
}
log.Debugf("pruned old snapshot %s", file.Name())
}
}
return nil
}

// List returns the list of the current snapshots stored on disk, indexed by height
func (sm *SnapshotManager) List() map[uint32]DiskSnapshotInfo {
files, err := os.ReadDir(sm.dataDir)
Expand Down
6 changes: 6 additions & 0 deletions vochain/appsetup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package vochain
import (
"context"
"fmt"
"strings"

cometcli "github.com/cometbft/cometbft/rpc/client/local"
cometcoretypes "github.com/cometbft/cometbft/rpc/core/types"
Expand Down Expand Up @@ -52,6 +53,11 @@ func (app *BaseApplication) SetDefaultMethods() {
app.SetFnGetBlockByHeight(func(height int64) *comettypes.Block {
resblock, err := app.NodeClient.Block(context.Background(), &height)
if err != nil {
if strings.Contains(err.Error(), "not available, lowest height is") {
// this error is frequent (and expected) after StateSync, log as Debug instead of Warn
log.Debugf("cannot fetch block by height: %v", err)
return nil
}
log.Warnf("cannot fetch block by height: %v", err)
return nil
}
Expand Down
7 changes: 6 additions & 1 deletion vochain/indexer/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ func (idx *Indexer) StartArchiveRetrieval(storage *downloader.Downloader, archiv
archive = append(archive, &p)
}

log.Debugw("archive processes unmarshaled", "processes", len(archive))
log.Debugw("archive processes unmarshaled, will wait until sync is finished and then import", "processes", len(archive))

<-idx.App.WaitUntilSynced()

log.Infof("running archive import after-sync")

added, err := idx.ImportArchive(archive)
if err != nil {
log.Warnw("cannot import archive", "err", err)
Expand Down

0 comments on commit 63753a7

Please sign in to comment.