Skip to content

Commit

Permalink
services: rpc service (#45)
Browse files Browse the repository at this point in the history
What
Creates a new RPC service, extracting the logic from the TSS package into a more general one to be available and used across the application.

Why
The Ingest Service for the new Balances Ingestion feature will also require RPC communication, therefore the RPC functions are being extracted into its own service.
  • Loading branch information
daniel-burghardt authored Sep 26, 2024
1 parent a9a63a2 commit e9cdaa2
Show file tree
Hide file tree
Showing 11 changed files with 659 additions and 573 deletions.
48 changes: 48 additions & 0 deletions internal/entities/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package entities

import (
"encoding/json"
)

type RPCStatus string

const (
// sendTransaction statuses
PendingStatus RPCStatus = "PENDING"
DuplicateStatus RPCStatus = "DUPLICATE"
TryAgainLaterStatus RPCStatus = "TRY_AGAIN_LATER"
ErrorStatus RPCStatus = "ERROR"
// getTransaction statuses
NotFoundStatus RPCStatus = "NOT_FOUND"
FailedStatus RPCStatus = "FAILED"
SuccessStatus RPCStatus = "SUCCESS"
)

type RPCResponse struct {
Result json.RawMessage `json:"result"`
JSONRPC string `json:"jsonrpc"`
ID int64 `json:"id"`
}

type RPCGetTransactionResult struct {
Status RPCStatus `json:"status"`
LatestLedger int64 `json:"latestLedger"`
LatestLedgerCloseTime string `json:"latestLedgerCloseTime"`
OldestLedger int64 `json:"oldestLedger"`
OldestLedgerCloseTime string `json:"oldestLedgerCloseTime"`
ApplicationOrder int64 `json:"applicationOrder"`
EnvelopeXDR string `json:"envelopeXdr"`
ResultXDR string `json:"resultXdr"`
ResultMetaXDR string `json:"resultMetaXdr"`
Ledger int64 `json:"ledger"`
CreatedAt string `json:"createdAt"`
ErrorResultXDR string `json:"errorResultXdr"`
}

type RPCSendTransactionResult struct {
Status RPCStatus `json:"status"`
LatestLedger int64 `json:"latestLedger"`
LatestLedgerCloseTime string `json:"latestLedgerCloseTime"`
Hash string `json:"hash"`
ErrorResultXDR string `json:"errorResultXdr"`
}
28 changes: 18 additions & 10 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"net/http"
"os"
"path"
"time"

"github.com/sirupsen/logrus"
"github.com/stellar/go/ingest/ledgerbackend"
Expand All @@ -27,6 +29,7 @@ type Configs struct {
EndLedger int
LogLevel logrus.Level
AppTracker apptracker.AppTracker
RPCURL string
}

func Ingest(cfg Configs) error {
Expand All @@ -44,15 +47,15 @@ func Ingest(cfg Configs) error {
return nil
}

func setupDeps(cfg Configs) (*services.IngestManager, error) {
func setupDeps(cfg Configs) (services.IngestService, error) {
// Open DB connection pool
dbConnectionPool, err := db.OpenDBConnectionPool(cfg.DatabaseURL)
if err != nil {
return nil, fmt.Errorf("error connecting to the database: %w", err)
return nil, fmt.Errorf("connecting to the database: %w", err)
}
models, err := data.NewModels(dbConnectionPool)
if err != nil {
return nil, fmt.Errorf("error creating models for Serve: %w", err)
return nil, fmt.Errorf("creating models: %w", err)
}

// Setup Captive Core backend
Expand All @@ -65,13 +68,18 @@ func setupDeps(cfg Configs) (*services.IngestManager, error) {
return nil, fmt.Errorf("creating captive core backend: %w", err)
}

return &services.IngestManager{
NetworkPassphrase: cfg.NetworkPassphrase,
LedgerCursorName: cfg.LedgerCursorName,
LedgerBackend: ledgerBackend,
PaymentModel: models.Payments,
AppTracker: cfg.AppTracker,
}, nil
httpClient := &http.Client{Timeout: 30 * time.Second}
rpcService, err := services.NewRPCService(cfg.RPCURL, httpClient)
if err != nil {
return nil, fmt.Errorf("instantiating rpc service: %w", err)
}

ingestService, err := services.NewIngestService(models, ledgerBackend, cfg.NetworkPassphrase, cfg.LedgerCursorName, cfg.AppTracker, rpcService)
if err != nil {
return nil, fmt.Errorf("instantiating ingest service: %w", err)
}

return ingestService, nil
}

const (
Expand Down
83 changes: 64 additions & 19 deletions internal/services/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package services

import (
"context"
"errors"
"fmt"
"io"
"time"
Expand All @@ -16,25 +17,69 @@ import (
"github.com/stellar/wallet-backend/internal/utils"
)

type IngestManager struct {
PaymentModel *data.PaymentModel
LedgerBackend ledgerbackend.LedgerBackend
NetworkPassphrase string
LedgerCursorName string
AppTracker apptracker.AppTracker
type IngestService interface {
Run(ctx context.Context, start, end uint32) error
}

func (m *IngestManager) Run(ctx context.Context, start, end uint32) error {
var _ IngestService = (*ingestService)(nil)

type ingestService struct {
models *data.Models
ledgerBackend ledgerbackend.LedgerBackend
networkPassphrase string
ledgerCursorName string
appTracker apptracker.AppTracker
rpcService RPCService
}

func NewIngestService(
models *data.Models,
ledgerBackend ledgerbackend.LedgerBackend,
networkPassphrase string,
ledgerCursorName string,
appTracker apptracker.AppTracker,
rpcService RPCService,
) (*ingestService, error) {
if models == nil {
return nil, errors.New("models cannot be nil")
}
if ledgerBackend == nil {
return nil, errors.New("ledgerBackend cannot be nil")
}
if networkPassphrase == "" {
return nil, errors.New("networkPassphrase cannot be nil")
}
if ledgerCursorName == "" {
return nil, errors.New("ledgerCursorName cannot be nil")
}
if appTracker == nil {
return nil, errors.New("appTracker cannot be nil")
}
if rpcService == nil {
return nil, errors.New("rpcService cannot be nil")
}

return &ingestService{
models: models,
ledgerBackend: ledgerBackend,
networkPassphrase: networkPassphrase,
ledgerCursorName: ledgerCursorName,
appTracker: appTracker,
rpcService: rpcService,
}, nil
}

func (m *ingestService) Run(ctx context.Context, start, end uint32) error {
var ingestLedger uint32

if start == 0 {
lastSyncedLedger, err := m.PaymentModel.GetLatestLedgerSynced(ctx, m.LedgerCursorName)
lastSyncedLedger, err := m.models.Payments.GetLatestLedgerSynced(ctx, m.ledgerCursorName)
if err != nil {
return fmt.Errorf("getting last ledger synced: %w", err)
}

if lastSyncedLedger == 0 {
// Captive Core is not able to process genesis ledger and often has trouble processing ledger 2, so we start ingestion at ledger 3
// Captive Core is not able to process genesis ledger (1) and often has trouble processing ledger 2, so we start ingestion at ledger 3
log.Ctx(ctx).Info("No last synced ledger cursor found, initializing ingestion at ledger 3")
ingestLedger = 3
} else {
Expand All @@ -54,12 +99,12 @@ func (m *IngestManager) Run(ctx context.Context, start, end uint32) error {
}

heartbeat := make(chan any)
go trackServiceHealth(heartbeat, m.AppTracker)
go trackServiceHealth(heartbeat, m.appTracker)

for ; end == 0 || ingestLedger <= end; ingestLedger++ {
log.Ctx(ctx).Infof("waiting for ledger %d", ingestLedger)

ledgerMeta, err := m.LedgerBackend.GetLedger(ctx, ingestLedger)
ledgerMeta, err := m.ledgerBackend.GetLedger(ctx, ingestLedger)
if err != nil {
return fmt.Errorf("getting ledger meta for ledger %d: %w", ingestLedger, err)
}
Expand All @@ -77,21 +122,21 @@ func (m *IngestManager) Run(ctx context.Context, start, end uint32) error {
return nil
}

func (m *IngestManager) maybePrepareRange(ctx context.Context, from, to uint32) error {
func (m *ingestService) maybePrepareRange(ctx context.Context, from, to uint32) error {
var ledgerRange ledgerbackend.Range
if to == 0 {
ledgerRange = ledgerbackend.UnboundedRange(from)
} else {
ledgerRange = ledgerbackend.BoundedRange(from, to)
}

prepared, err := m.LedgerBackend.IsPrepared(ctx, ledgerRange)
prepared, err := m.ledgerBackend.IsPrepared(ctx, ledgerRange)
if err != nil {
return fmt.Errorf("checking prepared range: %w", err)
}

if !prepared {
err = m.LedgerBackend.PrepareRange(ctx, ledgerRange)
err = m.ledgerBackend.PrepareRange(ctx, ledgerRange)
if err != nil {
return fmt.Errorf("preparing range: %w", err)
}
Expand Down Expand Up @@ -121,16 +166,16 @@ func trackServiceHealth(heartbeat chan any, tracker apptracker.AppTracker) {
}
}

func (m *IngestManager) processLedger(ctx context.Context, ledger uint32, ledgerMeta xdr.LedgerCloseMeta) (err error) {
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(m.NetworkPassphrase, ledgerMeta)
func (m *ingestService) processLedger(ctx context.Context, ledger uint32, ledgerMeta xdr.LedgerCloseMeta) (err error) {
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(m.networkPassphrase, ledgerMeta)
if err != nil {
return fmt.Errorf("creating ledger reader: %w", err)
}

ledgerCloseTime := time.Unix(int64(ledgerMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime), 0).UTC()
ledgerSequence := ledgerMeta.LedgerSequence()

return db.RunInTransaction(ctx, m.PaymentModel.DB, nil, func(dbTx db.Transaction) error {
return db.RunInTransaction(ctx, m.models.Payments.DB, nil, func(dbTx db.Transaction) error {
for {
tx, err := reader.Read()
if err == io.EOF {
Expand Down Expand Up @@ -176,14 +221,14 @@ func (m *IngestManager) processLedger(ctx context.Context, ledger uint32, ledger
continue
}

err = m.PaymentModel.AddPayment(ctx, dbTx, payment)
err = m.models.Payments.AddPayment(ctx, dbTx, payment)
if err != nil {
return fmt.Errorf("adding payment for ledger %d, tx %s (%d), operation %s (%d): %w", ledgerSequence, txHash, tx.Index, payment.OperationID, opIdx, err)
}
}
}

err = m.PaymentModel.UpdateLatestLedgerSynced(ctx, m.LedgerCursorName, ledger)
err = m.models.Payments.UpdateLatestLedgerSynced(ctx, m.ledgerCursorName, ledger)
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions internal/services/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ func TestProcessLedger(t *testing.T) {
require.NoError(t, err)
defer dbConnectionPool.Close()

m := &IngestManager{
PaymentModel: &data.PaymentModel{
DB: dbConnectionPool,
},
NetworkPassphrase: network.TestNetworkPassphrase,
LedgerCursorName: "last_synced_ledger",
LedgerBackend: nil,
models, _ := data.NewModels(dbConnectionPool)
service := &ingestService{
models: models,
networkPassphrase: network.TestNetworkPassphrase,
ledgerCursorName: "last_synced_ledger",
ledgerBackend: nil,
rpcService: nil,
}

ctx := context.Background()
Expand Down Expand Up @@ -111,12 +111,12 @@ func TestProcessLedger(t *testing.T) {

// Compute transaction hash and inject into ledger meta
components := ledgerMeta.V1.TxSet.V1TxSet.Phases[0].V0Components
xdrHash, err := network.HashTransactionInEnvelope((*components)[0].TxsMaybeDiscountedFee.Txs[0], m.NetworkPassphrase)
xdrHash, err := network.HashTransactionInEnvelope((*components)[0].TxsMaybeDiscountedFee.Txs[0], service.networkPassphrase)
require.NoError(t, err)
ledgerMeta.V1.TxProcessing[0].Result.TransactionHash = xdrHash

// Run ledger ingestion
err = m.processLedger(ctx, 1, ledgerMeta)
err = service.processLedger(ctx, 1, ledgerMeta)
require.NoError(t, err)

// Assert payment properly persisted to database
Expand Down
Loading

0 comments on commit e9cdaa2

Please sign in to comment.