Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
⚡ Split indexing into smaller batches to avoid OOMs
Browse files Browse the repository at this point in the history
  • Loading branch information
sameersubudhi committed Jun 23, 2023
1 parent 324508f commit fc567b7
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 26 deletions.
4 changes: 2 additions & 2 deletions services/blockchain-connector/shared/sdk/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const cacheBlocksFromWaitlist = async () => {

const getBlockByIDFromCache = async (id) => {
const blocksCache = await getBlocksCache();
const resultSet = await blocksCache.find({ id }, ['block']);
const resultSet = await blocksCache.find({ id, limit: 1 }, ['block']);
if (!resultSet.length) return null;

const [{ block }] = resultSet;
Expand All @@ -82,7 +82,7 @@ const getBlockByIDFromCache = async (id) => {

const getTransactionByIDFromCache = async (transactionID) => {
const trxIDToBlockIDCache = await getTrxIDtoBlockIDCache();
const resultSet = await trxIDToBlockIDCache.find({ transactionID }, ['blockID']);
const resultSet = await trxIDToBlockIDCache.find({ transactionID, limit: 1 }, ['blockID']);
if (!resultSet.length) return null;

const [{ blockID }] = resultSet;
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-coordinator/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ config.job = {
indexMissingBlocks: {
interval: process.env.INDEX_MISSING_BLOCKS_INTERVAL || 0,
schedule: process.env.INDEX_MISSING_BLOCKS_SCHEDULE || '*/15 * * * *',
skipThreshold: process.env.INDEX_MISSING_BLOCKS_SKIP_THRESHOLD || 100,
skipThreshold: process.env.INDEX_MISSING_BLOCKS_SKIP_THRESHOLD || 1000,
},
};

Expand Down
49 changes: 42 additions & 7 deletions services/blockchain-coordinator/shared/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,54 @@ const getInProgressJobCount = async (queue) => {
return count;
};

let intervalID;

const waitForJobCountToFallBelowThreshold = (resolve) => new Promise((res) => {
if (!resolve) resolve = res;
if (intervalID) {
clearInterval(intervalID);
intervalID = null;
}

return getInProgressJobCount(blockIndexQueue)
.then((count) => count < config.job.indexMissingBlocks.skipThreshold
? resolve(true)
: (() => {
const REFRESH_INTERVAL = 30000;
logger.info(`In progress job count not yet below the threshold. Waiting for ${REFRESH_INTERVAL} ms to re-check the job count before scheduling the next batch.`);
intervalID = setInterval(
waitForJobCountToFallBelowThreshold.bind(null, resolve),
REFRESH_INTERVAL,
);
})());
});

const scheduleBlocksIndexing = async (heights) => {
const blockHeights = Array.isArray(heights)
? heights
: [heights];

blockHeights.sort((h1, h2) => h1 - h2); // sort by ascending height
blockHeights.sort((h1, h2) => h1 - h2); // sort heights in ascending order

// Schedule indexing in batches when the list is too long to avoid OOM
const MAX_BATCH_SIZE = 20000;
const numBatches = Math.ceil(blockHeights.length / MAX_BATCH_SIZE);
if (numBatches > 1) logger.info(`Scheduling the blocks indexing in ${numBatches} smaller batches.`);

for (let i = 0; i < numBatches; i++) {
/* eslint-disable no-await-in-loop */

const blockHeightsBatch = blockHeights.slice(i * MAX_BATCH_SIZE, (i + 1) * MAX_BATCH_SIZE);

// eslint-disable-next-line no-restricted-syntax
for (const height of blockHeightsBatch) {
logger.trace(`Scheduling indexing for block at height: ${height}.`);
await blockIndexQueue.add({ height });
logger.debug(`Scheduled indexing for block at height: ${height}.`);
}

// eslint-disable-next-line no-restricted-syntax
for (const height of blockHeights) {
logger.trace(`Scheduling indexing for block at height: ${height}.`);
// eslint-disable-next-line no-await-in-loop
await blockIndexQueue.add({ height });
logger.debug(`Scheduled indexing for block at height: ${height}.`);
await waitForJobCountToFallBelowThreshold();
/* eslint-enable no-await-in-loop */
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ const normalizeBlock = async (originalBlock) => {

const { numberOfEvents, reward } = await (async () => {
const [dbResponse] = await blocksTable.find(
{ height: block.height },
{
height: block.height,
limit: 1,
},
['numberOfEvents', 'reward'],
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ const getEventsByHeight = async (height) => {
// Get from DB only when isPersistEvents is enabled
if (config.isPersistEvents) {
const eventsTable = await getEventsTable();
const dbEventStrs = await eventsTable.find({ height }, ['eventStr']);
const dbEventStrings = await eventsTable.find({ height }, ['eventStr']);

if (dbEventStrs.length) {
const dbEvents = dbEventStrs
if (dbEventStrings.length) {
const dbEvents = dbEventStrings
.map(({ eventStr }) => eventStr ? JSON.parse(eventStr) : eventStr);
await eventCache.set(height, JSON.stringify(dbEvents));
return dbEvents;
Expand Down Expand Up @@ -122,7 +122,7 @@ const getEvents = async (params) => {
if (params.blockID) {
const { blockID, ...remParams } = params;
params = remParams;
const [block] = await blocksTable.find({ id: blockID }, ['height']);
const [block] = await blocksTable.find({ id: blockID, limit: 1 }, ['height']);
if ('height' in params && params.height !== block.height) {
throw new NotFoundException(`Invalid combination of blockID: ${blockID} and height: ${params.height}`);
}
Expand Down Expand Up @@ -156,7 +156,7 @@ const getEvents = async (params) => {
event = eventsFromCache.find(entry => entry.index === index);
}

const [{ id, timestamp } = {}] = await blocksTable.find({ height }, ['id', 'timestamp']);
const [{ id, timestamp } = {}] = await blocksTable.find({ height, limit: 1 }, ['id', 'timestamp']);

return parseToJSONCompatObj({
...event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const {
} = require('../../constants');

const checkIfEndpointRegistered = async (endpoint) => {
const allregisteredEndpoints = await getAllRegisteredEndpoints();
return allregisteredEndpoints.includes(endpoint);
const allRegisteredEndpoints = await getAllRegisteredEndpoints();
return allRegisteredEndpoints.includes(endpoint);
};

const validateEndpointParams = async (invokeEndpointParams) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ const getTokenTopBalances = async (params) => {
}];
}

const tokenInfos = await accountBalancesTable.find(params, [`${accountBalancesTableSchema.tableName}.balance`, `${accountBalancesTableSchema.tableName}.address`, `${accountTableSchema.tableName}.publicKey`, `${accountTableSchema.tableName}.name`]);
const tokenInfos = await accountBalancesTable.find(
params,
[`${accountBalancesTableSchema.tableName}.balance`, `${accountBalancesTableSchema.tableName}.address`, `${accountTableSchema.tableName}.publicKey`, `${accountTableSchema.tableName}.name`],
);

const filteredTokenInfos = [];
// eslint-disable-next-line no-restricted-syntax
Expand Down Expand Up @@ -231,7 +234,8 @@ const getAvailableTokenIDs = async (params) => {
const accountBalancesTable = await getAccountBalancesTable();

const tokenInfos = await accountBalancesTable.find(
{ ...params, distinct: 'tokenID' }, ['tokenID'],
{ ...params, distinct: 'tokenID' },
['tokenID'],
);

response.data.tokenIDs = tokenInfos.map(tokenInfo => tokenInfo.tokenID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ const getPosValidators = async params => {
filteredValidators,
async validator => {
const [validatorInfo = {}] = await validatorsTable.find(
{ address: validator.address },
{ address: validator.address, limit: 1 },
['generatedBlocks', 'totalCommission', 'totalSelfStakeRewards'],
);
const {
Expand Down
14 changes: 10 additions & 4 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ const indexBlock = async job => {
const { block } = job.data;
if (!validateBlock(block)) throw new Error(`Invalid block ${block.id} at height ${block.height}.`);

logger.info(block.height);

const blocksTable = await getBlocksTable();

// Check if previous block is indexed, index previous block if not indexed
if (block.height !== await getGenesisHeight()) {
const [{ height: lastIndexedHeight } = {}] = await blocksTable.find(
const [lastIndexedBlock = {}] = await blocksTable.find(
{
propBetweens: [{
property: 'height',
Expand All @@ -102,15 +104,19 @@ const indexBlock = async job => {
sort: 'height:desc',
limit: 1,
},
['height'],
['height', 'isFinal'],
);

const heightsToIndex = range(
(lastIndexedHeight || await getGenesisHeight()) + 1,
(lastIndexedBlock.height || await getGenesisHeight()) + 1,
block.height + 1, // '+ 1' as 'to' is non-inclusive
1,
);

if (block.height <= lastIndexedBlock.height && lastIndexedBlock.isFinal) {
return;
}

if (heightsToIndex.length > 1) {
await BluebirdPromise.map(
heightsToIndex,
Expand Down Expand Up @@ -279,7 +285,7 @@ const deleteIndexedBlocks = async job => {
blocks,
async block => {
// Check if deleted block is indexed
const [deletedBlockFromDB] = await blocksTable.find({ height: block.height });
const [deletedBlockFromDB] = await blocksTable.find({ height: block.height, limit: 1 });

// Reschedule job if not deleted block is not indexed
if (!deletedBlockFromDB) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ const getTransactionsStatistics = async params => {

transactionsStatistics.data = { timeline, distributionByType, distributionByAmount };

const [{ date: minDate } = {}] = await transactionStatisticsTable.find({ sort: 'date:asc' }, 'date');
const [{ date: minDate } = {}] = await transactionStatisticsTable.find({ sort: 'date:asc', limit: 1 }, 'date');
const total = minDate ? moment().diff(moment.unix(minDate), params.interval) : 0;

transactionsStatistics.meta = {
Expand Down

0 comments on commit fc567b7

Please sign in to comment.