diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 203f998..c8dc11b 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -220,83 +220,83 @@ func TestIngestPayments(t *testing.T) { }) } -//func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { -// dbt := dbtest.Open(t) -// defer dbt.Close() -// -// dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) -// require.NoError(t, err) -// defer dbConnectionPool.Close() -// -// models, _ := data.NewModels(dbConnectionPool) -// mockAppTracker := apptracker.MockAppTracker{} -// mockRPCService := RPCServiceMock{} -// mockRouter := tssrouter.MockRouter{} -// -// tssStore, err := tssstore.NewStore(dbConnectionPool) -// require.NoError(t, err) -// -// ingestService, err := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) -// require.NoError(t, err) -// -// srcAccount := keypair.MustRandom().Address() -// destAccount := keypair.MustRandom().Address() -// -// mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ -// Status: "healthy", -// LatestLedger: 100, -// OldestLedger: 50, -// }, nil) -// paymentOp := txnbuild.Payment{ -// SourceAccount: srcAccount, -// Destination: destAccount, -// Amount: "10", -// Asset: txnbuild.NativeAsset{}, -// } -// transaction, _ := txnbuild.NewTransaction(txnbuild.TransactionParams{ -// SourceAccount: &txnbuild.SimpleAccount{ -// AccountID: keypair.MustRandom().Address(), -// }, -// Operations: []txnbuild.Operation{&paymentOp}, -// Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(10)}, -// }) -// txEnvXDR, _ := transaction.Base64() -// mockResult := entities.RPCGetTransactionsResult{ -// Transactions: []entities.Transaction{{ -// Status: entities.SuccessStatus, -// Hash: "abcd", -// ApplicationOrder: 1, -// FeeBump: false, -// EnvelopeXDR: txEnvXDR, -// ResultXDR: "AAAAAAAAAMj////9AAAAAA==", -// Ledger: 50, -// }, { -// Status: entities.SuccessStatus, -// Hash: "abcd", -// ApplicationOrder: 1, -// FeeBump: false, -// EnvelopeXDR: txEnvXDR, -// ResultXDR: "AAAAAAAAAMj////9AAAAAA==", -// Ledger: 51, -// }}, -// LatestLedger: int64(100), -// LatestLedgerCloseTime: int64(1), -// OldestLedger: int64(50), -// OldestLedgerCloseTime: int64(1), -// } -// mockRPCService.On("GetTransactions", int64(50), "", 50).Return(mockResult, nil).Once() -// //mockAppTracker.On("CaptureMessage", "ingestion service stale for over 1m0s").Maybe() -// -// err = ingestService.Run(context.Background(), uint32(49), uint32(50)) -// require.NoError(t, err) -// -// mockRPCService.AssertNotCalled(t, "GetTransactions", int64(49), "", int64(50)) -// mockRPCService.AssertExpectations(t) -// -// ledger, err := models.Payments.GetLatestLedgerSynced(context.Background(), "ingestionLedger") -// require.NoError(t, err) -// assert.Equal(t, uint32(50), ledger) -//} +func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + models, _ := data.NewModels(dbConnectionPool) + mockAppTracker := apptracker.MockAppTracker{} + mockRPCService := RPCServiceMock{} + mockRouter := tssrouter.MockRouter{} + + tssStore, err := tssstore.NewStore(dbConnectionPool) + require.NoError(t, err) + + ingestService, err := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) + require.NoError(t, err) + + srcAccount := keypair.MustRandom().Address() + destAccount := keypair.MustRandom().Address() + + mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + OldestLedger: 50, + }, nil) + paymentOp := txnbuild.Payment{ + SourceAccount: srcAccount, + Destination: destAccount, + Amount: "10", + Asset: txnbuild.NativeAsset{}, + } + transaction, _ := txnbuild.NewTransaction(txnbuild.TransactionParams{ + SourceAccount: &txnbuild.SimpleAccount{ + AccountID: keypair.MustRandom().Address(), + }, + Operations: []txnbuild.Operation{&paymentOp}, + Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(10)}, + }) + txEnvXDR, _ := transaction.Base64() + mockResult := entities.RPCGetTransactionsResult{ + Transactions: []entities.Transaction{{ + Status: entities.SuccessStatus, + Hash: "abcd", + ApplicationOrder: 1, + FeeBump: false, + EnvelopeXDR: txEnvXDR, + ResultXDR: "AAAAAAAAAMj////9AAAAAA==", + Ledger: 50, + }, { + Status: entities.SuccessStatus, + Hash: "abcd", + ApplicationOrder: 1, + FeeBump: false, + EnvelopeXDR: txEnvXDR, + ResultXDR: "AAAAAAAAAMj////9AAAAAA==", + Ledger: 51, + }}, + LatestLedger: int64(100), + LatestLedgerCloseTime: int64(1), + OldestLedger: int64(50), + OldestLedgerCloseTime: int64(1), + } + mockRPCService.On("GetTransactions", int64(50), "", 50).Return(mockResult, nil).Once() + //mockAppTracker.On("CaptureMessage", "ingestion service stale for over 1m0s").Maybe() + + err = ingestService.Run(context.Background(), uint32(49), uint32(50)) + require.NoError(t, err) + + mockRPCService.AssertNotCalled(t, "GetTransactions", int64(49), "", int64(50)) + mockRPCService.AssertExpectations(t) + + ledger, err := models.Payments.GetLatestLedgerSynced(context.Background(), "ingestionLedger") + require.NoError(t, err) + assert.Equal(t, uint32(50), ledger) +} func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { mockRPCService := &RPCServiceMock{}