Skip to content

Commit

Permalink
changes based on comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gouthamp-stellar committed Sep 30, 2024
1 parent 12425f9 commit 6b2a6f7
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 43 deletions.
10 changes: 0 additions & 10 deletions internal/entities/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,12 @@ const (
SuccessStatus RPCStatus = "SUCCESS"
)

type RPCEntry struct {
Key string `json:"key"`
XDR string `json:"xdr"`
LastModifiedLedgerSeq int64 `json:"lastModifiedLedgerSeq"`
}

type RPCResponse struct {
Result json.RawMessage `json:"result"`
JSONRPC string `json:"jsonrpc"`
ID int64 `json:"id"`
}

type RPCGetLedgerEntriesResult struct {
Entries []RPCEntry `json:"entries"`
}

type RPCGetTransactionResult struct {
Status RPCStatus `json:"status"`
LatestLedger int64 `json:"latestLedger"`
Expand Down
17 changes: 10 additions & 7 deletions internal/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ type handlerDeps struct {
// TSS
RPCCallerServiceChannel tss.Channel
TSSRouter tssrouter.Router
AppTracker apptracker.AppTracker
// Error Tracker
AppTracker apptracker.AppTracker
}

func Serve(cfg Configs) error {
Expand Down Expand Up @@ -172,7 +173,7 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
DistributionAccountSignatureClient: cfg.DistributionAccountSignatureClient,
ChannelAccountSignatureClient: cfg.ChannelAccountSignatureClient,
HorizonClient: &horizonClient,
BaseFee: int64(cfg.BaseFee), // Reuse horizon base fee for RPC??
BaseFee: int64(cfg.BaseFee),
}
tssTxService, err := tssservices.NewTransactionService(txServiceOpts)
if err != nil {
Expand All @@ -184,20 +185,22 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err)
}

// re-use same context as above??
store := tssstore.NewStore(dbConnectionPool)
store, err := tssstore.NewStore(dbConnectionPool)
if err != nil {
return handlerDeps{}, fmt.Errorf("instantiating tss store: %w", err)
}
txManager := tssservices.NewTransactionManager(tssservices.TransactionManagerConfigs{
TxService: tssTxService,
RPCService: rpcService,
Store: store,
})
tssChannelConfigs := tsschannel.RPCCallerChannelConfigs{

rpcCallerServiceChannel := tsschannel.NewRPCCallerChannel(tsschannel.RPCCallerChannelConfigs{
TxManager: txManager,
Store: store,
MaxBufferSize: cfg.RPCCallerServiceChannelBufferSize,
MaxWorkers: cfg.RPCCallerServiceChannelMaxWorkers,
}
rpcCallerServiceChannel := tsschannel.NewRPCCallerChannel(tssChannelConfigs)
})

