Skip to content

Commit

Permalink
tss: Webhook handler service (#44)
Browse files Browse the repository at this point in the history
* tables and interfaces for TSS

* TSS tables and the channel interface

* remove empty lines

* update

* adding semicolons

adding semicolons

* moving all migrations to one file

* make hash primary key instead of xdr

* missing ,

* remove the index on try_transaction_xdr and add column/index on try_transaction_hash

...also remove outgoing/incoming_status and have one status column

* Squashed commit of the following:

commit fb807aa
Author: gouthamp-stellar <[email protected]>
Date:   Tue Sep 3 10:28:49 2024 -0700

    remove the index on try_transaction_xdr and add column/index on try_transaction_hash

    ...also remove outgoing/incoming_status and have one status column

commit 6fc0dc2
Author: gouthamp-stellar <[email protected]>
Date:   Tue Sep 3 08:58:46 2024 -0700

    missing ,

commit a9cf4e3
Author: gouthamp-stellar <[email protected]>
Date:   Tue Sep 3 01:55:23 2024 -0700

    make hash primary key instead of xdr

commit c0f9d32
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 15:18:27 2024 -0700

    moving all migrations to one file

commit 2de9898
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 15:16:53 2024 -0700

    adding semicolons

    adding semicolons

commit 373c71a
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 15:12:24 2024 -0700

    update

commit 3f9f9f0
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 15:12:00 2024 -0700

    remove empty lines

commit 9920f48
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 15:06:40 2024 -0700

    TSS tables and the channel interface

commit a58d519
Author: gouthamp-stellar <[email protected]>
Date:   Fri Aug 30 10:55:22 2024 -0700

    tables and interfaces for TSS

* commit #2

* changing from RpcIngestTxResponse ->  RpcGetIngestTxResponse

* latest changes

* add tests for ValidateOptions

* changes based on comments

* latest changes based on changes to interface

* string -> RPCTXStatus

* adding a transaction_builder.go which takes a list of operation xdrs and builds a transaction out of it

* moving transaction_service to the utils dir

* upper case Channel methods

* latest changes

* p.txService.NetworkPassPhrase()

* last commit before writing unit tests

* Making the transaction service more injectible and adding fields to the Payload

* typo

* typo

* Update 2024-08-28.0-tss_transactions.sql

* lint errors

* go mod tidy

* test cases + other changes

* remoce unused mocks

* error handler service returns errorHandlerService

* changes based on comments

* lint deadcode error - suppress for now

* removed deadcode

* changes after comments on transaction service pr

* TSS Error Handler Service

* removing print statements

* Update transaction_service.go

* responding to comments

* remove commented code

* tx service changes

* remove comment

* remove comment

* latest tx service changes

* adding a router + utils file

* removing println

* changed function name

* commit #1

* Code() helper function on RPCTXCode

* Code()

* Delete .env.swp

* adding a helpers file

* removing BuildTestFeeBumpTransaction

* adding to serve.go etc

* casing

* better test for Send

* resolving merge conflicts

* removing mockSleep

* delete channels/mocks.go

* sleep -> time.Sleep

* incorporating Daniel's changes + comments

* delete files

* remove unused code

* name change

* remove commented code

* moving the mocks file inside servicesmocks dir

* name changes

* refactor

* changes to serve.go

* checking error on route.Route

* check error

* merging main

* fixing parsesendresp tests

* commit changes

* merging in changes from eror_handler_service

* delete file

* changes based on comments

* removing test case that is not relevant anymore

* changes based on prev pr comments

* remove fmt.Println

* merge latest error_handler_service branch + small changes

* %s -> %w

* U -> u

* variable for channel name

* merge main

* typo in desc string

* millisecond wait time
  • Loading branch information
gouthamp-stellar authored Oct 22, 2024
1 parent c062ffb commit 6553fa9
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 10 deletions.
5 changes: 5 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func (c *serveCmd) Command() *cobra.Command {
utils.ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMSOption(&cfg.ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMS),
utils.ErrorHandlerServiceJitterChannelMaxRetriesOptions(&cfg.ErrorHandlerServiceJitterChannelMaxRetries),
utils.ErrorHandlerServiceNonJitterChannelMaxRetriesOption(&cfg.ErrorHandlerServiceNonJitterChannelMaxRetries),
utils.WebhookHandlerServiceChannelMaxBufferSizeOption(&cfg.WebhookHandlerServiceChannelMaxBufferSize),
utils.WebhookHandlerServiceChannelMaxWorkersOptions(&cfg.WebhookHandlerServiceChannelMaxWorkers),
utils.WebhookHandlerServiceChannelMaxRetriesOption(&cfg.WebhookHandlerServiceChannelMaxRetries),
utils.WebhookHandlerServiceChannelMinWaitBtwnRetriesMSOption(&cfg.WebhookHandlerServiceChannelMinWaitBtwnRetriesMS),

{
Name: "port",
Usage: "Port to listen and serve on",
Expand Down
44 changes: 44 additions & 0 deletions cmd/utils/tss_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,47 @@ func ErrorHandlerServiceNonJitterChannelMaxRetriesOption(configKey *int) *config
Required: true,
}
}

func WebhookHandlerServiceChannelMaxBufferSizeOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "webhook-service-channel-max-buffer-size",
Usage: "Set the buffer size of the webhook serive channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 100,
Required: true,
}
}

