Skip to content

Commit

Permalink
all: TSS Error handler service (#43)
Browse files Browse the repository at this point in the history
* tables and interfaces for TSS

* TSS tables and the channel interface

* remove empty lines

* update

* adding semicolons

adding semicolons

* moving all migrations to one file

* make hash primary key instead of xdr

* missing ,

* remove the index on try_transaction_xdr and add column/index on try_transaction_hash

...also remove outgoing/incoming_status and have one status column

* Squashed commit of the following:

commit fb807aa
Author: gouthamp-stellar <[email protected]>
Date:   Tue Sep 3 10:28:49 2024 -0700

    remove the index on try_transaction_xdr and add column/index on try_transaction_hash

    ...also remove outgoing/incoming_status and have one status column

commit 6fc0dc2
Author: gouthamp-stellar <[email protected]>
Date:   Tue Sep 3 08:58:46 2024 -0700

    missing ,

commit a9cf4e3
Author: gouthamp-stellar <[email protected]>
Date:   Tue Sep 3 01:55:23 2024 -0700

    make hash primary key instead of xdr

commit c0f9d32
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 15:18:27 2024 -0700

    moving all migrations to one file

commit 2de9898
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 15:16:53 2024 -0700

    adding semicolons

    adding semicolons

commit 373c71a
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 15:12:24 2024 -0700

    update

commit 3f9f9f0
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 15:12:00 2024 -0700

    remove empty lines

commit 9920f48
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 15:06:40 2024 -0700

    TSS tables and the channel interface

commit a58d519
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 10:55:22 2024 -0700

    tables and interfaces for TSS

* commit #2

* changing from RpcIngestTxResponse ->  RpcGetIngestTxResponse

* latest changes

* add tests for ValidateOptions

* changes based on comments

* latest changes based on changes to interface

* string -> RPCTXStatus

* adding a transaction_builder.go which takes a list of operation xdrs and builds a transaction out of it

* moving transaction_service to the utils dir

* upper case Channel methods

* latest changes

* p.txService.NetworkPassPhrase()

* last commit before writing unit tests

* Making the transaction service more injectible and adding fields to the Payload

* typo

* typo

* Update 2024-08-28.0-tss_transactions.sql

* lint errors

* go mod tidy

* test cases + other changes

* remoce unused mocks

* error handler service returns errorHandlerService

* changes based on comments

* lint deadcode error - suppress for now

* removed deadcode

* changes after comments on transaction service pr

* TSS Error Handler Service

* removing print statements

* Update transaction_service.go

* responding to comments

* remove commented code

* tx service changes

* remove comment

* remove comment

* latest tx service changes

* adding a router + utils file

* removing println

* changed function name

* Code() helper function on RPCTXCode

* Code()

* adding a helpers file

* removing BuildTestFeeBumpTransaction

* casing

* better test for Send

* resolving merge conflicts

* removing mockSleep

* delete channels/mocks.go

* incorporating Daniel's changes + comments

* delete files

* remove unused code

* name change

* remove commented code

* moving the mocks file inside servicesmocks dir

* name changes

* refactor

* changes to serve.go

* checking error on route.Route

* check error

* merging main

* fixing parsesendresp tests

* changes based on comments

* removing test case that is not relevant anymore

* changes based on prev pr comments

* remove fmt.Println

* U -> u

* making sure the pools implement the channel interface

* responding to comments

* adding a log for when max retry limit is reached

* %e -> %v for log statements

* merged latest main

* go mod tidy
  • Loading branch information
gouthamp-stellar authored Oct 9, 2024
1 parent dd833aa commit 4e06683
Show file tree
Hide file tree
Showing 13 changed files with 676 additions and 62 deletions.
9 changes: 9 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ func (c *serveCmd) Command() *cobra.Command {
utils.ChannelAccountEncryptionPassphraseOption(&cfg.EncryptionPassphrase),
utils.SentryDSNOption(&sentryDSN),
utils.StellarEnvironmentOption(&stellarEnvironment),
utils.RPCURLOption(&cfg.RPCURL),
utils.ErrorHandlerServiceJitterChannelBufferSizeOption(&cfg.ErrorHandlerServiceJitterChannelBufferSize),
utils.ErrorHandlerServiceJitterChannelMaxWorkersOption(&cfg.ErrorHandlerServiceJitterChannelMaxWorkers),
utils.ErrorHandlerServiceNonJitterChannelBufferSizeOption(&cfg.ErrorHandlerServiceNonJitterChannelBufferSize),
utils.ErrorHandlerServiceNonJitterChannelMaxWorkersOption(&cfg.ErrorHandlerServiceNonJitterChannelMaxWorkers),
utils.ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMSOption(&cfg.ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMS),
utils.ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMSOption(&cfg.ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMS),
utils.ErrorHandlerServiceJitterChannelMaxRetriesOptions(&cfg.ErrorHandlerServiceJitterChannelMaxRetries),
utils.ErrorHandlerServiceNonJitterChannelMaxRetriesOption(&cfg.ErrorHandlerServiceNonJitterChannelMaxRetries),
{
Name: "port",
Usage: "Port to listen and serve on",
Expand Down
21 changes: 0 additions & 21 deletions cmd/utils/global_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,6 @@ func RPCURLOption(configKey *string) *config.ConfigOption {
}
}

func RPCCallerServiceChannelBufferSizeOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "tss-rpc-caller-service-channel-buffer-size",
Usage: "Set the buffer size for TSS RPC Caller Service channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 1000,
}
}

func RPCCallerServiceMaxWorkersOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "tss-rpc-caller-service-channel-max-workers",
Usage: "Set the maximum number of workers for TSS RPC Caller Service channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 100,
}

}

func ChannelAccountEncryptionPassphraseOption(configKey *string) *config.ConfigOption {
return &config.ConfigOption{
Name: "channel-account-encryption-passphrase",
Expand Down
118 changes: 118 additions & 0 deletions cmd/utils/tss_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package utils

import (
"go/types"

"github.com/stellar/go/support/config"
)

func RPCCallerServiceChannelBufferSizeOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "tss-rpc-caller-service-channel-buffer-size",
Usage: "Set the buffer size for TSS RPC Caller Service channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 1000,
}
}

func RPCCallerServiceMaxWorkersOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "tss-rpc-caller-service-channel-max-workers",
Usage: "Set the maximum number of workers for TSS RPC Caller Service channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 100,
}

}

func ErrorHandlerServiceJitterChannelBufferSizeOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "error-handler-service-jitter-channel-buffer-size",
Usage: "Set the buffer size of the Error Handler Service Jitter channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 100,
Required: true,
}
}

func ErrorHandlerServiceJitterChannelMaxWorkersOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "error-handler-service-jitter-channel-max-workers",
Usage: "Set the maximum number of workers for the Error Handler Service Jitter channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 10,
Required: true,
}
}

func ErrorHandlerServiceNonJitterChannelBufferSizeOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "error-handler-service-non-jitter-channel-buffer-size",
Usage: "Set the buffer size of the Error Handler Service Non Jitter channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 100,
Required: true,
}

}

func ErrorHandlerServiceNonJitterChannelMaxWorkersOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "error-handler-service-non-jitter-channel-max-workers",
Usage: "Set the maximum number of workers for the Error Handler Service Non Jitter channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 10,
Required: true,
}
}

func ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMSOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "error-handler-service-jitter-channel-min-wait-between-retries",
Usage: "Set the minimum amount of time in ms between retries for the Error Handler Service Jitter channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 10,
Required: true,
}
}

func ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMSOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "error-handler-service-non-jitter-channel-wait-between-retries",
Usage: "Set the amount of time in ms between retries for the Error Handler Service Non Jitter channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 10,
Required: true,
}
}

func ErrorHandlerServiceJitterChannelMaxRetriesOptions(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "error-handler-service-jitter-channel-max-retries",
Usage: "Set the number of retries for each task in the Error Handler Service Jitter channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 10,
Required: true,
}

}

