From ecd28b61d224bae9b43e2d4ac92c968b9707c53e Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 29 Jul 2024 10:04:59 +0100 Subject: [PATCH] services/horizon/internal/db2/history: Optimize query for reaping lookup tables (#5393) --- .../horizon/internal/db2/history/key_value.go | 45 ++++ services/horizon/internal/db2/history/main.go | 225 +++++++++--------- .../horizon/internal/db2/history/main_test.go | 46 +--- .../horizon/internal/db2/history/reap_test.go | 120 +++++++--- services/horizon/internal/ingest/main.go | 12 +- services/horizon/internal/ingest/main_test.go | 4 +- 6 files changed, 259 insertions(+), 193 deletions(-) diff --git a/services/horizon/internal/db2/history/key_value.go b/services/horizon/internal/db2/history/key_value.go index a2a170a4b1..3d23451937 100644 --- a/services/horizon/internal/db2/history/key_value.go +++ b/services/horizon/internal/db2/history/key_value.go @@ -3,9 +3,12 @@ package history import ( "context" "database/sql" + "fmt" "strconv" + "strings" sq "github.com/Masterminds/squirrel" + "github.com/stellar/go/support/errors" ) @@ -18,6 +21,7 @@ const ( stateInvalid = "exp_state_invalid" offerCompactionSequence = "offer_compaction_sequence" liquidityPoolCompactionSequence = "liquidity_pool_compaction_sequence" + lookupTableReapOffsetSuffix = "_reap_offset" ) // GetLastLedgerIngestNonBlocking works like GetLastLedgerIngest but @@ -203,6 +207,47 @@ func (q *Q) getValueFromStore(ctx context.Context, key string, forUpdate bool) ( return value, nil } +type KeyValuePair struct { + Key string `db:"key"` + Value string `db:"value"` +} + +func (q *Q) getLookupTableReapOffsets(ctx context.Context) (map[string]int64, error) { + keys := make([]string, 0, len(historyLookupTables)) + for table := range historyLookupTables { + keys = append(keys, table+lookupTableReapOffsetSuffix) + } + offsets := map[string]int64{} + var pairs []KeyValuePair + query := sq.Select("key", "value"). + From("key_value_store"). + Where(map[string]interface{}{ + "key": keys, + }) + err := q.Select(ctx, &pairs, query) + if err != nil { + return nil, err + } + for _, pair := range pairs { + table := strings.TrimSuffix(pair.Key, lookupTableReapOffsetSuffix) + if _, ok := historyLookupTables[table]; !ok { + return nil, fmt.Errorf("invalid key: %s", pair.Key) + } + + var offset int64 + offset, err = strconv.ParseInt(pair.Value, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid offset: %s", pair.Value) + } + offsets[table] = offset + } + return offsets, err +} + +func (q *Q) updateLookupTableReapOffset(ctx context.Context, table string, offset int64) error { + return q.updateValueInStore(ctx, table+lookupTableReapOffsetSuffix, strconv.FormatInt(offset, 10)) +} + // updateValueInStore updates a value for a given key in KV store func (q *Q) updateValueInStore(ctx context.Context, key, value string) error { query := sq.Insert("key_value_store"). diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 47e8952b07..e9d8ffb185 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -282,7 +282,7 @@ type IngestionQ interface { NewTradeBatchInsertBuilder() TradeBatchInsertBuilder RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error - ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]LookupTableReapResult, error) + ReapLookupTables(ctx context.Context, batchSize int) (map[string]LookupTableReapResult, error) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error) QTransactions QTrustLines @@ -981,7 +981,7 @@ type LookupTableReapResult struct { // which aren't used (orphaned), i.e. history entries for them were reaped. // This method must be executed inside ingestion transaction. Otherwise it may // create invalid state in lookup and history tables. -func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( +func (q *Q) ReapLookupTables(ctx context.Context, batchSize int) ( map[string]LookupTableReapResult, error, ) { @@ -989,80 +989,19 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( return nil, errors.New("cannot be called outside of an ingestion transaction") } - const batchSize = 1000 + offsets, err := q.getLookupTableReapOffsets(ctx) + if err != nil { + return nil, fmt.Errorf("could not obtain offsets: %w", err) + } results := map[string]LookupTableReapResult{} - for table, historyTables := range map[string][]tableObjectFieldPair{ - "history_accounts": { - { - name: "history_effects", - objectField: "history_account_id", - }, - { - name: "history_operation_participants", - objectField: "history_account_id", - }, - { - name: "history_trades", - objectField: "base_account_id", - }, - { - name: "history_trades", - objectField: "counter_account_id", - }, - { - name: "history_transaction_participants", - objectField: "history_account_id", - }, - }, - "history_assets": { - { - name: "history_trades", - objectField: "base_asset_id", - }, - { - name: "history_trades", - objectField: "counter_asset_id", - }, - { - name: "history_trades_60000", - objectField: "base_asset_id", - }, - { - name: "history_trades_60000", - objectField: "counter_asset_id", - }, - }, - "history_claimable_balances": { - { - name: "history_operation_claimable_balances", - objectField: "history_claimable_balance_id", - }, - { - name: "history_transaction_claimable_balances", - objectField: "history_claimable_balance_id", - }, - }, - "history_liquidity_pools": { - { - name: "history_operation_liquidity_pools", - objectField: "history_liquidity_pool_id", - }, - { - name: "history_transaction_liquidity_pools", - objectField: "history_liquidity_pool_id", - }, - }, - } { + for table, historyTables := range historyLookupTables { startTime := time.Now() - query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) - if err != nil { - return nil, errors.Wrap(err, "error constructing a query") - } + query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) // Find new offset before removing the rows var newOffset int64 - err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize)) + err := q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize)) if err != nil { if q.NoRows(err) { newOffset = 0 @@ -1079,6 +1018,10 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( return nil, errors.Wrapf(err, "error running query: %s", query) } + if err = q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil { + return nil, fmt.Errorf("error updating offset: %w", err) + } + rows, err := res.RowsAffected() if err != nil { return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query) @@ -1093,22 +1036,86 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( return results, nil } +var historyLookupTables = map[string][]tableObjectFieldPair{ + "history_accounts": { + { + name: "history_transaction_participants", + objectField: "history_account_id", + }, + + { + name: "history_effects", + objectField: "history_account_id", + }, + { + name: "history_operation_participants", + objectField: "history_account_id", + }, + { + name: "history_trades", + objectField: "base_account_id", + }, + { + name: "history_trades", + objectField: "counter_account_id", + }, + }, + "history_assets": { + { + name: "history_trades", + objectField: "base_asset_id", + }, + { + name: "history_trades", + objectField: "counter_asset_id", + }, + { + name: "history_trades_60000", + objectField: "base_asset_id", + }, + { + name: "history_trades_60000", + objectField: "counter_asset_id", + }, + }, + "history_claimable_balances": { + { + name: "history_transaction_claimable_balances", + objectField: "history_claimable_balance_id", + }, + { + name: "history_operation_claimable_balances", + objectField: "history_claimable_balance_id", + }, + }, + "history_liquidity_pools": { + { + name: "history_transaction_liquidity_pools", + objectField: "history_liquidity_pool_id", + }, + { + name: "history_operation_liquidity_pools", + objectField: "history_liquidity_pool_id", + }, + }, +} + // constructReapLookupTablesQuery creates a query like (using history_claimable_balances // as an example): // -// delete from history_claimable_balances where id in +// delete from history_claimable_balances where id in ( // -// (select id from -// (select id, -// (select 1 from history_operation_claimable_balances -// where history_claimable_balance_id = hcb.id limit 1) as c1, -// (select 1 from history_transaction_claimable_balances -// where history_claimable_balance_id = hcb.id limit 1) as c2, -// 1 as cx, -// from history_claimable_balances hcb where id > 1000 order by id limit 100) -// as sub where c1 IS NULL and c2 IS NULL and 1=1); +// WITH ha_batch AS ( +// SELECT id +// FROM history_claimable_balances +// WHERE id >= 1000 +// ORDER BY id limit 1000 +// ) SELECT e1.id as id FROM ha_batch e1 +// WHERE NOT EXISTS (SELECT 1 FROM history_transaction_claimable_balances WHERE history_transaction_claimable_balances.history_claimable_balance_id = id limit 1) +// AND NOT EXISTS (SELECT 1 FROM history_operation_claimable_balances WHERE history_operation_claimable_balances.history_claimable_balance_id = id limit 1) +// ) // -// In short it checks the 100 rows omitting 1000 row of history_claimable_balances +// In short it checks the 1000 rows omitting 1000 row of history_claimable_balances // and counts occurrences of each row in corresponding history tables. // If there are no history rows for a given id, the row in // history_claimable_balances is removed. @@ -1118,45 +1125,29 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( // possible that rows will be skipped from deletion. But offset is reset // when it reaches the table size so eventually all orphaned rows are // deleted. -func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) (string, error) { - var sb strings.Builder - var err error - _, err = fmt.Fprintf(&sb, "delete from %s where id IN (select id from (select id, ", table) - if err != nil { - return "", err - } - - for i, historyTable := range historyTables { - _, err = fmt.Fprintf( - &sb, - `(select 1 from %s where %s = hcb.id limit 1) as c%d, `, - historyTable.name, - historyTable.objectField, - i, +func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize int, offset int64) string { + var conditions []string + + for _, historyTable := range historyTables { + conditions = append( + conditions, + fmt.Sprintf( + "NOT EXISTS ( SELECT 1 as row FROM %s WHERE %s.%s = id LIMIT 1)", + historyTable.name, + historyTable.name, historyTable.objectField, + ), ) - if err != nil { - return "", err - } } - _, err = fmt.Fprintf(&sb, "1 as cx from %s hcb where id >= %d order by id limit %d) as sub where ", table, offset, batchSize) - if err != nil { - return "", err - } - - for i := range historyTables { - _, err = fmt.Fprintf(&sb, "c%d IS NULL and ", i) - if err != nil { - return "", err - } - } - - _, err = sb.WriteString("1=1);") - if err != nil { - return "", err - } - - return sb.String(), nil + return fmt.Sprintf( + "DELETE FROM %s WHERE id IN ("+ + "WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id limit %d) "+ + "SELECT e1.id as id FROM ha_batch e1 WHERE ", + table, + table, + offset, + batchSize, + ) + strings.Join(conditions, " AND ") + ")" } // DeleteRangeAll deletes a range of rows from all history tables between diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index 792f9826aa..1a28b9e584 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -4,9 +4,9 @@ import ( "testing" "time" - "github.com/stellar/go/services/horizon/internal/test" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + + "github.com/stellar/go/services/horizon/internal/test" ) func TestLatestLedger(t *testing.T) { @@ -70,43 +70,19 @@ func TestElderLedger(t *testing.T) { } func TestConstructReapLookupTablesQuery(t *testing.T) { - query, err := constructReapLookupTablesQuery( + query := constructReapLookupTablesQuery( "history_accounts", - []tableObjectFieldPair{ - { - name: "history_effects", - objectField: "history_account_id", - }, - { - name: "history_operation_participants", - objectField: "history_account_id", - }, - { - name: "history_trades", - objectField: "base_account_id", - }, - { - name: "history_trades", - objectField: "counter_account_id", - }, - { - name: "history_transaction_participants", - objectField: "history_account_id", - }, - }, + historyLookupTables["history_accounts"], 10, 0, ) - require.NoError(t, err) assert.Equal(t, - "delete from history_accounts where id IN "+ - "(select id from "+ - "(select id, (select 1 from history_effects where history_account_id = hcb.id limit 1) as c0, "+ - "(select 1 from history_operation_participants where history_account_id = hcb.id limit 1) as c1, "+ - "(select 1 from history_trades where base_account_id = hcb.id limit 1) as c2, "+ - "(select 1 from history_trades where counter_account_id = hcb.id limit 1) as c3, "+ - "(select 1 from history_transaction_participants where history_account_id = hcb.id limit 1) as c4, "+ - "1 as cx from history_accounts hcb where id >= 0 order by id limit 10) as sub "+ - "where c0 IS NULL and c1 IS NULL and c2 IS NULL and c3 IS NULL and c4 IS NULL and 1=1);", query) + "DELETE FROM history_accounts WHERE id IN ("+ + "WITH ha_batch AS (SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id limit 10) SELECT e1.id as id FROM ha_batch e1 "+ + "WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1))", query) } diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index 0f033c3629..5601cd19b6 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -30,21 +30,18 @@ func TestReapLookupTables(t *testing.T) { prevLiquidityPools, curLiquidityPools int ) - // Prev - { - err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevAssets, `SELECT COUNT(*) FROM history_assets`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) - tt.Require.NoError(err) - } - - err := reaper.DeleteUnretainedHistory(tt.Ctx) + err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) + tt.Require.NoError(err) + + err = reaper.DeleteUnretainedHistory(tt.Ctx) tt.Require.NoError(err) q := &history.Q{tt.HorizonSession()} @@ -52,36 +49,33 @@ func TestReapLookupTables(t *testing.T) { err = q.Begin(tt.Ctx) tt.Require.NoError(err) - results, err := q.ReapLookupTables(tt.Ctx, nil) + results, err := q.ReapLookupTables(tt.Ctx, 5) tt.Require.NoError(err) err = q.Commit() tt.Require.NoError(err) - // cur - { - err := db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) - tt.Require.NoError(err) - } + err = db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) + tt.Require.NoError(err) tt.Assert.Equal(61, prevLedgers, "prevLedgers") tt.Assert.Equal(1, curLedgers, "curLedgers") tt.Assert.Equal(25, prevAccounts, "prevAccounts") - tt.Assert.Equal(1, curAccounts, "curAccounts") - tt.Assert.Equal(int64(24), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + tt.Assert.Equal(21, curAccounts, "curAccounts") + tt.Assert.Equal(int64(4), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) tt.Assert.Equal(7, prevAssets, "prevAssets") - tt.Assert.Equal(0, curAssets, "curAssets") - tt.Assert.Equal(int64(7), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + tt.Assert.Equal(2, curAssets, "curAssets") + tt.Assert.Equal(int64(5), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") @@ -91,6 +85,66 @@ func TestReapLookupTables(t *testing.T) { tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") tt.Assert.Equal(int64(1), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + tt.Assert.Len(results, 4) + tt.Assert.Equal(int64(6), results["history_accounts"].Offset) + tt.Assert.Equal(int64(6), results["history_assets"].Offset) + tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) + + err = q.Begin(tt.Ctx) + tt.Require.NoError(err) + + results, err = q.ReapLookupTables(tt.Ctx, 5) + tt.Require.NoError(err) + + err = q.Commit() + tt.Require.NoError(err) + + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + + tt.Assert.Equal(16, curAccounts, "curAccounts") + tt.Assert.Equal(int64(5), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + + tt.Assert.Equal(0, curAssets, "curAssets") + tt.Assert.Equal(int64(2), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + + tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) + + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + + tt.Assert.Len(results, 4) + tt.Assert.Equal(int64(11), results["history_accounts"].Offset) + tt.Assert.Equal(int64(0), results["history_assets"].Offset) + tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) + + err = q.Begin(tt.Ctx) + tt.Require.NoError(err) + + results, err = q.ReapLookupTables(tt.Ctx, 1000) + tt.Require.NoError(err) + + err = q.Commit() + tt.Require.NoError(err) + + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + + tt.Assert.Equal(1, curAccounts, "curAccounts") + tt.Assert.Equal(int64(15), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + + tt.Assert.Equal(0, curAssets, "curAssets") + tt.Assert.Equal(int64(0), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + + tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) + + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + tt.Assert.Len(results, 4) tt.Assert.Equal(int64(0), results["history_accounts"].Offset) tt.Assert.Equal(int64(0), results["history_assets"].Offset) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 1a54e6843c..64e4558723 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -73,12 +73,13 @@ const ( // * Reaping (requires 2 connections, the extra connection is used for holding the advisory lock) MaxDBConnections = 5 - defaultCoreCursorName = "HORIZON" stateVerificationErrorThreshold = 3 // 100 ledgers per flush has shown in stress tests // to be best point on performance curve, default to that. MaxLedgersPerFlush uint32 = 100 + + reapLookupTablesBatchSize = 1000 ) var log = logpkg.DefaultLogger.WithField("service", "ingest") @@ -253,7 +254,6 @@ type system struct { runStateVerificationOnLedger func(uint32) bool - reapOffsetByTable map[string]int64 maxLedgerPerFlush uint32 reaper *Reaper @@ -369,7 +369,6 @@ func NewSystem(config Config) (System, error) { config.ReapConfig, config.HistorySession, ), - reapOffsetByTable: map[string]int64{}, } system.initMetrics() @@ -843,7 +842,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { defer cancel() reapStart := time.Now() - results, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsetByTable) + results, err := s.historyQ.ReapLookupTables(ctx, reapLookupTablesBatchSize) if err != nil { log.WithError(err).Warn("Error reaping lookup tables") return @@ -859,8 +858,9 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { reapLog := log for table, result := range results { totalDeleted += result.RowsDeleted - reapLog = reapLog.WithField(table, result) - s.reapOffsetByTable[table] = result.Offset + reapLog = reapLog.WithField(table+"_offset", result.Offset) + reapLog = reapLog.WithField(table+"_duration", result.Duration) + reapLog = reapLog.WithField(table+"_rows_deleted", result.RowsDeleted) s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).Observe(float64(result.RowsDeleted)) s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table}).Observe(result.Duration.Seconds()) } diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index fde8e40a9c..d5733ee5e4 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -562,8 +562,8 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder { return args.Get(0).(history.TradeBatchInsertBuilder) } -func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]history.LookupTableReapResult, error) { - args := m.Called(ctx, offsets) +func (m *mockDBQ) ReapLookupTables(ctx context.Context, batchSize int) (map[string]history.LookupTableReapResult, error) { + args := m.Called(ctx, batchSize) var r1 map[string]history.LookupTableReapResult if args.Get(0) != nil { r1 = args.Get(0).(map[string]history.LookupTableReapResult)