Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Optimise implementation for indexing genesis accounts balance #2012

86 changes: 50 additions & 36 deletions services/blockchain-connector/events/controller/blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ const appNetworkEventController = async cb => {
};

const txpoolNewTransactionController = async cb => {
const txpoolNewTransactionListener = async payload =>
cb({
...payload,
transaction: formatTransaction(payload.transaction),
});
const txpoolNewTransactionListener = async payload => {
try {
const transaction = formatTransaction(payload.transaction);
cb({ ...payload, transaction });
} catch (_) {
// No actions necessary
// Safety check added in case txpool_newTransaction event arrives before init is called
}
};
Signals.get('txpoolNewTransaction').add(txpoolNewTransactionListener);
};

Expand All @@ -73,46 +77,56 @@ const formatBlock = payload =>

const chainNewBlockController = async cb => {
const chainNewBlockListener = async payload => {
const { blockHeader } = payload;
let transactions = [];
let assets = [];

if (
blockHeader.transactionRoot !== EMPTY_TREE_ROOT_HASH ||
blockHeader.assetRoot !== EMPTY_TREE_ROOT_HASH
) {
try {
const block = await getBlockByID(blockHeader.id);
transactions = block.transactions;
assets = block.assets;
} catch (err) {
logger.warn(
`Could not fetch block ${blockHeader.id} within chainNewBlockListener due to: ${err.message}`,
);
logger.debug(err.stack);
try {
const { blockHeader } = payload;
let transactions = [];
let assets = [];

if (
blockHeader.transactionRoot !== EMPTY_TREE_ROOT_HASH ||
blockHeader.assetRoot !== EMPTY_TREE_ROOT_HASH
) {
try {
const block = await getBlockByID(blockHeader.id);
transactions = block.transactions;
assets = block.assets;
} catch (err) {
logger.warn(
`Could not fetch block ${blockHeader.id} within chainNewBlockListener due to: ${err.message}`,
);
logger.debug(err.stack);
}
}
}

cb(
formatBlock({
blockHeader,
assets,
transactions,
}),
);

// Reload validators cache on pos module transactions
if (transactions.some(t => t.module === MODULE_NAME_POS)) {
Signals.get('reloadAllPosValidators').dispatch();
cb(
formatBlock({
blockHeader,
assets,
transactions,
}),
);

// Reload validators cache on pos module transactions
if (transactions.some(t => t.module === MODULE_NAME_POS)) {
Signals.get('reloadAllPosValidators').dispatch();
}
} catch (_) {
// No actions necessary
// Safety check added in case txpool_newTransaction event arrives before init is called
}
};
Signals.get('chainNewBlock').add(chainNewBlockListener);
};

const chainDeleteBlockController = async cb => {
const chainDeleteBlockListener = async payload => {
cb(formatBlock(payload));
Signals.get('reloadAllPosValidators').dispatch();
try {
cb(formatBlock(payload));
Signals.get('reloadAllPosValidators').dispatch();
} catch (_) {
// No actions necessary
// Safety check added in case txpool_newTransaction event arrives before init is called
}
};
Signals.get('chainDeleteBlock').add(chainDeleteBlockListener);
};
Expand Down
46 changes: 33 additions & 13 deletions services/blockchain-connector/shared/sdk/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ const {
Signals,
HTTP,
Exceptions: { TimeoutException },
Utils: { isObject, waitForIt },
Utils: { delay, isObject, waitForIt },
} = require('lisk-service-framework');
const { createWSClient, createIPCClient } = require('@liskhq/lisk-api-client');

const crypto = require('crypto');

const config = require('../../config');
const delay = require('../utils/delay');

const logger = Logger();

Expand Down Expand Up @@ -56,15 +55,15 @@ const clientInstantiationStats = {
};
let requestCount = 0;

const checkIsClientAlive = client => client && client._channel && client._channel.isAlive;

const getApiClientStats = () => ({
...clientInstantiationStats,
currentPoolSize: clientPool.length,
activePoolSize: clientPool.filter(client => checkIsClientAlive(client)).length,
expectedPoolSize: MAX_CLIENT_POOL_SIZE,
numEndpointInvocations: requestCount,
});

const checkIsClientAlive = client => client && client._channel && client._channel.isAlive;

const pingListener = apiClient => {
if (!isObject(apiClient)) {
logger.warn(`apiClient is ${JSON.stringify(apiClient)}. Cannot register a pingListener.`);
Expand Down Expand Up @@ -121,25 +120,45 @@ const instantiateNewClient = async () => {
}
};

let isReInstantiateIntervalRunning = false;
const initClientPool = async poolSize => {
// Set the intervals only at application init
if (clientPool.length === 0) {
setInterval(() => {
logger.info(`API client instantiation stats: ${JSON.stringify(getApiClientStats())}`);
const stats = getApiClientStats();
logger.info(`API client instantiation stats: ${JSON.stringify(stats)}`);
if (stats.activePoolSize < stats.expectedPoolSize) {
logger.warn(
'activePoolSize should catch up with the expectedPoolSize, once the node is under less stress.',
);
}
}, 5 * 60 * 1000);

setInterval(() => {
clientPool.forEach(async (apiClient, index) => {
if (isObject(apiClient)) return;
// Re-instantiate interval: Replaces nulls in clientPool with new active apiClients
// isReInstantiateIntervalRunning is the safety check to skip callback execution if the previous one is already in-progress
setInterval(async () => {
if (isReInstantiateIntervalRunning) return;
isReInstantiateIntervalRunning = true;

for (let index = 0; index < clientPool.length; index++) {
const apiClient = clientPool[index];

// eslint-disable-next-line no-continue
if (isObject(apiClient)) continue;

// Re-instantiate when null
clientPool[index] = await instantiateNewClient()
const newApiClient = await instantiateNewClient()
.then(client => {
client.poolIndex = index;
return client;
})
.catch(() => null);
});
// Delay to lower stress on the node
.catch(() => delay(Math.ceil(2 * WS_SERVER_PING_INTERVAL), null));
clientPool[index] = newApiClient;
if (newApiClient) Signals.get('newApiClient').dispatch(newApiClient.poolIndex);
}

isReInstantiateIntervalRunning = false;
}, WS_SERVER_PING_INTERVAL);
}

Expand Down Expand Up @@ -183,14 +202,15 @@ const getApiClient = async poolIndex => {
`Dispatched 'resetApiClient' signal from getApiClient for API client ${apiClient.poolIndex}.`,
);
}
return waitForIt(getApiClient, 10);
return waitForIt(getApiClient, Math.ceil(WS_SERVER_PING_INTERVAL / MAX_CLIENT_POOL_SIZE));
})();
};

const resetApiClient = async (apiClient, isEventSubscriptionClient = false) => {
// Replace the dead API client in the pool
if (!isObject(apiClient)) {
logger.warn(`apiClient is ${JSON.stringify(apiClient)}. Cannot reset.`);
if (isEventSubscriptionClient) Signals.get('eventSubscriptionClientReset').dispatch();
return;
}

Expand Down
45 changes: 27 additions & 18 deletions services/blockchain-connector/shared/sdk/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,27 +139,34 @@ const ensureAPIClientLiveness = () => {

if (isNodeSynced && isGenesisBlockDownloaded) {
setInterval(async () => {
if (typeof eventsCounter === 'number' && eventsCounter > 0) {
eventsCounter = 0;
} else {
if (typeof eventsCounter !== 'number') {
logger.warn(
`eventsCounter ended up with non-numeric value: ${JSON.stringify(
eventsCounter,
null,
'\t',
)}.`,
);
try {
if (typeof eventsCounter === 'number' && eventsCounter > 0) {
eventsCounter = 0;
}
} else {
if (typeof eventsCounter !== 'number') {
logger.warn(
`eventsCounter ended up with non-numeric value: ${JSON.stringify(
eventsCounter,
null,
'\t',
)}.`,
);
eventsCounter = 0;
}

if (typeof eventSubscribeClientPoolIndex === 'number') {
const apiClient = await getApiClient(eventSubscribeClientPoolIndex);
Signals.get('resetApiClient').dispatch(apiClient, true);
logger.debug(
`Dispatched 'resetApiClient' signal for the event subscription API client ${apiClient.poolIndex}.`,
);
if (typeof eventSubscribeClientPoolIndex === 'number') {
const apiClient = await getApiClient(eventSubscribeClientPoolIndex);
Signals.get('resetApiClient').dispatch(apiClient, true);
logger.debug(
`Dispatched 'resetApiClient' signal for the event subscription API client ${apiClient.poolIndex}.`,
);
} else {
logger.debug('Triggered subscribeToAllRegisteredEvents from ensureAPIClientLiveness.');
await subscribeToAllRegisteredEvents();
}
}
} catch (_) {
// No action required
}
}, config.clientConnVerifyInterval);
} else {
Expand All @@ -170,11 +177,13 @@ const ensureAPIClientLiveness = () => {
};

const nodeIsSyncedListener = () => {
logger.debug('Node is now synced with the network.');
isNodeSynced = true;
ensureAPIClientLiveness();
};

const genesisBlockDownloadedListener = () => {
logger.debug('Genesis block is now downloaded.');
isGenesisBlockDownloaded = true;
ensureAPIClientLiveness();
};
Expand Down
10 changes: 10 additions & 0 deletions services/blockchain-connector/shared/sdk/interoperability.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
const { invokeEndpoint } = require('./client');
const { getNodeInfo } = require('./endpoints_1');

let isThisMainchain;
let mainchainID;
let registrationFee;

Expand All @@ -36,6 +37,14 @@ const getMainchainID = async () => {
return mainchainID;
};

const isMainchain = async () => {
if (typeof isThisMainchain !== 'boolean') {
const { chainID } = await getNodeInfo();
isThisMainchain = chainID === (await getMainchainID());
}
return isThisMainchain;
};

const getChannel = async chainID => {
const channelInfo = await invokeEndpoint('interoperability_getChannel', { chainID });
return channelInfo;
Expand All @@ -51,6 +60,7 @@ const getRegistrationFee = async () => {
module.exports = {
getChainAccount,
getMainchainID,
isMainchain,
getChannel,
getRegistrationFee,
};
8 changes: 7 additions & 1 deletion services/blockchain-connector/shared/sdk/pos.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ const getAllPosValidators = async isForceReload => {
return allPosValidators;
};

Signals.get('reloadAllPosValidators').add(() => getAllPosValidators(true));
Signals.get('reloadAllPosValidators').add(() =>
getAllPosValidators(true).catch(err => {
logger.warn(
`Could not force reload the PoS validators list. Will retry again later.\nError: ${err.message}`,
);
}),
);

const getPosValidatorsByStake = async limit => {
const validators = await invokeEndpoint('pos_getValidatorsByStake', { limit });
Expand Down
3 changes: 2 additions & 1 deletion services/blockchain-connector/shared/sdk/token.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*
*/
const { invokeEndpoint } = require('./client');
const { isMainchain } = require('./interoperability');

let escrowedAmounts;
let supportedTokens;
Expand Down Expand Up @@ -68,7 +69,7 @@ const hasEscrowAccount = async ({ tokenID, escrowChainID }) =>

const updateTokenInfo = async () => {
escrowedAmounts = await getEscrowedAmounts(true);
supportedTokens = await getSupportedTokens(true);
if (!(await isMainchain()) || !supportedTokens) supportedTokens = await getSupportedTokens(true);
totalSupply = await getTotalSupply(true);
};

Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-coordinator/shared/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ const waitForGenesisBlockIndexing = (resolve, reject) =>
const jobCount = await getLiveIndexingJobCount();
if (jobCount >= 1) {
logger.info(
`Genesis block indexing is still in progress. Waiting for ${REFRESH_INTERVAL}ms to re-check the genesis block indexing status.`,
`Genesis block indexing still in progress. Waiting for ${REFRESH_INTERVAL}ms to re-check the genesis block indexing status.`,
);
intervalID = setInterval(
waitForGenesisBlockIndexing.bind(null, resolve, reject),
Expand Down
Loading