router := tssrouter.NewRouter(tssrouter.RouterConfigs{
RPCCallerChannel: rpcCallerServiceChannel,
Expand Down
4 changes: 0 additions & 4 deletions internal/services/servicesmocks/rpc_service_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,3 @@ func (r *RPCServiceMock) GetTransaction(transactionHash string) (entities.RPCGet
args := r.Called(transactionHash)
return args.Get(0).(entities.RPCGetTransactionResult), args.Error(1)
}

type TransactionManagerMock struct {
mock.Mock
}
13 changes: 5 additions & 8 deletions internal/tss/channels/rpc_caller_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/alitto/pond"

"github.com/stellar/go/support/log"
"github.com/stellar/wallet-backend/internal/entities"
"github.com/stellar/wallet-backend/internal/tss"
"github.com/stellar/wallet-backend/internal/tss/router"
"github.com/stellar/wallet-backend/internal/tss/services"
Expand Down Expand Up @@ -54,21 +53,19 @@ func (p *rpcCallerPool) Receive(payload tss.Payload) {
err := p.Store.UpsertTransaction(ctx, payload.WebhookURL, payload.TransactionHash, payload.TransactionXDR, tss.RPCTXStatus{OtherStatus: tss.NewStatus})

if err != nil {
log.Errorf("%s: Unable to upsert transaction into transactions table: %e", RPCCallerChannelName, err)
log.Errorf("%s: unable to upsert transaction into transactions table: %e", RPCCallerChannelName, err)
return
}
rpcSendResp, err := p.TxManager.BuildAndSubmitTransaction(ctx, RPCCallerChannelName, payload)

if err != nil {
log.Errorf("%s: Unable to sign and submit transaction: %e", RPCCallerChannelName, err)
log.Errorf("%s: unable to sign and submit transaction: %e", RPCCallerChannelName, err)
return
}
payload.RpcSubmitTxResponse = rpcSendResp
if rpcSendResp.Status.RPCStatus == entities.TryAgainLaterStatus || rpcSendResp.Status.RPCStatus == entities.ErrorStatus {
err = p.Router.Route(payload)
if err != nil {
log.Errorf("%s: Unable to route payload: %e", RPCCallerChannelName, err)
}
err = p.Router.Route(payload)
if err != nil {
log.Errorf("%s: unable to route payload: %e", RPCCallerChannelName, err)
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/tss/channels/rpc_caller_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestSend(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
store := store.NewStore(dbConnectionPool)
store, _ := store.NewStore(dbConnectionPool)
txManagerMock := services.TransactionManagerMock{}
routerMock := router.MockRouter{}
cfgs := RPCCallerChannelConfigs{
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestReceivee(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
store := store.NewStore(dbConnectionPool)
store, _ := store.NewStore(dbConnectionPool)
txManagerMock := services.TransactionManagerMock{}
routerMock := router.MockRouter{}
cfgs := RPCCallerChannelConfigs{
Expand Down
2 changes: 1 addition & 1 deletion internal/tss/services/transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (t *transactionManager) BuildAndSubmitTransaction(ctx context.Context, chan
return rpcSendResp, fmt.Errorf("%s: RPC fail: %w", channelName, parseErr)
}

if rpcErr != nil && rpcSendResp.Code.OtherCodes == tss.RPCFailCode || rpcSendResp.Code.OtherCodes == tss.UnmarshalBinaryCode {
if parseErr != nil && rpcSendResp.Code.OtherCodes == tss.RPCFailCode || rpcSendResp.Code.OtherCodes == tss.UnmarshalBinaryCode {
return tss.RPCSendTxResponse{}, fmt.Errorf("%s: RPC fail: %w", channelName, rpcErr)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/tss/services/transaction_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestBuildAndSubmitTransaction(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
store := store.NewStore(dbConnectionPool)
store, _ := store.NewStore(dbConnectionPool)
txServiceMock := TransactionServiceMock{}
rpcServiceMock := servicesmocks.RPCServiceMock{}
txManager := NewTransactionManager(TransactionManagerConfigs{
Expand Down
19 changes: 11 additions & 8 deletions internal/tss/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ type store struct {
DB db.ConnectionPool
}

func NewStore(db db.ConnectionPool) Store {
func NewStore(db db.ConnectionPool) (Store, error) {
if db == nil {
return nil, fmt.Errorf("db cannot be nil")
}
return &store{
DB: db,
}
}, nil
}

func (s *store) UpsertTransaction(ctx context.Context, webhookURL string, txHash string, txXDR string, status tss.RPCTXStatus) error {
Expand All @@ -33,9 +36,9 @@ func (s *store) UpsertTransaction(ctx context.Context, webhookURL string, txHash
($1, $2, $3, $4)
ON CONFLICT (transaction_hash)
DO UPDATE SET
transaction_xdr = $2,
webhook_url = $3,
current_status = $4,
transaction_xdr = EXCLUDED.transaction_xdr,
webhook_url = EXCLUDED.webhook_url,
current_status = EXCLUDED.current_status,
updated_at = NOW();
`
_, err := s.DB.ExecContext(ctx, q, txHash, txXDR, webhookURL, status.Status())
Expand All @@ -53,9 +56,9 @@ func (s *store) UpsertTry(ctx context.Context, txHash string, feeBumpTxHash stri
($1, $2, $3, $4)
ON CONFLICT (try_transaction_hash)
DO UPDATE SET
original_transaction_hash = $1,
try_transaction_xdr = $3,
status = $4,
original_transaction_hash = EXCLUDED.original_transaction_hash,
try_transaction_xdr = EXCLUDED.try_transaction_xdr,
status = EXCLUDED.status,
updated_at = NOW();
`
_, err := s.DB.ExecContext(ctx, q, txHash, feeBumpTxHash, feeBumpTxXDR, status.Code())
Expand Down
4 changes: 2 additions & 2 deletions internal/tss/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestUpsertTransaction(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
store := NewStore(dbConnectionPool)
store, _ := NewStore(dbConnectionPool)
t.Run("insert", func(t *testing.T) {
_ = store.UpsertTransaction(context.Background(), "www.stellar.org", "hash", "xdr", tss.RPCTXStatus{OtherStatus: tss.NewStatus})

Expand Down Expand Up @@ -52,7 +52,7 @@ func TestUpsertTry(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()
store := NewStore(dbConnectionPool)
store, _ := NewStore(dbConnectionPool)
t.Run("insert", func(t *testing.T) {
code := tss.RPCTXCode{OtherCodes: tss.NewCode}
_ = store.UpsertTry(context.Background(), "hash", "feebumptxhash", "feebumptxxdr", code)
Expand Down

0 comments on commit 6b2a6f7

Please sign in to comment.