Skip to content

Commit

Permalink
indexer: reindex blocks and transactions after migrations
Browse files Browse the repository at this point in the history
indexer: new method ReindexBlocks

indexerdb: make CreateBlock and CreateTransaction an UPSERT
  • Loading branch information
altergui committed Sep 2, 2024
1 parent 6fd12ba commit 0fcf9c8
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 4 deletions.
14 changes: 14 additions & 0 deletions vochain/indexer/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,17 @@ func (idx *Indexer) BlockList(limit, offset int, chainID, hash, proposerAddress
}
return list, uint64(results[0].TotalCount), nil
}

// CountBlocks returns how many blocks are indexed.
func (idx *Indexer) CountBlocks() (uint64, error) {
results, err := idx.readOnlyQuery.SearchBlocks(context.TODO(), indexerdb.SearchBlocksParams{
Limit: 1,
})
if err != nil {
return 0, err
}
if len(results) == 0 {
return 0, nil
}
return uint64(results[0].TotalCount), nil
}
6 changes: 6 additions & 0 deletions vochain/indexer/db/blocks.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions vochain/indexer/db/transactions.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 92 additions & 0 deletions vochain/indexer/indexer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package indexer

import (
"bytes"
"context"
"database/sql"
"embed"
Expand Down Expand Up @@ -172,6 +173,12 @@ func (idx *Indexer) startDB() error {
}
goose.SetLogger(log.GooseLogger())
goose.SetBaseFS(embedMigrations)

if gooseMigrationsPending(idx.readWriteDB, "migrations") {
log.Info("indexer db needs migration, scheduling a reindex after sync")
go idx.ReindexBlocks(false)
}

if err := goose.Up(idx.readWriteDB, "migrations"); err != nil {
return fmt.Errorf("goose up: %w", err)
}
Expand Down Expand Up @@ -249,6 +256,27 @@ func (idx *Indexer) RestoreBackup(path string) error {
return nil
}

func gooseMigrationsPending(db *sql.DB, dir string) bool {
// Get the latest applied migration version
currentVersion, err := goose.GetDBVersion(db)
if err != nil {
log.Errorf("failed to get current database version: %v", err)
return false
}

// Collect migrations after the current version
migrations, err := goose.CollectMigrations(dir, currentVersion, goose.MaxVersion)
if err != nil {
if errors.Is(err, goose.ErrNoMigrationFiles) {
return false
}
log.Errorf("failed to collect migrations: %v", err)
return false
}

return len(migrations) > 0
}

// SaveBackup backs up the database to a file on disk.
// Note that writes to the database may be blocked until the backup finishes,
// and an error may occur if a file at path already exists.
Expand Down Expand Up @@ -402,6 +430,70 @@ func (idx *Indexer) AfterSyncBootstrap(inTest bool) {
log.Infof("live results recovery computation finished, took %s", time.Since(startTime))
}

// ReindexBlocks reindexes all blocks found in blockstore
func (idx *Indexer) ReindexBlocks(inTest bool) {
if !inTest {
<-idx.App.WaitUntilSynced()
}

// Note that holding blockMu means new votes aren't added until the reindex finishes.
idx.blockMu.Lock()
defer idx.blockMu.Unlock()

idxBlockCount, err := idx.CountBlocks()
if err != nil {
log.Warnf("indexer CountBlocks returned error: %s", err)
}
log.Infow("start reindexing",
"blockStoreBase", idx.App.Node.BlockStore().Base(),
"blockStoreHeight", idx.App.Node.BlockStore().Height(),
"indexerBlockCount", idxBlockCount,
)
queries := idx.blockTxQueries()
for height := idx.App.Node.BlockStore().Base(); height <= idx.App.Node.BlockStore().Height(); height++ {
if b := idx.App.GetBlockByHeight(int64(height)); b != nil {
// Blocks
func() {
idxBlock, err := idx.readOnlyQuery.GetBlockByHeight(context.TODO(), b.Height)
if err == nil && idxBlock.Time != b.Time {
log.Errorf("while reindexing blocks, block %d timestamp in db (%s) differs from blockstore (%s), leaving untouched", height, idxBlock.Time, b.Time)
return
}
if _, err := queries.CreateBlock(context.TODO(), indexerdb.CreateBlockParams{
ChainID: b.ChainID,
Height: b.Height,
Time: b.Time,
Hash: nonNullBytes(b.Hash()),
ProposerAddress: nonNullBytes(b.ProposerAddress),
LastBlockHash: nonNullBytes(b.LastBlockID.Hash),
}); err != nil {
log.Errorw(err, "cannot index new block")
}
}()

// Transactions
func() {
for index, tx := range b.Data.Txs {
idxTx, err := idx.readOnlyQuery.GetTransactionByHeightAndIndex(context.TODO(), indexerdb.GetTransactionByHeightAndIndexParams{
BlockHeight: b.Height,
BlockIndex: int64(index),
})
if err == nil && !bytes.Equal(idxTx.Hash, tx.Hash()) {
log.Errorf("while reindexing txs, tx %d/%d hash in db (%x) differs from blockstore (%x), leaving untouched", b.Height, index, idxTx.Hash, tx.Hash())
return
}
vtx := new(vochaintx.Tx)
if err := vtx.Unmarshal(tx, b.ChainID); err != nil {
log.Errorw(err, fmt.Sprintf("cannot unmarshal tx %d/%d", b.Height, index))
continue
}
idx.indexTx(vtx, uint32(b.Height), int32(index))
}
}()
}
}
}

// Commit is called by the APP when a block is confirmed and included into the chain
func (idx *Indexer) Commit(height uint32) error {
idx.blockMu.Lock()
Expand Down
8 changes: 7 additions & 1 deletion vochain/indexer/queries/blocks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ INSERT INTO blocks(
chain_id, height, time, hash, proposer_address, last_block_hash
) VALUES (
?, ?, ?, ?, ?, ?
);
)
ON CONFLICT(height) DO UPDATE
SET chain_id = excluded.chain_id,
time = excluded.time,
hash = excluded.hash,
proposer_address = excluded.proposer_address,
last_block_hash = excluded.last_block_hash;

-- name: GetBlockByHeight :one
SELECT * FROM blocks
Expand Down
11 changes: 10 additions & 1 deletion vochain/indexer/queries/transactions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@ INSERT INTO transactions (
hash, block_height, block_index, type, subtype, raw_tx, signature, signer
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?
);
)
ON CONFLICT(hash) DO UPDATE
SET block_height = excluded.block_height,
block_index = excluded.block_index,
type = excluded.type,
subtype = excluded.subtype,
raw_tx = excluded.raw_tx,
signature = excluded.signature,
signer = excluded.signer;


-- name: GetTransactionByHash :one
SELECT * FROM transactions
Expand Down
8 changes: 6 additions & 2 deletions vochain/indexer/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,13 @@ func (idx *Indexer) OnNewTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32)
idx.blockMu.Lock()
defer idx.blockMu.Unlock()

idx.indexTx(tx, blockHeight, txIndex)
}

func (idx *Indexer) indexTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32) {
rawtx, err := proto.Marshal(tx.Tx)
if err != nil {
log.Errorw(err, "indexer cannot marshal new transaction")
log.Errorw(err, "indexer cannot marshal transaction")
return
}

Expand All @@ -119,6 +123,6 @@ func (idx *Indexer) OnNewTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32)
Signature: nonNullBytes(tx.Signature),
Signer: nonNullBytes(signer),
}); err != nil {
log.Errorw(err, "cannot index new transaction")
log.Errorw(err, "cannot index transaction")
}
}

0 comments on commit 0fcf9c8

Please sign in to comment.