Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a way to follow a SQL db instead of indexing #284

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ type Config struct {
// blocks that the node doesn't have data for, such as by skipping them in checkpoint sync.
// For sensible reasons, indexing may actually start at an even later block, such as if
// this block is already indexed or the node indicates that it doesn't have this block.
IndexingStart uint64 `koanf:"indexing_start"`
IndexingDisable bool `koanf:"indexing_disable"`
IndexingStart uint64 `koanf:"indexing_start"`
IndexingDisable bool `koanf:"indexing_disable"`
IndexingSQLFollow bool `koanf:"indexing_sql_follow"`

Log *LogConfig `koanf:"log"`
Cache *CacheConfig `koanf:"cache"`
Expand Down
177 changes: 173 additions & 4 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import (
"context"
"errors"
"fmt"
"math/big"
"sync/atomic"
"time"

ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/quantity"
"github.com/oasisprotocol/oasis-core/go/common/service"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
"github.com/oasisprotocol/oasis-sdk/client-sdk/go/client"
Expand All @@ -16,6 +21,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/oasisprotocol/emerald-web3-gateway/conf"
"github.com/oasisprotocol/emerald-web3-gateway/db/model"
"github.com/oasisprotocol/emerald-web3-gateway/filters"
"github.com/oasisprotocol/emerald-web3-gateway/storage"
)

Expand Down Expand Up @@ -48,9 +55,11 @@ type Service struct {
pruningStep uint64
indexingStart uint64
indexingDisable bool
indexingFollow bool

backend Backend
client client.RuntimeClient
storage storage.Storage
core core.V1

queryBlockGasLimit bool
Expand Down Expand Up @@ -302,14 +311,172 @@ func (s *Service) indexingWorker() {
}
}

func (s *Service) followingWorker() {
// This is gross but there is no other way to dispatch pubsub events
// which is the whole point of this routine.
ib := s.backend.(*indexBackend)

var lastIndexed uint64
pollLoop:
for {
select {
case <-s.ctx.Done():
return
case <-time.After(indexerLoopDelay):
}

// Poll the db for new blocks.
latest, err := s.storage.GetLatestBlockNumber(s.ctx)
if err != nil {
s.Logger.Warn("failed to query latest block from DB",
"err", err,
)
continue
}

// Do the "right thing(TM)".
var startAt uint64
switch lastIndexed {
case latest:
// No new blocks.
continue
case 0:
// First time through the loop.
startAt = latest
default:
startAt = lastIndexed + 1
}

for round := startAt; round <= latest; round++ {
block, err := s.storage.GetBlockByNumber(s.ctx, round)
if err != nil {
s.Logger.Warn("failed to get block from DB",
"err", err,
"round", round,
)
continue pollLoop
}

logs, err := s.storage.GetLogs(s.ctx, round, round)
if err != nil {
s.Logger.Warn("failed to get logs from DB",
"err", err,
"round", round,
)
continue pollLoop
}

chainEvent := filters.ChainEvent{
Block: block,
Hash: ethCommon.HexToHash(block.Hash),
Logs: logs,
}
ib.subscribe.ChainChan() <- chainEvent

// Reconstruct the transactions and receipts that were actually
// stored for this block. The replacement of prior failure
// nonsense is surprisingly annoying to deal with.
numTxes := len(block.Transactions)
uniqueTxes := make([]*model.Transaction, 0, numTxes)
uniqueReceipts := make([]*model.Receipt, 0, numTxes)
var lastTransactionPrice quantity.Quantity
for i, tx := range block.Transactions {
// In the Emerald Paratime it can happen that an already committed
// transaction is re-proposed at a later block. The duplicate transaction fails,
// but it is still included in the block. The gateway should drop these
// transactions to remain compatible with ETH semantics.
if tx.Status == uint(ethTypes.ReceiptStatusFailed) {
continue
}

dbTx, err := s.storage.GetTransaction(s.ctx, tx.Hash)
if err != nil {
// Unlike the indexer version of this, the tx (by hash)
// MUST be in the database by the point that the block
// is in the database.
//
// This is an invariant violation so recovery is unlikely.
s.Logger.Warn("failed to get tx from DB",
"err", err,
"round", round,
"tx_hash", tx.Hash,
)
continue pollLoop
}
if dbTx.Round != tx.Round && dbTx.Status != uint(ethTypes.ReceiptStatusFailed) {
// Earlier tx exists, and it didn't fail.
//
// This is an invariant violation so recovery is unlikely.
ib.logger.Error("duplicate tx",
"round", round,
"earlier_tx", dbTx,
"tx", tx,
)
continue pollLoop
}

// If the tx passes the deduplication/replacement checks,
// presumably the receipt does as well?
receipt, err := s.storage.GetTransactionReceipt(s.ctx, tx.Hash)
if err != nil {
s.Logger.Warn("failed to get tx receipt from DB",
"err", err,
"round", round,
"tx_hash", tx.Hash,
)
continue pollLoop
}

// Done this way because quantity.Quantity.UnmarshalText
// can have a side-effect on error.
if err = func() error {
var gp big.Int
if err = gp.UnmarshalText([]byte(tx.GasPrice)); err != nil {
return err
}
return lastTransactionPrice.FromBigInt(&gp)
}(); err != nil {
s.Logger.Warn("failed to parse tx gas price",
"err", err,
"round", round,
"tx_hash", tx.Hash,
)
}

uniqueTxes = append(uniqueTxes, block.Transactions[i])
uniqueReceipts = append(uniqueReceipts, receipt)
}

blockData := &BlockData{
Block: block,
Receipts: uniqueReceipts,
UniqueTxes: uniqueTxes,
LastTransactionPrice: &lastTransactionPrice,
}

if ib.observer != nil {
ib.observer.OnBlockIndexed(blockData)
}
ib.blockNotifier.Broadcast(blockData)
}

lastIndexed = latest // Caught up!
}
}

// Start starts service.
func (s *Service) Start() {
// TODO/NotYawning: Non-archive nodes that have the indexer disabled
// likey want to use a different notion of healthy, and probably also
// want to start a worker that monitors the database for changes.
if s.indexingDisable {
switch {
case s.indexingDisable:
// Archive mode disables the worker entirely.
s.updateHealth(true)
return
case s.indexingFollow:
// DB follow mode follows another indexer instance via the DB.
s.updateHealth(true)
go s.followingWorker()
return
default:
}

go s.indexingWorker()
Expand Down Expand Up @@ -342,13 +509,15 @@ func New(
runtimeID: runtimeID,
backend: cachingBackend,
client: client,
storage: storage,
core: core.NewV1(client),
ctx: ctx,
cancelCtx: cancelCtx,
enablePruning: cfg.EnablePruning,
pruningStep: cfg.PruningStep,
indexingStart: cfg.IndexingStart,
indexingDisable: cfg.IndexingDisable,
indexingFollow: cfg.IndexingSQLFollow,
}
s.Logger = s.Logger.With("runtime_id", s.runtimeID.String())

Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func runRoot() error {
// For now, "disable" write access to the DB in a kind of kludgy way
// if the indexer is disabled. Yes this means that no migrations
// can be done. Deal with it.
dbReadOnly := cfg.IndexingDisable
dbReadOnly := cfg.IndexingDisable || cfg.IndexingSQLFollow

// Initialize db for migrations (higher timeouts).
db, err := psql.InitDB(ctx, cfg.Database, true, dbReadOnly)
Expand Down