From 9e91b405df88965be6b90142f623d90d91e9355c Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Wed, 8 Jan 2025 21:38:46 +0100 Subject: [PATCH] Custom log index for sei chain (#1592) Patch with minor adjustments from the chainlink repo [PR](https://github.com/smartcontractkit/chainlink/pull/15858). Adjustments were caused by differences between repos. It's easier to do&review them than to apply all the changes made to chainlink repo that included MultiNode abstraction with 5k lines changed. --- .changeset/clever-knives-tap.md | 5 + ccip/config/evm/Sei_Testnet_Atlantic.toml | 18 +++ core/build/platform_arch_guard.go | 3 + core/chains/evm/client/errors.go | 12 +- core/chains/evm/client/errors_test.go | 6 + core/chains/evm/client/helpers_test.go | 8 +- core/chains/evm/client/rpc_client.go | 72 +++++++++- .../evm/client/rpc_client_internal_test.go | 93 ++++++++++++ core/chains/evm/client/rpc_client_test.go | 132 +++++++++++++++++- core/chains/evm/client/sub_forwarder.go | 30 ++-- core/chains/evm/client/sub_forwarder_test.go | 48 ++++--- core/chains/evm/config/chaintype/chaintype.go | 6 +- core/services/chainlink/config_test.go | 4 +- core/services/ocr/contract_tracker.go | 2 +- core/services/ocrcommon/block_translator.go | 2 +- 15 files changed, 397 insertions(+), 44 deletions(-) create mode 100644 .changeset/clever-knives-tap.md create mode 100644 ccip/config/evm/Sei_Testnet_Atlantic.toml create mode 100644 core/build/platform_arch_guard.go create mode 100644 core/chains/evm/client/rpc_client_internal_test.go diff --git a/.changeset/clever-knives-tap.md b/.changeset/clever-knives-tap.md new file mode 100644 index 0000000000..8683e89f77 --- /dev/null +++ b/.changeset/clever-knives-tap.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#added Sei config and error mapping diff --git a/ccip/config/evm/Sei_Testnet_Atlantic.toml b/ccip/config/evm/Sei_Testnet_Atlantic.toml new file mode 100644 index 0000000000..f8c23d95c5 --- /dev/null +++ b/ccip/config/evm/Sei_Testnet_Atlantic.toml @@ -0,0 +1,18 @@ +ChainID = '1328' +ChainType = 'sei' +# finality_depth: instant +FinalityDepth = 10 +# block_time: ~0.4s, adding 1 second buffer +LogPollInterval = '2s' +# finality_depth * block_time / 60 secs = ~0.8 min (finality time) +NoNewFinalizedHeadsThreshold = '5m' +# "RPC node returned multiple missing blocks on query for block numbers [31592085 31592084] even though the WS subscription already sent us these blocks. It might help to increase EVM.RPCBlockQueryDelay (currently 1)" +RPCBlockQueryDelay = 5 + +[GasEstimator] +EIP1559DynamicFees = false +Mode = 'BlockHistory' +PriceMax = '3000 gwei' # recommended by ds&a + +[GasEstimator.BlockHistory] +BlockHistorySize = 200 diff --git a/core/build/platform_arch_guard.go b/core/build/platform_arch_guard.go new file mode 100644 index 0000000000..3a22f7df53 --- /dev/null +++ b/core/build/platform_arch_guard.go @@ -0,0 +1,3 @@ +//go:build !amd64 && !arm64 +package build +"non-64-bits architectures are not supported" diff --git a/core/chains/evm/client/errors.go b/core/chains/evm/client/errors.go index f92a79e203..0bac56d142 100644 --- a/core/chains/evm/client/errors.go +++ b/core/chains/evm/client/errors.go @@ -284,6 +284,16 @@ var gnosis = ClientErrors{ TransactionAlreadyInMempool: regexp.MustCompile(`(: |^)(alreadyknown)`), } +var sei = ClientErrors{ + // https://github.com/sei-protocol/sei-tendermint/blob/e9a22c961e83579d8a68cd045c532980d82fb2a0/types/mempool.go#L12 + TransactionAlreadyInMempool: regexp.MustCompile("tx already exists in cache"), + // https://github.com/sei-protocol/sei-cosmos/blob/a4eb451c957b1ca7ca9118406682f93fe83d1f61/types/errors/errors.go#L50 + // https://github.com/sei-protocol/sei-cosmos/blob/a4eb451c957b1ca7ca9118406682f93fe83d1f61/types/errors/errors.go#L56 + // https://github.com/sei-protocol/sei-cosmos/blob/a4eb451c957b1ca7ca9118406682f93fe83d1f61/client/broadcast.go#L27 + // https://github.com/sei-protocol/sei-cosmos/blob/a4eb451c957b1ca7ca9118406682f93fe83d1f61/types/errors/errors.go#L32 + Fatal: regexp.MustCompile(`(: |^)'*out of gas|insufficient fee|Tx too large. Max size is \d+, but got \d+|: insufficient funds`), +} + const TerminallyStuckMsg = "transaction terminally stuck" // Tx.Error messages that are set internally so they are not chain or client specific @@ -291,7 +301,7 @@ var internal = ClientErrors{ TerminallyStuck: regexp.MustCompile(TerminallyStuckMsg), } -var clients = []ClientErrors{parity, geth, arbitrum, metis, substrate, avalanche, nethermind, harmony, besu, erigon, klaytn, celo, zkSync, zkEvm, treasure, mantle, aStar, hedera, gnosis, internal} +var clients = []ClientErrors{parity, geth, arbitrum, metis, substrate, avalanche, nethermind, harmony, besu, erigon, klaytn, celo, zkSync, zkEvm, treasure, mantle, aStar, hedera, gnosis, sei, internal} // ClientErrorRegexes returns a map of compiled regexes for each error type func ClientErrorRegexes(errsRegex config.ClientErrors) *ClientErrors { diff --git a/core/chains/evm/client/errors_test.go b/core/chains/evm/client/errors_test.go index c8483972c8..7fba0ae51f 100644 --- a/core/chains/evm/client/errors_test.go +++ b/core/chains/evm/client/errors_test.go @@ -144,6 +144,7 @@ func Test_Eth_Errors(t *testing.T) { {"client error transaction already in mempool", true, "tomlConfig"}, {"alreadyknown", true, "Gnosis"}, {"failed to forward tx to sequencer, please try again. Error message: 'already known'", true, "Mantle"}, + {"tx already exists in cache", true, "Sei"}, } for _, test := range tests { err = evmclient.NewSendErrorS(test.message) @@ -420,6 +421,11 @@ func Test_Eth_Errors_Fatal(t *testing.T) { {"client error fatal", true, "tomlConfig"}, {"[Request ID: d9711488-4c1e-4af2-bc1f-7969913d7b60] Error invoking RPC: transaction 0.0.4425573@1718213476.914320044 failed precheck with status INVALID_SIGNATURE", true, "hedera"}, {"invalid chain id for signer", true, "Treasure"}, + + {": out of gas", true, "Sei"}, + {"Tx too large. Max size is 2048576, but got 2097431", true, "Sei"}, + {": insufficient funds", true, "Sei"}, + {"insufficient fee", true, "Sei"}, } for _, test := range tests { diff --git a/core/chains/evm/client/helpers_test.go b/core/chains/evm/client/helpers_test.go index 1a6090e4a0..acb8f39338 100644 --- a/core/chains/evm/client/helpers_test.go +++ b/core/chains/evm/client/helpers_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/big" "net/url" + "sync" "testing" "time" @@ -219,6 +220,7 @@ const HeadResult = `{"difficulty":"0xf3a00","extraData":"0xd88301050384676574688 type mockSubscription struct { unsubscribed bool Errors chan error + unsub sync.Once } func NewMockSubscription() *mockSubscription { @@ -228,8 +230,10 @@ func NewMockSubscription() *mockSubscription { func (mes *mockSubscription) Err() <-chan error { return mes.Errors } func (mes *mockSubscription) Unsubscribe() { - mes.unsubscribed = true - close(mes.Errors) + mes.unsub.Do(func() { + mes.unsubscribed = true + close(mes.Errors) + }) } func ParseTestNodeConfigs(nodes []NodeConfig) ([]*toml.Node, error) { diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index f55c35980d..58f88bd003 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/big" "net/url" "strconv" @@ -151,6 +152,7 @@ type rpcClient struct { latestChainInfo commonclient.ChainInfo } +// NewRPCCLient returns a new *rpcClient as commonclient.RPC // NewRPCCLient returns a new *rpcClient as commonclient.RPC func NewRPCClient( lggr logger.Logger, @@ -166,6 +168,22 @@ func NewRPCClient( rpcTimeout time.Duration, chainType chaintype.ChainType, ) RPCClient { + return newRPCClient(lggr, wsuri, httpuri, name, id, chainID, tier, finalizedBlockPollInterval, newHeadsPollInterval, largePayloadRpcTimeout, rpcTimeout, chainType) +} +func newRPCClient( + lggr logger.Logger, + wsuri *url.URL, + httpuri *url.URL, + name string, + id int, + chainID *big.Int, + tier commonclient.NodeTier, + finalizedBlockPollInterval time.Duration, + newHeadsPollInterval time.Duration, + largePayloadRpcTimeout time.Duration, + rpcTimeout time.Duration, + chainType chaintype.ChainType, +) *rpcClient { r := &rpcClient{ largePayloadRpcTimeout: largePayloadRpcTimeout, rpcTimeout: rpcTimeout, @@ -428,6 +446,10 @@ func (r *rpcClient) BatchCallContext(rootCtx context.Context, b []rpc.BatchElem) var requestedFinalizedBlock bool if r.chainType == chaintype.ChainAstar { for _, el := range b { + if el.Method == "eth_getLogs" { + r.rpcLog.Critical("evmclient.BatchCallContext: eth_getLogs is not supported") + return errors.New("evmclient.BatchCallContext: eth_getLogs is not supported") + } if !isRequestingFinalizedBlock(el) { continue } @@ -547,10 +569,10 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp r.logResult(lggr, err, duration, r.getRPCDomain(), "EthSubscribe") err = r.wrapWS(err) }() - subForwarder := newSubForwarder(channel, func(head *evmtypes.Head) *evmtypes.Head { + subForwarder := newSubForwarder(channel, func(head *evmtypes.Head) (*evmtypes.Head, error) { head.EVMChainID = ubig.New(r.chainID) r.onNewHead(ctx, chStopInFlight, head) - return head + return head, nil }, r.wrapRPCClientError) err = subForwarder.start(ws.rpc.EthSubscribe(ctx, subForwarder.srcCh, args...)) if err != nil { @@ -602,10 +624,10 @@ func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H }() channel := make(chan *evmtypes.Head) - forwarder := newSubForwarder(channel, func(head *evmtypes.Head) *evmtypes.Head { + forwarder := newSubForwarder(channel, func(head *evmtypes.Head) (*evmtypes.Head, error) { head.EVMChainID = ubig.New(r.chainID) r.onNewHead(ctx, chStopInFlight, head) - return head + return head, nil }, r.wrapRPCClientError) err = forwarder.start(ws.rpc.EthSubscribe(ctx, forwarder.srcCh, args...)) @@ -1283,8 +1305,11 @@ func (r *rpcClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l [ l, err = ws.geth.FilterLogs(ctx, q) err = r.wrapWS(err) } - duration := time.Since(start) + if err == nil { + err = r.makeLogsValid(l) + } + duration := time.Since(start) r.logResult(lggr, err, duration, r.getRPCDomain(), "FilterLogs", "log", l, ) @@ -1312,7 +1337,7 @@ func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQu r.logResult(lggr, err, duration, r.getRPCDomain(), "SubscribeFilterLogs") err = r.wrapWS(err) }() - sub := newSubForwarder(ch, nil, r.wrapRPCClientError) + sub := newSubForwarder(ch, r.makeLogValid, r.wrapRPCClientError) err = sub.start(ws.geth.SubscribeFilterLogs(ctx, q, sub.srcCh)) if err != nil { return @@ -1540,3 +1565,38 @@ func ToBlockNumArg(number *big.Int) string { } return hexutil.EncodeBig(number) } + +func (r *rpcClient) makeLogsValid(logs []types.Log) error { + if r.chainType != chaintype.ChainSei { + return nil + } + + for i := range logs { + var err error + logs[i], err = r.makeLogValid(logs[i]) + if err != nil { + return err + } + } + + return nil +} + +func (r *rpcClient) makeLogValid(log types.Log) (types.Log, error) { + if r.chainType != chaintype.ChainSei { + return log, nil + } + + if log.TxIndex > math.MaxUint32 { + return types.Log{}, fmt.Errorf("TxIndex of tx %s exceeds max supported value of %d", log.TxHash, math.MaxUint32) + } + + if log.Index > math.MaxUint32 { + return types.Log{}, fmt.Errorf("log's index %d of tx %s exceeds max supported value of %d", log.Index, log.TxHash, math.MaxUint32) + } + + // it's safe as we have a build guard to guarantee 64-bit system + newIndex := uint64(log.TxIndex<<32) | uint64(log.Index) + log.Index = uint(newIndex) + return log, nil +} diff --git a/core/chains/evm/client/rpc_client_internal_test.go b/core/chains/evm/client/rpc_client_internal_test.go new file mode 100644 index 0000000000..ab8333477d --- /dev/null +++ b/core/chains/evm/client/rpc_client_internal_test.go @@ -0,0 +1,93 @@ +package client + +import ( + "errors" + "math" + "testing" + + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + + commonclient "github.com/smartcontractkit/chainlink/v2/common/client" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/chaintype" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func TestRPCClient_MakeLogsValid(t *testing.T) { + testCases := []struct { + Name string + TxIndex uint + LogIndex uint + ExpectedLogIndex uint + ExpectedError error + }{ + { + Name: "TxIndex = 0 LogIndex = 0", + TxIndex: 0, + LogIndex: 0, + ExpectedLogIndex: 0, + ExpectedError: nil, + }, + { + Name: "TxIndex = 0 LogIndex = 1", + TxIndex: 0, + LogIndex: 1, + ExpectedLogIndex: 1, + ExpectedError: nil, + }, + { + Name: "TxIndex = 0 LogIndex = MaxUint32", + TxIndex: 0, + LogIndex: math.MaxUint32, + ExpectedLogIndex: math.MaxUint32, + ExpectedError: nil, + }, + { + Name: "LogIndex = MaxUint32 + 1 => returns an error", + TxIndex: 0, + LogIndex: math.MaxUint32 + 1, + ExpectedLogIndex: 0, + ExpectedError: errors.New("log's index 4294967296 of tx 0x0000000000000000000000000000000000000000000000000000000000000000 exceeds max supported value of 4294967295"), + }, + { + Name: "TxIndex = 1 LogIndex = 0", + TxIndex: 1, + LogIndex: 0, + ExpectedLogIndex: math.MaxUint32 + 1, + ExpectedError: nil, + }, + { + Name: "TxIndex = MaxUint32 LogIndex = MaxUint32", + TxIndex: math.MaxUint32, + LogIndex: math.MaxUint32, + ExpectedLogIndex: math.MaxUint64, + ExpectedError: nil, + }, + { + Name: "TxIndex = MaxUint32 + 1 => returns an error", + TxIndex: math.MaxUint32 + 1, + LogIndex: 0, + ExpectedLogIndex: 0, + ExpectedError: errors.New("TxIndex of tx 0x0000000000000000000000000000000000000000000000000000000000000000 exceeds max supported value of 4294967295"), + }, + } + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + rpc := newRPCClient(logger.TestLogger(t), nil, nil, "eth-primary-rpc-0", 0, nil, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + log, err := rpc.makeLogValid(ethtypes.Log{TxIndex: tc.TxIndex, Index: tc.LogIndex}) + // non sei should return as is + require.NoError(t, err) + require.Equal(t, tc.TxIndex, log.TxIndex) + require.Equal(t, tc.LogIndex, log.Index) + seiRPC := newRPCClient(logger.TestLogger(t), nil, nil, "eth-primary-rpc-0", 0, nil, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainSei) + log, err = seiRPC.makeLogValid(ethtypes.Log{TxIndex: tc.TxIndex, Index: tc.LogIndex}) + if tc.ExpectedError != nil { + require.EqualError(t, err, tc.ExpectedError.Error()) + return + } + + require.Equal(t, tc.ExpectedLogIndex, log.Index) + require.Equal(t, tc.TxIndex, log.TxIndex) + }) + } +} diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index 662c757ffb..f2d8a83206 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/big" "net/url" "sync" @@ -12,6 +13,7 @@ import ( "time" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" "github.com/stretchr/testify/assert" @@ -32,14 +34,16 @@ import ( evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) -func makeNewHeadWSMessage(head *evmtypes.Head) string { - asJSON, err := json.Marshal(head) +func makeNewWSMessage[T any](v T) string { + asJSON, err := json.Marshal(v) if err != nil { panic(fmt.Errorf("failed to marshal head: %w", err)) } return fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_subscription","params":{"subscription":"0x00","result":%s}}`, string(asJSON)) } +var makeNewHeadWSMessage = makeNewWSMessage[*evmtypes.Head] + func TestRPCClient_SubscribeNewHead(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(tests.Context(t), tests.WaitTimeout(t)) @@ -390,6 +394,130 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { t.Errorf("Expected subscription to return an error, but test timeout instead") } }) + t.Run("Log's index is properly set for Sei chain type", func(t *testing.T) { + server := testutils.NewWSServer(t, chainId, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { + if method == "eth_unsubscribe" { + resp.Result = "true" + return + } else if method == "eth_subscribe" { + if assert.True(t, params.IsArray()) && assert.Equal(t, "logs", params.Array()[0].String()) { + resp.Result = `"0x00"` + } + return + } + return + }) + wsURL := server.WSURL() + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainSei) + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + ch := make(chan types.Log) + sub, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, ch) + require.NoError(t, err) + testCases := []struct { + TxIndex uint + Index uint + ExpectedIndex uint + }{ + { + TxIndex: 0, + Index: 0, + ExpectedIndex: 0, + }, + { + TxIndex: 0, + Index: 1, + ExpectedIndex: 1, + }, + { + TxIndex: 1, + Index: 0, + ExpectedIndex: math.MaxUint32 + 1, + }, + } + go func() { + for _, testCase := range testCases { + server.MustWriteBinaryMessageSync(t, makeNewWSMessage(types.Log{TxIndex: testCase.TxIndex, Index: testCase.Index, Topics: []common.Hash{{}}})) + } + }() + defer sub.Unsubscribe() + for _, testCase := range testCases { + select { + case <-tests.Context(t).Done(): + require.Fail(t, "context timed out") + case err := <-sub.Err(): + require.NoError(t, err) + require.Fail(t, "Did not expect error channel to be closed or return error before all testcases were consumed") + case log := <-ch: + require.Equal(t, testCase.ExpectedIndex, log.Index, "Unexpected log index %d for test case %v", log.Index, testCase) + } + } + }) +} + +func TestRPCClientFilterLogs(t *testing.T) { + t.Parallel() + + chainID := big.NewInt(123456) + lggr := logger.Test(t) + ctx, cancel := context.WithTimeout(tests.Context(t), tests.WaitTimeout(t)) + defer cancel() + t.Run("Log's index is properly set for Sei chain type", func(t *testing.T) { + testCases := []struct { + TxIndex uint + Index uint + ExpectedIndex uint + }{ + { + TxIndex: 0, + Index: 0, + ExpectedIndex: 0, + }, + { + TxIndex: 0, + Index: 1, + ExpectedIndex: 1, + }, + { + TxIndex: 1, + Index: 0, + ExpectedIndex: math.MaxUint32 + 1, + }, + } + server := testutils.NewWSServer(t, chainID, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { + if method != "eth_getLogs" { + return + } + var logs []types.Log + for _, testCase := range testCases { + logs = append(logs, types.Log{TxIndex: testCase.TxIndex, Index: testCase.Index, Topics: []common.Hash{{}}}) + } + raw, err := json.Marshal(logs) + require.NoError(t, err) + resp.Result = string(raw) + return + }) + wsURL := server.WSURL() + seiRPC := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainSei) + defer seiRPC.Close() + require.NoError(t, seiRPC.Dial(ctx)) + logs, err := seiRPC.FilterEvents(ctx, ethereum.FilterQuery{}) + require.NoError(t, err) + for i, testCase := range testCases { + require.Equal(t, testCase.ExpectedIndex, logs[i].Index, "Unexpected log index %d for test case %v", logs[i].Index, testCase) + } + + // non sei should return index as is + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + logs, err = rpc.FilterEvents(ctx, ethereum.FilterQuery{}) + require.NoError(t, err) + for i, testCase := range testCases { + require.Equal(t, testCase.Index, logs[i].Index, "Expected non sei log to be returned as is") + require.Equal(t, testCase.TxIndex, logs[i].TxIndex, "Expected non sei log to be returned as is") + } + }) } func TestRPCClient_LatestFinalizedBlock(t *testing.T) { diff --git a/core/chains/evm/client/sub_forwarder.go b/core/chains/evm/client/sub_forwarder.go index 93e9b106b4..a9b5a97eee 100644 --- a/core/chains/evm/client/sub_forwarder.go +++ b/core/chains/evm/client/sub_forwarder.go @@ -13,7 +13,7 @@ type subForwarder[T any] struct { srcCh chan T srcSub ethereum.Subscription - interceptResult func(T) T + interceptResult func(T) (T, error) interceptError func(error) error done chan struct{} @@ -21,14 +21,14 @@ type subForwarder[T any] struct { unSub chan struct{} } -func newSubForwarder[T any](destCh chan<- T, interceptResult func(T) T, interceptError func(error) error) *subForwarder[T] { +func newSubForwarder[T any](destCh chan<- T, interceptResult func(T) (T, error), interceptError func(error) error) *subForwarder[T] { return &subForwarder[T]{ interceptResult: interceptResult, interceptError: interceptError, destCh: destCh, srcCh: make(chan T), done: make(chan struct{}), - err: make(chan error), + err: make(chan error, 1), unSub: make(chan struct{}, 1), } } @@ -44,6 +44,14 @@ func (c *subForwarder[T]) start(sub ethereum.Subscription, err error) error { return nil } +func (c *subForwarder[T]) handleError(err error) { + if c.interceptError != nil { + err = c.interceptError(err) + } + c.err <- err // err is buffered, and we never write twice, so write is not blocking + c.srcSub.Unsubscribe() +} + // forwardLoop receives from src, adds the chainID, and then sends to dest. // It also handles Unsubscribing, which may interrupt either forwarding operation. func (c *subForwarder[T]) forwardLoop() { @@ -54,19 +62,17 @@ func (c *subForwarder[T]) forwardLoop() { for { select { case err := <-c.srcSub.Err(): - if c.interceptError != nil { - err = c.interceptError(err) - } - select { - case c.err <- err: - case <-c.unSub: - c.srcSub.Unsubscribe() - } + c.handleError(err) return case h := <-c.srcCh: if c.interceptResult != nil { - h = c.interceptResult(h) + var err error + h, err = c.interceptResult(h) + if err != nil { + c.handleError(err) + return + } } select { case c.destCh <- h: diff --git a/core/chains/evm/client/sub_forwarder_test.go b/core/chains/evm/client/sub_forwarder_test.go index 1bc0122603..267fa1b846 100644 --- a/core/chains/evm/client/sub_forwarder_test.go +++ b/core/chains/evm/client/sub_forwarder_test.go @@ -21,9 +21,9 @@ func TestChainIDSubForwarder(t *testing.T) { t.Parallel() newChainIDSubForwarder := func(chainID *big.Int, ch chan<- *evmtypes.Head) *subForwarder[*evmtypes.Head] { - return newSubForwarder(ch, func(head *evmtypes.Head) *evmtypes.Head { + return newSubForwarder(ch, func(head *evmtypes.Head) (*evmtypes.Head, error) { head.EVMChainID = ubig.New(chainID) - return head + return head, nil }, nil) } @@ -54,12 +54,14 @@ func TestChainIDSubForwarder(t *testing.T) { sub := NewMockSubscription() err := forwarder.start(sub, nil) assert.NoError(t, err) - sub.Errors <- errors.New("boo") + expectedError := errors.New("boo") + sub.Errors <- expectedError forwarder.Unsubscribe() assert.True(t, sub.unsubscribed) - _, ok := <-sub.Err() - assert.False(t, ok) + err, ok := <-forwarder.Err() + assert.True(t, ok) + require.ErrorIs(t, err, expectedError) _, ok = <-forwarder.Err() assert.False(t, ok) }) @@ -117,6 +119,31 @@ func TestChainIDSubForwarder(t *testing.T) { }) } +func TestSubscriptionForwarder(t *testing.T) { + t.Run("Error returned by interceptResult is forwarded to err channel", func(t *testing.T) { + t.Parallel() + + ch := make(chan *evmtypes.Head) + expectedErr := errors.New("something went wrong during result interception") + forwarder := newSubForwarder(ch, func(head *evmtypes.Head) (*evmtypes.Head, error) { + return nil, expectedErr + }, nil) + mockedSub := NewMockSubscription() + require.NoError(t, forwarder.start(mockedSub, nil)) + + head := &evmtypes.Head{ + ID: 1, + } + forwarder.srcCh <- head + err := <-forwarder.Err() + require.ErrorIs(t, err, expectedErr) + // ensure forwarder is closed + _, ok := <-forwarder.Err() + assert.False(t, ok) + assert.True(t, mockedSub.unsubscribed) + }) +} + func TestSubscriptionErrorWrapper(t *testing.T) { t.Parallel() newSubscriptionErrorWrapper := func(t *testing.T, sub commontypes.Subscription, errorPrefix string) ethereum.Subscription { @@ -145,17 +172,6 @@ func TestSubscriptionErrorWrapper(t *testing.T) { // subsequence unsubscribe does not causes panic wrapper.Unsubscribe() }) - t.Run("Unsubscribe interrupts error delivery", func(t *testing.T) { - t.Parallel() - sub := NewMockSubscription() - const prefix = "RPC returned error" - wrapper := newSubscriptionErrorWrapper(t, sub, prefix) - sub.Errors <- fmt.Errorf("error") - - wrapper.Unsubscribe() - _, ok := <-wrapper.Err() - assert.False(t, ok) - }) t.Run("Successfully wraps error", func(t *testing.T) { t.Parallel() sub := NewMockSubscription() diff --git a/core/chains/evm/config/chaintype/chaintype.go b/core/chains/evm/config/chaintype/chaintype.go index f6b84e4655..eca8ea6062 100644 --- a/core/chains/evm/config/chaintype/chaintype.go +++ b/core/chains/evm/config/chaintype/chaintype.go @@ -17,6 +17,7 @@ const ( ChainMantle ChainType = "mantle" ChainMetis ChainType = "metis" ChainOptimismBedrock ChainType = "optimismBedrock" + ChainSei ChainType = "sei" ChainScroll ChainType = "scroll" ChainWeMix ChainType = "wemix" ChainXLayer ChainType = "xlayer" @@ -38,7 +39,7 @@ func (c ChainType) IsL2() bool { func (c ChainType) IsValid() bool { switch c { - case "", ChainArbitrum, ChainAstar, ChainCelo, ChainGnosis, ChainHedera, ChainKroma, ChainMantle, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync: + case "", ChainArbitrum, ChainAstar, ChainCelo, ChainGnosis, ChainHedera, ChainKroma, ChainMantle, ChainMetis, ChainOptimismBedrock, ChainSei, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync: return true } return false @@ -64,6 +65,8 @@ func FromSlug(slug string) ChainType { return ChainMetis case "optimismBedrock": return ChainOptimismBedrock + case "sei": + return ChainSei case "scroll": return ChainScroll case "wemix": @@ -135,6 +138,7 @@ var ErrInvalid = fmt.Errorf("must be one of %s or omitted", strings.Join([]strin string(ChainMantle), string(ChainMetis), string(ChainOptimismBedrock), + string(ChainSei), string(ChainScroll), string(ChainWeMix), string(ChainXLayer), diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 9caf37ccf2..163a6de05e 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -1357,7 +1357,7 @@ func TestConfig_Validate(t *testing.T) { - 1: 10 errors: - ChainType: invalid value (Foo): must not be set with this chain id - Nodes: missing: must have at least one node - - ChainType: invalid value (Foo): must be one of arbitrum, astar, celo, gnosis, hedera, kroma, mantle, metis, optimismBedrock, scroll, wemix, xlayer, zkevm, zksync or omitted + - ChainType: invalid value (Foo): must be one of arbitrum, astar, celo, gnosis, hedera, kroma, mantle, metis, optimismBedrock, sei, scroll, wemix, xlayer, zkevm, zksync or omitted - HeadTracker.HistoryDepth: invalid value (30): must be greater than or equal to FinalizedBlockOffset - GasEstimator.BumpThreshold: invalid value (0): cannot be 0 if auto-purge feature is enabled for Foo - Transactions.AutoPurge.Threshold: missing: needs to be set if auto-purge feature is enabled for Foo @@ -1370,7 +1370,7 @@ func TestConfig_Validate(t *testing.T) { - 2: 5 errors: - ChainType: invalid value (Arbitrum): only "optimismBedrock" can be used with this chain id - Nodes: missing: must have at least one node - - ChainType: invalid value (Arbitrum): must be one of arbitrum, astar, celo, gnosis, hedera, kroma, mantle, metis, optimismBedrock, scroll, wemix, xlayer, zkevm, zksync or omitted + - ChainType: invalid value (Arbitrum): must be one of arbitrum, astar, celo, gnosis, hedera, kroma, mantle, metis, optimismBedrock, sei, scroll, wemix, xlayer, zkevm, zksync or omitted - FinalityDepth: invalid value (0): must be greater than or equal to 1 - MinIncomingConfirmations: invalid value (0): must be greater than or equal to 1 - 3: 3 errors: diff --git a/core/services/ocr/contract_tracker.go b/core/services/ocr/contract_tracker.go index 0d0dc45c0d..ac71d8855d 100644 --- a/core/services/ocr/contract_tracker.go +++ b/core/services/ocr/contract_tracker.go @@ -400,7 +400,7 @@ func (t *OCRContractTracker) LatestBlockHeight(ctx context.Context) (blockheight // care about the block height; we have no way of getting the L1 block // height anyway return 0, nil - case "", chaintype.ChainArbitrum, chaintype.ChainAstar, chaintype.ChainCelo, chaintype.ChainGnosis, chaintype.ChainHedera, chaintype.ChainKroma, chaintype.ChainOptimismBedrock, chaintype.ChainScroll, chaintype.ChainWeMix, chaintype.ChainXLayer, chaintype.ChainZkEvm, chaintype.ChainZkSync: + case "", chaintype.ChainArbitrum, chaintype.ChainAstar, chaintype.ChainCelo, chaintype.ChainGnosis, chaintype.ChainHedera, chaintype.ChainKroma, chaintype.ChainOptimismBedrock, chaintype.ChainSei, chaintype.ChainScroll, chaintype.ChainWeMix, chaintype.ChainXLayer, chaintype.ChainZkEvm, chaintype.ChainZkSync: // continue } latestBlockHeight := t.getLatestBlockHeight() diff --git a/core/services/ocrcommon/block_translator.go b/core/services/ocrcommon/block_translator.go index fa44d79c2d..34f6f5c908 100644 --- a/core/services/ocrcommon/block_translator.go +++ b/core/services/ocrcommon/block_translator.go @@ -22,7 +22,7 @@ func NewBlockTranslator(cfg Config, client evmclient.Client, lggr logger.Logger) switch cfg.ChainType() { case chaintype.ChainArbitrum: return NewArbitrumBlockTranslator(client, lggr) - case "", chaintype.ChainCelo, chaintype.ChainGnosis, chaintype.ChainKroma, chaintype.ChainMetis, chaintype.ChainOptimismBedrock, chaintype.ChainScroll, chaintype.ChainWeMix, chaintype.ChainXLayer, chaintype.ChainZkEvm, chaintype.ChainZkSync: + case "", chaintype.ChainCelo, chaintype.ChainGnosis, chaintype.ChainKroma, chaintype.ChainMetis, chaintype.ChainOptimismBedrock, chaintype.ChainSei, chaintype.ChainScroll, chaintype.ChainWeMix, chaintype.ChainXLayer, chaintype.ChainZkEvm, chaintype.ChainZkSync: fallthrough default: return &l1BlockTranslator{}