Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
🐛 Kickstart event subscription if the eventSubscriptionClient is renewed
Browse files Browse the repository at this point in the history
  • Loading branch information
sameersubudhi committed Jan 10, 2024
1 parent e9f9260 commit 42d7def
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
2 changes: 1 addition & 1 deletion services/blockchain-connector/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ config.apiClient = {

// Every n milliseconds, verify if client connection is alive
config.clientConnVerifyInterval =
Number(process.env.CLIENT_CONNECTION_VERIFY_INTERVAL) || 60 * 1000; // in millisecs
Number(process.env.CLIENT_CONNECTION_VERIFY_INTERVAL) || 30 * 1000; // in millisecs

// Backdoor config to restart the connector if the stall issue pops up - disabled by default
const exitDelay = Number(process.env.CONNECTOR_EXIT_DELAY_IN_HOURS); // in hours
Expand Down
18 changes: 13 additions & 5 deletions services/blockchain-connector/shared/sdk/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ const clientInstantiationStats = {
};
let requestCount = 0;

const checkIsClientAlive = client => client && client._channel && client._channel.isAlive;

const getApiClientStats = () => ({
...clientInstantiationStats,
currentPoolSize: clientPool.length,
activePoolSize: clientPool.filter(client => checkIsClientAlive(client)).length,
expectedPoolSize: MAX_CLIENT_POOL_SIZE,
numEndpointInvocations: requestCount,
});

const checkIsClientAlive = client => client && client._channel && client._channel.isAlive;

const pingListener = apiClient => {
if (!isObject(apiClient)) {
logger.warn(`apiClient is ${JSON.stringify(apiClient)}. Cannot register a pingListener.`);
Expand Down Expand Up @@ -125,20 +125,28 @@ const initClientPool = async poolSize => {
// Set the intervals only at application init
if (clientPool.length === 0) {
setInterval(() => {
logger.info(`API client instantiation stats: ${JSON.stringify(getApiClientStats())}`);
const stats = getApiClientStats();
logger.info(`API client instantiation stats: ${JSON.stringify(stats)}`);
if (stats.activePoolSize < stats.expectedPoolSize) {
logger.warn(
'activePoolSize should catch up with the expectedPoolSize, once the node is under less stress.',
);
}
}, 5 * 60 * 1000);

setInterval(() => {
clientPool.forEach(async (apiClient, index) => {
if (isObject(apiClient)) return;

// Re-instantiate when null
clientPool[index] = await instantiateNewClient()
const newApiClient = await instantiateNewClient()
.then(client => {
client.poolIndex = index;
return client;
})
.catch(() => null);
clientPool[index] = newApiClient;
if (newApiClient) Signals.get('newApiClient').dispatch(newApiClient.poolIndex);
});
}, WS_SERVER_PING_INTERVAL);
}
Expand Down
5 changes: 5 additions & 0 deletions services/blockchain-connector/shared/sdk/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ const ensureAPIClientLiveness = () => {
logger.debug(
`Dispatched 'resetApiClient' signal for the event subscription API client ${apiClient.poolIndex}.`,
);
} else {
logger.debug('Triggered subscribeToAllRegisteredEvents from ensureAPIClientLiveness.');
await subscribeToAllRegisteredEvents();
}
}
} catch (_) {
Expand All @@ -174,11 +177,13 @@ const ensureAPIClientLiveness = () => {
};

const nodeIsSyncedListener = () => {
logger.debug('Node is now synced with the network.');
isNodeSynced = true;
ensureAPIClientLiveness();
};

const genesisBlockDownloadedListener = () => {
logger.debug('Genesis block is now downloaded.');
isGenesisBlockDownloaded = true;
ensureAPIClientLiveness();
};
Expand Down

0 comments on commit 42d7def

Please sign in to comment.