func WebhookHandlerServiceChannelMaxWorkersOptions(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "webhook-service-channel-max-workers",
Usage: "Set the max number of workers for the webhook serive channel.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 10,
Required: true,
}
}

func WebhookHandlerServiceChannelMaxRetriesOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "webhook-service-channel-max-retries",
Usage: "Set the max number of times to ping a webhook before quitting.",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 3,
Required: true,
}
}

func WebhookHandlerServiceChannelMinWaitBtwnRetriesMSOption(configKey *int) *config.ConfigOption {
return &config.ConfigOption{
Name: "webhook-service-channel-min-wait-between-retries",
Usage: "The minumum amout of time to wait before resending the payload to the webhook url",
OptType: types.Int,
ConfigKey: configKey,
FlagDefault: 10,
Required: true,
}
}
27 changes: 22 additions & 5 deletions internal/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type Configs struct {
ErrorHandlerServiceNonJitterChannelWaitBtwnRetriesMS int
ErrorHandlerServiceJitterChannelMaxRetries int
ErrorHandlerServiceNonJitterChannelMaxRetries int
WebhookHandlerServiceChannelMaxBufferSize int
WebhookHandlerServiceChannelMaxWorkers int
WebhookHandlerServiceChannelMaxRetries int
WebhookHandlerServiceChannelMinWaitBtwnRetriesMS int

// Error Tracker
AppTracker apptracker.AppTracker
}
Expand All @@ -94,6 +99,7 @@ type handlerDeps struct {
RPCCallerChannel tss.Channel
ErrorJitterChannel tss.Channel
ErrorNonJitterChannel tss.Channel
WebhookChannel tss.Channel
TSSRouter tssrouter.Router
// Error Tracker
AppTracker apptracker.AppTracker
Expand All @@ -117,6 +123,7 @@ func Serve(cfg Configs) error {
deps.ErrorJitterChannel.Stop()
deps.ErrorNonJitterChannel.Stop()
deps.RPCCallerChannel.Stop()
deps.WebhookChannel.Stop()
},
})

Expand Down Expand Up @@ -180,14 +187,14 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
}
go ensureChannelAccounts(channelAccountService, int64(cfg.NumberOfChannelAccounts))

