Skip to content

Commit

Permalink
all: TSS RPC Caller Service (#40)
Browse files Browse the repository at this point in the history
What
This is TSS's RPC Caller Service as described in this doc:
https://docs.google.com/document/d/1xcX86-w8lwT_60flCuYBK9X9co-108-5X6D9e5epw0U/edit#heading=h.76emp0zcbvdy

Why
This service is responsible for handling incoming transaction submission requests from clients and calling RPC (for the very first time) to submit those transactions to the network
  • Loading branch information
gouthamp-stellar authored Oct 9, 2024
1 parent 1e7ab45 commit dd833aa
Show file tree
Hide file tree
Showing 24 changed files with 1,122 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (c *ingestCmd) Command() *cobra.Command {
utils.NetworkPassphraseOption(&cfg.NetworkPassphrase),
utils.SentryDSNOption(&sentryDSN),
utils.StellarEnvironmentOption(&stellarEnvironment),
utils.RPCClientURLOption(&cfg.RPCURL),
utils.RPCURLOption(&cfg.RPCURL),
{
Name: "captive-core-bin-path",
Usage: "Path to Captive Core's binary file.",
Expand Down
3 changes: 3 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ func (c *serveCmd) Command() *cobra.Command {
utils.NetworkPassphraseOption(&cfg.NetworkPassphrase),
utils.BaseFeeOption(&cfg.BaseFee),
utils.HorizonClientURLOption(&cfg.HorizonClientURL),
utils.RPCURLOption(&cfg.RPCURL),
utils.RPCCallerServiceChannelBufferSizeOption(&cfg.RPCCallerServiceChannelBufferSize),
utils.RPCCallerServiceMaxWorkersOption(&cfg.RPCCallerServiceChannelMaxWorkers),
utils.ChannelAccountEncryptionPassphraseOption(&cfg.EncryptionPassphrase),
utils.SentryDSNOption(&sentryDSN),
utils.StellarEnvironmentOption(&stellarEnvironment),
Expand Down
34 changes: 28 additions & 6 deletions cmd/utils/global_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,38 @@ func HorizonClientURLOption(configKey *string) *config.ConfigOption {
}
}

func RPCClientURLOption(configKey *string) *config.ConfigOption {
func RPCURLOption(configKey *string) *config.ConfigOption {
return &config.ConfigOption{
Name: "rpc-url",
Usage: "The URL of the Stellar RPC server which this application will communicate with.",
OptType: types.String,
ConfigKey: configKey,
Required: true,
Name: "rpc-url",
Usage: "The URL of the RPC Server.",
OptType: types.String,
ConfigKey: configKey,
FlagDefault: "localhost:8080",
Required: true,
}
}

func RPCCallerServiceChannelBufferSizeOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "tss-rpc-caller-service-channel-buffer-size",
Usage: "Set the buffer size for TSS RPC Caller Service channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 1000,
}
}

func RPCCallerServiceMaxWorkersOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "tss-rpc-caller-service-channel-max-workers",
Usage: "Set the maximum number of workers for TSS RPC Caller Service channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 100,
}

}

