Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVM Relayer DB migrations PoC #13617

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7274c52
hacking
krehermann Jun 13, 2024
c0e1d00
make resolved, optional migrations succeeded. TODO rollback interleaving
krehermann Jun 14, 2024
2bbfbbd
reorg
krehermann Jun 14, 2024
b5566ef
subsytem migration wip
krehermann Jun 15, 2024
8931f16
plugin migration and commit-ordered manifest
krehermann Jun 15, 2024
c48e999
rename template dir
krehermann Jun 15, 2024
eb3ee09
manifest wip
krehermann Jun 15, 2024
a5a3fd3
rollback; optional plugin args to cli
krehermann Jun 17, 2024
c24b885
clean up internal and isolate manifest
krehermann Jun 17, 2024
85ea68c
real evm
krehermann Jun 17, 2024
3d9b1f1
realistic test for parameterized evm schema.
krehermann Jun 17, 2024
5fe7e39
more testing
krehermann Jun 17, 2024
4000d64
realistic evm migration working in unit test
krehermann Jun 17, 2024
2cd82c9
cli plumbing
krehermann Jun 18, 2024
dad8f5c
hack txdb to run evm migrations on the fly
krehermann Jun 18, 2024
0bc3f9d
initialize evm test db per cfg; refactor txdb to use with it
krehermann Jun 18, 2024
c9d923e
cleanup and comment
krehermann Jun 18, 2024
472f44c
rm manifest
krehermann Jun 18, 2024
051dba7
update forwarder ORM to be schema-specific
krehermann Jun 18, 2024
dbd5cf7
hack db into NewRelayer for PoC
krehermann Jun 18, 2024
1715ce5
clean up the cli
krehermann Jun 18, 2024
a09bea7
linter, mocks
krehermann Jun 18, 2024
ae0556b
1.update goose 2.use that implement goose provider for evm migration …
krehermann Jun 19, 2024
9ee4543
fix ci failures to find evm_XX schema
krehermann Jun 19, 2024
76fc0b8
wrestling with too many connections
krehermann Jun 20, 2024
3d2f239
refactor for multiple go migrations and add test fixtures
krehermann Jun 26, 2024
c2b2a28
add log_poller_blocks migration and tests
krehermann Jun 26, 2024
8726b51
add log poller filters migrations and tests
krehermann Jun 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/chains/evm/forwarders/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewFwdMgr(ds sqlutil.DataSource, client evmclient.Client, logpoller evmlogp
logger: lggr,
cfg: cfg,
evmClient: client,
ORM: NewORM(ds),
ORM: NewScopedORM(ds, (*big.Big)(client.ConfiguredChainID())),
logpoller: logpoller,
sendersCache: make(map[common.Address][]common.Address),
}
Expand Down
46 changes: 29 additions & 17 deletions core/chains/evm/forwarders/forwarder_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/testhelpers"
"github.com/smartcontractkit/chainlink/v2/core/store/migrate/plugins/relayer/evm"
evmtestdb "github.com/smartcontractkit/chainlink/v2/core/store/migrate/plugins/relayer/evm/testutils"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders"
Expand All @@ -32,7 +34,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

var GetAuthorisedSendersABI = evmtypes.MustGetABI(authorized_receiver.AuthorizedReceiverABI).Methods["getAuthorizedSenders"]
Expand All @@ -41,7 +42,8 @@ var SimpleOracleCallABI = evmtypes.MustGetABI(operator_wrapper.OperatorABI).Meth

func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {
lggr := logger.Test(t)
db := pgtest.NewSqlxDB(t)
testChainId := testutils.FixtureChainID
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW you can use github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils to get FixtureChainID which is not coupled with Core.

emvPlugindb := evmtestdb.NewDB(t, evm.Cfg{Schema: "evm_" + testChainId.String(), ChainID: ubig.New(testChainId)})
cfg := configtest.NewTestGeneralConfig(t)
evmcfg := evmtest.NewChainScopedConfig(t, cfg)
owner := testutils.MustNewSimTransactor(t)
Expand All @@ -66,7 +68,7 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {
require.NoError(t, err)
t.Log(authorized)

evmClient := client.NewSimulatedBackendClient(t, ec, testutils.FixtureChainID)
evmClient := client.NewSimulatedBackendClient(t, ec, testChainId)

lpOpts := logpoller.Opts{
PollPeriod: 100 * time.Millisecond,
Expand All @@ -75,13 +77,14 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
}
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), evmClient, lggr, lpOpts)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM())
fwdMgr.ORM = forwarders.NewORM(db)
lp := logpoller.NewLogPoller(logpoller.NewORM(testChainId, emvPlugindb, lggr), evmClient, lggr, lpOpts)
fwdMgr := forwarders.NewFwdMgr(emvPlugindb, evmClient, lp, lggr, evmcfg.EVM())
cid := ubig.Big(*testChainId)
fwdMgr.ORM = forwarders.NewScopedORM(emvPlugindb, &cid)

