From 6b2a6f7e4832d7fce72351b86a6172b276ef7d2c Mon Sep 17 00:00:00 2001 From: gouthamp-stellar Date: Mon, 30 Sep 2024 15:42:18 -0700 Subject: [PATCH] changes based on comments --- internal/entities/rpc.go | 10 ---------- internal/serve/serve.go | 17 ++++++++++------- .../servicesmocks/rpc_service_mocks.go | 4 ---- internal/tss/channels/rpc_caller_channel.go | 13 +++++-------- .../tss/channels/rpc_caller_channel_test.go | 4 ++-- internal/tss/services/transaction_manager.go | 2 +- .../tss/services/transaction_manager_test.go | 2 +- internal/tss/store/store.go | 19 +++++++++++-------- internal/tss/store/store_test.go | 4 ++-- 9 files changed, 32 insertions(+), 43 deletions(-) diff --git a/internal/entities/rpc.go b/internal/entities/rpc.go index cec0ecc..552ba49 100644 --- a/internal/entities/rpc.go +++ b/internal/entities/rpc.go @@ -18,22 +18,12 @@ const ( SuccessStatus RPCStatus = "SUCCESS" ) -type RPCEntry struct { - Key string `json:"key"` - XDR string `json:"xdr"` - LastModifiedLedgerSeq int64 `json:"lastModifiedLedgerSeq"` -} - type RPCResponse struct { Result json.RawMessage `json:"result"` JSONRPC string `json:"jsonrpc"` ID int64 `json:"id"` } -type RPCGetLedgerEntriesResult struct { - Entries []RPCEntry `json:"entries"` -} - type RPCGetTransactionResult struct { Status RPCStatus `json:"status"` LatestLedger int64 `json:"latestLedger"` diff --git a/internal/serve/serve.go b/internal/serve/serve.go index c960bd5..00ee74f 100644 --- a/internal/serve/serve.go +++ b/internal/serve/serve.go @@ -85,7 +85,8 @@ type handlerDeps struct { // TSS RPCCallerServiceChannel tss.Channel TSSRouter tssrouter.Router - AppTracker apptracker.AppTracker + // Error Tracker + AppTracker apptracker.AppTracker } func Serve(cfg Configs) error { @@ -172,7 +173,7 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) { DistributionAccountSignatureClient: cfg.DistributionAccountSignatureClient, ChannelAccountSignatureClient: cfg.ChannelAccountSignatureClient, HorizonClient: &horizonClient, - BaseFee: int64(cfg.BaseFee), // Reuse horizon base fee for RPC?? + BaseFee: int64(cfg.BaseFee), } tssTxService, err := tssservices.NewTransactionService(txServiceOpts) if err != nil { @@ -184,20 +185,22 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) { return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err) } - // re-use same context as above?? - store := tssstore.NewStore(dbConnectionPool) + store, err := tssstore.NewStore(dbConnectionPool) + if err != nil { + return handlerDeps{}, fmt.Errorf("instantiating tss store: %w", err) + } txManager := tssservices.NewTransactionManager(tssservices.TransactionManagerConfigs{ TxService: tssTxService, RPCService: rpcService, Store: store, }) - tssChannelConfigs := tsschannel.RPCCallerChannelConfigs{ + + rpcCallerServiceChannel := tsschannel.NewRPCCallerChannel(tsschannel.RPCCallerChannelConfigs{ TxManager: txManager, Store: store, MaxBufferSize: cfg.RPCCallerServiceChannelBufferSize, MaxWorkers: cfg.RPCCallerServiceChannelMaxWorkers, - } - rpcCallerServiceChannel := tsschannel.NewRPCCallerChannel(tssChannelConfigs) + }) router := tssrouter.NewRouter(tssrouter.RouterConfigs{ RPCCallerChannel: rpcCallerServiceChannel, diff --git a/internal/services/servicesmocks/rpc_service_mocks.go b/internal/services/servicesmocks/rpc_service_mocks.go index f72c348..7c6053d 100644 --- a/internal/services/servicesmocks/rpc_service_mocks.go +++ b/internal/services/servicesmocks/rpc_service_mocks.go @@ -21,7 +21,3 @@ func (r *RPCServiceMock) GetTransaction(transactionHash string) (entities.RPCGet args := r.Called(transactionHash) return args.Get(0).(entities.RPCGetTransactionResult), args.Error(1) } - -type TransactionManagerMock struct { - mock.Mock -} diff --git a/internal/tss/channels/rpc_caller_channel.go b/internal/tss/channels/rpc_caller_channel.go index 9a36318..4e8b276 100644 --- a/internal/tss/channels/rpc_caller_channel.go +++ b/internal/tss/channels/rpc_caller_channel.go @@ -6,7 +6,6 @@ import ( "github.com/alitto/pond" "github.com/stellar/go/support/log" - "github.com/stellar/wallet-backend/internal/entities" "github.com/stellar/wallet-backend/internal/tss" "github.com/stellar/wallet-backend/internal/tss/router" "github.com/stellar/wallet-backend/internal/tss/services" @@ -54,21 +53,19 @@ func (p *rpcCallerPool) Receive(payload tss.Payload) { err := p.Store.UpsertTransaction(ctx, payload.WebhookURL, payload.TransactionHash, payload.TransactionXDR, tss.RPCTXStatus{OtherStatus: tss.NewStatus}) if err != nil { - log.Errorf("%s: Unable to upsert transaction into transactions table: %e", RPCCallerChannelName, err) + log.Errorf("%s: unable to upsert transaction into transactions table: %e", RPCCallerChannelName, err) return } rpcSendResp, err := p.TxManager.BuildAndSubmitTransaction(ctx, RPCCallerChannelName, payload) if err != nil { - log.Errorf("%s: Unable to sign and submit transaction: %e", RPCCallerChannelName, err) + log.Errorf("%s: unable to sign and submit transaction: %e", RPCCallerChannelName, err) return } payload.RpcSubmitTxResponse = rpcSendResp - if rpcSendResp.Status.RPCStatus == entities.TryAgainLaterStatus || rpcSendResp.Status.RPCStatus == entities.ErrorStatus { - err = p.Router.Route(payload) - if err != nil { - log.Errorf("%s: Unable to route payload: %e", RPCCallerChannelName, err) - } + err = p.Router.Route(payload) + if err != nil { + log.Errorf("%s: unable to route payload: %e", RPCCallerChannelName, err) } } diff --git a/internal/tss/channels/rpc_caller_channel_test.go b/internal/tss/channels/rpc_caller_channel_test.go index b5899c0..a46d063 100644 --- a/internal/tss/channels/rpc_caller_channel_test.go +++ b/internal/tss/channels/rpc_caller_channel_test.go @@ -21,7 +21,7 @@ func TestSend(t *testing.T) { dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) require.NoError(t, err) defer dbConnectionPool.Close() - store := store.NewStore(dbConnectionPool) + store, _ := store.NewStore(dbConnectionPool) txManagerMock := services.TransactionManagerMock{} routerMock := router.MockRouter{} cfgs := RPCCallerChannelConfigs{ @@ -64,7 +64,7 @@ func TestReceivee(t *testing.T) { dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) require.NoError(t, err) defer dbConnectionPool.Close() - store := store.NewStore(dbConnectionPool) + store, _ := store.NewStore(dbConnectionPool) txManagerMock := services.TransactionManagerMock{} routerMock := router.MockRouter{} cfgs := RPCCallerChannelConfigs{ diff --git a/internal/tss/services/transaction_manager.go b/internal/tss/services/transaction_manager.go index 7825582..5fa3c63 100644 --- a/internal/tss/services/transaction_manager.go +++ b/internal/tss/services/transaction_manager.go @@ -64,7 +64,7 @@ func (t *transactionManager) BuildAndSubmitTransaction(ctx context.Context, chan return rpcSendResp, fmt.Errorf("%s: RPC fail: %w", channelName, parseErr) } - if rpcErr != nil && rpcSendResp.Code.OtherCodes == tss.RPCFailCode || rpcSendResp.Code.OtherCodes == tss.UnmarshalBinaryCode { + if parseErr != nil && rpcSendResp.Code.OtherCodes == tss.RPCFailCode || rpcSendResp.Code.OtherCodes == tss.UnmarshalBinaryCode { return tss.RPCSendTxResponse{}, fmt.Errorf("%s: RPC fail: %w", channelName, rpcErr) } diff --git a/internal/tss/services/transaction_manager_test.go b/internal/tss/services/transaction_manager_test.go index 80731d4..70c1df1 100644 --- a/internal/tss/services/transaction_manager_test.go +++ b/internal/tss/services/transaction_manager_test.go @@ -24,7 +24,7 @@ func TestBuildAndSubmitTransaction(t *testing.T) { dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) require.NoError(t, err) defer dbConnectionPool.Close() - store := store.NewStore(dbConnectionPool) + store, _ := store.NewStore(dbConnectionPool) txServiceMock := TransactionServiceMock{} rpcServiceMock := servicesmocks.RPCServiceMock{} txManager := NewTransactionManager(TransactionManagerConfigs{ diff --git a/internal/tss/store/store.go b/internal/tss/store/store.go index b7953b0..7d5bc81 100644 --- a/internal/tss/store/store.go +++ b/internal/tss/store/store.go @@ -19,10 +19,13 @@ type store struct { DB db.ConnectionPool } -func NewStore(db db.ConnectionPool) Store { +func NewStore(db db.ConnectionPool) (Store, error) { + if db == nil { + return nil, fmt.Errorf("db cannot be nil") + } return &store{ DB: db, - } + }, nil } func (s *store) UpsertTransaction(ctx context.Context, webhookURL string, txHash string, txXDR string, status tss.RPCTXStatus) error { @@ -33,9 +36,9 @@ func (s *store) UpsertTransaction(ctx context.Context, webhookURL string, txHash ($1, $2, $3, $4) ON CONFLICT (transaction_hash) DO UPDATE SET - transaction_xdr = $2, - webhook_url = $3, - current_status = $4, + transaction_xdr = EXCLUDED.transaction_xdr, + webhook_url = EXCLUDED.webhook_url, + current_status = EXCLUDED.current_status, updated_at = NOW(); ` _, err := s.DB.ExecContext(ctx, q, txHash, txXDR, webhookURL, status.Status()) @@ -53,9 +56,9 @@ func (s *store) UpsertTry(ctx context.Context, txHash string, feeBumpTxHash stri ($1, $2, $3, $4) ON CONFLICT (try_transaction_hash) DO UPDATE SET - original_transaction_hash = $1, - try_transaction_xdr = $3, - status = $4, + original_transaction_hash = EXCLUDED.original_transaction_hash, + try_transaction_xdr = EXCLUDED.try_transaction_xdr, + status = EXCLUDED.status, updated_at = NOW(); ` _, err := s.DB.ExecContext(ctx, q, txHash, feeBumpTxHash, feeBumpTxXDR, status.Code()) diff --git a/internal/tss/store/store_test.go b/internal/tss/store/store_test.go index 2987a27..ce9989b 100644 --- a/internal/tss/store/store_test.go +++ b/internal/tss/store/store_test.go @@ -19,7 +19,7 @@ func TestUpsertTransaction(t *testing.T) { dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) require.NoError(t, err) defer dbConnectionPool.Close() - store := NewStore(dbConnectionPool) + store, _ := NewStore(dbConnectionPool) t.Run("insert", func(t *testing.T) { _ = store.UpsertTransaction(context.Background(), "www.stellar.org", "hash", "xdr", tss.RPCTXStatus{OtherStatus: tss.NewStatus}) @@ -52,7 +52,7 @@ func TestUpsertTry(t *testing.T) { dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) require.NoError(t, err) defer dbConnectionPool.Close() - store := NewStore(dbConnectionPool) + store, _ := NewStore(dbConnectionPool) t.Run("insert", func(t *testing.T) { code := tss.RPCTXCode{OtherCodes: tss.NewCode} _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", code)