func ChannelAccountEncryptionPassphraseOption(configKey *string) *config.ConfigOption {
return &config.ConfigOption{
Name: "channel-account-encryption-passphrase",
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ services:

# Channel Account
CHANNEL_ACCOUNT_ENCRYPTION_PASSPHRASE: ${CHANNEL_ACCOUNT_ENCRYPTION_PASSPHRASE}
TRACKER_DSN: ${TRACKER_DSN}
STELLAR_ENVIRONMENT: ${STELLAR_ENVIRONMENT}
ingest:
image: stellar/wallet-backend:development
depends_on:
Expand All @@ -63,6 +65,8 @@ services:
- ./wallet-backend ingest
environment:
DATABASE_URL: postgres://postgres@db:5432/wallet-backend?sslmode=disable
TRACKER_DSN: ${TRACKER_DSN}
STELLAR_ENVIRONMENT: ${STELLAR_ENVIRONMENT}
volumes:
postgres-db:
driver: local
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/stellar/wallet-backend
go 1.23.2

require (
github.com/alitto/pond v1.9.2
github.com/aws/aws-sdk-go v1.55.5
github.com/getsentry/sentry-go v0.29.0
github.com/go-chi/chi v4.1.2+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs=
github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
Expand Down
63 changes: 61 additions & 2 deletions internal/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (
"github.com/stellar/wallet-backend/internal/signing"
"github.com/stellar/wallet-backend/internal/signing/store"
signingutils "github.com/stellar/wallet-backend/internal/signing/utils"
"github.com/stellar/wallet-backend/internal/tss"
tsschannel "github.com/stellar/wallet-backend/internal/tss/channels"
tssrouter "github.com/stellar/wallet-backend/internal/tss/router"
tssservices "github.com/stellar/wallet-backend/internal/tss/services"
tssstore "github.com/stellar/wallet-backend/internal/tss/store"
)

// NOTE: perhaps move this to a environment variable.
Expand Down Expand Up @@ -58,7 +63,10 @@ type Configs struct {
HorizonClientURL string
DistributionAccountSignatureClient signing.SignatureClient
ChannelAccountSignatureClient signing.SignatureClient

// TSS
RPCURL string
RPCCallerServiceChannelBufferSize int
RPCCallerServiceChannelMaxWorkers int
// Error Tracker
AppTracker apptracker.AppTracker
}
Expand All @@ -74,7 +82,11 @@ type handlerDeps struct {
AccountService services.AccountService
AccountSponsorshipService services.AccountSponsorshipService
PaymentService services.PaymentService
AppTracker apptracker.AppTracker
// TSS
RPCCallerServiceChannel tss.Channel
TSSRouter tssrouter.Router
// Error Tracker
AppTracker apptracker.AppTracker
}

func Serve(cfg Configs) error {
Expand All @@ -92,6 +104,7 @@ func Serve(cfg Configs) error {
},
OnStopping: func() {
log.Info("Stopping Wallet Backend server")
deps.RPCCallerServiceChannel.Stop()
},
})

Expand Down Expand Up @@ -155,6 +168,49 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
}
go ensureChannelAccounts(channelAccountService, int64(cfg.NumberOfChannelAccounts))

// TSS
txServiceOpts := tssservices.TransactionServiceOptions{
DistributionAccountSignatureClient: cfg.DistributionAccountSignatureClient,
ChannelAccountSignatureClient: cfg.ChannelAccountSignatureClient,
HorizonClient: &horizonClient,
BaseFee: int64(cfg.BaseFee),
}
tssTxService, err := tssservices.NewTransactionService(txServiceOpts)
if err != nil {
return handlerDeps{}, fmt.Errorf("instantiating tss transaction service: %w", err)
}
httpClient := http.Client{Timeout: time.Duration(30 * time.Second)}
rpcService, err := services.NewRPCService(cfg.RPCURL, &httpClient)
if err != nil {
return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err)
}

store, err := tssstore.NewStore(dbConnectionPool)
if err != nil {
return handlerDeps{}, fmt.Errorf("instantiating tss store: %w", err)
}
txManager := tssservices.NewTransactionManager(tssservices.TransactionManagerConfigs{
TxService: tssTxService,
RPCService: rpcService,
Store: store,
})

rpcCallerServiceChannel := tsschannel.NewRPCCallerChannel(tsschannel.RPCCallerChannelConfigs{
TxManager: txManager,
Store: store,
MaxBufferSize: cfg.RPCCallerServiceChannelBufferSize,
MaxWorkers: cfg.RPCCallerServiceChannelMaxWorkers,
})

router := tssrouter.NewRouter(tssrouter.RouterConfigs{
RPCCallerChannel: rpcCallerServiceChannel,
ErrorJitterChannel: nil,
ErrorNonJitterChannel: nil,
WebhookChannel: nil,
})