fwd, err := fwdMgr.ORM.CreateForwarder(ctx, forwarderAddr, ubig.Big(*testutils.FixtureChainID))
fwd, err := fwdMgr.ORM.CreateForwarder(ctx, forwarderAddr, cid)
require.NoError(t, err)
lst, err := fwdMgr.ORM.FindForwardersByChain(ctx, ubig.Big(*testutils.FixtureChainID))
lst, err := fwdMgr.ORM.FindForwardersByChain(ctx, cid)
require.NoError(t, err)
require.Equal(t, len(lst), 1)
require.Equal(t, lst[0].Address, forwarderAddr)
Expand All @@ -95,7 +98,7 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {

cleanupCalled := false
cleanup := func(tx sqlutil.DataSource, evmChainId int64, addr common.Address) error {
require.Equal(t, testutils.FixtureChainID.Int64(), evmChainId)
require.Equal(t, testChainId.Int64(), evmChainId)
require.Equal(t, forwarderAddr, addr)
require.NotNil(t, tx)
cleanupCalled = true
Expand All @@ -109,10 +112,15 @@ func TestFwdMgr_MaybeForwardTransaction(t *testing.T) {

func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {
lggr := logger.Test(t)
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
cfg := configtest.NewTestGeneralConfig(t)
evmcfg := evmtest.NewChainScopedConfig(t, cfg)
// db := pgtest.NewSqlxDB(t)
testChainId := ubig.New(evmcfg.EVM().ChainID())
// db := pgtest.NewSqlxDB(t)

db := evmtestdb.NewDB(t, evm.Cfg{Schema: "evm_" + testChainId.String(), ChainID: testChainId})

owner := testutils.MustNewSimTransactor(t)
ec := backends.NewSimulatedBackend(map[common.Address]core.GenesisAccount{
owner.From: {
Expand All @@ -138,11 +146,12 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {
}
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), evmClient, lggr, lpOpts)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM())
fwdMgr.ORM = forwarders.NewORM(db)
testChainID := ubig.Big(*testutils.FixtureChainID)
fwdMgr.ORM = forwarders.NewScopedORM(db, &testChainID)

_, err = fwdMgr.ORM.CreateForwarder(ctx, forwarderAddr, ubig.Big(*testutils.FixtureChainID))
_, err = fwdMgr.ORM.CreateForwarder(ctx, forwarderAddr, testChainID)
require.NoError(t, err)
lst, err := fwdMgr.ORM.FindForwardersByChain(ctx, ubig.Big(*testutils.FixtureChainID))
lst, err := fwdMgr.ORM.FindForwardersByChain(ctx, testChainID)
require.NoError(t, err)
require.Equal(t, len(lst), 1)
require.Equal(t, lst[0].Address, forwarderAddr)
Expand All @@ -158,10 +167,12 @@ func TestFwdMgr_AccountUnauthorizedToForward_SkipsForwarding(t *testing.T) {

func TestFwdMgr_InvalidForwarderForOCR2FeedsStates(t *testing.T) {
lggr := logger.Test(t)
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
cfg := configtest.NewTestGeneralConfig(t)
evmcfg := evmtest.NewChainScopedConfig(t, cfg)
testChainId := ubig.New(evmcfg.EVM().ChainID())
// db := pgtest.NewSqlxDB(t)
db := evmtestdb.NewDB(t, evm.Cfg{Schema: "evm_" + testChainId.String(), ChainID: testChainId})
owner := testutils.MustNewSimTransactor(t)
ec := backends.NewSimulatedBackend(map[common.Address]core.GenesisAccount{
owner.From: {
Expand Down Expand Up @@ -203,11 +214,12 @@ func TestFwdMgr_InvalidForwarderForOCR2FeedsStates(t *testing.T) {
}
lp := logpoller.NewLogPoller(logpoller.NewORM(testutils.FixtureChainID, db, lggr), evmClient, lggr, lpOpts)
fwdMgr := forwarders.NewFwdMgr(db, evmClient, lp, lggr, evmcfg.EVM())
fwdMgr.ORM = forwarders.NewORM(db)
testChainID := ubig.Big(*testutils.FixtureChainID)
fwdMgr.ORM = forwarders.NewScopedORM(db, &testChainID)

_, err = fwdMgr.ORM.CreateForwarder(ctx, forwarderAddr, ubig.Big(*testutils.FixtureChainID))
_, err = fwdMgr.ORM.CreateForwarder(ctx, forwarderAddr, testChainID)
require.NoError(t, err)
lst, err := fwdMgr.ORM.FindForwardersByChain(ctx, ubig.Big(*testutils.FixtureChainID))
lst, err := fwdMgr.ORM.FindForwardersByChain(ctx, testChainID)
require.NoError(t, err)
require.Equal(t, len(lst), 1)
require.Equal(t, lst[0].Address, forwarderAddr)
Expand Down
67 changes: 0 additions & 67 deletions core/chains/evm/forwarders/mocks/orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 43 additions & 20 deletions core/chains/evm/forwarders/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,64 @@ package forwarders
import (
"context"
"database/sql"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"

"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
pkgerrors "github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
)

//go:generate mockery --quiet --name ORM --output ./mocks/ --case=underscore

type ORM interface {
CreateForwarder(ctx context.Context, addr common.Address, evmChainId big.Big) (fwd Forwarder, err error)
FindForwarders(ctx context.Context, offset, limit int) ([]Forwarder, int, error)
//FindForwarders(ctx context.Context, offset, limit int) ([]Forwarder, int, error)
FindForwardersByChain(ctx context.Context, evmChainId big.Big) ([]Forwarder, error)
DeleteForwarder(ctx context.Context, id int64, cleanup func(tx sqlutil.DataSource, evmChainId int64, addr common.Address) error) error
FindForwardersInListByChain(ctx context.Context, evmChainId big.Big, addrs []common.Address) ([]Forwarder, error)
// FindForwardersInListByChain(ctx context.Context, evmChainId big.Big, addrs []common.Address) ([]Forwarder, error)
}

type DSORM struct {
ds sqlutil.DataSource
ds sqlutil.DataSource
cid *big.Big
}

var _ ORM = &DSORM{}

func NewORM(ds sqlutil.DataSource) *DSORM {
return &DSORM{ds: ds}
func NewScopedORM(ds sqlutil.DataSource, evmChainId *big.Big) *DSORM {
return &DSORM{ds: ds, cid: evmChainId}
}

func (o *DSORM) Transact(ctx context.Context, fn func(*DSORM) error) (err error) {
return sqlutil.Transact(ctx, o.new, o.ds, nil, fn)
}

// new returns a NewORM like o, but backed by q.
func (o *DSORM) new(q sqlutil.DataSource) *DSORM { return NewORM(q) }
func (o *DSORM) new(q sqlutil.DataSource) *DSORM { return NewScopedORM(q, o.cid) }

func (o *DSORM) schemaName() string {
if o.cid != nil {
return fmt.Sprintf("evm_%s", o.cid.String())
}
return "evm"
}

// CreateForwarder creates the Forwarder address associated with the current EVM chain id.
func (o *DSORM) CreateForwarder(ctx context.Context, addr common.Address, evmChainId big.Big) (fwd Forwarder, err error) {
sql := `INSERT INTO evm.forwarders (address, evm_chain_id, created_at, updated_at) VALUES ($1, $2, now(), now()) RETURNING *`
err = o.ds.GetContext(ctx, &fwd, sql, addr, evmChainId)
return fwd, err
if o.cid != nil && !o.cid.Equal(&evmChainId) {
// hacking
evmChainId = *o.cid
}
// sql := `INSERT INTO evm.forwarders (address, evm_chain_id, created_at, updated_at) VALUES ($1, $2, now(), now()) RETURNING *`
sql := fmt.Sprintf("INSERT INTO %s.forwarders (address, created_at, updated_at) VALUES ($1, now(), now()) RETURNING *", o.schemaName())
err = o.ds.GetContext(ctx, &fwd, sql, addr)
if err != nil {
return fwd, err
}
fwd.EVMChainID = *o.cid
return fwd, nil
}

// DeleteForwarder removes a forwarder address.
Expand All @@ -56,7 +72,8 @@ func (o *DSORM) DeleteForwarder(ctx context.Context, id int64, cleanup func(tx s
EvmChainId int64
Address common.Address
}
err := orm.ds.GetContext(ctx, &dest, `SELECT evm_chain_id, address FROM evm.forwarders WHERE id = $1`, id)
selectStmt := fmt.Sprintf("SELECT address FROM %s.forwarders WHERE id = $1", o.schemaName())
err := orm.ds.GetContext(ctx, &dest, selectStmt, id)
if err != nil {
return err
}
Expand All @@ -65,8 +82,8 @@ func (o *DSORM) DeleteForwarder(ctx context.Context, id int64, cleanup func(tx s
return err
}
}

result, err := orm.ds.ExecContext(ctx, `DELETE FROM evm.forwarders WHERE id = $1`, id)
deleteStmt := fmt.Sprintf("DELETE FROM %s.forwarders WHERE id = $1", o.schemaName())
result, err := orm.ds.ExecContext(ctx, deleteStmt, id)
// If the forwarder wasn't found, we still want to delete the filter.
// In that case, the transaction must return nil, even though DeleteForwarder
// will return sql.ErrNoRows
Expand All @@ -83,12 +100,15 @@ func (o *DSORM) DeleteForwarder(ctx context.Context, id int64, cleanup func(tx s

// FindForwarders returns all forwarder addresses from offset up until limit.
func (o *DSORM) FindForwarders(ctx context.Context, offset, limit int) (fwds []Forwarder, count int, err error) {
sql := `SELECT count(*) FROM evm.forwarders`
// sql := `SELECT count(*) FROM evm.forwarders`
sql := fmt.Sprintf("SELECT count(*) FROM %s.forwarders", o.schemaName())

if err = o.ds.GetContext(ctx, &count, sql); err != nil {
return
}

sql = `SELECT * FROM evm.forwarders ORDER BY created_at DESC, id DESC LIMIT $1 OFFSET $2`
// sql = `SELECT * FROM evm.forwarders ORDER BY created_at DESC, id DESC LIMIT $1 OFFSET $2`
sql = fmt.Sprintf("SELECT * FROM %s.forwarders ORDER BY created_at DESC, id DESC LIMIT $1 OFFSET $2", o.schemaName())
if err = o.ds.SelectContext(ctx, &fwds, sql, limit, offset); err != nil {
return
}
Expand All @@ -97,11 +117,13 @@ func (o *DSORM) FindForwarders(ctx context.Context, offset, limit int) (fwds []F

// FindForwardersByChain returns all forwarder addresses for a chain.
func (o *DSORM) FindForwardersByChain(ctx context.Context, evmChainId big.Big) (fwds []Forwarder, err error) {
sql := `SELECT * FROM evm.forwarders where evm_chain_id = $1 ORDER BY created_at DESC, id DESC`
err = o.ds.SelectContext(ctx, &fwds, sql, evmChainId)
// sql := `SELECT * FROM evm.forwarders where evm_chain_id = $1 ORDER BY created_at DESC, id DESC`
sql := fmt.Sprintf("SELECT * FROM %s.forwarders ORDER BY created_at DESC, id DESC", o.schemaName())
err = o.ds.SelectContext(ctx, &fwds, sql)
return
}

/*
func (o *DSORM) FindForwardersInListByChain(ctx context.Context, evmChainId big.Big, addrs []common.Address) ([]Forwarder, error) {
var fwdrs []Forwarder

Expand All @@ -111,7 +133,7 @@ func (o *DSORM) FindForwardersInListByChain(ctx context.Context, evmChainId big.
}

query, args, err := sqlx.Named(`
SELECT * FROM evm.forwarders
SELECT * FROM evm.forwarders
WHERE evm_chain_id = :chainid
AND address IN (:addresses)
ORDER BY created_at DESC, id DESC`,
Expand All @@ -136,3 +158,4 @@ func (o *DSORM) FindForwardersInListByChain(ctx context.Context, evmChainId big.

return fwdrs, nil
}
*/
14 changes: 10 additions & 4 deletions core/chains/evm/forwarders/orm_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package forwarders
package forwarders_test

import (
"database/sql"
Expand All @@ -11,17 +11,23 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/forwarders"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/store/migrate/plugins/relayer/evm"
evmtestdb "github.com/smartcontractkit/chainlink/v2/core/store/migrate/plugins/relayer/evm/testutils"
)

// Tests the atomicity of cleanup function passed to DeleteForwarder, during DELETE operation
func Test_DeleteForwarder(t *testing.T) {
t.Parallel()
orm := NewORM(pgtest.NewSqlxDB(t))
addr := testutils.NewAddress()
chainID := testutils.FixtureChainID
orm := forwarders.NewScopedORM(evmtestdb.NewDB(t, evm.Cfg{
Schema: "evm_" + chainID.String(),
ChainID: big.New(chainID),
}), big.New(chainID))

addr := testutils.NewAddress()
ctx := testutils.Context(t)

fwd, err := orm.CreateForwarder(ctx, addr, *big.New(chainID))
Expand Down
Loading
Loading