diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index fb032b6..e3a7531 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -1,16 +1,11 @@ package services import ( - "bytes" "context" - "errors" - "fmt" - "os" "testing" "time" "github.com/stellar/go/keypair" - "github.com/stellar/go/support/log" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" @@ -224,77 +219,77 @@ func TestIngestPayments(t *testing.T) { }) } -//func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { -// dbt := dbtest.Open(t) -// defer dbt.Close() -// -// dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) -// require.NoError(t, err) -// defer dbConnectionPool.Close() -// -// models, _ := data.NewModels(dbConnectionPool) -// mockAppTracker := apptracker.MockAppTracker{} -// mockRPCService := RPCServiceMock{} -// mockRouter := tssrouter.MockRouter{} -// tssStore, _ := tssstore.NewStore(dbConnectionPool) -// ingestService, _ := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) -// srcAccount := keypair.MustRandom().Address() -// destAccount := keypair.MustRandom().Address() -// -// mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ -// Status: "healthy", -// LatestLedger: 100, -// OldestLedger: 50, -// }, nil) -// paymentOp := txnbuild.Payment{ -// SourceAccount: srcAccount, -// Destination: destAccount, -// Amount: "10", -// Asset: txnbuild.NativeAsset{}, -// } -// transaction, _ := txnbuild.NewTransaction(txnbuild.TransactionParams{ -// SourceAccount: &txnbuild.SimpleAccount{ -// AccountID: keypair.MustRandom().Address(), -// }, -// Operations: []txnbuild.Operation{&paymentOp}, -// Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(10)}, -// }) -// txEnvXDR, _ := transaction.Base64() -// mockResult := entities.RPCGetTransactionsResult{ -// Transactions: []entities.Transaction{{ -// Status: entities.SuccessStatus, -// Hash: "abcd", -// ApplicationOrder: 1, -// FeeBump: false, -// EnvelopeXDR: txEnvXDR, -// ResultXDR: "AAAAAAAAAMj////9AAAAAA==", -// Ledger: 50, -// }, { -// Status: entities.SuccessStatus, -// Hash: "abcd", -// ApplicationOrder: 1, -// FeeBump: false, -// EnvelopeXDR: txEnvXDR, -// ResultXDR: "AAAAAAAAAMj////9AAAAAA==", -// Ledger: 51, -// }}, -// LatestLedger: int64(100), -// LatestLedgerCloseTime: int64(1), -// OldestLedger: int64(50), -// OldestLedgerCloseTime: int64(1), -// } -// mockRPCService.On("GetTransactions", int64(50), "", 50).Return(mockResult, nil).Once() -// -// err = ingestService.Run(context.Background(), uint32(49), uint32(50)) -// require.NoError(t, err) -// -// mockRPCService.AssertNotCalled(t, "GetTransactions", int64(49), "", int64(50)) -// mockRPCService.AssertExpectations(t) -// -// ledger, err := models.Payments.GetLatestLedgerSynced(context.Background(), "ingestionLedger") -// require.NoError(t, err) -// assert.Equal(t, uint32(50), ledger) -//} +func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + models, _ := data.NewModels(dbConnectionPool) + mockAppTracker := apptracker.MockAppTracker{} + mockRPCService := RPCServiceMock{} + mockRouter := tssrouter.MockRouter{} + tssStore, _ := tssstore.NewStore(dbConnectionPool) + ingestService, _ := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) + srcAccount := keypair.MustRandom().Address() + destAccount := keypair.MustRandom().Address() + + mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + OldestLedger: 50, + }, nil) + paymentOp := txnbuild.Payment{ + SourceAccount: srcAccount, + Destination: destAccount, + Amount: "10", + Asset: txnbuild.NativeAsset{}, + } + transaction, _ := txnbuild.NewTransaction(txnbuild.TransactionParams{ + SourceAccount: &txnbuild.SimpleAccount{ + AccountID: keypair.MustRandom().Address(), + }, + Operations: []txnbuild.Operation{&paymentOp}, + Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(10)}, + }) + txEnvXDR, _ := transaction.Base64() + mockResult := entities.RPCGetTransactionsResult{ + Transactions: []entities.Transaction{{ + Status: entities.SuccessStatus, + Hash: "abcd", + ApplicationOrder: 1, + FeeBump: false, + EnvelopeXDR: txEnvXDR, + ResultXDR: "AAAAAAAAAMj////9AAAAAA==", + Ledger: 50, + }, { + Status: entities.SuccessStatus, + Hash: "abcd", + ApplicationOrder: 1, + FeeBump: false, + EnvelopeXDR: txEnvXDR, + ResultXDR: "AAAAAAAAAMj////9AAAAAA==", + Ledger: 51, + }}, + LatestLedger: int64(100), + LatestLedgerCloseTime: int64(1), + OldestLedger: int64(50), + OldestLedgerCloseTime: int64(1), + } + mockRPCService.On("GetTransactions", int64(50), "", 50).Return(mockResult, nil).Once() + + err = ingestService.Run(context.Background(), uint32(49), uint32(50)) + require.NoError(t, err) + + mockRPCService.AssertNotCalled(t, "GetTransactions", int64(49), "", int64(50)) + mockRPCService.AssertExpectations(t) + + ledger, err := models.Payments.GetLatestLedgerSynced(context.Background(), "ingestionLedger") + require.NoError(t, err) + assert.Equal(t, uint32(50), ledger) +} func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { mockRPCService := &RPCServiceMock{} @@ -323,28 +318,28 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { mockAppTracker.AssertNotCalled(t, "CaptureMessage") } -func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { - var logBuffer bytes.Buffer - log.SetOut(&logBuffer) - log.SetLevel(log.WarnLevel) - defer log.SetOut(os.Stderr) - - mockRPCService := &RPCServiceMock{} - mockAppTracker := &apptracker.MockAppTracker{} - heartbeat := make(chan entities.RPCGetHealthResult, 1) - - mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{}, errors.New("rpc error")) - mockAppTracker.On("CaptureMessage", "ingestion service stale for over 1m0s") - - go trackRPCServiceHealth(context.Background(), heartbeat, nil, mockRPCService) - - // Wait long enough for both warnings to trigger - time.Sleep(65 * time.Second) - - mockRPCService.AssertExpectations(t) - logOutput := logBuffer.String() - assert.Contains(t, logOutput, fmt.Sprintf("rpc service unhealthy for over %s", rpcHealthCheckMaxWaitTime)) -} +//func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { +// var logBuffer bytes.Buffer +// log.SetOut(&logBuffer) +// log.SetLevel(log.WarnLevel) +// defer log.SetOut(os.Stderr) +// +// mockRPCService := &RPCServiceMock{} +// mockAppTracker := &apptracker.MockAppTracker{} +// heartbeat := make(chan entities.RPCGetHealthResult, 1) +// +// mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{}, errors.New("rpc error")) +// mockAppTracker.On("CaptureMessage", "ingestion service stale for over 1m0s") +// +// go trackRPCServiceHealth(context.Background(), heartbeat, nil, mockRPCService) +// +// // Wait long enough for both warnings to trigger +// time.Sleep(65 * time.Second) +// +// mockRPCService.AssertExpectations(t) +// logOutput := logBuffer.String() +// assert.Contains(t, logOutput, fmt.Sprintf("rpc service unhealthy for over %s", rpcHealthCheckMaxWaitTime)) +//} func TestTrackRPCService_ContextCancelled(t *testing.T) { mockRPCService := &RPCServiceMock{}