rpcCallerServiceChannel.SetRouter(router)

return handlerDeps{
Models: models,
SignatureVerifier: signatureVerifier,
Expand All @@ -163,6 +219,9 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
AccountSponsorshipService: accountSponsorshipService,
PaymentService: paymentService,
AppTracker: cfg.AppTracker,
// TSS
RPCCallerServiceChannel: rpcCallerServiceChannel,
TSSRouter: router,
}, nil
}

Expand Down
23 changes: 23 additions & 0 deletions internal/services/servicesmocks/rpc_service_mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package servicesmocks

import (
"github.com/stellar/wallet-backend/internal/entities"
"github.com/stellar/wallet-backend/internal/services"
"github.com/stretchr/testify/mock"
)

type RPCServiceMock struct {
mock.Mock
}

var _ services.RPCService = (*RPCServiceMock)(nil)

func (r *RPCServiceMock) SendTransaction(transactionXdr string) (entities.RPCSendTransactionResult, error) {
args := r.Called(transactionXdr)
return args.Get(0).(entities.RPCSendTransactionResult), args.Error(1)
}

func (r *RPCServiceMock) GetTransaction(transactionHash string) (entities.RPCGetTransactionResult, error) {
args := r.Called(transactionHash)
return args.Get(0).(entities.RPCGetTransactionResult), args.Error(1)
}
80 changes: 80 additions & 0 deletions internal/tss/channels/rpc_caller_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package channels

import (
"context"

"github.com/alitto/pond"

"github.com/stellar/go/support/log"
"github.com/stellar/wallet-backend/internal/tss"
"github.com/stellar/wallet-backend/internal/tss/router"
"github.com/stellar/wallet-backend/internal/tss/services"
"github.com/stellar/wallet-backend/internal/tss/store"
)

type RPCCallerChannelConfigs struct {
TxManager services.TransactionManager
Router router.Router
Store store.Store
MaxBufferSize int
MaxWorkers int
}

type rpcCallerPool struct {
Pool *pond.WorkerPool
TxManager services.TransactionManager
Router router.Router
Store store.Store
}

var RPCCallerChannelName = "RPCCallerChannel"

var _ tss.Channel = (*rpcCallerPool)(nil)

func NewRPCCallerChannel(cfg RPCCallerChannelConfigs) *rpcCallerPool {
pool := pond.New(cfg.MaxBufferSize, cfg.MaxWorkers, pond.Strategy(pond.Balanced()))
return &rpcCallerPool{
Pool: pool,
TxManager: cfg.TxManager,
Store: cfg.Store,
Router: cfg.Router,
}

}

func (p *rpcCallerPool) Send(payload tss.Payload) {
p.Pool.Submit(func() {
p.Receive(payload)
})
}

func (p *rpcCallerPool) Receive(payload tss.Payload) {

ctx := context.Background()
// Create a new transaction record in the transactions table.
err := p.Store.UpsertTransaction(ctx, payload.WebhookURL, payload.TransactionHash, payload.TransactionXDR, tss.RPCTXStatus{OtherStatus: tss.NewStatus})

if err != nil {
log.Errorf("%s: unable to upsert transaction into transactions table: %e", RPCCallerChannelName, err)
return
}
rpcSendResp, err := p.TxManager.BuildAndSubmitTransaction(ctx, RPCCallerChannelName, payload)

if err != nil {
log.Errorf("%s: unable to sign and submit transaction: %e", RPCCallerChannelName, err)
return
}
payload.RpcSubmitTxResponse = rpcSendResp
err = p.Router.Route(payload)
if err != nil {
log.Errorf("%s: unable to route payload: %e", RPCCallerChannelName, err)
}
}

func (p *rpcCallerPool) SetRouter(router router.Router) {
p.Router = router
}

func (p *rpcCallerPool) Stop() {
p.Pool.StopAndWait()
}
Loading

0 comments on commit dd833aa

Please sign in to comment.