diff --git a/cmd/ingest.go b/cmd/ingest.go index 32dab4d..581dcf5 100644 --- a/cmd/ingest.go +++ b/cmd/ingest.go @@ -30,10 +30,10 @@ func (c *ingestCmd) Command() *cobra.Command { utils.RPCURLOption(&cfg.RPCURL), utils.StartLedgerOption(&cfg.StartLedger), utils.EndLedgerOption(&cfg.EndLedger), - utils.WebhookHandlerServiceChannelMaxBufferSizeOption(&cfg.WebhookChannelMaxBufferSize), - utils.WebhookHandlerServiceChannelMaxWorkersOptions(&cfg.WebhookChannelMaxWorkers), - utils.WebhookHandlerServiceChannelMaxRetriesOption(&cfg.WebhookChannelMaxRetries), - utils.WebhookHandlerServiceChannelMinWaitBtwnRetriesMSOption(&cfg.WebhookChannelWaitBtwnTriesMS), + utils.WebhookHandlerChannelMaxBufferSizeOption(&cfg.WebhookChannelMaxBufferSize), + utils.WebhookHandlerChannelMaxWorkersOptions(&cfg.WebhookChannelMaxWorkers), + utils.WebhookHandlerChannelMaxRetriesOption(&cfg.WebhookChannelMaxRetries), + utils.WebhookHandlerChannelMinWaitBtwnRetriesMSOption(&cfg.WebhookChannelWaitBtwnTriesMS), { Name: "ledger-cursor-name", Usage: "Name of last synced ledger cursor, used to keep track of the last ledger ingested by the service. When starting up, ingestion will resume from the ledger number stored in this record. It should be an unique name per container as different containers would overwrite the cursor value of its peers when using the same cursor name.", diff --git a/cmd/serve.go b/cmd/serve.go index 320e798..2108e26 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -30,23 +30,23 @@ func (c *serveCmd) Command() *cobra.Command { utils.BaseFeeOption(&cfg.BaseFee), utils.HorizonClientURLOption(&cfg.HorizonClientURL), utils.RPCURLOption(&cfg.RPCURL), - utils.RPCCallerServiceChannelBufferSizeOption(&cfg.RPCCallerServiceChannelBufferSize), - utils.RPCCallerServiceMaxWorkersOption(&cfg.RPCCallerServiceChannelMaxWorkers), + utils.RPCCallerChannelBufferSizeOption(&cfg.RPCCallerServiceChannelBufferSize), + utils.RPCCallerChannelMaxWorkersOption(&cfg.RPCCallerServiceChannelMaxWorkers), utils.ChannelAccountEncryptionPassphraseOption(&cfg.EncryptionPassphrase), utils.SentryDSNOption(&sentryDSN), utils.StellarEnvironmentOption(&stellarEnvironment), - utils.ErrorHandlerServiceJitterChannelBufferSizeOption(&cfg.ErrorHandlerServiceJitterChannelBufferSize), - utils.ErrorHandlerServiceJitterChannelMaxWorkersOption(&cfg.ErrorHandlerServiceJitterChannelMaxWorkers), - utils.ErrorHandlerServiceNonJitterChannelBufferSizeOption(&cfg.ErrorHandlerServiceNonJitterChannelBufferSize), - utils.ErrorHandlerServiceNonJitterChannelMaxWorkersOption(&cfg.ErrorHandlerServiceNonJitterChannelMaxWorkers), - utils.ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMSOption(&cfg.ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMS), - utils.ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMSOption(&cfg.ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMS), - utils.ErrorHandlerServiceJitterChannelMaxRetriesOptions(&cfg.ErrorHandlerServiceJitterChannelMaxRetries), - utils.ErrorHandlerServiceNonJitterChannelMaxRetriesOption(&cfg.ErrorHandlerServiceNonJitterChannelMaxRetries), - utils.WebhookHandlerServiceChannelMaxBufferSizeOption(&cfg.WebhookHandlerServiceChannelMaxBufferSize), - utils.WebhookHandlerServiceChannelMaxWorkersOptions(&cfg.WebhookHandlerServiceChannelMaxWorkers), - utils.WebhookHandlerServiceChannelMaxRetriesOption(&cfg.WebhookHandlerServiceChannelMaxRetries), - utils.WebhookHandlerServiceChannelMinWaitBtwnRetriesMSOption(&cfg.WebhookHandlerServiceChannelMinWaitBtwnRetriesMS), + utils.ErrorHandlerJitterChannelBufferSizeOption(&cfg.ErrorHandlerServiceJitterChannelBufferSize), + utils.ErrorHandlerJitterChannelMaxWorkersOption(&cfg.ErrorHandlerServiceJitterChannelMaxWorkers), + utils.ErrorHandlerNonJitterChannelBufferSizeOption(&cfg.ErrorHandlerServiceNonJitterChannelBufferSize), + utils.ErrorHandlerNonJitterChannelMaxWorkersOption(&cfg.ErrorHandlerServiceNonJitterChannelMaxWorkers), + utils.ErrorHandlerJitterChannelMinWaitBtwnRetriesMSOption(&cfg.ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMS), + utils.ErrorHandlerNonJitterChannelWaitBtwnRetriesMSOption(&cfg.ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMS), + utils.ErrorHandlerJitterChannelMaxRetriesOptions(&cfg.ErrorHandlerServiceJitterChannelMaxRetries), + utils.ErrorHandlerNonJitterChannelMaxRetriesOption(&cfg.ErrorHandlerServiceNonJitterChannelMaxRetries), + utils.WebhookHandlerChannelMaxBufferSizeOption(&cfg.WebhookHandlerServiceChannelMaxBufferSize), + utils.WebhookHandlerChannelMaxWorkersOptions(&cfg.WebhookHandlerServiceChannelMaxWorkers), + utils.WebhookHandlerChannelMaxRetriesOption(&cfg.WebhookHandlerServiceChannelMaxRetries), + utils.WebhookHandlerChannelMinWaitBtwnRetriesMSOption(&cfg.WebhookHandlerServiceChannelMinWaitBtwnRetriesMS), { Name: "port", diff --git a/cmd/utils/tss_options.go b/cmd/utils/tss_options.go index 7a3e9b0..6cb729b 100644 --- a/cmd/utils/tss_options.go +++ b/cmd/utils/tss_options.go @@ -6,20 +6,20 @@ import ( "github.com/stellar/go/support/config" ) -func RPCCallerServiceChannelBufferSizeOption(configKey *int) *config.ConfigOption { +func RPCCallerChannelBufferSizeOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "tss-rpc-caller-service-channel-buffer-size", - Usage: "Set the buffer size for TSS RPC Caller Service channel.", + Name: "tss-rpc-caller-channel-buffer-size", + Usage: "Set the buffer size for TSS RPC Caller channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 1000, } } -func RPCCallerServiceMaxWorkersOption(configKey *int) *config.ConfigOption { +func RPCCallerChannelMaxWorkersOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "tss-rpc-caller-service-channel-max-workers", - Usage: "Set the maximum number of workers for TSS RPC Caller Service channel.", + Name: "tss-rpc-caller-channel-max-workers", + Usage: "Set the maximum number of workers for TSS RPC Caller channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 100, @@ -27,10 +27,10 @@ func RPCCallerServiceMaxWorkersOption(configKey *int) *config.ConfigOption { } -func ErrorHandlerServiceJitterChannelBufferSizeOption(configKey *int) *config.ConfigOption { +func ErrorHandlerJitterChannelBufferSizeOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "error-handler-service-jitter-channel-buffer-size", - Usage: "Set the buffer size of the Error Handler Service Jitter channel.", + Name: "error-handler-jitter-channel-buffer-size", + Usage: "Set the buffer size of the Error Handler Jitter channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 100, @@ -38,10 +38,10 @@ func ErrorHandlerServiceJitterChannelBufferSizeOption(configKey *int) *config.Co } } -func ErrorHandlerServiceJitterChannelMaxWorkersOption(configKey *int) *config.ConfigOption { +func ErrorHandlerJitterChannelMaxWorkersOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "error-handler-service-jitter-channel-max-workers", - Usage: "Set the maximum number of workers for the Error Handler Service Jitter channel.", + Name: "error-handler-jitter-channel-max-workers", + Usage: "Set the maximum number of workers for the Error Handler Jitter channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 10, @@ -49,10 +49,10 @@ func ErrorHandlerServiceJitterChannelMaxWorkersOption(configKey *int) *config.Co } } -func ErrorHandlerServiceNonJitterChannelBufferSizeOption(configKey *int) *config.ConfigOption { +func ErrorHandlerNonJitterChannelBufferSizeOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "error-handler-service-non-jitter-channel-buffer-size", - Usage: "Set the buffer size of the Error Handler Service Non Jitter channel.", + Name: "error-handler-non-jitter-channel-buffer-size", + Usage: "Set the buffer size of the Error Handler Non Jitter channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 100, @@ -61,10 +61,10 @@ func ErrorHandlerServiceNonJitterChannelBufferSizeOption(configKey *int) *config } -func ErrorHandlerServiceNonJitterChannelMaxWorkersOption(configKey *int) *config.ConfigOption { +func ErrorHandlerNonJitterChannelMaxWorkersOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "error-handler-service-non-jitter-channel-max-workers", - Usage: "Set the maximum number of workers for the Error Handler Service Non Jitter channel.", + Name: "error-handler-non-jitter-channel-max-workers", + Usage: "Set the maximum number of workers for the Error Handler Non Jitter channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 10, @@ -72,10 +72,10 @@ func ErrorHandlerServiceNonJitterChannelMaxWorkersOption(configKey *int) *config } } -func ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMSOption(configKey *int) *config.ConfigOption { +func ErrorHandlerJitterChannelMinWaitBtwnRetriesMSOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "error-handler-service-jitter-channel-min-wait-between-retries", - Usage: "Set the minimum amount of time in ms between retries for the Error Handler Service Jitter channel.", + Name: "error-handler-jitter-channel-min-wait-between-retries", + Usage: "Set the minimum amount of time in ms between retries for the Error Handler Jitter channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 10, @@ -83,10 +83,10 @@ func ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMSOption(configKey *int) } } -func ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMSOption(configKey *int) *config.ConfigOption { +func ErrorHandlerNonJitterChannelWaitBtwnRetriesMSOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "error-handler-service-non-jitter-channel-wait-between-retries", - Usage: "Set the amount of time in ms between retries for the Error Handler Service Non Jitter channel.", + Name: "error-handler-non-jitter-channel-wait-between-retries", + Usage: "Set the amount of time in ms between retries for the Error Handler Non Jitter channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 10, @@ -94,10 +94,10 @@ func ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMSOption(configKey *int) } } -func ErrorHandlerServiceJitterChannelMaxRetriesOptions(configKey *int) *config.ConfigOption { +func ErrorHandlerJitterChannelMaxRetriesOptions(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "error-handler-service-jitter-channel-max-retries", - Usage: "Set the number of retries for each task in the Error Handler Service Jitter channel.", + Name: "error-handler-jitter-channel-max-retries", + Usage: "Set the number of retries for each task in the Error Handler Jitter channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 10, @@ -106,10 +106,10 @@ func ErrorHandlerServiceJitterChannelMaxRetriesOptions(configKey *int) *config.C } -func ErrorHandlerServiceNonJitterChannelMaxRetriesOption(configKey *int) *config.ConfigOption { +func ErrorHandlerNonJitterChannelMaxRetriesOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "error-handler-service-non-jitter-channel-max-retries", - Usage: "Set the number of retries for each task in the Error Handler Service Non Jitter channel.", + Name: "error-handler-non-jitter-channel-max-retries", + Usage: "Set the number of retries for each task in the Error Handler Service Jitter channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 10, @@ -117,10 +117,10 @@ func ErrorHandlerServiceNonJitterChannelMaxRetriesOption(configKey *int) *config } } -func WebhookHandlerServiceChannelMaxBufferSizeOption(configKey *int) *config.ConfigOption { +func WebhookHandlerChannelMaxBufferSizeOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "webhook-service-channel-max-buffer-size", - Usage: "Set the buffer size of the webhook serive channel.", + Name: "webhook-channel-max-buffer-size", + Usage: "Set the buffer size of the webhook channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 100, @@ -128,10 +128,10 @@ func WebhookHandlerServiceChannelMaxBufferSizeOption(configKey *int) *config.Con } } -func WebhookHandlerServiceChannelMaxWorkersOptions(configKey *int) *config.ConfigOption { +func WebhookHandlerChannelMaxWorkersOptions(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "webhook-service-channel-max-workers", - Usage: "Set the max number of workers for the webhook serive channel.", + Name: "webhook-channel-max-workers", + Usage: "Set the max number of workers for the webhook channel.", OptType: types.Int, ConfigKey: configKey, FlagDefault: 10, @@ -139,9 +139,9 @@ func WebhookHandlerServiceChannelMaxWorkersOptions(configKey *int) *config.Confi } } -func WebhookHandlerServiceChannelMaxRetriesOption(configKey *int) *config.ConfigOption { +func WebhookHandlerChannelMaxRetriesOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "webhook-service-channel-max-retries", + Name: "webhook-channel-max-retries", Usage: "Set the max number of times to ping a webhook before quitting.", OptType: types.Int, ConfigKey: configKey, @@ -150,9 +150,9 @@ func WebhookHandlerServiceChannelMaxRetriesOption(configKey *int) *config.Config } } -func WebhookHandlerServiceChannelMinWaitBtwnRetriesMSOption(configKey *int) *config.ConfigOption { +func WebhookHandlerChannelMinWaitBtwnRetriesMSOption(configKey *int) *config.ConfigOption { return &config.ConfigOption{ - Name: "webhook-service-channel-min-wait-between-retries", + Name: "webhook-channel-min-wait-between-retries", Usage: "The minumum amout of time to wait before resending the payload to the webhook url", OptType: types.Int, ConfigKey: configKey, diff --git a/internal/db/migrations/2024-10-14.0-alter_tss_transaction_tries.sql b/internal/db/migrations/2024-10-14.0-alter_tss_transaction_tries.sql new file mode 100644 index 0000000..4dee34b --- /dev/null +++ b/internal/db/migrations/2024-10-14.0-alter_tss_transaction_tries.sql @@ -0,0 +1,15 @@ +-- +migrate Up +ALTER TABLE tss_transaction_submission_tries + RENAME COLUMN status TO code; + +ALTER TABLE tss_transaction_submission_tries + ADD column status TEXT NOT NULL, + ADD COLUMN result_xdr TEXT NOT NULL; + +-- +migrate Down +ALTER TABLE tss_transaction_submission_tries + DROP COLUMN status, + DROP COLUMN result_xdr; + +ALTER TABLE tss_transaction_submission_tries + RENAME COLUMN code TO status; \ No newline at end of file diff --git a/internal/serve/serve.go b/internal/serve/serve.go index b4e5ba3..bcf25b3 100644 --- a/internal/serve/serve.go +++ b/internal/serve/serve.go @@ -101,6 +101,7 @@ type handlerDeps struct { ErrorNonJitterChannel tss.Channel WebhookChannel tss.Channel TSSRouter tssrouter.Router + PoolPopulator tssservices.PoolPopulator // Error Tracker AppTracker apptracker.AppTracker } @@ -117,6 +118,7 @@ func Serve(cfg Configs) error { Handler: handler(deps), OnStarting: func() { log.Infof("Starting Wallet Backend server on port %d", cfg.Port) + go populatePools(deps.PoolPopulator) }, OnStopping: func() { log.Info("Stopping Wallet Backend server") @@ -240,6 +242,7 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) { httpClient = http.Client{Timeout: time.Duration(30 * time.Second)} webhookChannel := tsschannel.NewWebhookChannel(tsschannel.WebhookChannelConfigs{ HTTPClient: &httpClient, + Store: store, MaxBufferSize: cfg.WebhookHandlerServiceChannelMaxBufferSize, MaxWorkers: cfg.WebhookHandlerServiceChannelMaxWorkers, MaxRetries: cfg.WebhookHandlerServiceChannelMaxRetries, @@ -257,6 +260,11 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) { errorJitterChannel.SetRouter(router) errorNonJitterChannel.SetRouter(router) + poolPopulator, err := tssservices.NewPoolPopulator(router, store, rpcService) + if err != nil { + return handlerDeps{}, fmt.Errorf("instantiating tss pool populator") + } + return handlerDeps{ Models: models, SignatureVerifier: signatureVerifier, @@ -271,9 +279,20 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) { ErrorNonJitterChannel: errorNonJitterChannel, WebhookChannel: webhookChannel, TSSRouter: router, + PoolPopulator: poolPopulator, }, nil } +func populatePools(poolPopulator tssservices.PoolPopulator) { + alertAfter := time.Minute * 10 + ticker := time.NewTicker(alertAfter) + ctx := context.Background() + + for range ticker.C { + poolPopulator.PopulatePools(ctx) + } +} + func ensureChannelAccounts(channelAccountService services.ChannelAccountService, numberOfChannelAccounts int64) { ctx := context.Background() log.Ctx(ctx).Info("Ensuring the number of channel accounts in the database...") diff --git a/internal/services/ingest.go b/internal/services/ingest.go index cae959d..7bf0782 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -167,6 +167,7 @@ func (m *ingestService) ingestPayments(ctx context.Context, ledgerTransactions [ Memo: txMemo, MemoType: txMemoType, } + switch op.Body.Type { case xdr.OperationTypePayment: fillPayment(&payment, op.Body) @@ -177,6 +178,7 @@ func (m *ingestService) ingestPayments(ctx context.Context, ledgerTransactions [ default: continue } + err = m.models.Payments.AddPayment(ctx, dbTx, payment) if err != nil { return fmt.Errorf("adding payment for ledger %d, tx %s (%d), operation %s (%d): %w", tx.Ledger, tx.Hash, tx.ApplicationOrder, payment.OperationID, opIdx, err) @@ -210,7 +212,7 @@ func (m *ingestService) processTSSTransactions(ctx context.Context, ledgerTransa if err != nil { return fmt.Errorf("error unmarshaling resultxdr: %w", err) } - err = m.tssStore.UpsertTry(ctx, tssTry.OrigTxHash, tssTry.Hash, tssTry.XDR, code) + err = m.tssStore.UpsertTry(ctx, tssTry.OrigTxHash, tssTry.Hash, tssTry.XDR, status, code, tx.ResultXDR) if err != nil { return fmt.Errorf("error updating try: %w", err) } diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 11ec6bd..330464c 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -144,7 +144,7 @@ func TestProcessTSSTransactions(t *testing.T) { } _ = tssStore.UpsertTransaction(context.Background(), "localhost:8000/webhook", "hash", "xdr", tss.RPCTXStatus{OtherStatus: tss.NewStatus}) - _ = tssStore.UpsertTry(context.Background(), "hash", "feebumphash", "feebumpxdr", tss.RPCTXCode{OtherCodes: tss.NewCode}) + _ = tssStore.UpsertTry(context.Background(), "hash", "feebumphash", "feebumpxdr", tss.RPCTXStatus{OtherStatus: tss.NewStatus}, tss.RPCTXCode{OtherCodes: tss.NewCode}, "") mockRouter. On("Route", mock.AnythingOfType("tss.Payload")). @@ -157,6 +157,7 @@ func TestProcessTSSTransactions(t *testing.T) { updatedTX, _ := tssStore.GetTransaction(context.Background(), "hash") assert.Equal(t, string(entities.SuccessStatus), updatedTX.Status) updatedTry, _ := tssStore.GetTry(context.Background(), "feebumphash") + assert.Equal(t, "AAAAAAAAAMj////9AAAAAA==", updatedTry.ResultXDR) assert.Equal(t, int32(xdr.TransactionResultCodeTxTooLate), updatedTry.Code) }) } @@ -174,7 +175,6 @@ func TestIngestPayments(t *testing.T) { mockRouter := tssrouter.MockRouter{} tssStore, _ := tssstore.NewStore(dbConnectionPool) ingestService, _ := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) - // test these 3 test cases: OperationTypePayment, OperationTypePathPaymentStrictSend, OperationTypePathPaymentStrictReceive srcAccount := keypair.MustRandom().Address() destAccount := keypair.MustRandom().Address() t.Run("test_op_payment", func(t *testing.T) { diff --git a/internal/tss/channels/webhook_channel.go b/internal/tss/channels/webhook_channel.go index 0e8e637..e844f42 100644 --- a/internal/tss/channels/webhook_channel.go +++ b/internal/tss/channels/webhook_channel.go @@ -2,6 +2,7 @@ package channels import ( "bytes" + "context" "encoding/json" "net/http" "time" @@ -9,12 +10,14 @@ import ( "github.com/alitto/pond" "github.com/stellar/go/support/log" "github.com/stellar/wallet-backend/internal/tss" + "github.com/stellar/wallet-backend/internal/tss/store" tssutils "github.com/stellar/wallet-backend/internal/tss/utils" "github.com/stellar/wallet-backend/internal/utils" ) type WebhookChannelConfigs struct { HTTPClient utils.HTTPClient + Store store.Store MaxBufferSize int MaxWorkers int MaxRetries int @@ -23,6 +26,7 @@ type WebhookChannelConfigs struct { type webhookPool struct { Pool *pond.WorkerPool + Store store.Store HTTPClient utils.HTTPClient MaxRetries int MinWaitBtwnRetriesMS int @@ -36,6 +40,7 @@ func NewWebhookChannel(cfg WebhookChannelConfigs) *webhookPool { pool := pond.New(cfg.MaxBufferSize, cfg.MaxWorkers, pond.Strategy(pond.Balanced())) return &webhookPool{ Pool: pool, + Store: cfg.Store, HTTPClient: cfg.HTTPClient, MaxRetries: cfg.MaxRetries, MinWaitBtwnRetriesMS: cfg.MinWaitBtwnRetriesMS, @@ -57,19 +62,35 @@ func (p *webhookPool) Receive(payload tss.Payload) { return } var i int + sent := false + ctx := context.Background() for i = 0; i < p.MaxRetries; i++ { - resp, err := p.HTTPClient.Post(payload.WebhookURL, "application/json", bytes.NewBuffer(jsonData)) + httpResp, err := p.HTTPClient.Post(payload.WebhookURL, "application/json", bytes.NewBuffer(jsonData)) if err != nil { log.Errorf("%s: error making POST request to webhook: %e", WebhookChannelName, err) } - defer resp.Body.Close() + defer httpResp.Body.Close() - if resp.StatusCode == http.StatusOK { - return + if httpResp.StatusCode == http.StatusOK { + sent = true + err := p.Store.UpsertTransaction( + ctx, payload.WebhookURL, payload.TransactionHash, payload.TransactionXDR, tss.RPCTXStatus{OtherStatus: tss.SentStatus}) + if err != nil { + log.Errorf("%s: error updating transaction status: %e", WebhookChannelName, err) + } + break } currentBackoff := p.MinWaitBtwnRetriesMS * (1 << i) time.Sleep(jitter(time.Duration(currentBackoff)) * time.Millisecond) } + if !sent { + err := p.Store.UpsertTransaction( + ctx, payload.WebhookURL, payload.TransactionHash, payload.TransactionXDR, tss.RPCTXStatus{OtherStatus: tss.NotSentStatus}) + if err != nil { + log.Errorf("%s: error updating transaction status: %e", WebhookChannelName, err) + } + } + } func (p *webhookPool) Stop() { diff --git a/internal/tss/channels/webhook_channel_test.go b/internal/tss/channels/webhook_channel_test.go index 88e4cf3..fe20572 100644 --- a/internal/tss/channels/webhook_channel_test.go +++ b/internal/tss/channels/webhook_channel_test.go @@ -2,21 +2,34 @@ package channels import ( "bytes" + "context" "encoding/json" "io" "net/http" "strings" "testing" + "github.com/stellar/wallet-backend/internal/db" + "github.com/stellar/wallet-backend/internal/db/dbtest" "github.com/stellar/wallet-backend/internal/tss" + "github.com/stellar/wallet-backend/internal/tss/store" tssutils "github.com/stellar/wallet-backend/internal/tss/utils" "github.com/stellar/wallet-backend/internal/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestWebhookHandlerServiceChannel(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + store, _ := store.NewStore(dbConnectionPool) mockHTTPClient := utils.MockHTTPClient{} cfg := WebhookChannelConfigs{ HTTPClient: &mockHTTPClient, + Store: store, MaxBufferSize: 1, MaxWorkers: 1, MaxRetries: 3, @@ -25,6 +38,8 @@ func TestWebhookHandlerServiceChannel(t *testing.T) { channel := NewWebhookChannel(cfg) payload := tss.Payload{} + payload.TransactionHash = "hash" + payload.TransactionXDR = "xdr" payload.WebhookURL = "www.stellar.org" jsonData, _ := json.Marshal(tssutils.PayloadTOTSSResponse(payload)) @@ -52,4 +67,8 @@ func TestWebhookHandlerServiceChannel(t *testing.T) { channel.Stop() mockHTTPClient.AssertNumberOfCalls(t, "Post", 2) + + tx, err := store.GetTransaction(context.Background(), payload.TransactionHash) + assert.Equal(t, string(tss.SentStatus), tx.Status) + assert.NoError(t, err) } diff --git a/internal/tss/router/router.go b/internal/tss/router/router.go index d548349..ef45e76 100644 --- a/internal/tss/router/router.go +++ b/internal/tss/router/router.go @@ -40,12 +40,12 @@ func NewRouter(cfg RouterConfigs) Router { func (r *router) Route(payload tss.Payload) error { var channel tss.Channel if payload.RpcSubmitTxResponse.Status.Status() != "" { - switch payload.RpcSubmitTxResponse.Status.Status() { - case string(tss.NewStatus): + switch payload.RpcSubmitTxResponse.Status { + case tss.RPCTXStatus{OtherStatus: tss.NewStatus}: channel = r.RPCCallerChannel - case string(entities.TryAgainLaterStatus): + case tss.RPCTXStatus{RPCStatus: entities.TryAgainLaterStatus}: channel = r.ErrorJitterChannel - case string(entities.ErrorStatus): + case tss.RPCTXStatus{RPCStatus: entities.ErrorStatus}: if payload.RpcSubmitTxResponse.Code.OtherCodes == tss.NoCode { if slices.Contains(tss.JitterErrorCodes, payload.RpcSubmitTxResponse.Code.TxResultCode) { channel = r.ErrorJitterChannel @@ -55,6 +55,10 @@ func (r *router) Route(payload tss.Payload) error { channel = r.WebhookChannel } } + case tss.RPCTXStatus{RPCStatus: entities.SuccessStatus}: + channel = r.WebhookChannel + case tss.RPCTXStatus{RPCStatus: entities.FailedStatus}: + channel = r.WebhookChannel default: // Do nothing for PENDING / DUPLICATE statuses return nil diff --git a/internal/tss/router/router_test.go b/internal/tss/router/router_test.go index 8060770..3ba7c4a 100644 --- a/internal/tss/router/router_test.go +++ b/internal/tss/router/router_test.go @@ -33,8 +33,9 @@ func TestRouter(t *testing.T) { Return(). Once() - _ = router.Route(payload) + err := router.Route(payload) + assert.NoError(t, err) rpcCallerChannel.AssertCalled(t, "Send", payload) }) t.Run("status_try_again_later_routes_to_error_jitter_channel", func(t *testing.T) { @@ -46,10 +47,42 @@ func TestRouter(t *testing.T) { Return(). Once() - _ = router.Route(payload) + err := router.Route(payload) + assert.NoError(t, err) errorJitterChannel.AssertCalled(t, "Send", payload) }) + + t.Run("status_failure_routes_to_webhook_channel", func(t *testing.T) { + payload := tss.Payload{} + payload.RpcSubmitTxResponse.Status = tss.RPCTXStatus{RPCStatus: entities.FailedStatus} + + webhookChannel. + On("Send", payload). + Return(). + Once() + + err := router.Route(payload) + + assert.NoError(t, err) + webhookChannel.AssertCalled(t, "Send", payload) + }) + + t.Run("status_success_routes_to_webhook_channel", func(t *testing.T) { + payload := tss.Payload{} + payload.RpcSubmitTxResponse.Status = tss.RPCTXStatus{RPCStatus: entities.SuccessStatus} + + webhookChannel. + On("Send", payload). + Return(). + Once() + + err := router.Route(payload) + + assert.NoError(t, err) + webhookChannel.AssertCalled(t, "Send", payload) + }) + t.Run("status_error_routes_to_error_jitter_channel", func(t *testing.T) { for _, code := range tss.JitterErrorCodes { payload := tss.Payload{ @@ -68,8 +101,9 @@ func TestRouter(t *testing.T) { Return(). Once() - _ = router.Route(payload) + err := router.Route(payload) + assert.NoError(t, err) errorJitterChannel.AssertCalled(t, "Send", payload) } }) @@ -91,14 +125,14 @@ func TestRouter(t *testing.T) { Return(). Once() - _ = router.Route(payload) + err := router.Route(payload) + assert.NoError(t, err) errorNonJitterChannel.AssertCalled(t, "Send", payload) } }) t.Run("status_error_routes_to_webhook_channel", func(t *testing.T) { for _, code := range tss.FinalCodes { - payload := tss.Payload{ RpcSubmitTxResponse: tss.RPCSendTxResponse{ Status: tss.RPCTXStatus{ @@ -114,12 +148,13 @@ func TestRouter(t *testing.T) { Return(). Once() - _ = router.Route(payload) + err := router.Route(payload) + assert.NoError(t, err) webhookChannel.AssertCalled(t, "Send", payload) } }) - t.Run("get_ingest_resp_always_routes_to_webhook_cbannel", func(t *testing.T) { + t.Run("get_ingest_resp_always_routes_to_webhook_channel", func(t *testing.T) { payload := tss.Payload{ RpcGetIngestTxResponse: tss.RPCGetIngestTxResponse{ Status: entities.SuccessStatus, @@ -133,8 +168,8 @@ func TestRouter(t *testing.T) { Return(). Once() - _ = router.Route(payload) - + err := router.Route(payload) + assert.NoError(t, err) webhookChannel.AssertCalled(t, "Send", payload) }) t.Run("nil_channel_does_not_route", func(t *testing.T) { diff --git a/internal/tss/services/pool_populator.go b/internal/tss/services/pool_populator.go new file mode 100644 index 0000000..5a78d44 --- /dev/null +++ b/internal/tss/services/pool_populator.go @@ -0,0 +1,245 @@ +package services + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/stellar/go/support/log" + "github.com/stellar/go/txnbuild" + "github.com/stellar/go/xdr" + "github.com/stellar/wallet-backend/internal/entities" + "github.com/stellar/wallet-backend/internal/services" + "github.com/stellar/wallet-backend/internal/tss" + "github.com/stellar/wallet-backend/internal/tss/router" + "github.com/stellar/wallet-backend/internal/tss/store" +) + +type PoolPopulator interface { + PopulatePools(ctx context.Context) +} + +type poolPopulator struct { + Router router.Router + Store store.Store + RPCService services.RPCService +} + +func NewPoolPopulator(router router.Router, store store.Store, rpcService services.RPCService) (*poolPopulator, error) { + if router == nil { + return nil, fmt.Errorf("router is nil") + } + if store == nil { + return nil, fmt.Errorf("store is nil") + } + if rpcService == nil { + return nil, fmt.Errorf("rpcservice is nil") + } + return &poolPopulator{ + Router: router, + Store: store, + RPCService: rpcService, + }, nil +} + +func (p *poolPopulator) PopulatePools(ctx context.Context) { + err := p.routeNewTransactions(ctx) + if err != nil { + log.Ctx(ctx).Errorf("error routing new transactions: %v", err) + } + + err = p.routeErrorTransactions(ctx) + if err != nil { + log.Ctx(ctx).Errorf("error routing error transactions: %v", err) + } + + err = p.routeFinalTransactions(ctx, tss.RPCTXStatus{RPCStatus: entities.FailedStatus}) + if err != nil { + log.Ctx(ctx).Errorf("error routing failed transactions: %v", err) + } + + err = p.routeFinalTransactions(ctx, tss.RPCTXStatus{RPCStatus: entities.SuccessStatus}) + if err != nil { + log.Ctx(ctx).Errorf("error routing successful transactions: %v", err) + } + + err = p.routeNotSentTransactions(ctx) + if err != nil { + log.Ctx(ctx).Errorf("error routing not_sent transactions: %v", err) + } +} + +func (p *poolPopulator) routeNewTransactions(ctx context.Context) error { + newTxns, err := p.Store.GetTransactionsWithStatus(ctx, tss.RPCTXStatus{OtherStatus: tss.NewStatus}) + if err != nil { + return fmt.Errorf("unable to get transactions: %w", err) + } + for _, txn := range newTxns { + payload := tss.Payload{ + TransactionHash: txn.Hash, + TransactionXDR: txn.XDR, + WebhookURL: txn.WebhookURL, + } + try, err := p.Store.GetLatestTry(ctx, txn.Hash) + if err != nil { + return fmt.Errorf("getting latest try for transaction: %w", err) + } + if try == (store.Try{}) { + // there is no try for this transactionm - route to RPC caller channel + payload.RpcSubmitTxResponse.Status = tss.RPCTXStatus{OtherStatus: tss.NewStatus} + } else { + /* + if there is a try for this transaction, check to see if it is + submitted to RPC first. If status is NOT_FOUND, make sure + that the latest try for this transaction is past it's timebounds + before trying to re-submit the transaction. If the status is either + SUCCESS or FAILED, build a payload that will be routed to the Webhook + channel directly + */ + getTransactionResult, err := p.RPCService.GetTransaction(try.Hash) + if err != nil { + return fmt.Errorf("getting transaction: %w", err) + } + if getTransactionResult.Status == entities.NotFoundStatus { + genericTx, err := txnbuild.TransactionFromXDR(try.XDR) + if err != nil { + return fmt.Errorf("unmarshaling tx from xdr string: %w", err) + } + feeBumpTx, unpackable := genericTx.FeeBump() + if !unpackable { + return fmt.Errorf("fee bump transaction cannot be unpacked: %w", err) + } + timeBounds := feeBumpTx.InnerTransaction().ToXDR().Preconditions().TimeBounds + if time.Now().Before(time.Unix(int64(timeBounds.MaxTime), 0)) { + continue + } + // route to the RPC Caller channel + payload.RpcSubmitTxResponse.Status = tss.RPCTXStatus{OtherStatus: tss.NewStatus} + } else { + getIngestTxResponse, err := tss.ParseToRPCGetIngestTxResponse(getTransactionResult, err) + if err != nil { + return fmt.Errorf("parsing rpc reponse: %w", err) + } + payload.RpcGetIngestTxResponse = getIngestTxResponse + } + } + err = p.Router.Route(payload) + if err != nil { + return fmt.Errorf("unable to route payload: %w", err) + } + } + return nil +} + +func (p *poolPopulator) routeErrorTransactions(ctx context.Context) error { + errorTxns, err := p.Store.GetTransactionsWithStatus(ctx, tss.RPCTXStatus{RPCStatus: entities.ErrorStatus}) + if err != nil { + return fmt.Errorf("unable to get transactions: %w", err) + } + for _, txn := range errorTxns { + payload := tss.Payload{ + TransactionHash: txn.Hash, + TransactionXDR: txn.XDR, + WebhookURL: txn.WebhookURL, + } + try, err := p.Store.GetLatestTry(ctx, txn.Hash) + if err != nil { + return fmt.Errorf("gretting latest try for transaction: %w", err) + } + if slices.Contains(tss.FinalCodes, xdr.TransactionResultCode(try.Code)) { + // route to webhook channel + payload.RpcSubmitTxResponse = tss.RPCSendTxResponse{ + TransactionHash: try.Hash, + TransactionXDR: try.XDR, + Status: tss.RPCTXStatus{RPCStatus: entities.ErrorStatus}, + Code: tss.RPCTXCode{TxResultCode: xdr.TransactionResultCode(try.Code)}, + ErrorResultXDR: try.ResultXDR, + } + } else if try.Code == int32(tss.RPCFailCode) || try.Code == int32(tss.UnmarshalBinaryCode) { + // check for timebounds first and route iff out of timebounds route to errorchannel + genericTx, err := txnbuild.TransactionFromXDR(try.XDR) + if err != nil { + return fmt.Errorf("unmarshaling tx from xdr string: %w", err) + } + feeBumpTx, unpackable := genericTx.FeeBump() + if !unpackable { + return fmt.Errorf("fee bump transaction cannot be unpacked: %w", err) + } + timeBounds := feeBumpTx.InnerTransaction().ToXDR().Preconditions().TimeBounds + if time.Now().Before(time.Unix(int64(timeBounds.MaxTime), 0)) { + continue + } + payload.RpcSubmitTxResponse = tss.RPCSendTxResponse{ + TransactionHash: try.Hash, + TransactionXDR: try.XDR, + Status: tss.RPCTXStatus{RPCStatus: entities.TryAgainLaterStatus}, + } + + } + err = p.Router.Route(payload) + if err != nil { + return fmt.Errorf("unable to route payload: %w", err) + } + } + return nil +} + +func (p *poolPopulator) routeFinalTransactions(ctx context.Context, status tss.RPCTXStatus) error { + finalTxns, err := p.Store.GetTransactionsWithStatus(ctx, status) + if err != nil { + return fmt.Errorf("unable to get transactions: %w", err) + } + for _, txn := range finalTxns { + payload := tss.Payload{ + TransactionHash: txn.Hash, + TransactionXDR: txn.XDR, + WebhookURL: txn.WebhookURL, + } + try, err := p.Store.GetLatestTry(ctx, txn.Hash) + if err != nil { + return fmt.Errorf("gretting latest try for transaction: %w", err) + } + payload.RpcGetIngestTxResponse = tss.RPCGetIngestTxResponse{ + Status: status.RPCStatus, + Code: tss.RPCTXCode{TxResultCode: xdr.TransactionResultCode(try.Code)}, + EnvelopeXDR: try.XDR, + ResultXDR: try.ResultXDR, + } + err = p.Router.Route(payload) + if err != nil { + return fmt.Errorf("unable to route payload: %w", err) + } + } + return nil +} + +func (p *poolPopulator) routeNotSentTransactions(ctx context.Context) error { + notSentTxns, err := p.Store.GetTransactionsWithStatus(ctx, tss.RPCTXStatus{OtherStatus: tss.NotSentStatus}) + if err != nil { + return fmt.Errorf("unable to get transactions: %w", err) + } + for _, txn := range notSentTxns { + payload := tss.Payload{ + TransactionHash: txn.Hash, + TransactionXDR: txn.XDR, + WebhookURL: txn.WebhookURL, + } + try, err := p.Store.GetLatestTry(ctx, txn.Hash) + if err != nil { + return fmt.Errorf("gretting latest try for transaction: %w", err) + } + payload.RpcSubmitTxResponse = tss.RPCSendTxResponse{ + TransactionHash: try.Hash, + TransactionXDR: try.XDR, + Status: tss.RPCTXStatus{RPCStatus: entities.RPCStatus(try.Status)}, + Code: tss.RPCTXCode{TxResultCode: xdr.TransactionResultCode(try.Code)}, + ErrorResultXDR: try.ResultXDR, + } + err = p.Router.Route(payload) + if err != nil { + return fmt.Errorf("unable to route payload: %w", err) + } + } + return nil +} diff --git a/internal/tss/services/pool_populator_test.go b/internal/tss/services/pool_populator_test.go new file mode 100644 index 0000000..4304dc9 --- /dev/null +++ b/internal/tss/services/pool_populator_test.go @@ -0,0 +1,225 @@ +package services + +import ( + "context" + "testing" + + "github.com/stellar/go/xdr" + "github.com/stellar/wallet-backend/internal/db" + "github.com/stellar/wallet-backend/internal/db/dbtest" + "github.com/stellar/wallet-backend/internal/entities" + "github.com/stellar/wallet-backend/internal/services" + "github.com/stellar/wallet-backend/internal/tss" + "github.com/stellar/wallet-backend/internal/tss/router" + "github.com/stellar/wallet-backend/internal/tss/store" + "github.com/stellar/wallet-backend/internal/tss/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRouteNewTransactions(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + store, _ := store.NewStore(dbConnectionPool) + mockRouter := router.MockRouter{} + mockRPCSerive := services.RPCServiceMock{} + populator, _ := NewPoolPopulator(&mockRouter, store, &mockRPCSerive) + t.Run("tx_has_no_try", func(t *testing.T) { + _ = store.UpsertTransaction(context.Background(), "localhost:8000/webhook", "hash", "xdr", tss.RPCTXStatus{OtherStatus: tss.NewStatus}) + + expectedPayload := tss.Payload{ + TransactionHash: "hash", + TransactionXDR: "xdr", + WebhookURL: "localhost:8000/webhook", + RpcSubmitTxResponse: tss.RPCSendTxResponse{Status: tss.RPCTXStatus{OtherStatus: tss.NewStatus}}, + } + mockRouter. + On("Route", expectedPayload). + Return(nil). + Once() + + err := populator.routeNewTransactions(context.Background()) + assert.Empty(t, err) + }) + + t.Run("tx_has_try", func(t *testing.T) { + _ = store.UpsertTransaction(context.Background(), "localhost:8000/webhook", "hash", "xdr", tss.RPCTXStatus{OtherStatus: tss.NewStatus}) + _ = store.UpsertTry(context.Background(), "hash", "feebumphash", "feebumpxdr", tss.RPCTXStatus{OtherStatus: tss.NewStatus}, tss.RPCTXCode{OtherCodes: tss.NewCode}, "ABCD") + + rpcGetTransacrionResp := entities.RPCGetTransactionResult{ + Status: entities.ErrorStatus, + EnvelopeXDR: "envelopexdr", + ResultXDR: "AAAAAAARFy8AAAAAAAAAAQAAAAAAAAAYAAAAAMu8SHUN67hTUJOz3q+IrH9M/4dCVXaljeK6x1Ss20YWAAAAAA==", + CreatedAt: "1234", + } + + mockRPCSerive. + On("GetTransaction", "feebumphash"). + Return(rpcGetTransacrionResp, nil). + Once() + + getIngestTxResp, _ := tss.ParseToRPCGetIngestTxResponse(rpcGetTransacrionResp, nil) + expectedPayload := tss.Payload{ + TransactionHash: "hash", + TransactionXDR: "xdr", + WebhookURL: "localhost:8000/webhook", + RpcGetIngestTxResponse: getIngestTxResp, + } + mockRouter. + On("Route", expectedPayload). + Return(nil). + Once() + + err := populator.routeNewTransactions(context.Background()) + assert.Empty(t, err) + }) + + t.Run("tx_not_found_timebounds_not_exceeded", func(t *testing.T) { + feeBumpTx := utils.BuildTestFeeBumpTransaction() + txXDRStr, _ := feeBumpTx.Base64() + _ = store.UpsertTransaction(context.Background(), "localhost:8000/webhook", "hash", "xdr", tss.RPCTXStatus{OtherStatus: tss.NewStatus}) + _ = store.UpsertTry(context.Background(), "hash", "feebumphash", txXDRStr, tss.RPCTXStatus{OtherStatus: tss.NewStatus}, tss.RPCTXCode{OtherCodes: tss.NewCode}, "ABCD") + + rpcGetTransacrionResp := entities.RPCGetTransactionResult{ + Status: entities.NotFoundStatus, + EnvelopeXDR: "envelopexdr", + } + + mockRPCSerive. + On("GetTransaction", "feebumphash"). + Return(rpcGetTransacrionResp, nil). + Once() + + err := populator.routeNewTransactions(context.Background()) + assert.Empty(t, err) + }) +} + +func TestRouteErrorTransactions(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + store, _ := store.NewStore(dbConnectionPool) + mockRouter := router.MockRouter{} + mockRPCSerive := services.RPCServiceMock{} + populator, _ := NewPoolPopulator(&mockRouter, store, &mockRPCSerive) + + t.Run("tx_has_final_error_code", func(t *testing.T) { + _ = store.UpsertTransaction(context.Background(), "localhost:8000/webhook", "hash", "xdr", tss.RPCTXStatus{RPCStatus: entities.ErrorStatus}) + _ = store.UpsertTry(context.Background(), "hash", "feebumphash", "feebumpxdr", tss.RPCTXStatus{RPCStatus: entities.ErrorStatus}, tss.RPCTXCode{TxResultCode: xdr.TransactionResultCodeTxInsufficientBalance}, "ABCD") + + expectedPayload := tss.Payload{ + TransactionHash: "hash", + TransactionXDR: "xdr", + WebhookURL: "localhost:8000/webhook", + RpcSubmitTxResponse: tss.RPCSendTxResponse{ + TransactionHash: "feebumphash", + TransactionXDR: "feebumpxdr", + Status: tss.RPCTXStatus{RPCStatus: entities.ErrorStatus}, + Code: tss.RPCTXCode{TxResultCode: xdr.TransactionResultCodeTxInsufficientBalance}, + ErrorResultXDR: "ABCD", + }, + } + + mockRouter. + On("Route", expectedPayload). + Return(nil). + Once() + + err := populator.routeErrorTransactions(context.Background()) + assert.Empty(t, err) + }) + t.Run("tx_timebounds_not_exceeded", func(t *testing.T) { + feeBumpTx := utils.BuildTestFeeBumpTransaction() + txXDRStr, _ := feeBumpTx.Base64() + _ = store.UpsertTransaction(context.Background(), "localhost:8000/webhook", "hash", "xdr", tss.RPCTXStatus{RPCStatus: entities.ErrorStatus}) + _ = store.UpsertTry(context.Background(), "hash", "feebumphash", txXDRStr, tss.RPCTXStatus{RPCStatus: entities.ErrorStatus}, tss.RPCTXCode{OtherCodes: tss.RPCFailCode}, "ABCD") + + err := populator.routeErrorTransactions(context.Background()) + assert.Empty(t, err) + }) +} + +func TestRouteFinalTransactions(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + store, _ := store.NewStore(dbConnectionPool) + mockRouter := router.MockRouter{} + mockRPCSerive := services.RPCServiceMock{} + populator, _ := NewPoolPopulator(&mockRouter, store, &mockRPCSerive) + + t.Run("route_successful_tx", func(t *testing.T) { + _ = store.UpsertTransaction(context.Background(), "localhost:8000/webhook", "hash", "xdr", tss.RPCTXStatus{RPCStatus: entities.SuccessStatus}) + _ = store.UpsertTry(context.Background(), "hash", "feebumphash", "feebumpxdr", tss.RPCTXStatus{RPCStatus: entities.SuccessStatus}, tss.RPCTXCode{TxResultCode: xdr.TransactionResultCodeTxSuccess}, "ABCD") + + expectedPayload := tss.Payload{ + TransactionHash: "hash", + TransactionXDR: "xdr", + WebhookURL: "localhost:8000/webhook", + RpcGetIngestTxResponse: tss.RPCGetIngestTxResponse{ + Status: entities.SuccessStatus, + Code: tss.RPCTXCode{TxResultCode: xdr.TransactionResultCodeTxSuccess}, + EnvelopeXDR: "feebumpxdr", + ResultXDR: "ABCD", + }, + } + + mockRouter. + On("Route", expectedPayload). + Return(nil). + Once() + + err = populator.routeFinalTransactions(context.Background(), tss.RPCTXStatus{RPCStatus: entities.SuccessStatus}) + assert.Empty(t, err) + }) +} + +func TestNotSentTransactions(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + store, _ := store.NewStore(dbConnectionPool) + mockRouter := router.MockRouter{} + mockRPCSerive := services.RPCServiceMock{} + populator, _ := NewPoolPopulator(&mockRouter, store, &mockRPCSerive) + + t.Run("routes_not_sent_txns", func(t *testing.T) { + _ = store.UpsertTransaction(context.Background(), "localhost:8000/webhook", "hash", "xdr", tss.RPCTXStatus{OtherStatus: tss.NotSentStatus}) + _ = store.UpsertTry(context.Background(), "hash", "feebumphash", "feebumpxdr", tss.RPCTXStatus{RPCStatus: entities.SuccessStatus}, tss.RPCTXCode{TxResultCode: xdr.TransactionResultCodeTxSuccess}, "ABCD") + + expectedPayload := tss.Payload{ + TransactionHash: "hash", + TransactionXDR: "xdr", + WebhookURL: "localhost:8000/webhook", + RpcSubmitTxResponse: tss.RPCSendTxResponse{ + TransactionHash: "feebumphash", + TransactionXDR: "feebumpxdr", + Status: tss.RPCTXStatus{RPCStatus: entities.SuccessStatus}, + Code: tss.RPCTXCode{TxResultCode: xdr.TransactionResultCodeTxSuccess}, + ErrorResultXDR: "ABCD", + }, + } + + mockRouter. + On("Route", expectedPayload). + Return(nil). + Once() + + err = populator.routeNotSentTransactions(context.Background()) + assert.Empty(t, err) + }) +} diff --git a/internal/tss/services/transaction_manager.go b/internal/tss/services/transaction_manager.go index 5fa3c63..0a88573 100644 --- a/internal/tss/services/transaction_manager.go +++ b/internal/tss/services/transaction_manager.go @@ -48,14 +48,14 @@ func (t *transactionManager) BuildAndSubmitTransaction(ctx context.Context, chan return tss.RPCSendTxResponse{}, fmt.Errorf("%s: Unable to base64 fee bump transaction: %w", channelName, err) } - err = t.Store.UpsertTry(ctx, payload.TransactionHash, feeBumpTxHash, feeBumpTxXDR, tss.RPCTXCode{OtherCodes: tss.NewCode}) + err = t.Store.UpsertTry(ctx, payload.TransactionHash, feeBumpTxHash, feeBumpTxXDR, tss.RPCTXStatus{OtherStatus: tss.NewStatus}, tss.RPCTXCode{OtherCodes: tss.NewCode}, "") if err != nil { return tss.RPCSendTxResponse{}, fmt.Errorf("%s: Unable to upsert try in tries table: %w", channelName, err) } rpcResp, rpcErr := t.RPCService.SendTransaction(feeBumpTxXDR) rpcSendResp, parseErr := tss.ParseToRPCSendTxResponse(feeBumpTxHash, rpcResp, rpcErr) - err = t.Store.UpsertTry(ctx, payload.TransactionHash, feeBumpTxHash, feeBumpTxXDR, rpcSendResp.Code) + err = t.Store.UpsertTry(ctx, payload.TransactionHash, feeBumpTxHash, feeBumpTxXDR, rpcSendResp.Status, rpcSendResp.Code, rpcResp.ErrorResultXDR) if err != nil { return tss.RPCSendTxResponse{}, fmt.Errorf("%s: Unable to upsert try in tries table: %s", channelName, err.Error()) } diff --git a/internal/tss/services/transaction_manager_test.go b/internal/tss/services/transaction_manager_test.go index 9a5742a..0071200 100644 --- a/internal/tss/services/transaction_manager_test.go +++ b/internal/tss/services/transaction_manager_test.go @@ -48,14 +48,13 @@ func TestBuildAndSubmitTransaction(t *testing.T) { Return(nil, errors.New("signing failed")). Once() - _, err := txManager.BuildAndSubmitTransaction(context.Background(), "channel", payload) + txSendResp, err := txManager.BuildAndSubmitTransaction(context.Background(), "channel", payload) + assert.Equal(t, tss.RPCSendTxResponse{}, txSendResp) assert.Equal(t, "channel: Unable to sign/build transaction: signing failed", err.Error()) - var status string - err = dbConnectionPool.GetContext(context.Background(), &status, `SELECT current_status FROM tss_transactions WHERE transaction_hash = $1`, payload.TransactionHash) - require.NoError(t, err) - assert.Equal(t, string(tss.NewStatus), status) + tx, _ := store.GetTransaction(context.Background(), payload.TransactionHash) + assert.Equal(t, string(tss.NewStatus), tx.Status) }) t.Run("rpc_call_fail", func(t *testing.T) { @@ -74,19 +73,18 @@ func TestBuildAndSubmitTransaction(t *testing.T) { Return(sendResp, errors.New("RPC down")). Once() - _, err := txManager.BuildAndSubmitTransaction(context.Background(), "channel", payload) + txSendResp, err := txManager.BuildAndSubmitTransaction(context.Background(), "channel", payload) + assert.Equal(t, entities.ErrorStatus, txSendResp.Status.RPCStatus) + assert.Equal(t, tss.RPCFailCode, txSendResp.Code.OtherCodes) assert.Equal(t, "channel: RPC fail: RPC fail: RPC down", err.Error()) - var txStatus string - err = dbConnectionPool.GetContext(context.Background(), &txStatus, `SELECT current_status FROM tss_transactions WHERE transaction_hash = $1`, payload.TransactionHash) - require.NoError(t, err) - assert.Equal(t, txStatus, string(tss.NewStatus)) + tx, _ := store.GetTransaction(context.Background(), payload.TransactionHash) + assert.Equal(t, string(tss.NewStatus), tx.Status) - var tryStatus int - err = dbConnectionPool.GetContext(context.Background(), &tryStatus, `SELECT status FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1`, feeBumpTxHash) - require.NoError(t, err) - assert.Equal(t, int(tss.RPCFailCode), tryStatus) + try, _ := store.GetTry(context.Background(), feeBumpTxHash) + assert.Equal(t, string(entities.ErrorStatus), try.Status) + assert.Equal(t, int32(tss.RPCFailCode), try.Code) }) t.Run("rpc_resp_empty_errorresult_xdr", func(t *testing.T) { @@ -108,22 +106,20 @@ func TestBuildAndSubmitTransaction(t *testing.T) { Return(sendResp, nil). Once() - resp, err := txManager.BuildAndSubmitTransaction(context.Background(), "channel", payload) + txSendResp, err := txManager.BuildAndSubmitTransaction(context.Background(), "channel", payload) - assert.Equal(t, entities.PendingStatus, resp.Status.RPCStatus) - assert.Equal(t, tss.EmptyCode, resp.Code.OtherCodes) + assert.Equal(t, entities.PendingStatus, txSendResp.Status.RPCStatus) + assert.Equal(t, tss.EmptyCode, txSendResp.Code.OtherCodes) assert.Empty(t, err) - var txStatus string - err = dbConnectionPool.GetContext(context.Background(), &txStatus, `SELECT current_status FROM tss_transactions WHERE transaction_hash = $1`, payload.TransactionHash) - require.NoError(t, err) - assert.Equal(t, txStatus, string(entities.PendingStatus)) + tx, _ := store.GetTransaction(context.Background(), payload.TransactionHash) + assert.Equal(t, string(entities.PendingStatus), tx.Status) - var tryStatus int - err = dbConnectionPool.GetContext(context.Background(), &tryStatus, `SELECT status FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1`, feeBumpTxHash) - require.NoError(t, err) - assert.Equal(t, int(tss.EmptyCode), tryStatus) + try, _ := store.GetTry(context.Background(), feeBumpTxHash) + assert.Equal(t, string(entities.PendingStatus), try.Status) + assert.Equal(t, int32(tss.EmptyCode), try.Code) }) + t.Run("rpc_resp_has_unparsable_errorresult_xdr", func(t *testing.T) { _ = store.UpsertTransaction(context.Background(), payload.WebhookURL, payload.TransactionHash, payload.TransactionXDR, tss.RPCTXStatus{OtherStatus: tss.NewStatus}) sendResp := entities.RPCSendTransactionResult{ @@ -143,20 +139,20 @@ func TestBuildAndSubmitTransaction(t *testing.T) { Return(sendResp, nil). Once() - _, err := txManager.BuildAndSubmitTransaction(context.Background(), "channel", payload) + txSendResp, err := txManager.BuildAndSubmitTransaction(context.Background(), "channel", payload) + assert.Equal(t, entities.ErrorStatus, txSendResp.Status.RPCStatus) + assert.Equal(t, tss.UnmarshalBinaryCode, txSendResp.Code.OtherCodes) assert.Equal(t, "channel: RPC fail: parse error result xdr string: unable to parse: unable to unmarshal errorResultXDR: ABCD", err.Error()) - var txStatus string - err = dbConnectionPool.GetContext(context.Background(), &txStatus, `SELECT current_status FROM tss_transactions WHERE transaction_hash = $1`, payload.TransactionHash) - require.NoError(t, err) - assert.Equal(t, txStatus, string(tss.NewStatus)) + tx, _ := store.GetTransaction(context.Background(), payload.TransactionHash) + assert.Equal(t, string(tss.NewStatus), tx.Status) - var tryStatus int - err = dbConnectionPool.GetContext(context.Background(), &tryStatus, `SELECT status FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1`, feeBumpTxHash) - require.NoError(t, err) - assert.Equal(t, int(tss.UnmarshalBinaryCode), tryStatus) + try, _ := store.GetTry(context.Background(), feeBumpTxHash) + assert.Equal(t, string(entities.ErrorStatus), try.Status) + assert.Equal(t, int32(tss.UnmarshalBinaryCode), try.Code) }) + t.Run("rpc_returns_response", func(t *testing.T) { _ = store.UpsertTransaction(context.Background(), payload.WebhookURL, payload.TransactionHash, payload.TransactionXDR, tss.RPCTXStatus{OtherStatus: tss.NewStatus}) sendResp := entities.RPCSendTransactionResult{ @@ -176,20 +172,17 @@ func TestBuildAndSubmitTransaction(t *testing.T) { Return(sendResp, nil). Once() - resp, err := txManager.BuildAndSubmitTransaction(context.Background(), "channel", payload) + txSendResp, err := txManager.BuildAndSubmitTransaction(context.Background(), "channel", payload) - assert.Equal(t, entities.ErrorStatus, resp.Status.RPCStatus) - assert.Equal(t, xdr.TransactionResultCodeTxTooLate, resp.Code.TxResultCode) + assert.Equal(t, entities.ErrorStatus, txSendResp.Status.RPCStatus) + assert.Equal(t, xdr.TransactionResultCodeTxTooLate, txSendResp.Code.TxResultCode) assert.Empty(t, err) - var txStatus string - err = dbConnectionPool.GetContext(context.Background(), &txStatus, `SELECT current_status FROM tss_transactions WHERE transaction_hash = $1`, payload.TransactionHash) - require.NoError(t, err) - assert.Equal(t, string(entities.ErrorStatus), txStatus) + tx, _ := store.GetTransaction(context.Background(), payload.TransactionHash) + assert.Equal(t, string(entities.ErrorStatus), tx.Status) - var tryStatus int - err = dbConnectionPool.GetContext(context.Background(), &tryStatus, `SELECT status FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1`, feeBumpTxHash) - require.NoError(t, err) - assert.Equal(t, int(xdr.TransactionResultCodeTxTooLate), tryStatus) + try, _ := store.GetTry(context.Background(), feeBumpTxHash) + assert.Equal(t, string(entities.ErrorStatus), try.Status) + assert.Equal(t, int32(xdr.TransactionResultCodeTxTooLate), try.Code) }) } diff --git a/internal/tss/services/transaction_service.go b/internal/tss/services/transaction_service.go index b65bae1..9ecf7d9 100644 --- a/internal/tss/services/transaction_service.go +++ b/internal/tss/services/transaction_service.go @@ -90,7 +90,7 @@ func (t *transactionService) SignAndBuildNewFeeBumpTransaction(ctx context.Conte Operations: originalTx.Operations(), BaseFee: int64(t.BaseFee), Preconditions: txnbuild.Preconditions{ - TimeBounds: txnbuild.NewTimeout(300), + TimeBounds: txnbuild.NewTimeout(120), }, IncrementSequenceNum: true, }, diff --git a/internal/tss/store/store.go b/internal/tss/store/store.go index 8fa0e51..2ef0661 100644 --- a/internal/tss/store/store.go +++ b/internal/tss/store/store.go @@ -14,9 +14,11 @@ import ( type Store interface { GetTransaction(ctx context.Context, hash string) (Transaction, error) UpsertTransaction(ctx context.Context, WebhookURL string, txHash string, txXDR string, status tss.RPCTXStatus) error - UpsertTry(ctx context.Context, transactionHash string, feeBumpTxHash string, feeBumpTxXDR string, status tss.RPCTXCode) error + UpsertTry(ctx context.Context, transactionHash string, feeBumpTxHash string, feeBumpTxXDR string, status tss.RPCTXStatus, code tss.RPCTXCode, resultXDR string) error GetTry(ctx context.Context, hash string) (Try, error) GetTryByXDR(ctx context.Context, xdr string) (Try, error) + GetTransactionsWithStatus(ctx context.Context, status tss.RPCTXStatus) ([]Transaction, error) + GetLatestTry(ctx context.Context, txHash string) (Try, error) } var _ Store = (*store)(nil) @@ -39,7 +41,9 @@ type Try struct { Hash string `db:"try_transaction_hash"` OrigTxHash string `db:"original_transaction_hash"` XDR string `db:"try_transaction_xdr"` - Code int32 `db:"status"` + Status string `db:"status"` + Code int32 `db:"code"` + ResultXDR string `db:"result_xdr"` CreatedAt time.Time `db:"updated_at"` } @@ -72,20 +76,22 @@ func (s *store) UpsertTransaction(ctx context.Context, webhookURL string, txHash return nil } -func (s *store) UpsertTry(ctx context.Context, txHash string, feeBumpTxHash string, feeBumpTxXDR string, status tss.RPCTXCode) error { +func (s *store) UpsertTry(ctx context.Context, txHash string, feeBumpTxHash string, feeBumpTxXDR string, status tss.RPCTXStatus, code tss.RPCTXCode, resultXDR string) error { const q = ` INSERT INTO - tss_transaction_submission_tries (original_transaction_hash, try_transaction_hash, try_transaction_xdr, status) + tss_transaction_submission_tries (original_transaction_hash, try_transaction_hash, try_transaction_xdr, status, code, result_xdr) VALUES - ($1, $2, $3, $4) + ($1, $2, $3, $4, $5, $6) ON CONFLICT (try_transaction_hash) DO UPDATE SET original_transaction_hash = EXCLUDED.original_transaction_hash, try_transaction_xdr = EXCLUDED.try_transaction_xdr, status = EXCLUDED.status, + code = EXCLUDED.code, + result_xdr = EXCLUDED.result_xdr, updated_at = NOW(); ` - _, err := s.DB.ExecContext(ctx, q, txHash, feeBumpTxHash, feeBumpTxXDR, status.Code()) + _, err := s.DB.ExecContext(ctx, q, txHash, feeBumpTxHash, feeBumpTxXDR, status.Status(), code.Code(), resultXDR) if err != nil { return fmt.Errorf("inserting/updating tss try: %w", err) } @@ -93,12 +99,11 @@ func (s *store) UpsertTry(ctx context.Context, txHash string, feeBumpTxHash stri } func (s *store) GetTransaction(ctx context.Context, hash string) (Transaction, error) { - q := `SELECT * from tss_transactions where transaction_hash = $1` + q := `SELECT * FROM tss_transactions WHERE transaction_hash = $1` var transaction Transaction err := s.DB.GetContext(ctx, &transaction, q, hash) if err != nil { if errors.Is(err, sql.ErrNoRows) { - fmt.Println("empty") return Transaction{}, nil } return Transaction{}, fmt.Errorf("getting transaction: %w", err) @@ -107,7 +112,7 @@ func (s *store) GetTransaction(ctx context.Context, hash string) (Transaction, e } func (s *store) GetTry(ctx context.Context, hash string) (Try, error) { - q := `SELECT * from tss_transaction_submission_tries where try_transaction_hash = $1` + q := `SELECT * FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1` var try Try err := s.DB.GetContext(ctx, &try, q, hash) if err != nil { @@ -120,7 +125,7 @@ func (s *store) GetTry(ctx context.Context, hash string) (Try, error) { } func (s *store) GetTryByXDR(ctx context.Context, xdr string) (Try, error) { - q := `SELECT * from tss_transaction_submission_tries where try_transaction_xdr = $1` + q := `SELECT * FROM tss_transaction_submission_tries WHERE try_transaction_xdr = $1` var try Try err := s.DB.GetContext(ctx, &try, q, xdr) if err != nil { @@ -131,3 +136,29 @@ func (s *store) GetTryByXDR(ctx context.Context, xdr string) (Try, error) { } return try, nil } + +func (s *store) GetTransactionsWithStatus(ctx context.Context, status tss.RPCTXStatus) ([]Transaction, error) { + q := `SELECT * FROM tss_transactions WHERE current_status = $1` + var transactions []Transaction + err := s.DB.SelectContext(ctx, &transactions, q, status.Status()) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return []Transaction{}, nil + } + return []Transaction{}, fmt.Errorf("getting transactions: %w", err) + } + return transactions, nil +} + +func (s *store) GetLatestTry(ctx context.Context, txHash string) (Try, error) { + q := `SELECT * FROM tss_transaction_submission_tries WHERE original_transaction_hash = $1 ORDER BY updated_at DESC LIMIT 1` + var try Try + err := s.DB.GetContext(ctx, &try, q, txHash) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Try{}, nil + } + return Try{}, fmt.Errorf("getting latest trt: %w", err) + } + return try, nil +} diff --git a/internal/tss/store/store_test.go b/internal/tss/store/store_test.go index f5b69c9..cb0a6d5 100644 --- a/internal/tss/store/store_test.go +++ b/internal/tss/store/store_test.go @@ -23,20 +23,18 @@ func TestUpsertTransaction(t *testing.T) { t.Run("insert", func(t *testing.T) { _ = store.UpsertTransaction(context.Background(), "www.stellar.org", "hash", "xdr", tss.RPCTXStatus{OtherStatus: tss.NewStatus}) - var status string - err = dbConnectionPool.GetContext(context.Background(), &status, `SELECT current_status FROM tss_transactions WHERE transaction_hash = $1`, "hash") - require.NoError(t, err) - assert.Equal(t, status, string(tss.NewStatus)) + tx, _ := store.GetTransaction(context.Background(), "hash") + assert.Equal(t, "xdr", tx.XDR) + assert.Equal(t, string(tss.NewStatus), tx.Status) }) t.Run("update", func(t *testing.T) { _ = store.UpsertTransaction(context.Background(), "www.stellar.org", "hash", "xdr", tss.RPCTXStatus{OtherStatus: tss.NewStatus}) _ = store.UpsertTransaction(context.Background(), "www.stellar.org", "hash", "xdr", tss.RPCTXStatus{RPCStatus: entities.SuccessStatus}) - var status string - err = dbConnectionPool.GetContext(context.Background(), &status, `SELECT current_status FROM tss_transactions WHERE transaction_hash = $1`, "hash") - require.NoError(t, err) - assert.Equal(t, status, string(entities.SuccessStatus)) + tx, _ := store.GetTransaction(context.Background(), "hash") + assert.Equal(t, "xdr", tx.XDR) + assert.Equal(t, string(entities.SuccessStatus), tx.Status) var numRows int err = dbConnectionPool.GetContext(context.Background(), &numRows, `SELECT count(*) FROM tss_transactions WHERE transaction_hash = $1`, "hash") @@ -54,24 +52,32 @@ func TestUpsertTry(t *testing.T) { defer dbConnectionPool.Close() store, _ := NewStore(dbConnectionPool) t.Run("insert", func(t *testing.T) { + status := tss.RPCTXStatus{OtherStatus: tss.NewStatus} code := tss.RPCTXCode{OtherCodes: tss.NewCode} - _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", code) - - var status int - err = dbConnectionPool.GetContext(context.Background(), &status, `SELECT status FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1`, "feebumptxhash") - require.NoError(t, err) - assert.Equal(t, status, int(tss.NewCode)) + resultXDR := "ABCD//" + err = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", status, code, resultXDR) + + try, _ := store.GetTry(context.Background(), "feebumptxhash") + assert.Equal(t, "hash", try.OrigTxHash) + assert.Equal(t, status.Status(), try.Status) + assert.Equal(t, code.Code(), int(try.Code)) + assert.Equal(t, resultXDR, try.ResultXDR) + assert.Equal(t, status.Status(), try.Status) }) t.Run("update_other_code", func(t *testing.T) { + status := tss.RPCTXStatus{OtherStatus: tss.NewStatus} code := tss.RPCTXCode{OtherCodes: tss.NewCode} - _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", code) + resultXDR := "ABCD//" + _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", status, code, resultXDR) code = tss.RPCTXCode{OtherCodes: tss.RPCFailCode} - _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", code) - var status int - err = dbConnectionPool.GetContext(context.Background(), &status, `SELECT status FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1`, "feebumptxhash") - require.NoError(t, err) - assert.Equal(t, status, int(tss.RPCFailCode)) + _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", status, code, resultXDR) + + try, _ := store.GetTry(context.Background(), "feebumptxhash") + assert.Equal(t, "hash", try.OrigTxHash) + assert.Equal(t, status.Status(), try.Status) + assert.Equal(t, code.Code(), int(try.Code)) + assert.Equal(t, resultXDR, try.ResultXDR) var numRows int err = dbConnectionPool.GetContext(context.Background(), &numRows, `SELECT count(*) FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1`, "feebumptxhash") @@ -80,14 +86,18 @@ func TestUpsertTry(t *testing.T) { }) t.Run("update_tx_code", func(t *testing.T) { - code := tss.RPCTXCode{OtherCodes: tss.NewCode} - _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", code) + status := tss.RPCTXStatus{RPCStatus: entities.ErrorStatus} + code := tss.RPCTXCode{TxResultCode: xdr.TransactionResultCodeTxInsufficientFee} + resultXDR := "ABCD//" + _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", status, code, resultXDR) code = tss.RPCTXCode{TxResultCode: xdr.TransactionResultCodeTxSuccess} - _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", code) - var status int - err = dbConnectionPool.GetContext(context.Background(), &status, `SELECT status FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1`, "feebumptxhash") - require.NoError(t, err) - assert.Equal(t, status, int(xdr.TransactionResultCodeTxSuccess)) + _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", status, code, resultXDR) + + try, _ := store.GetTry(context.Background(), "feebumptxhash") + assert.Equal(t, "hash", try.OrigTxHash) + assert.Equal(t, status.Status(), try.Status) + assert.Equal(t, code.Code(), int(try.Code)) + assert.Equal(t, resultXDR, try.ResultXDR) var numRows int err = dbConnectionPool.GetContext(context.Background(), &numRows, `SELECT count(*) FROM tss_transaction_submission_tries WHERE try_transaction_hash = $1`, "feebumptxhash") @@ -107,6 +117,7 @@ func TestGetTransaction(t *testing.T) { status := tss.RPCTXStatus{OtherStatus: tss.NewStatus} _ = store.UpsertTransaction(context.Background(), "localhost:8000", "hash", "xdr", status) tx, err := store.GetTransaction(context.Background(), "hash") + assert.Equal(t, "xdr", tx.XDR) assert.Empty(t, err) @@ -126,10 +137,17 @@ func TestGetTry(t *testing.T) { defer dbConnectionPool.Close() store, _ := NewStore(dbConnectionPool) t.Run("try_exists", func(t *testing.T) { + status := tss.RPCTXStatus{OtherStatus: tss.NewStatus} code := tss.RPCTXCode{OtherCodes: tss.NewCode} - _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", code) + resultXDR := "ABCD//" + _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", status, code, resultXDR) + try, err := store.GetTry(context.Background(), "feebumptxhash") - assert.Equal(t, try.OrigTxHash, "hash") + + assert.Equal(t, "hash", try.OrigTxHash) + assert.Equal(t, status.Status(), try.Status) + assert.Equal(t, code.Code(), int(try.Code)) + assert.Equal(t, resultXDR, try.ResultXDR) assert.Empty(t, err) }) @@ -148,10 +166,17 @@ func TestGetTryByXDR(t *testing.T) { defer dbConnectionPool.Close() store, _ := NewStore(dbConnectionPool) t.Run("try_exists", func(t *testing.T) { + status := tss.RPCTXStatus{OtherStatus: tss.NewStatus} code := tss.RPCTXCode{OtherCodes: tss.NewCode} - _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", code) + resultXDR := "ABCD//" + _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", status, code, resultXDR) + try, err := store.GetTryByXDR(context.Background(), "feebumptxxdr") - assert.Equal(t, try.OrigTxHash, "hash") + + assert.Equal(t, "hash", try.OrigTxHash) + assert.Equal(t, status.Status(), try.Status) + assert.Equal(t, code.Code(), int(try.Code)) + assert.Equal(t, resultXDR, try.ResultXDR) assert.Empty(t, err) }) @@ -161,3 +186,62 @@ func TestGetTryByXDR(t *testing.T) { assert.Empty(t, err) }) } + +func TestGetTransactionsWithStatus(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + store, _ := NewStore(dbConnectionPool) + + t.Run("transactions_do_not_exist", func(t *testing.T) { + status := tss.RPCTXStatus{OtherStatus: tss.NewStatus} + txns, err := store.GetTransactionsWithStatus(context.Background(), status) + assert.Equal(t, 0, len(txns)) + assert.Empty(t, err) + }) + + t.Run("transactions_exist", func(t *testing.T) { + status := tss.RPCTXStatus{OtherStatus: tss.NewStatus} + _ = store.UpsertTransaction(context.Background(), "localhost:8000", "hash1", "xdr1", status) + _ = store.UpsertTransaction(context.Background(), "localhost:8000", "hash2", "xdr2", status) + + txns, err := store.GetTransactionsWithStatus(context.Background(), status) + + assert.Equal(t, 2, len(txns)) + assert.Equal(t, "hash1", txns[0].Hash) + assert.Equal(t, "hash2", txns[1].Hash) + assert.Empty(t, err) + }) +} + +func TestGetLatestTry(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + store, _ := NewStore(dbConnectionPool) + + t.Run("tries_do_not_exist", func(t *testing.T) { + try, err := store.GetLatestTry(context.Background(), "hash") + + assert.Equal(t, Try{}, try) + assert.Empty(t, err) + }) + + t.Run("tries_exist", func(t *testing.T) { + status := tss.RPCTXStatus{OtherStatus: tss.NewStatus} + code := tss.RPCTXCode{OtherCodes: tss.NewCode} + resultXDR := "ABCD//" + _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash1", "feebumptxxdr1", status, code, resultXDR) + _ = store.UpsertTry(context.Background(), "hash", "feebumptxhash2", "feebumptxxdr2", status, code, resultXDR) + + try, err := store.GetLatestTry(context.Background(), "hash") + + assert.Equal(t, "feebumptxhash2", try.Hash) + assert.Empty(t, err) + }) + +} diff --git a/internal/tss/types.go b/internal/tss/types.go index 7db15ce..4f2088e 100644 --- a/internal/tss/types.go +++ b/internal/tss/types.go @@ -42,7 +42,6 @@ func ParseToRPCGetIngestTxResponse(result entities.RPCGetTransactionResult, err } } getIngestTxResponse.Code, err = TransactionResultXDRToCode(result.ResultXDR) - if err != nil { return getIngestTxResponse, fmt.Errorf("parse error result xdr string: %w", err) } @@ -56,8 +55,10 @@ type OtherCodes int32 type TransactionResultCode int32 const ( - NewStatus OtherStatus = "NEW" - NoStatus OtherStatus = "" + NewStatus OtherStatus = "NEW" + NoStatus OtherStatus = "" + SentStatus OtherStatus = "SENT" + NotSentStatus OtherStatus = "NOT_SENT" ) type RPCTXStatus struct { @@ -122,7 +123,8 @@ type RPCSendTxResponse struct { Status RPCTXStatus // The (optional) error code that is derived by deserialzing the errorResultXdr string in the sendTransaction response // list of possible errror codes: https://developers.stellar.org/docs/data/horizon/api-reference/errors/result-codes/transactions - Code RPCTXCode + Code RPCTXCode + ErrorResultXDR string } func ParseToRPCSendTxResponse(transactionXDR string, result entities.RPCSendTransactionResult, err error) (RPCSendTxResponse, error) { @@ -135,6 +137,7 @@ func ParseToRPCSendTxResponse(transactionXDR string, result entities.RPCSendTran } sendTxResponse.Status.RPCStatus = result.Status sendTxResponse.TransactionHash = result.Hash + sendTxResponse.ErrorResultXDR = result.ErrorResultXDR sendTxResponse.Code, err = TransactionResultXDRToCode(result.ErrorResultXDR) if err != nil { return sendTxResponse, fmt.Errorf("parse error result xdr string: %w", err) diff --git a/internal/tss/types_test.go b/internal/tss/types_test.go index ed489ff..ad5f0f5 100644 --- a/internal/tss/types_test.go +++ b/internal/tss/types_test.go @@ -26,6 +26,7 @@ func TestParseToRPCSendTxResponse(t *testing.T) { }, nil) assert.Equal(t, entities.PendingStatus, resp.Status.RPCStatus) + assert.Equal(t, "", resp.ErrorResultXDR) assert.Equal(t, EmptyCode, resp.Code.OtherCodes) assert.Empty(t, err) }) @@ -44,6 +45,7 @@ func TestParseToRPCSendTxResponse(t *testing.T) { ErrorResultXDR: "AAAAAAAAAMj////9AAAAAA==", }, nil) + assert.Equal(t, "AAAAAAAAAMj////9AAAAAA==", resp.ErrorResultXDR) assert.Equal(t, xdr.TransactionResultCodeTxTooLate, resp.Code.TxResultCode) assert.Empty(t, err) }) diff --git a/internal/tss/utils/helpers.go b/internal/tss/utils/helpers.go index 407d46b..dc50d16 100644 --- a/internal/tss/utils/helpers.go +++ b/internal/tss/utils/helpers.go @@ -13,13 +13,13 @@ func PayloadTOTSSResponse(payload tss.Payload) tss.TSSResponse { response.Status = string(payload.RpcSubmitTxResponse.Status.Status()) response.TransactionResultCode = payload.RpcSubmitTxResponse.Code.TxResultCode.String() response.EnvelopeXDR = payload.RpcSubmitTxResponse.TransactionXDR + response.ResultXDR = payload.RpcSubmitTxResponse.ErrorResultXDR } else if payload.RpcGetIngestTxResponse.Status != "" { response.Status = string(payload.RpcGetIngestTxResponse.Status) response.TransactionResultCode = payload.RpcGetIngestTxResponse.Code.TxResultCode.String() response.EnvelopeXDR = payload.RpcGetIngestTxResponse.EnvelopeXDR response.ResultXDR = payload.RpcGetIngestTxResponse.ResultXDR response.CreatedAt = payload.RpcGetIngestTxResponse.CreatedAt - } return response }