diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 045d18d24..a5f6bc320 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -120,19 +120,23 @@ const instantiateNewClient = async () => { } }; +let isClientStatLogIntervalSet = false; let isReInstantiateIntervalRunning = false; const initClientPool = async poolSize => { // Set the intervals only at application init if (clientPool.length === 0) { - setInterval(() => { - const stats = getApiClientStats(); - logger.info(`API client instantiation stats: ${JSON.stringify(stats)}`); - if (stats.activePoolSize < stats.expectedPoolSize) { - logger.warn( - 'activePoolSize should catch up with the expectedPoolSize, once the node is under less stress.', - ); - } - }, 5 * 60 * 1000); + if (!isClientStatLogIntervalSet) { + isClientStatLogIntervalSet = true; + setInterval(() => { + const stats = getApiClientStats(); + logger.info(`API client instantiation stats: ${JSON.stringify(stats)}`); + if (stats.activePoolSize < stats.expectedPoolSize) { + logger.warn( + 'activePoolSize should catch up with the expectedPoolSize, once the node is under less stress.', + ); + } + }, 5 * 60 * 1000); + } // Re-instantiate interval: Replaces nulls in clientPool with new active apiClients // isReInstantiateIntervalRunning is the safety check to skip callback execution if the previous one is already in-progress diff --git a/services/blockchain-indexer/config.js b/services/blockchain-indexer/config.js index fc57b117f..dac3db7b6 100644 --- a/services/blockchain-indexer/config.js +++ b/services/blockchain-indexer/config.js @@ -69,12 +69,18 @@ config.debug = process.env.SERVICE_LOG_LEVEL === 'debug'; * Message queue options */ config.queue = { - defaultJobOptions: { - attempts: 5, - timeout: 5 * 60 * 1000, // millisecs - removeOnComplete: true, - removeOnFail: true, - stackTraceLimit: 0, + defaultOptions: { + defaultJobOptions: { + attempts: 5, + timeout: 5 * 60 * 1000, // millisecs + removeOnComplete: true, + removeOnFail: true, + stackTraceLimit: 0, + }, + limiter: { + max: 30, + duration: 10 * 1000, // in millisecs + }, }, // Inter-microservice message queues diff --git a/services/blockchain-indexer/shared/indexer/blockchainIndex.js b/services/blockchain-indexer/shared/indexer/blockchainIndex.js index cf6f430f1..b57471809 100644 --- a/services/blockchain-indexer/shared/indexer/blockchainIndex.js +++ b/services/blockchain-indexer/shared/indexer/blockchainIndex.js @@ -699,6 +699,7 @@ const initBlockProcessingQueues = async () => { config.queue.indexBlocks.name, indexBlock, config.queue.indexBlocks.concurrency, + config.queue.defaultOptions, ); deleteIndexedBlocksQueue = Queue( @@ -706,6 +707,7 @@ const initBlockProcessingQueues = async () => { config.queue.deleteIndexedBlocks.name, deleteIndexedBlocksWrapper, config.queue.deleteIndexedBlocks.concurrency, + config.queue.defaultOptions, ); }; diff --git a/services/blockchain-indexer/shared/messageProcessor.js b/services/blockchain-indexer/shared/messageProcessor.js index e37004c24..688ab12bf 100644 --- a/services/blockchain-indexer/shared/messageProcessor.js +++ b/services/blockchain-indexer/shared/messageProcessor.js @@ -45,17 +45,17 @@ const STATS_INTERVAL = 1 * 60 * 1000; // ms const accountMessageQueue = new MessageQueue( config.queue.account.name, config.endpoints.messageQueue, - { defaultJobOptions: config.queue.defaultJobOptions }, + { defaultJobOptions: config.queue.defaultOptions.defaultJobOptions }, ); // Missing blocks const blockMessageQueue = new MessageQueue(config.queue.block.name, config.endpoints.messageQueue, { - defaultJobOptions: config.queue.defaultJobOptions, + defaultJobOptions: config.queue.defaultOptions.defaultJobOptions, }); // Newly generated blocks const eventMessageQueue = new MessageQueue(config.queue.event.name, config.endpoints.messageQueue, { - defaultJobOptions: config.queue.defaultJobOptions, + defaultJobOptions: config.queue.defaultOptions.defaultJobOptions, }); const queueStatus = async messageQueue => {