// TSS
txServiceOpts := tssservices.TransactionServiceOptions{
// TSS setup
tssTxService, err := tssservices.NewTransactionService(tssservices.TransactionServiceOptions{
DistributionAccountSignatureClient: cfg.DistributionAccountSignatureClient,
ChannelAccountSignatureClient: cfg.ChannelAccountSignatureClient,
HorizonClient: &horizonClient,
BaseFee: int64(cfg.BaseFee),
}
tssTxService, err := tssservices.NewTransactionService(txServiceOpts)
})

if err != nil {
return handlerDeps{}, fmt.Errorf("instantiating tss transaction service: %w", err)
}
Expand Down Expand Up @@ -230,11 +237,20 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
WaitBtwnRetriesMS: cfg.ErrorHandlerServiceJitterChannelMinWaitBtwnRetriesMS,
})

httpClient = http.Client{Timeout: time.Duration(30 * time.Second)}
webhookChannel := tsschannel.NewWebhookChannel(tsschannel.WebhookChannelConfigs{
HTTPClient: &httpClient,
MaxBufferSize: cfg.WebhookHandlerServiceChannelMaxBufferSize,
MaxWorkers: cfg.WebhookHandlerServiceChannelMaxWorkers,
MaxRetries: cfg.WebhookHandlerServiceChannelMaxRetries,
MinWaitBtwnRetriesMS: cfg.WebhookHandlerServiceChannelMinWaitBtwnRetriesMS,
})

router := tssrouter.NewRouter(tssrouter.RouterConfigs{
RPCCallerChannel: rpcCallerChannel,
ErrorJitterChannel: errorJitterChannel,
ErrorNonJitterChannel: errorNonJitterChannel,
WebhookChannel: nil,
WebhookChannel: webhookChannel,
})

rpcCallerChannel.SetRouter(router)
Expand All @@ -253,6 +269,7 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
RPCCallerChannel: rpcCallerChannel,
ErrorJitterChannel: errorJitterChannel,
ErrorNonJitterChannel: errorNonJitterChannel,
WebhookChannel: webhookChannel,
TSSRouter: router,
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/tss/channels/error_jitter_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (p *errorJitterPool) Receive(payload tss.Payload) {
err = p.Router.Route(payload)
if err != nil {
log.Errorf("%s: unable to route payload: %v", ErrorJitterChannelName, err)

return
}
return
Expand All @@ -85,7 +86,6 @@ func (p *errorJitterPool) Receive(payload tss.Payload) {
if err != nil {
log.Errorf("%s: unable to route payload: %v", ErrorJitterChannelName, err)
}

}

func (p *errorJitterPool) SetRouter(router router.Router) {
Expand Down
1 change: 0 additions & 1 deletion internal/tss/channels/error_non_jitter_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func (p *errorNonJitterPool) Receive(payload tss.Payload) {
return
}
}

// Retry limit reached, route the payload to the router so it can re-route it to this pool and keep re-trying
log.Infof("%s: max retry limit reached", ErrorNonJitterChannelName)
err := p.Router.Route(payload)
Expand Down
77 changes: 77 additions & 0 deletions internal/tss/channels/webhook_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package channels

import (
"bytes"
"encoding/json"
"net/http"
"time"

"github.com/alitto/pond"
"github.com/stellar/go/support/log"
"github.com/stellar/wallet-backend/internal/tss"
tssutils "github.com/stellar/wallet-backend/internal/tss/utils"
"github.com/stellar/wallet-backend/internal/utils"
)

type WebhookChannelConfigs struct {
HTTPClient utils.HTTPClient
MaxBufferSize int
MaxWorkers int
MaxRetries int
MinWaitBtwnRetriesMS int
}

type webhookPool struct {
Pool *pond.WorkerPool
HTTPClient utils.HTTPClient
MaxRetries int
MinWaitBtwnRetriesMS int
}

var WebhookChannelName = "WebhookChannel"

var _ tss.Channel = (*webhookPool)(nil)

func NewWebhookChannel(cfg WebhookChannelConfigs) *webhookPool {
pool := pond.New(cfg.MaxBufferSize, cfg.MaxWorkers, pond.Strategy(pond.Balanced()))
return &webhookPool{
Pool: pool,
HTTPClient: cfg.HTTPClient,
MaxRetries: cfg.MaxRetries,
MinWaitBtwnRetriesMS: cfg.MinWaitBtwnRetriesMS,
}

}

func (p *webhookPool) Send(payload tss.Payload) {
p.Pool.Submit(func() {
p.Receive(payload)
})
}

func (p *webhookPool) Receive(payload tss.Payload) {
resp := tssutils.PayloadTOTSSResponse(payload)
jsonData, err := json.Marshal(resp)
if err != nil {
log.Errorf("%s: error marshaling payload: %e", WebhookChannelName, err)
return
}
var i int
for i = 0; i < p.MaxRetries; i++ {
resp, err := p.HTTPClient.Post(payload.WebhookURL, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
log.Errorf("%s: error making POST request to webhook: %e", WebhookChannelName, err)
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
return
}
currentBackoff := p.MinWaitBtwnRetriesMS * (1 << i)
time.Sleep(jitter(time.Duration(currentBackoff)) * time.Millisecond)
}
}

func (p *webhookPool) Stop() {
p.Pool.StopAndWait()
}
55 changes: 55 additions & 0 deletions internal/tss/channels/webhook_channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package channels

import (
"bytes"
"encoding/json"
"io"
"net/http"
"strings"
"testing"

"github.com/stellar/wallet-backend/internal/tss"
tssutils "github.com/stellar/wallet-backend/internal/tss/utils"
"github.com/stellar/wallet-backend/internal/utils"
)

func TestWebhookHandlerServiceChannel(t *testing.T) {
mockHTTPClient := utils.MockHTTPClient{}
cfg := WebhookChannelConfigs{
HTTPClient: &mockHTTPClient,
MaxBufferSize: 1,
MaxWorkers: 1,
MaxRetries: 3,
MinWaitBtwnRetriesMS: 5,
}
channel := NewWebhookChannel(cfg)

payload := tss.Payload{}
payload.WebhookURL = "www.stellar.org"
jsonData, _ := json.Marshal(tssutils.PayloadTOTSSResponse(payload))

httpResponse1 := &http.Response{
StatusCode: http.StatusBadGateway,
Body: io.NopCloser(strings.NewReader(`{"result": {"status": "OK"}}`)),
}

httpResponse2 := &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader(`{"result": {"status": "OK"}}`)),
}

