diff --git a/services/blockchain-connector/shared/sdk/cache.js b/services/blockchain-connector/shared/sdk/cache.js index 25aa70e87f..725a36f2fd 100644 --- a/services/blockchain-connector/shared/sdk/cache.js +++ b/services/blockchain-connector/shared/sdk/cache.js @@ -72,7 +72,7 @@ const cacheBlocksFromWaitlist = async () => { const getBlockByIDFromCache = async (id) => { const blocksCache = await getBlocksCache(); - const resultSet = await blocksCache.find({ id }, ['block']); + const resultSet = await blocksCache.find({ id, limit: 1 }, ['block']); if (!resultSet.length) return null; const [{ block }] = resultSet; @@ -82,7 +82,7 @@ const getBlockByIDFromCache = async (id) => { const getTransactionByIDFromCache = async (transactionID) => { const trxIDToBlockIDCache = await getTrxIDtoBlockIDCache(); - const resultSet = await trxIDToBlockIDCache.find({ transactionID }, ['blockID']); + const resultSet = await trxIDToBlockIDCache.find({ transactionID, limit: 1 }, ['blockID']); if (!resultSet.length) return null; const [{ blockID }] = resultSet; diff --git a/services/blockchain-coordinator/config.js b/services/blockchain-coordinator/config.js index a4b04c8636..2a9cc69ce0 100644 --- a/services/blockchain-coordinator/config.js +++ b/services/blockchain-coordinator/config.js @@ -77,7 +77,7 @@ config.job = { indexMissingBlocks: { interval: process.env.INDEX_MISSING_BLOCKS_INTERVAL || 0, schedule: process.env.INDEX_MISSING_BLOCKS_SCHEDULE || '*/15 * * * *', - skipThreshold: process.env.INDEX_MISSING_BLOCKS_SKIP_THRESHOLD || 100, + skipThreshold: process.env.INDEX_MISSING_BLOCKS_SKIP_THRESHOLD || 1000, }, }; diff --git a/services/blockchain-coordinator/shared/scheduler.js b/services/blockchain-coordinator/shared/scheduler.js index 12b07bebf5..1f35bafdd3 100644 --- a/services/blockchain-coordinator/shared/scheduler.js +++ b/services/blockchain-coordinator/shared/scheduler.js @@ -57,19 +57,54 @@ const getInProgressJobCount = async (queue) => { return count; }; +let intervalID; + +const waitForJobCountToFallBelowThreshold = (resolve) => new Promise((res) => { + if (!resolve) resolve = res; + if (intervalID) { + clearInterval(intervalID); + intervalID = null; + } + + return getInProgressJobCount(blockIndexQueue) + .then((count) => count < config.job.indexMissingBlocks.skipThreshold + ? resolve(true) + : (() => { + const REFRESH_INTERVAL = 30000; + logger.info(`In progress job count not yet below the threshold. Waiting for ${REFRESH_INTERVAL} ms to re-check the job count before scheduling the next batch.`); + intervalID = setInterval( + waitForJobCountToFallBelowThreshold.bind(null, resolve), + REFRESH_INTERVAL, + ); + })()); +}); + const scheduleBlocksIndexing = async (heights) => { const blockHeights = Array.isArray(heights) ? heights : [heights]; - blockHeights.sort((h1, h2) => h1 - h2); // sort by ascending height + blockHeights.sort((h1, h2) => h1 - h2); // sort heights in ascending order + + // Schedule indexing in batches when the list is too long to avoid OOM + const MAX_BATCH_SIZE = 20000; + const numBatches = Math.ceil(blockHeights.length / MAX_BATCH_SIZE); + if (numBatches > 1) logger.info(`Scheduling the blocks indexing in ${numBatches} smaller batches.`); + + for (let i = 0; i < numBatches; i++) { + /* eslint-disable no-await-in-loop */ + + const blockHeightsBatch = blockHeights.slice(i * MAX_BATCH_SIZE, (i + 1) * MAX_BATCH_SIZE); + + // eslint-disable-next-line no-restricted-syntax + for (const height of blockHeightsBatch) { + logger.trace(`Scheduling indexing for block at height: ${height}.`); + await blockIndexQueue.add({ height }); + logger.debug(`Scheduled indexing for block at height: ${height}.`); + } - // eslint-disable-next-line no-restricted-syntax - for (const height of blockHeights) { - logger.trace(`Scheduling indexing for block at height: ${height}.`); - // eslint-disable-next-line no-await-in-loop - await blockIndexQueue.add({ height }); - logger.debug(`Scheduled indexing for block at height: ${height}.`); + await waitForJobCountToFallBelowThreshold(); + /* eslint-enable no-await-in-loop */ } }; diff --git a/services/blockchain-indexer/shared/dataService/business/blocks.js b/services/blockchain-indexer/shared/dataService/business/blocks.js index 4dd6e04019..43bc750a08 100644 --- a/services/blockchain-indexer/shared/dataService/business/blocks.js +++ b/services/blockchain-indexer/shared/dataService/business/blocks.js @@ -77,7 +77,7 @@ const normalizeBlock = async (originalBlock) => { const { numberOfEvents, reward } = await (async () => { const [dbResponse] = await blocksTable.find( - { height: block.height }, + { height: block.height, limit: 1 }, ['numberOfEvents', 'reward'], ); diff --git a/services/blockchain-indexer/shared/dataService/business/events.js b/services/blockchain-indexer/shared/dataService/business/events.js index 9ed7039041..4f384ef5cb 100644 --- a/services/blockchain-indexer/shared/dataService/business/events.js +++ b/services/blockchain-indexer/shared/dataService/business/events.js @@ -54,10 +54,10 @@ const getEventsByHeight = async (height) => { // Get from DB only when isPersistEvents is enabled if (config.isPersistEvents) { const eventsTable = await getEventsTable(); - const dbEventStrs = await eventsTable.find({ height }, ['eventStr']); + const dbEventStrings = await eventsTable.find({ height }, ['eventStr']); - if (dbEventStrs.length) { - const dbEvents = dbEventStrs + if (dbEventStrings.length) { + const dbEvents = dbEventStrings .map(({ eventStr }) => eventStr ? JSON.parse(eventStr) : eventStr); await eventCache.set(height, JSON.stringify(dbEvents)); return dbEvents; @@ -122,7 +122,7 @@ const getEvents = async (params) => { if (params.blockID) { const { blockID, ...remParams } = params; params = remParams; - const [block] = await blocksTable.find({ id: blockID }, ['height']); + const [block] = await blocksTable.find({ id: blockID, limit: 1 }, ['height']); if ('height' in params && params.height !== block.height) { throw new NotFoundException(`Invalid combination of blockID: ${blockID} and height: ${params.height}`); } @@ -156,7 +156,7 @@ const getEvents = async (params) => { event = eventsFromCache.find(entry => entry.index === index); } - const [{ id, timestamp } = {}] = await blocksTable.find({ height }, ['id', 'timestamp']); + const [{ id, timestamp } = {}] = await blocksTable.find({ height, limit: 1 }, ['id', 'timestamp']); return parseToJSONCompatObj({ ...event, diff --git a/services/blockchain-indexer/shared/dataService/business/invoke.js b/services/blockchain-indexer/shared/dataService/business/invoke.js index 23d9f15e3f..c5639c606c 100644 --- a/services/blockchain-indexer/shared/dataService/business/invoke.js +++ b/services/blockchain-indexer/shared/dataService/business/invoke.js @@ -27,8 +27,8 @@ const { } = require('../../constants'); const checkIfEndpointRegistered = async (endpoint) => { - const allregisteredEndpoints = await getAllRegisteredEndpoints(); - return allregisteredEndpoints.includes(endpoint); + const allRegisteredEndpoints = await getAllRegisteredEndpoints(); + return allRegisteredEndpoints.includes(endpoint); }; const validateEndpointParams = async (invokeEndpointParams) => { diff --git a/services/blockchain-indexer/shared/dataService/business/token.js b/services/blockchain-indexer/shared/dataService/business/token.js index c2b7957b9b..c8555ec218 100644 --- a/services/blockchain-indexer/shared/dataService/business/token.js +++ b/services/blockchain-indexer/shared/dataService/business/token.js @@ -112,7 +112,10 @@ const getTokenTopBalances = async (params) => { }]; } - const tokenInfos = await accountBalancesTable.find(params, [`${accountBalancesTableSchema.tableName}.balance`, `${accountBalancesTableSchema.tableName}.address`, `${accountTableSchema.tableName}.publicKey`, `${accountTableSchema.tableName}.name`]); + const tokenInfos = await accountBalancesTable.find( + params, + [`${accountBalancesTableSchema.tableName}.balance`, `${accountBalancesTableSchema.tableName}.address`, `${accountTableSchema.tableName}.publicKey`, `${accountTableSchema.tableName}.name`], + ); const filteredTokenInfos = []; // eslint-disable-next-line no-restricted-syntax @@ -231,7 +234,8 @@ const getAvailableTokenIDs = async (params) => { const accountBalancesTable = await getAccountBalancesTable(); const tokenInfos = await accountBalancesTable.find( - { ...params, distinct: 'tokenID' }, ['tokenID'], + { ...params, distinct: 'tokenID' }, + ['tokenID'], ); response.data.tokenIDs = tokenInfos.map(tokenInfo => tokenInfo.tokenID); diff --git a/services/blockchain-indexer/shared/dataService/pos/validators.js b/services/blockchain-indexer/shared/dataService/pos/validators.js index bc5afb9d9d..83563fdf8f 100644 --- a/services/blockchain-indexer/shared/dataService/pos/validators.js +++ b/services/blockchain-indexer/shared/dataService/pos/validators.js @@ -212,7 +212,7 @@ const getPosValidators = async params => { filteredValidators, async validator => { const [validatorInfo = {}] = await validatorsTable.find( - { address: validator.address }, + { address: validator.address, limit: 1 }, ['generatedBlocks', 'totalCommission', 'totalSelfStakeRewards'], ); const { diff --git a/services/blockchain-indexer/shared/indexer/blockchainIndex.js b/services/blockchain-indexer/shared/indexer/blockchainIndex.js index 3df52b18be..3a699417f2 100644 --- a/services/blockchain-indexer/shared/indexer/blockchainIndex.js +++ b/services/blockchain-indexer/shared/indexer/blockchainIndex.js @@ -89,11 +89,13 @@ const indexBlock = async job => { const { block } = job.data; if (!validateBlock(block)) throw new Error(`Invalid block ${block.id} at height ${block.height}.`); + logger.info(block.height); + const blocksTable = await getBlocksTable(); // Check if previous block is indexed, index previous block if not indexed if (block.height !== await getGenesisHeight()) { - const [{ height: lastIndexedHeight } = {}] = await blocksTable.find( + const [lastIndexedBlock = {}] = await blocksTable.find( { propBetweens: [{ property: 'height', @@ -102,15 +104,19 @@ const indexBlock = async job => { sort: 'height:desc', limit: 1, }, - ['height'], + ['height', 'isFinal'], ); const heightsToIndex = range( - (lastIndexedHeight || await getGenesisHeight()) + 1, + (lastIndexedBlock.height || await getGenesisHeight()) + 1, block.height + 1, // '+ 1' as 'to' is non-inclusive 1, ); + if (block.height <= lastIndexedBlock.height && lastIndexedBlock.isFinal) { + return; + } + if (heightsToIndex.length > 1) { await BluebirdPromise.map( heightsToIndex, @@ -279,7 +285,7 @@ const deleteIndexedBlocks = async job => { blocks, async block => { // Check if deleted block is indexed - const [deletedBlockFromDB] = await blocksTable.find({ height: block.height }); + const [deletedBlockFromDB] = await blocksTable.find({ height: block.height, limit: 1 }); // Reschedule job if not deleted block is not indexed if (!deletedBlockFromDB) { diff --git a/services/transaction-statistics/shared/transactionStatistics.js b/services/transaction-statistics/shared/transactionStatistics.js index 15d2c11aa5..e22c379429 100644 --- a/services/transaction-statistics/shared/transactionStatistics.js +++ b/services/transaction-statistics/shared/transactionStatistics.js @@ -200,7 +200,7 @@ const getTransactionsStatistics = async params => { transactionsStatistics.data = { timeline, distributionByType, distributionByAmount }; - const [{ date: minDate } = {}] = await transactionStatisticsTable.find({ sort: 'date:asc' }, 'date'); + const [{ date: minDate } = {}] = await transactionStatisticsTable.find({ sort: 'date:asc', limit: 1 }, 'date'); const total = minDate ? moment().diff(moment.unix(minDate), params.interval) : 0; transactionsStatistics.meta = {