diff --git a/services/blockchain-connector/events/controller/blockchain.js b/services/blockchain-connector/events/controller/blockchain.js index df5a0c9fbd..72fb20af8f 100644 --- a/services/blockchain-connector/events/controller/blockchain.js +++ b/services/blockchain-connector/events/controller/blockchain.js @@ -46,11 +46,15 @@ const appNetworkEventController = async cb => { }; const txpoolNewTransactionController = async cb => { - const txpoolNewTransactionListener = async payload => - cb({ - ...payload, - transaction: formatTransaction(payload.transaction), - }); + const txpoolNewTransactionListener = async payload => { + try { + const transaction = formatTransaction(payload.transaction); + cb({ ...payload, transaction }); + } catch (_) { + // No actions necessary + // Safety check added in case txpool_newTransaction event arrives before init is called + } + }; Signals.get('txpoolNewTransaction').add(txpoolNewTransactionListener); }; @@ -73,37 +77,42 @@ const formatBlock = payload => const chainNewBlockController = async cb => { const chainNewBlockListener = async payload => { - const { blockHeader } = payload; - let transactions = []; - let assets = []; - - if ( - blockHeader.transactionRoot !== EMPTY_TREE_ROOT_HASH || - blockHeader.assetRoot !== EMPTY_TREE_ROOT_HASH - ) { - try { - const block = await getBlockByID(blockHeader.id); - transactions = block.transactions; - assets = block.assets; - } catch (err) { - logger.warn( - `Could not fetch block ${blockHeader.id} within chainNewBlockListener due to: ${err.message}`, - ); - logger.debug(err.stack); + try { + const { blockHeader } = payload; + let transactions = []; + let assets = []; + + if ( + blockHeader.transactionRoot !== EMPTY_TREE_ROOT_HASH || + blockHeader.assetRoot !== EMPTY_TREE_ROOT_HASH + ) { + try { + const block = await getBlockByID(blockHeader.id); + transactions = block.transactions; + assets = block.assets; + } catch (err) { + logger.warn( + `Could not fetch block ${blockHeader.id} within chainNewBlockListener due to: ${err.message}`, + ); + logger.debug(err.stack); + } } - } - - cb( - formatBlock({ - blockHeader, - assets, - transactions, - }), - ); - // Reload validators cache on pos module transactions - if (transactions.some(t => t.module === MODULE_NAME_POS)) { - Signals.get('reloadAllPosValidators').dispatch(); + cb( + formatBlock({ + blockHeader, + assets, + transactions, + }), + ); + + // Reload validators cache on pos module transactions + if (transactions.some(t => t.module === MODULE_NAME_POS)) { + Signals.get('reloadAllPosValidators').dispatch(); + } + } catch (_) { + // No actions necessary + // Safety check added in case txpool_newTransaction event arrives before init is called } }; Signals.get('chainNewBlock').add(chainNewBlockListener); @@ -111,8 +120,13 @@ const chainNewBlockController = async cb => { const chainDeleteBlockController = async cb => { const chainDeleteBlockListener = async payload => { - cb(formatBlock(payload)); - Signals.get('reloadAllPosValidators').dispatch(); + try { + cb(formatBlock(payload)); + Signals.get('reloadAllPosValidators').dispatch(); + } catch (_) { + // No actions necessary + // Safety check added in case txpool_newTransaction event arrives before init is called + } }; Signals.get('chainDeleteBlock').add(chainDeleteBlockListener); }; diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index ff56665441..045d18d240 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -18,14 +18,13 @@ const { Signals, HTTP, Exceptions: { TimeoutException }, - Utils: { isObject, waitForIt }, + Utils: { delay, isObject, waitForIt }, } = require('lisk-service-framework'); const { createWSClient, createIPCClient } = require('@liskhq/lisk-api-client'); const crypto = require('crypto'); const config = require('../../config'); -const delay = require('../utils/delay'); const logger = Logger(); @@ -56,15 +55,15 @@ const clientInstantiationStats = { }; let requestCount = 0; +const checkIsClientAlive = client => client && client._channel && client._channel.isAlive; + const getApiClientStats = () => ({ ...clientInstantiationStats, - currentPoolSize: clientPool.length, + activePoolSize: clientPool.filter(client => checkIsClientAlive(client)).length, expectedPoolSize: MAX_CLIENT_POOL_SIZE, numEndpointInvocations: requestCount, }); -const checkIsClientAlive = client => client && client._channel && client._channel.isAlive; - const pingListener = apiClient => { if (!isObject(apiClient)) { logger.warn(`apiClient is ${JSON.stringify(apiClient)}. Cannot register a pingListener.`); @@ -121,25 +120,45 @@ const instantiateNewClient = async () => { } }; +let isReInstantiateIntervalRunning = false; const initClientPool = async poolSize => { // Set the intervals only at application init if (clientPool.length === 0) { setInterval(() => { - logger.info(`API client instantiation stats: ${JSON.stringify(getApiClientStats())}`); + const stats = getApiClientStats(); + logger.info(`API client instantiation stats: ${JSON.stringify(stats)}`); + if (stats.activePoolSize < stats.expectedPoolSize) { + logger.warn( + 'activePoolSize should catch up with the expectedPoolSize, once the node is under less stress.', + ); + } }, 5 * 60 * 1000); - setInterval(() => { - clientPool.forEach(async (apiClient, index) => { - if (isObject(apiClient)) return; + // Re-instantiate interval: Replaces nulls in clientPool with new active apiClients + // isReInstantiateIntervalRunning is the safety check to skip callback execution if the previous one is already in-progress + setInterval(async () => { + if (isReInstantiateIntervalRunning) return; + isReInstantiateIntervalRunning = true; + + for (let index = 0; index < clientPool.length; index++) { + const apiClient = clientPool[index]; + + // eslint-disable-next-line no-continue + if (isObject(apiClient)) continue; // Re-instantiate when null - clientPool[index] = await instantiateNewClient() + const newApiClient = await instantiateNewClient() .then(client => { client.poolIndex = index; return client; }) - .catch(() => null); - }); + // Delay to lower stress on the node + .catch(() => delay(Math.ceil(2 * WS_SERVER_PING_INTERVAL), null)); + clientPool[index] = newApiClient; + if (newApiClient) Signals.get('newApiClient').dispatch(newApiClient.poolIndex); + } + + isReInstantiateIntervalRunning = false; }, WS_SERVER_PING_INTERVAL); } @@ -183,7 +202,7 @@ const getApiClient = async poolIndex => { `Dispatched 'resetApiClient' signal from getApiClient for API client ${apiClient.poolIndex}.`, ); } - return waitForIt(getApiClient, 10); + return waitForIt(getApiClient, Math.ceil(WS_SERVER_PING_INTERVAL / MAX_CLIENT_POOL_SIZE)); })(); }; @@ -191,6 +210,7 @@ const resetApiClient = async (apiClient, isEventSubscriptionClient = false) => { // Replace the dead API client in the pool if (!isObject(apiClient)) { logger.warn(`apiClient is ${JSON.stringify(apiClient)}. Cannot reset.`); + if (isEventSubscriptionClient) Signals.get('eventSubscriptionClientReset').dispatch(); return; } diff --git a/services/blockchain-connector/shared/sdk/events.js b/services/blockchain-connector/shared/sdk/events.js index 1d92131d44..6e8b025b6a 100644 --- a/services/blockchain-connector/shared/sdk/events.js +++ b/services/blockchain-connector/shared/sdk/events.js @@ -139,27 +139,34 @@ const ensureAPIClientLiveness = () => { if (isNodeSynced && isGenesisBlockDownloaded) { setInterval(async () => { - if (typeof eventsCounter === 'number' && eventsCounter > 0) { - eventsCounter = 0; - } else { - if (typeof eventsCounter !== 'number') { - logger.warn( - `eventsCounter ended up with non-numeric value: ${JSON.stringify( - eventsCounter, - null, - '\t', - )}.`, - ); + try { + if (typeof eventsCounter === 'number' && eventsCounter > 0) { eventsCounter = 0; - } + } else { + if (typeof eventsCounter !== 'number') { + logger.warn( + `eventsCounter ended up with non-numeric value: ${JSON.stringify( + eventsCounter, + null, + '\t', + )}.`, + ); + eventsCounter = 0; + } - if (typeof eventSubscribeClientPoolIndex === 'number') { - const apiClient = await getApiClient(eventSubscribeClientPoolIndex); - Signals.get('resetApiClient').dispatch(apiClient, true); - logger.debug( - `Dispatched 'resetApiClient' signal for the event subscription API client ${apiClient.poolIndex}.`, - ); + if (typeof eventSubscribeClientPoolIndex === 'number') { + const apiClient = await getApiClient(eventSubscribeClientPoolIndex); + Signals.get('resetApiClient').dispatch(apiClient, true); + logger.debug( + `Dispatched 'resetApiClient' signal for the event subscription API client ${apiClient.poolIndex}.`, + ); + } else { + logger.debug('Triggered subscribeToAllRegisteredEvents from ensureAPIClientLiveness.'); + await subscribeToAllRegisteredEvents(); + } } + } catch (_) { + // No action required } }, config.clientConnVerifyInterval); } else { @@ -170,11 +177,13 @@ const ensureAPIClientLiveness = () => { }; const nodeIsSyncedListener = () => { + logger.debug('Node is now synced with the network.'); isNodeSynced = true; ensureAPIClientLiveness(); }; const genesisBlockDownloadedListener = () => { + logger.debug('Genesis block is now downloaded.'); isGenesisBlockDownloaded = true; ensureAPIClientLiveness(); }; diff --git a/services/blockchain-connector/shared/sdk/interoperability.js b/services/blockchain-connector/shared/sdk/interoperability.js index 0170d9bee1..7389fd6087 100644 --- a/services/blockchain-connector/shared/sdk/interoperability.js +++ b/services/blockchain-connector/shared/sdk/interoperability.js @@ -16,6 +16,7 @@ const { invokeEndpoint } = require('./client'); const { getNodeInfo } = require('./endpoints_1'); +let isThisMainchain; let mainchainID; let registrationFee; @@ -36,6 +37,14 @@ const getMainchainID = async () => { return mainchainID; }; +const isMainchain = async () => { + if (typeof isThisMainchain !== 'boolean') { + const { chainID } = await getNodeInfo(); + isThisMainchain = chainID === (await getMainchainID()); + } + return isThisMainchain; +}; + const getChannel = async chainID => { const channelInfo = await invokeEndpoint('interoperability_getChannel', { chainID }); return channelInfo; @@ -51,6 +60,7 @@ const getRegistrationFee = async () => { module.exports = { getChainAccount, getMainchainID, + isMainchain, getChannel, getRegistrationFee, }; diff --git a/services/blockchain-connector/shared/sdk/pos.js b/services/blockchain-connector/shared/sdk/pos.js index d502fc1f78..0a772fd841 100644 --- a/services/blockchain-connector/shared/sdk/pos.js +++ b/services/blockchain-connector/shared/sdk/pos.js @@ -46,7 +46,13 @@ const getAllPosValidators = async isForceReload => { return allPosValidators; }; -Signals.get('reloadAllPosValidators').add(() => getAllPosValidators(true)); +Signals.get('reloadAllPosValidators').add(() => + getAllPosValidators(true).catch(err => { + logger.warn( + `Could not force reload the PoS validators list. Will retry again later.\nError: ${err.message}`, + ); + }), +); const getPosValidatorsByStake = async limit => { const validators = await invokeEndpoint('pos_getValidatorsByStake', { limit }); diff --git a/services/blockchain-connector/shared/sdk/token.js b/services/blockchain-connector/shared/sdk/token.js index f4a18e6fc6..11468ad978 100644 --- a/services/blockchain-connector/shared/sdk/token.js +++ b/services/blockchain-connector/shared/sdk/token.js @@ -14,6 +14,7 @@ * */ const { invokeEndpoint } = require('./client'); +const { isMainchain } = require('./interoperability'); let escrowedAmounts; let supportedTokens; @@ -68,7 +69,7 @@ const hasEscrowAccount = async ({ tokenID, escrowChainID }) => const updateTokenInfo = async () => { escrowedAmounts = await getEscrowedAmounts(true); - supportedTokens = await getSupportedTokens(true); + if (!(await isMainchain()) || !supportedTokens) supportedTokens = await getSupportedTokens(true); totalSupply = await getTotalSupply(true); }; diff --git a/services/blockchain-coordinator/shared/scheduler.js b/services/blockchain-coordinator/shared/scheduler.js index fa1ba165d6..5cf61b2fc3 100644 --- a/services/blockchain-coordinator/shared/scheduler.js +++ b/services/blockchain-coordinator/shared/scheduler.js @@ -98,7 +98,7 @@ const waitForGenesisBlockIndexing = (resolve, reject) => const jobCount = await getLiveIndexingJobCount(); if (jobCount >= 1) { logger.info( - `Genesis block indexing is still in progress. Waiting for ${REFRESH_INTERVAL}ms to re-check the genesis block indexing status.`, + `Genesis block indexing still in progress. Waiting for ${REFRESH_INTERVAL}ms to re-check the genesis block indexing status.`, ); intervalID = setInterval( waitForGenesisBlockIndexing.bind(null, resolve, reject), diff --git a/services/blockchain-indexer/shared/indexer/genesisBlock.js b/services/blockchain-indexer/shared/indexer/genesisBlock.js index 8a15b5b9b6..8dfe8ff79c 100644 --- a/services/blockchain-indexer/shared/indexer/genesisBlock.js +++ b/services/blockchain-indexer/shared/indexer/genesisBlock.js @@ -19,20 +19,18 @@ const { DB: { MySQL: { getTableInstance }, }, - Signals, Logger, } = require('lisk-service-framework'); const { MODULE, MODULE_SUB_STORE, getGenesisHeight } = require('../constants'); const { updateTotalStake, updateTotalSelfStake } = require('./transactionProcessor/pos/stake'); -const { updateAccountBalances } = require('./accountBalanceIndex'); const { indexAccountPublicKey, triggerAccountUpdates } = require('./accountIndex'); const { updateTotalLockedAmounts } = require('./utils/blockchainIndex'); -const { getIndexStats } = require('./indexStatus'); const requestAll = require('../utils/requestAll'); const config = require('../../config'); const accountsTableSchema = require('../database/schema/accounts'); +const accountBalancesTableSchema = require('../database/schema/accountBalances'); const stakesTableSchema = require('../database/schema/stakes'); const commissionsTableSchema = require('../database/schema/commissions'); @@ -46,11 +44,11 @@ const MYSQL_ENDPOINT = config.endpoints.mysql; const getStakesTable = () => getTableInstance(stakesTableSchema, MYSQL_ENDPOINT); const getAccountsTable = () => getTableInstance(accountsTableSchema, MYSQL_ENDPOINT); +const getAccountBalancesTable = () => getTableInstance(accountBalancesTableSchema, MYSQL_ENDPOINT); const getCommissionsTable = () => getTableInstance(commissionsTableSchema, MYSQL_ENDPOINT); -const allAccountsAddresses = []; let intervalTimeout; -let isTokensBalanceIndexed = false; +const genesisAccountBalances = []; const getGenesisAssetIntervalTimeout = () => intervalTimeout; @@ -72,7 +70,11 @@ const indexTokenModuleAssets = async dbTrx => { // eslint-disable-next-line no-restricted-syntax for (const userInfo of userSubStoreInfos) { - const { tokenID } = userInfo; + const { address, availableBalance: balance, tokenID } = userInfo; + + // Add entry to index the genesis account balances + const accountBalanceEntry = { address, tokenID, balance }; + genesisAccountBalances.push(accountBalanceEntry); // eslint-disable-next-line no-restricted-syntax for (const lockedBalance of userInfo.lockedBalances) { @@ -81,9 +83,6 @@ const indexTokenModuleAssets = async dbTrx => { } tokenIDLockedAmountChangeMap[tokenID] += BigInt(lockedBalance.amount); } - - // Index account balance - allAccountsAddresses.push(userInfo.address); } await updateTotalLockedAmounts(tokenIDLockedAmountChangeMap, dbTrx); @@ -95,6 +94,7 @@ const isGeneratorKeyValid = generatorKey => generatorKey !== INVALID_ED25519_KEY const indexPosValidatorsInfo = async (numValidators, dbTrx) => { logger.debug('Starting to index the validators information from the genesis PoS module assets.'); if (numValidators > 0) { + const accountsTable = await getAccountsTable(); const commissionsTable = await getCommissionsTable(); const posModuleData = await requestAll( @@ -117,7 +117,6 @@ const indexPosValidatorsInfo = async (numValidators, dbTrx) => { publicKey: validator.generatorKey, }; - const accountsTable = await getAccountsTable(); await accountsTable .upsert(account) .catch(() => indexAccountPublicKey(validator.generatorKey)); @@ -211,27 +210,42 @@ const indexGenesisBlockAssets = async dbTrx => { logger.info('Finished indexing all the genesis assets.'); }; -const indexTokenBalances = async () => { - // eslint-disable-next-line no-restricted-syntax - for (const address of allAccountsAddresses) { - await updateAccountBalances(address).catch(err => { - const errorMessage = `Updating account balance for ${address} failed. Retrying.\nError: ${err.message}`; - logger.warn(errorMessage); - logger.debug(err.stack); - - allAccountsAddresses.push(address); - }); - } - isTokensBalanceIndexed = true; -}; +let indexedGenesisAccountBalances; +const interval = setInterval(async () => { + try { + if (genesisAccountBalances.length) { + if (indexedGenesisAccountBalances === false) return; + } else { + if (indexedGenesisAccountBalances === true) clearInterval(interval); + return; + } + indexedGenesisAccountBalances = false; + + logger.info('Started indexing genesis account balances.'); + let numEntries = 0; + const accountBalancesTable = await getAccountBalancesTable(); + while (genesisAccountBalances.length) { + const accountBalanceEntry = genesisAccountBalances.shift(); + await accountBalancesTable + .upsert(accountBalanceEntry) + .then(() => { + numEntries++; + }) + .catch(err => { + numEntries--; + genesisAccountBalances.push(accountBalanceEntry); + logger.warn( + `Updating account balance for ${accountBalanceEntry.address} failed. Will retry.\nError: ${err.message}`, + ); + }); + } -const indexTokenBalancesListener = async () => { - const indexStatus = await getIndexStats(); - if (Number(indexStatus.percentage) === 100 && !isTokensBalanceIndexed) { - indexTokenBalances(); + indexedGenesisAccountBalances = true; + logger.info(`Finished indexing genesis account balances. Added ${numEntries} entries.`); + } catch (_) { + // No actions required } -}; -Signals.get('chainNewBlock').add(indexTokenBalancesListener); +}, 5 * 60 * 1000); module.exports = { getGenesisAssetIntervalTimeout,