Skip to content

Commit

Permalink
Add unittests for ingestion - 1
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya1702 committed Nov 15, 2024
1 parent 85d4ea5 commit 9119641
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 14 deletions.
24 changes: 14 additions & 10 deletions internal/services/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
)

const (
rpcHealthCheckMaxWaitTime = 60 * time.Second
rpcHealthCheckSleepTime = 5 * time.Second
ingestHealthCheckWaitTime = 60 * time.Second
rpcHealthCheckSleepTime = 5 * time.Second
rpcHealthCheckMaxWaitTime = 60 * time.Second
ingestHealthCheckMaxWaitTime = 60 * time.Second
)

type IngestService interface {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u
ingestHeartbeat := make(chan any, 1)

// Start service health trackers
go m.trackRPCHealth(ctx, rpcHeartbeat, m.appTracker)
go trackRPCServiceHealth(ctx, rpcHeartbeat, m.appTracker, m.rpcService)
go trackIngestServiceHealth(ctx, ingestHeartbeat, m.appTracker)

if startLedger == 0 {
Expand All @@ -102,8 +102,12 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u
return fmt.Errorf("context cancelled: %w", ctx.Err())
case resp := <-rpcHeartbeat:
switch {
// Case-1: wallet-backend is running behind rpc's oldest ledger. In this case, we start
// ingestion from rpc's oldest ledger.
case ingestLedger < resp.OldestLedger:
ingestLedger = resp.OldestLedger
// Case-2: rpc is running behind wallet-backend's latest synced ledger. We wait for rpc to
// catch back up to wallet-backend.
case ingestLedger > resp.LatestLedger:
log.Debugf("waiting for RPC to catchup to ledger %d (latest: %d)",
ingestLedger, resp.LatestLedger)
Expand Down Expand Up @@ -275,7 +279,7 @@ func (m *ingestService) processTSSTransactions(ctx context.Context, ledgerTransa
return nil
}

func (m *ingestService) trackRPCHealth(ctx context.Context, heartbeat chan entities.RPCGetHealthResult, tracker apptracker.AppTracker) {
func trackRPCServiceHealth(ctx context.Context, heartbeat chan entities.RPCGetHealthResult, tracker apptracker.AppTracker, rpcService RPCService) {
healthCheckTicker := time.NewTicker(rpcHealthCheckSleepTime)
warningTicker := time.NewTicker(rpcHealthCheckMaxWaitTime)
defer func() {
Expand All @@ -298,7 +302,7 @@ func (m *ingestService) trackRPCHealth(ctx context.Context, heartbeat chan entit
}
warningTicker.Reset(rpcHealthCheckMaxWaitTime)
case <-healthCheckTicker.C:
result, err := m.rpcService.GetHealth()
result, err := rpcService.GetHealth()
if err != nil {
log.Warnf("rpc health check failed: %v", err)
continue
Expand All @@ -310,7 +314,7 @@ func (m *ingestService) trackRPCHealth(ctx context.Context, heartbeat chan entit
}

func trackIngestServiceHealth(ctx context.Context, heartbeat chan any, tracker apptracker.AppTracker) {
ticker := time.NewTicker(ingestHealthCheckWaitTime)
ticker := time.NewTicker(ingestHealthCheckMaxWaitTime)
defer func() {
ticker.Stop()
close(heartbeat)
Expand All @@ -321,16 +325,16 @@ func trackIngestServiceHealth(ctx context.Context, heartbeat chan any, tracker a
case <-ctx.Done():
return
case <-ticker.C:
warn := fmt.Sprintf("ingestion service stale for over %s", ingestHealthCheckWaitTime)
warn := fmt.Sprintf("ingestion service stale for over %s", ingestHealthCheckMaxWaitTime)
log.Warn(warn)
if tracker != nil {
tracker.CaptureMessage(warn)
} else {
log.Warn("App Tracker is nil")
}
ticker.Reset(ingestHealthCheckWaitTime)
ticker.Reset(ingestHealthCheckMaxWaitTime)
case <-heartbeat:
ticker.Reset(ingestHealthCheckWaitTime)
ticker.Reset(ingestHealthCheckMaxWaitTime)
}
}
}
Expand Down
142 changes: 138 additions & 4 deletions internal/services/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package services

import (
"context"
"errors"
"testing"
"time"

"github.com/stellar/go/keypair"
"github.com/stellar/go/txnbuild"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/stellar/wallet-backend/internal/apptracker"
"github.com/stellar/wallet-backend/internal/data"
"github.com/stellar/wallet-backend/internal/db"
Expand All @@ -15,9 +21,6 @@ import (
"github.com/stellar/wallet-backend/internal/tss"
tssrouter "github.com/stellar/wallet-backend/internal/tss/router"
tssstore "github.com/stellar/wallet-backend/internal/tss/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestGetLedgerTransactions(t *testing.T) {
Expand Down Expand Up @@ -138,7 +141,7 @@ func TestProcessTSSTransactions(t *testing.T) {
ResultXDR: "AAAAAAAAAMj////9AAAAAA==",
ResultMetaXDR: "meta",
Ledger: 123456,
DiagnosticEventsXDR: "diag",
DiagnosticEventsXDR: []string{"diag"},
CreatedAt: 1695939098,
},
}
Expand Down Expand Up @@ -169,6 +172,7 @@ func TestIngestPayments(t *testing.T) {
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()

models, _ := data.NewModels(dbConnectionPool)
mockAppTracker := apptracker.MockAppTracker{}
mockRPCService := RPCServiceMock{}
Expand Down Expand Up @@ -215,3 +219,133 @@ func TestIngestPayments(t *testing.T) {
assert.Equal(t, payments[0].TransactionHash, "abcd")
})
}

func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) {
dbt := dbtest.Open(t)
defer dbt.Close()

dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()

models, _ := data.NewModels(dbConnectionPool)
mockAppTracker := apptracker.MockAppTracker{}
mockRPCService := RPCServiceMock{}
mockRouter := tssrouter.MockRouter{}
tssStore, _ := tssstore.NewStore(dbConnectionPool)
ingestService, _ := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore)
srcAccount := keypair.MustRandom().Address()
destAccount := keypair.MustRandom().Address()

mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{
Status: "healthy",
LatestLedger: 100,
OldestLedger: 50,
}, nil)
paymentOp := txnbuild.Payment{
SourceAccount: srcAccount,
Destination: destAccount,
Amount: "10",
Asset: txnbuild.NativeAsset{},
}
transaction, _ := txnbuild.NewTransaction(txnbuild.TransactionParams{
SourceAccount: &txnbuild.SimpleAccount{
AccountID: keypair.MustRandom().Address(),
},
Operations: []txnbuild.Operation{&paymentOp},
Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(10)},
})
txEnvXDR, _ := transaction.Base64()
mockResult := entities.RPCGetTransactionsResult{
Transactions: []entities.Transaction{{
Status: entities.SuccessStatus,
Hash: "abcd",
ApplicationOrder: 1,
FeeBump: false,
EnvelopeXDR: txEnvXDR,
ResultXDR: "AAAAAAAAAMj////9AAAAAA==",
Ledger: 50,
}, {
Status: entities.SuccessStatus,
Hash: "abcd",
ApplicationOrder: 1,
FeeBump: false,
EnvelopeXDR: txEnvXDR,
ResultXDR: "AAAAAAAAAMj////9AAAAAA==",
Ledger: 51,
}},
LatestLedger: int64(100),
LatestLedgerCloseTime: int64(1),
OldestLedger: int64(50),
OldestLedgerCloseTime: int64(1),
}
mockRPCService.On("GetTransactions", int64(50), "", 50).Return(mockResult, nil).Once()

err = ingestService.Run(context.Background(), uint32(49), uint32(50))
require.NoError(t, err)

mockRPCService.AssertNotCalled(t, "GetTransactions", int64(49), "", int64(50))
mockRPCService.AssertExpectations(t)

ledger, err := models.Payments.GetLatestLedgerSynced(context.Background(), "ingestionLedger")
require.NoError(t, err)
assert.Equal(t, uint32(50), ledger)
}

func TestTrackRPCServiceHealth(t *testing.T) {
mockAppTracker := &apptracker.MockAppTracker{}

t.Run("healthy_rpc_service", func(t *testing.T) {
mockRPCService := &RPCServiceMock{}
heartbeat := make(chan entities.RPCGetHealthResult, 1)
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

healthResult := entities.RPCGetHealthResult{
Status: "healthy",
LatestLedger: 100,
OldestLedger: 1,
LedgerRetentionWindow: 0,
}
mockRPCService.On("GetHealth").Return(healthResult, nil)

go trackRPCServiceHealth(ctx, heartbeat, mockAppTracker, mockRPCService)

select {
case result := <-heartbeat:
assert.Equal(t, healthResult, result)
case <-ctx.Done():
t.Fatal("timeout waiting for heartbeat")
}

mockRPCService.AssertExpectations(t)
mockAppTracker.AssertNotCalled(t, "CaptureMessage")
})

t.Run("unhealthy_rpc_service", func(t *testing.T) {
mockRPCService := &RPCServiceMock{}
heartbeat := make(chan entities.RPCGetHealthResult, 1)
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

expectedWarning := "rpc service unhealthy for over 1m0s"
mockAppTracker.On("CaptureMessage", expectedWarning)
mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{}, errors.New("rpc error"))

go trackRPCServiceHealth(ctx, heartbeat, mockAppTracker, mockRPCService)
time.Sleep(rpcHealthCheckMaxWaitTime + 100*time.Millisecond)

mockAppTracker.AssertCalled(t, "CaptureMessage", expectedWarning)
})

t.Run("context_cancelled", func(t *testing.T) {
mockRPCService := &RPCServiceMock{}
heartbeat := make(chan entities.RPCGetHealthResult, 1)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

go trackRPCServiceHealth(ctx, heartbeat, mockAppTracker, mockRPCService)
mockRPCService.AssertNotCalled(t, "GetHealth")
mockAppTracker.AssertNotCalled(t, "CaptureMessage")
})
}

0 comments on commit 9119641

Please sign in to comment.