func ErrorHandlerServiceNonJitterChannelMaxRetriesOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "error-handler-service-non-jitter-channel-max-retries",
Usage: "Set the number of retries for each task in the Error Handler Service Non Jitter channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 10,
Required: true,
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/stellar/go v0.0.0-20240416222646-fd107948e6c4
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2
github.com/stretchr/testify v1.9.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/term v0.25.0
)

Expand Down Expand Up @@ -88,7 +89,6 @@ require (
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
Expand Down
58 changes: 45 additions & 13 deletions internal/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,17 @@ type Configs struct {
DistributionAccountSignatureClient signing.SignatureClient
ChannelAccountSignatureClient signing.SignatureClient
// TSS
RPCURL string
RPCCallerServiceChannelBufferSize int
RPCCallerServiceChannelMaxWorkers int
RPCURL string
RPCCallerServiceChannelBufferSize int
RPCCallerServiceChannelMaxWorkers int
ErrorHandlerServiceJitterChannelBufferSize int
ErrorHandlerServiceJitterChannelMaxWorkers int
ErrorHandlerServiceNonJitterChannelBufferSize int
ErrorHandlerServiceNonJitterChannelMaxWorkers int
ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMS int
ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMS int
ErrorHandlerServiceJitterChannelMaxRetries int
ErrorHandlerServiceNonJitterChannelMaxRetries int
// Error Tracker
AppTracker apptracker.AppTracker
}
Expand All @@ -83,8 +91,10 @@ type handlerDeps struct {
AccountSponsorshipService services.AccountSponsorshipService
PaymentService services.PaymentService
// TSS
RPCCallerServiceChannel tss.Channel
TSSRouter tssrouter.Router
RPCCallerChannel tss.Channel
ErrorJitterChannel tss.Channel
ErrorNonJitterChannel tss.Channel
TSSRouter tssrouter.Router
// Error Tracker
AppTracker apptracker.AppTracker
}
Expand All @@ -104,7 +114,9 @@ func Serve(cfg Configs) error {
},
OnStopping: func() {
log.Info("Stopping Wallet Backend server")
deps.RPCCallerServiceChannel.Stop()
deps.ErrorJitterChannel.Stop()
deps.ErrorNonJitterChannel.Stop()
deps.RPCCallerChannel.Stop()
},
})

Expand Down Expand Up @@ -195,21 +207,39 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
Store: store,
})