mockHTTPClient.
On("Post", payload.WebhookURL, "application/json", bytes.NewBuffer(jsonData)).
Return(httpResponse1, nil).
Once()

mockHTTPClient.
On("Post", payload.WebhookURL, "application/json", bytes.NewBuffer(jsonData)).
Return(httpResponse2, nil).
Once()

channel.Send(payload)
channel.Stop()

mockHTTPClient.AssertNumberOfCalls(t, "Post", 2)
}
2 changes: 0 additions & 2 deletions internal/tss/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func TestRouter(t *testing.T) {
})
t.Run("status_error_routes_to_error_non_jitter_channel", func(t *testing.T) {
for _, code := range tss.NonJitterErrorCodes {

payload := tss.Payload{
RpcSubmitTxResponse: tss.RPCSendTxResponse{
Status: tss.RPCTXStatus{
Expand All @@ -98,7 +97,6 @@ func TestRouter(t *testing.T) {
}
})
t.Run("status_error_routes_to_webhook_channel", func(t *testing.T) {

for _, code := range tss.FinalErrorCodes {
payload := tss.Payload{
RpcSubmitTxResponse: tss.RPCSendTxResponse{
Expand Down
18 changes: 17 additions & 1 deletion internal/tss/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
type RPCGetIngestTxResponse struct {
// A status that indicated whether this transaction failed or successly made it to the ledger
Status entities.RPCStatus
// The error code that is derived by deserialzing the ResultXdr string in the sendTransaction response
// list of possible errror codes: https://developers.stellar.org/docs/data/horizon/api-reference/errors/result-codes/transactions
Code RPCTXCode
// The raw TransactionEnvelope XDR for this transaction
EnvelopeXDR string
// The raw TransactionResult XDR of the envelopeXdr
Expand All @@ -29,7 +32,7 @@ func ParseToRPCGetIngestTxResponse(result entities.RPCGetTransactionResult, err
}

getIngestTxResponse := RPCGetIngestTxResponse{
Status: entities.RPCStatus(result.Status),
Status: result.Status,
EnvelopeXDR: result.EnvelopeXDR,
ResultXDR: result.ResultXDR,
}
Expand All @@ -39,6 +42,10 @@ func ParseToRPCGetIngestTxResponse(result entities.RPCGetTransactionResult, err
return RPCGetIngestTxResponse{Status: entities.ErrorStatus}, fmt.Errorf("unable to parse createdAt: %w", err)
}
}
getIngestTxResponse.Code, err = parseSendTransactionErrorXDR(result.ResultXDR)
if err != nil {
return getIngestTxResponse, fmt.Errorf("parse error result xdr string: %w", err)
}
return getIngestTxResponse, nil
}

Expand Down Expand Up @@ -156,6 +163,15 @@ func parseSendTransactionErrorXDR(errorResultXDR string) (RPCTXCode, error) {
}, nil
}

type TSSResponse struct {
TransactionHash string `json:"tx_hash"`
TransactionResultCode string `json:"tx_result_code"`
Status string `json:"status"`
CreatedAt int64 `json:"created_at"`
EnvelopeXDR string `json:"envelopeXdr"`
ResultXDR string `json:"resultXdr"`
}

type Payload struct {
WebhookURL string
// The hash of the transaction xdr submitted by the client - the id of the transaction submitted by a client
Expand Down
11 changes: 11 additions & 0 deletions internal/tss/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,15 @@ func TestParseToRPCGetIngestTxResponse(t *testing.T) {
assert.Equal(t, int64(1234567), resp.CreatedAt)
assert.Empty(t, err)
})

t.Run("response_has_errorResultXdr", func(t *testing.T) {
resp, err := ParseToRPCGetIngestTxResponse(entities.RPCGetTransactionResult{
Status: entities.ErrorStatus,
CreatedAt: "1234567",
ResultXDR: "AAAAAAAAAMj////9AAAAAA==",
}, nil)

assert.Equal(t, xdr.TransactionResultCodeTxTooLate, resp.Code.TxResultCode)
assert.Empty(t, err)
})
}
17 changes: 17 additions & 0 deletions internal/tss/utils/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,25 @@ package utils
import (
"github.com/stellar/go/keypair"
"github.com/stellar/go/txnbuild"
"github.com/stellar/wallet-backend/internal/tss"
)

func PayloadTOTSSResponse(payload tss.Payload) tss.TSSResponse {
response := tss.TSSResponse{}
response.TransactionHash = payload.TransactionHash
if payload.RpcSubmitTxResponse.Status.Status() != "" {
response.Status = string(payload.RpcSubmitTxResponse.Status.Status())
response.TransactionResultCode = payload.RpcSubmitTxResponse.Code.TxResultCode.String()
response.EnvelopeXDR = payload.RpcSubmitTxResponse.TransactionXDR
} else if payload.RpcGetIngestTxResponse.Status != "" {
response.Status = string(payload.RpcGetIngestTxResponse.Status)
response.TransactionResultCode = payload.RpcGetIngestTxResponse.Code.TxResultCode.String()
response.EnvelopeXDR = payload.RpcGetIngestTxResponse.EnvelopeXDR
response.ResultXDR = payload.RpcGetIngestTxResponse.ResultXDR
}
return response
}

func BuildTestTransaction() *txnbuild.Transaction {
accountToSponsor := keypair.MustRandom()

Expand Down

0 comments on commit 6553fa9

Please sign in to comment.