Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Connector creates too many ws connections with the node which stresses out the node #1981

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/blockchain-app-registry/events/metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module.exports = [
description: 'Emit event when the database is successfully synchronized',
controller: async callback => {
const updateMetadataListener = async data => {
logger.debug('Database has been successfully synchronized');
logger.debug('Database has been successfully synchronized.');
callback(data);
};
Signals.get('metadataUpdated').add(updateMetadataListener);
Expand Down
8 changes: 7 additions & 1 deletion services/blockchain-connector/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*
*/
const path = require('path');
const { Microservice, Logger, LoggerConfig } = require('lisk-service-framework');
const { Signals, Microservice, Logger, LoggerConfig } = require('lisk-service-framework');

const config = require('./config');

Expand All @@ -31,6 +31,12 @@ const app = Microservice({
transporter: config.transporter,
brokerTimeout: config.brokerTimeout, // in seconds
logger: config.log,
events: {
'update.index.status': async payload => {
logger.debug("Received a 'update.index.status' moleculer event from indexer.");
Signals.get('updateIndexStatus').dispatch(payload);
},
},
});

nodeStatus.waitForNode().then(async () => {
Expand Down
1 change: 1 addition & 0 deletions services/blockchain-connector/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ config.job = {
config.apiClient = {
heartbeatAckMaxWaitTime: Number(process.env.HEARTBEAT_ACK_MAX_WAIT_TIME) || 1000, // in millisecs
aliveAssumptionTime: Number(process.env.CLIENT_ALIVE_ASSUMPTION_TIME) || 5 * 1000, // in millisecs
aliveAssumptionTimeBeforeGenesis: Number(30 * 1000),
instantiation: {
maxWaitTime: Number(process.env.CLIENT_INSTANTIATION_MAX_WAIT_TIME) || 5 * 1000, // in millisecs
retryInterval: Number(process.env.CLIENT_INSTANTIATION_RETRY_INTERVAL) || 1, // in millisecs
Expand Down
4 changes: 2 additions & 2 deletions services/blockchain-connector/shared/geolocation.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const requestData = async requestedIp => {
getFromHttp(ip)
.then(data => {
if (data) cacheRedis.set(key, data.data, GEOIP_TTL);
logger.debug(`Fetched geolocation data from online service for IP ${ip}`);
logger.debug(`Fetched geolocation data from online service for IP ${ip}.`);
refreshSchedule.push(
setTimeout(
() => refreshData(ip),
Expand All @@ -84,7 +84,7 @@ const autoCleanUp = () =>
const tooMuch = refreshSchedule.splice(0, refreshSchedule.length - SCHEDULE_MAX_LENGTH);
tooMuch.forEach(item => clearInterval(item));
logger.debug(
`Cache queue: Removed ${tooMuch.length} items, ${refreshSchedule.length} last elements left`,
`Cache queue: Removed ${tooMuch.length} items, ${refreshSchedule.length} last elements left.`,
);
}, SCHEDULE_CLEANUP_INTERVAL);

Expand Down
12 changes: 6 additions & 6 deletions services/blockchain-connector/shared/sdk/blocksUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,24 @@ const loadConfig = async () => {

if (config.genesisBlockUrl !== config.constants.GENESIS_BLOCK_URL_DEFAULT) {
genesisBlockUrl = config.genesisBlockUrl;
logger.info(`genesisBlockUrl set to ${genesisBlockUrl}`);
logger.info(`genesisBlockUrl set to ${genesisBlockUrl}.`);

genesisBlockFilePath = `./data/${chainID}/genesis_block.json`;
logger.info(`genesisBlockFilePath set to ${genesisBlockFilePath}`);
logger.info(`genesisBlockFilePath set to ${genesisBlockFilePath}.`);
} else {
// Check if current node is running Lisk Core
const [networkConfig] = config.networks.LISK.filter(c => chainID === c.chainID);
if (networkConfig) {
logger.info(`Found config for ${networkConfig.name} (${chainID})`);
logger.info(`Found config for ${networkConfig.name} (${chainID}).`);

genesisBlockUrl = networkConfig.genesisBlockUrl;
logger.info(`genesisBlockUrl set to ${genesisBlockUrl}`);
logger.info(`genesisBlockUrl set to ${genesisBlockUrl}.`);

genesisBlockFilePath = `./data/${chainID}/genesis_block.json`;
logger.info(`genesisBlockFilePath set to ${genesisBlockFilePath}`);
logger.info(`genesisBlockFilePath set to ${genesisBlockFilePath}.`);
} else {
logger.info(
`Network is neither defined in the config, nor in the environment variable (${chainID})`,
`Network is neither defined in the config, nor in the environment variable (${chainID}).`,
);
return;
}
Expand Down
36 changes: 32 additions & 4 deletions services/blockchain-connector/shared/sdk/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const MAX_INSTANTIATION_WAIT_TIME = config.apiClient.instantiation.maxWaitTime;
const NUM_REQUEST_RETRIES = config.apiClient.request.maxRetries;
const ENDPOINT_INVOKE_RETRY_DELAY = config.apiClient.request.retryDelay;
const CLIENT_ALIVE_ASSUMPTION_TIME = config.apiClient.aliveAssumptionTime;
const CLIENT_ALIVE_ASSUMPTION_TIME_BEFORE_GENESIS =
config.apiClient.aliveAssumptionTimeBeforeGenesis;
const HEARTBEAT_ACK_MAX_WAIT_TIME = config.apiClient.heartbeatAckMaxWaitTime;

// Caching and flags
Expand All @@ -42,6 +44,7 @@ let lastClientAliveTime;
let heartbeatCheckBeginTime;
let isInstantiating = false;
let isClientAlive = false;
let isGenesisBlockIndexed = false;

const pongListener = res => {
isClientAlive = true;
Expand Down Expand Up @@ -162,13 +165,38 @@ if (config.isUseLiskIPCClient) {
const resetApiClientListener = async () => instantiateClient(true).catch(() => {});
Signals.get('resetApiClient').add(resetApiClientListener);
} else {
const triggerRegularClientLivelinessChecks = () =>
setInterval(async () => {
let intervalTimeout;
const triggerRegularClientLivelinessChecks = intervalMs => {
intervalTimeout = setInterval(async () => {
const isAlive = await checkIsClientAlive();
if (!isAlive) instantiateClient(true).catch(() => {});
}, CLIENT_ALIVE_ASSUMPTION_TIME);
}, intervalMs);
};

Signals.get('genesisBlockDownloaded').add(triggerRegularClientLivelinessChecks);
const genesisBlockDownloadedListener = () => {
triggerRegularClientLivelinessChecks(CLIENT_ALIVE_ASSUMPTION_TIME_BEFORE_GENESIS);
logger.info(
`API client heartbeat checks scheduled every ${CLIENT_ALIVE_ASSUMPTION_TIME_BEFORE_GENESIS}ms. The frequency will be set to ${CLIENT_ALIVE_ASSUMPTION_TIME}ms after successful indexing of the genesis block.`,
);
};

const genesisBlockIndexedListener = indexStatus => {
if (
!isGenesisBlockIndexed &&
indexStatus.data &&
indexStatus.data.genesisHeight <= indexStatus.data.lastIndexedBlockHeight
) {
clearInterval(intervalTimeout);
triggerRegularClientLivelinessChecks(CLIENT_ALIVE_ASSUMPTION_TIME);
isGenesisBlockIndexed = true;
logger.info(
`API client heartbeat checks re-scheduled to run every ${CLIENT_ALIVE_ASSUMPTION_TIME}ms.`,
);
}
};

Signals.get('genesisBlockDownloaded').add(genesisBlockDownloadedListener);
Signals.get('updateIndexStatus').add(genesisBlockIndexedListener);
}

module.exports = {
Expand Down
12 changes: 6 additions & 6 deletions services/blockchain-connector/shared/utils/download.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ const downloadAndExtractTarball = (url, directoryPath) =>

const downloadJSONFile = (fileUrl, filePath) =>
new Promise((resolve, reject) => {
logger.info(`Downloading JSON file from ${fileUrl} as ${filePath}`);
logger.info(`Downloading JSON file from ${fileUrl} as ${filePath}.`);
request(fileUrl)
.then(async response => {
const block = typeof response === 'string' ? JSON.parse(response).data : response.data;
fs.writeFile(filePath, JSON.stringify(block), () => {
logger.info('File downloaded successfully');
logger.info('File downloaded successfully.');
resolve();
});
})
Expand All @@ -68,15 +68,15 @@ const downloadJSONFile = (fileUrl, filePath) =>

const downloadAndUnzipFile = (fileUrl, filePath) =>
new Promise((resolve, reject) => {
logger.info(`Downloading and extracting file from ${fileUrl} as ${filePath}`);
logger.info(`Downloading and extracting file from ${fileUrl} as ${filePath}.`);
getHTTPProtocolByURL(fileUrl).get(fileUrl, response => {
if (response.statusCode === 200) {
const unzip = zlib.createUnzip();
const writeFile = fs.createWriteStream(filePath);
response.pipe(unzip).pipe(writeFile);
response.on('error', async err => reject(new Error(err)));
response.on('end', async () => {
logger.info('File downloaded successfully');
logger.info('File downloaded successfully.');
resolve();
});
} else {
Expand All @@ -90,7 +90,7 @@ const downloadAndUnzipFile = (fileUrl, filePath) =>

const downloadFile = (url, dirPath) =>
new Promise((resolve, reject) => {
logger.info(`Downloading file from ${url} to ${dirPath}`);
logger.info(`Downloading file from ${url} to ${dirPath}.`);

getHTTPProtocolByURL(url).get(url, response => {
if (response.statusCode === 200) {
Expand All @@ -100,7 +100,7 @@ const downloadFile = (url, dirPath) =>
response.pipe(writeStream);
response.on('error', async err => reject(new Error(err)));
response.on('end', async () => {
logger.info('File downloaded successfully');
logger.info('File downloaded successfully.');
resolve();
});
} else {
Expand Down
8 changes: 4 additions & 4 deletions services/blockchain-coordinator/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ const app = Microservice({
logger: config.log,
events: {
chainNewBlock: async payload => {
logger.debug("Received a 'chainNewBlock' event from connecter.");
logger.debug("Received a 'chainNewBlock' moleculer event from connecter.");
Signals.get('newBlock').dispatch(payload);
},
chainDeleteBlock: async payload => {
logger.debug("Received a 'chainDeleteBlock' event from connecter.");
logger.debug("Received a 'chainDeleteBlock' moleculer event from connecter.");
Signals.get('deleteBlock').dispatch(payload);
},
chainValidatorsChange: async payload => {
logger.debug("Received a 'chainValidatorsChange' event from connecter.");
logger.debug("Received a 'chainValidatorsChange' moleculer event from connecter.");
Signals.get('newRound').dispatch(payload);
},
systemNodeInfo: async payload => {
logger.debug("Received a 'systemNodeInfo' event from connecter.");
logger.debug("Received a 'systemNodeInfo' moleculer event from connecter.");
Signals.get('nodeInfo').dispatch(payload);
},
},
Expand Down
6 changes: 3 additions & 3 deletions services/blockchain-indexer/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ const defaultBrokerConfig = {
logger: config.log,
events: {
chainNewBlock: async () => {
logger.debug("Received a 'chainNewBlock' event from connecter.");
logger.debug("Received a 'chainNewBlock' moleculer event from connecter.");
Signals.get('chainNewBlock').dispatch();
},
systemNodeInfo: async payload => {
logger.debug("Received a 'systemNodeInfo' event from connecter.");
logger.debug("Received a 'systemNodeInfo' moleculer event from connecter.");
Signals.get('nodeInfo').dispatch(payload);
},
'update.fee_estimates': async payload => {
logger.debug("Received a 'update.fee_estimates' event from fee-estimator.");
logger.debug("Received a 'update.fee_estimates' moleculer event from fee-estimator.");
await setFeeEstimates(payload);
},
},
Expand Down
10 changes: 5 additions & 5 deletions services/blockchain-indexer/events/blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ module.exports = [
try {
if (payload && Array.isArray(payload.data)) {
const [block] = payload.data;
logger.debug(`New block arrived (${block.id})...`);
logger.debug(`Received new block (${block.id})...`);
// Fork detection
if (localPreviousBlockId) {
if (localPreviousBlockId !== block.previousBlockId) {
logger.debug(`Fork detected at block height ${localPreviousBlockId}`);
logger.debug(`Fork detected at block height ${localPreviousBlockId}.`);
}
}
localPreviousBlockId = block.id;
Expand Down Expand Up @@ -68,7 +68,7 @@ module.exports = [

if (numberOfTransactions > 0) {
logger.debug(
`Block (${block.id}) arrived containing ${block.numberOfTransactions} new transactions`,
`Received block (${block.id}) containing ${block.numberOfTransactions} new transactions.`,
);

const formattedTransactions = await formatTransactionsInBlock(block);
Expand Down Expand Up @@ -161,7 +161,7 @@ module.exports = [
description: 'Returns true when the index is ready',
controller: callback => {
const indexStatusListener = async payload => {
logger.debug("Dispatching 'index.ready' event over websocket");
logger.debug("Dispatching 'index.ready' event to message broker.");
callback(payload);
};
Signals.get('blockIndexReady').add(indexStatusListener);
Expand All @@ -172,7 +172,7 @@ module.exports = [
description: 'Emit index status updates.',
controller: callback => {
const indexStatusUpdateListener = async payload => {
logger.debug("Dispatching 'update.index.status' event over websocket");
logger.debug("Dispatching 'update.index.status' event to message broker.");
callback(payload);
};
Signals.get('updateIndexStatus').add(indexStatusUpdateListener);
Expand Down
8 changes: 4 additions & 4 deletions services/blockchain-indexer/shared/dataService/blocks.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ const getBlocksFromServer = async params => {
meta: {},
};

if (params.blockID) logger.debug(`Retrieved block with ID ${params.blockID} from Lisk Core`);
if (params.blockID) logger.debug(`Retrieved block with ID ${params.blockID} from the node.`);
else if (params.height)
logger.debug(`Retrieved block with height: ${params.height} from Lisk Core`);
else logger.debug(`Retrieved block with custom search: ${util.inspect(params)} from Lisk Core`);
logger.debug(`Retrieved block with height: ${params.height} from the node.`);
else logger.debug(`Retrieved block with custom search: ${util.inspect(params)} from the node.`);

const response = await business.getBlocks(params);
if (response.data) blocks.data = response.data;
Expand Down Expand Up @@ -140,7 +140,7 @@ const getBlocksAssets = async params => {

const performLastBlockUpdate = async newBlock => {
try {
logger.debug(`Setting last block to height: ${newBlock.height} (id: ${newBlock.id})`);
logger.debug(`Setting last block to height: ${newBlock.height} (id: ${newBlock.id}).`);
await setLastBlock(newBlock);
} catch (err) {
logger.error(`Error occurred when performing last block update:\n${err.stack}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const applyTransaction = async (blockHeader, tx, events, dbTrx) => {
// Process the transaction to create the entityTableEntry
// And, finally, perform DB operations to update the index
await entityTable.upsert(entityTableEntry, dbTrx); // it is important to pass dbTrx
logger.debug('Add custom logs');
logger.debug('Add custom logs.');

Promise.resolve({ blockHeader, tx });
};
Expand All @@ -66,7 +66,7 @@ const revertTransaction = async (blockHeader, tx, events, dbTrx) => {
// Process the transaction to create the entityTableEntry
// And, finally, perform DB operations to update the index and revert the induced changes
await entityTable.delete(entityTableEntry, dbTrx); // it is important to pass dbTrx
logger.debug('Add custom logs');
logger.debug('Add custom logs.');

Promise.resolve({ blockHeader, tx });
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export const applyTransaction = async (blockHeader, tx, events, dbTrx) => {
// Process the transaction to create the entityTableEntry
// And, finally, perform DB operations to update the index
await entityTable.upsert(entityTableEntry, dbTrx); // it is important to pass dbTrx
logger.debug('Add custom logs');
logger.debug('Add custom logs.');

Promise.resolve({ blockHeader, tx });
};
Expand All @@ -66,7 +66,7 @@ export const revertTransaction = async (blockHeader, tx, events, dbTrx) => {
// Process the transaction to create the entityTableEntry
// And, finally, perform DB operations to update the index and revert the induced changes
await entityTable.delete(entityTableEntry, dbTrx); // it is important to pass dbTrx
logger.debug('Add custom logs');
logger.debug('Add custom logs.');

Promise.resolve({ blockHeader, tx });
};
2 changes: 1 addition & 1 deletion services/export/jobs/purge.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ module.exports = [
interval: config.job.purgeCache.interval,
schedule: config.job.purgeCache.schedule,
controller: () => {
logger.info('Performing cache maintenance');
logger.info('Running cache maintenance.');
partials.purge();
staticFiles.purge();
},
Expand Down
2 changes: 1 addition & 1 deletion services/fee-estimator/shared/dynamicFees.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const calculateEstimateFeePerByteQuick = async () => {
const fromHeight = toHeight - batchSize;

logger.debug(
`Computing quick fee estimate for block ${latestBlock.id} at height ${latestBlock.height}`,
`Computing quick fee estimate for block ${latestBlock.id} at height ${latestBlock.height}.`,
);
const cachedFeeEstPerByteQuick = await checkAndProcessExecution(
fromHeight,
Expand Down
2 changes: 1 addition & 1 deletion services/fee-estimator/shared/utils/dynamicFees.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ const getEstimateFeePerByteForBatch = async (fromHeight, toHeight, cacheKey) =>
await cacheRedisFees.set(cacheKey, feeEstPerByte);

logger.info(
`Recalulated dynamic fees: L: ${feeEstPerByte.low} M: ${feeEstPerByte.med} H: ${feeEstPerByte.high}`,
`Re-calculated dynamic fees: L: ${feeEstPerByte.low} M: ${feeEstPerByte.med} H: ${feeEstPerByte.high}.`,
);

return feeEstPerByte;
Expand Down
2 changes: 1 addition & 1 deletion services/gateway/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ tempApp.run().then(async () => {

if (config.rateLimit.enable) {
logger.info(
`Enabling rate limiter, connLimit: ${config.rateLimit.connectionLimit}, window: ${config.rateLimit.window}`,
`Enabling rate limiter, connLimit: ${config.rateLimit.connectionLimit}, window: ${config.rateLimit.window}.`,
);

gatewayConfig.settings.rateLimit = {
Expand Down
2 changes: 1 addition & 1 deletion services/gateway/shared/ready.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const getReady = () => {
});
return { services: includeSvcForReadiness };
} catch (_) {
logger.error(`Current service status: ${currentSvcStatus}`);
logger.error(`Current service status:\n${JSON.stringify(currentSvcStatus, null, '\t')}`);
throw new MoleculerError('Service Unavailable', 503, 'SERVICES_NOT_READY');
}
};
Expand Down
Loading