rpcCallerServiceChannel := tsschannel.NewRPCCallerChannel(tsschannel.RPCCallerChannelConfigs{
rpcCallerChannel := tsschannel.NewRPCCallerChannel(tsschannel.RPCCallerChannelConfigs{
TxManager: txManager,
Store: store,
MaxBufferSize: cfg.RPCCallerServiceChannelBufferSize,
MaxWorkers: cfg.RPCCallerServiceChannelMaxWorkers,
})

errorJitterChannel := tsschannel.NewErrorJitterChannel(tsschannel.ErrorJitterChannelConfigs{
TxManager: txManager,
MaxBufferSize: cfg.ErrorHandlerServiceJitterChannelBufferSize,
MaxWorkers: cfg.ErrorHandlerServiceJitterChannelMaxWorkers,
MaxRetries: cfg.ErrorHandlerServiceJitterChannelMaxRetries,
MinWaitBtwnRetriesMS: cfg.ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMS,
})

errorNonJitterChannel := tsschannel.NewErrorNonJitterChannel(tsschannel.ErrorNonJitterChannelConfigs{
TxManager: txManager,
MaxBufferSize: cfg.ErrorHandlerServiceJitterChannelBufferSize,
MaxWorkers: cfg.ErrorHandlerServiceJitterChannelMaxWorkers,
MaxRetries: cfg.ErrorHandlerServiceJitterChannelMaxRetries,
WaitBtwnRetriesMS: cfg.ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMS,
})

router := tssrouter.NewRouter(tssrouter.RouterConfigs{
RPCCallerChannel: rpcCallerServiceChannel,
ErrorJitterChannel: nil,
ErrorNonJitterChannel: nil,
RPCCallerChannel: rpcCallerChannel,
ErrorJitterChannel: errorJitterChannel,
ErrorNonJitterChannel: errorNonJitterChannel,
WebhookChannel: nil,
})

rpcCallerServiceChannel.SetRouter(router)
rpcCallerChannel.SetRouter(router)
errorJitterChannel.SetRouter(router)
errorNonJitterChannel.SetRouter(router)

return handlerDeps{
Models: models,
Expand All @@ -220,8 +250,10 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
PaymentService: paymentService,
AppTracker: cfg.AppTracker,
// TSS
RPCCallerServiceChannel: rpcCallerServiceChannel,
TSSRouter: router,
RPCCallerChannel: rpcCallerChannel,
ErrorJitterChannel: errorJitterChannel,
ErrorNonJitterChannel: errorNonJitterChannel,
TSSRouter: router,
}, nil
}

Expand Down
97 changes: 97 additions & 0 deletions internal/tss/channels/error_jitter_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package channels

import (
"context"
"slices"
"time"

"github.com/alitto/pond"
"github.com/stellar/go/support/log"
"github.com/stellar/wallet-backend/internal/tss"
"github.com/stellar/wallet-backend/internal/tss/router"
"github.com/stellar/wallet-backend/internal/tss/services"
"golang.org/x/exp/rand"
)

type ErrorJitterChannelConfigs struct {
TxManager services.TransactionManager
Router router.Router
MaxBufferSize int
MaxWorkers int
MaxRetries int
MinWaitBtwnRetriesMS int
}

type errorJitterPool struct {
Pool *pond.WorkerPool
TxManager services.TransactionManager
Router router.Router
MaxRetries int
MinWaitBtwnRetriesMS int
}

var ErrorJitterChannelName = "ErrorJitterChannel"

var _ tss.Channel = (*errorJitterPool)(nil)

func jitter(dur time.Duration) time.Duration {
halfDur := int64(dur / 2)
delta := rand.Int63n(halfDur) - halfDur/2
return dur + time.Duration(delta)
}

func NewErrorJitterChannel(cfg ErrorJitterChannelConfigs) *errorJitterPool {
pool := pond.New(cfg.MaxBufferSize, cfg.MaxWorkers, pond.Strategy(pond.Balanced()))
return &errorJitterPool{
Pool: pool,
TxManager: cfg.TxManager,
Router: cfg.Router,
MaxRetries: cfg.MaxRetries,
MinWaitBtwnRetriesMS: cfg.MinWaitBtwnRetriesMS,
}
}

func (p *errorJitterPool) Send(payload tss.Payload) {
p.Pool.Submit(func() {
p.Receive(payload)
})
}

func (p *errorJitterPool) Receive(payload tss.Payload) {
ctx := context.Background()
var i int
for i = 0; i < p.MaxRetries; i++ {
currentBackoff := p.MinWaitBtwnRetriesMS * (1 << i)
time.Sleep(jitter(time.Duration(currentBackoff)) * time.Millisecond)
rpcSendResp, err := p.TxManager.BuildAndSubmitTransaction(ctx, ErrorJitterChannelName, payload)
if err != nil {
log.Errorf("%s: unable to sign and submit transaction: %v", ErrorJitterChannelName, err)
return
}
payload.RpcSubmitTxResponse = rpcSendResp
if !slices.Contains(tss.JitterErrorCodes, rpcSendResp.Code.TxResultCode) {
err = p.Router.Route(payload)
if err != nil {
log.Errorf("%s: unable to route payload: %v", ErrorJitterChannelName, err)
return
}
return
}
}

// Retry limit reached, route the payload to the router so it can re-route it to this pool and keep re-trying
log.Infof("%s: max retry limit reached", ErrorJitterChannelName)
err := p.Router.Route(payload)
if err != nil {
log.Errorf("%s: unable to route payload: %v", ErrorJitterChannelName, err)
}

}

func (p *errorJitterPool) SetRouter(router router.Router) {
p.Router = router
}

func (p *errorJitterPool) Stop() {
p.Pool.StopAndWait()
}
Loading

0 comments on commit 4e06683

Please sign in to comment.