Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Add handling for txpool_newTransaction event #2009

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions services/blockchain-connector/events/controller/blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ const { Logger, Signals } = require('lisk-service-framework');
const { MODULE_NAME_POS } = require('../../shared/sdk/constants/names');

const { getBlockByID } = require('../../shared/sdk/endpoints');
const { formatBlock: formatBlockFromFormatter } = require('../../shared/sdk/formatter');
const {
formatBlock: formatBlockFromFormatter,
formatTransaction,
} = require('../../shared/sdk/formatter');

const EMPTY_TREE_ROOT_HASH = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855';
const logger = Logger();
Expand All @@ -43,7 +46,11 @@ const appNetworkEventController = async cb => {
};

const txpoolNewTransactionController = async cb => {
const txpoolNewTransactionListener = async payload => cb(payload);
const txpoolNewTransactionListener = async payload =>
cb({
...payload,
transaction: formatTransaction(payload.transaction),
nagdahimanshu marked this conversation as resolved.
Show resolved Hide resolved
});
Signals.get('txpoolNewTransaction').add(txpoolNewTransactionListener);
};

Expand Down
4 changes: 4 additions & 0 deletions services/blockchain-coordinator/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ const app = Microservice({
logger.debug("Received a 'systemNodeInfo' moleculer event from connecter.");
Signals.get('nodeInfo').dispatch(payload);
},
txpoolNewTransaction: async payload => {
logger.debug("Received a 'txpoolNewTransaction' moleculer event from connecter.");
Signals.get('txpoolNewTransaction').dispatch(payload);
},
},
dependencies: ['connector', 'indexer'],
});
Expand Down
11 changes: 10 additions & 1 deletion services/blockchain-coordinator/shared/eventsScheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ const scheduleDeleteBlock = async payload => {
const scheduleUpdatesOnNewRound = async payload => {
logger.debug('Scheduling updates on new round.');
await eventMessageQueue.add({ ...payload, isNewRound: true });
logger.debug('Finished scheduling updates on new round}.');
logger.debug('Finished scheduling updates on new round.');
};

const scheduleUpdatesOnNewTransaction = async payload => {
logger.debug('Scheduling updates on new transaction in the pool.');
await eventMessageQueue.add({ ...payload, isTxPoolNewTransaction: true });
logger.debug('Finished scheduling updates on new transaction in the pool.');
};

const initEventsScheduler = async () => {
Expand All @@ -53,6 +59,9 @@ const initEventsScheduler = async () => {

const newRoundListener = async payload => scheduleUpdatesOnNewRound(payload);
Signals.get('newRound').add(newRoundListener);

const txpoolNewTransactionListener = async payload => scheduleUpdatesOnNewTransaction(payload);
Signals.get('txpoolNewTransaction').add(txpoolNewTransactionListener);
};

module.exports = {
Expand Down
4 changes: 4 additions & 0 deletions services/blockchain-indexer/events/blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ module.exports = [
logger.error(`Error occurred when processing 'transactions.new' event:\n${err.stack}`);
}
};

const txPoolNewTransactionListener = async payload => callback(payload);

Signals.get('newBlock').add(newTransactionsListener);
Signals.get('txPoolNewTransaction').add(txPoolNewTransactionListener);
},
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ const {
formatTransactionsInBlock,
} = require('./transactions');

const { getPendingTransactions, loadAllPendingTransactions } = require('./pendingTransactions');
const {
getPendingTransactions,
loadAllPendingTransactions,
formatPendingTransaction,
} = require('./pendingTransactions');

const {
getBlockchainApps,
Expand Down Expand Up @@ -127,6 +131,7 @@ module.exports = {
normalizeTransaction,
getPendingTransactions,
loadAllPendingTransactions,
formatPendingTransaction,
postTransactions,
dryRunTransactions,
estimateTransactionFees,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const BluebirdPromise = require('bluebird');
const {
Logger,
Exceptions: { ValidationException },
Signals,
} = require('lisk-service-framework');

const logger = Logger();
Expand All @@ -26,52 +27,55 @@ const { normalizeTransaction } = require('./transactions');
const { getIndexedAccountInfo } = require('../utils/account');
const { requestConnector } = require('../../utils/request');
const { getLisk32AddressFromPublicKey } = require('../../utils/account');
const { TRANSACTION_STATUS } = require('../../constants');
const { indexAccountPublicKey } = require('../../indexer/accountIndex');

let pendingTransactionsList = [];

const getPendingTransactionsFromCore = async () => {
const formatPendingTransaction = async transaction => {
const normalizedTransaction = await normalizeTransaction(transaction);
const senderAddress = getLisk32AddressFromPublicKey(normalizedTransaction.senderPublicKey);
const account = await getIndexedAccountInfo({ address: senderAddress }, ['name']);

normalizedTransaction.sender = {
address: senderAddress,
publicKey: normalizedTransaction.senderPublicKey,
name: account.name || null,
};

if (normalizedTransaction.params.recipientAddress) {
const recipientAccount = await getIndexedAccountInfo(
{ address: normalizedTransaction.params.recipientAddress },
['publicKey', 'name'],
);

normalizedTransaction.meta = {
recipient: {
address: normalizedTransaction.params.recipientAddress,
publicKey: recipientAccount ? recipientAccount.publicKey : null,
name: recipientAccount ? recipientAccount.name : null,
},
};
}

indexAccountPublicKey(normalizedTransaction.senderPublicKey);
normalizedTransaction.executionStatus = TRANSACTION_STATUS.PENDING;
return normalizedTransaction;
};

const getPendingTransactionsFromNode = async () => {
const response = await requestConnector('getTransactionsFromPool');
const pendingTx = await BluebirdPromise.map(
response,
async transaction => {
const normalizedTransaction = await normalizeTransaction(transaction);
const senderAddress = getLisk32AddressFromPublicKey(normalizedTransaction.senderPublicKey);
const account = await getIndexedAccountInfo({ address: senderAddress }, ['name']);

normalizedTransaction.sender = {
address: senderAddress,
publicKey: normalizedTransaction.senderPublicKey,
name: account.name || null,
};

if (normalizedTransaction.params.recipientAddress) {
const recipientAccount = await getIndexedAccountInfo(
{ address: normalizedTransaction.params.recipientAddress },
['publicKey', 'name'],
);

normalizedTransaction.meta = {
recipient: {
address: normalizedTransaction.params.recipientAddress,
publicKey: recipientAccount ? recipientAccount.publicKey : null,
name: recipientAccount ? recipientAccount.name : null,
},
};
}

indexAccountPublicKey(normalizedTransaction.senderPublicKey);
normalizedTransaction.executionStatus = 'pending';
return normalizedTransaction;
},
async transaction => formatPendingTransaction(transaction),
{ concurrency: response.length },
);
return pendingTx;
};

const loadAllPendingTransactions = async () => {
try {
pendingTransactionsList = await getPendingTransactionsFromCore();
pendingTransactionsList = await getPendingTransactionsFromNode();
logger.info(
`Updated pending transaction cache with ${pendingTransactionsList.length} transactions.`,
);
Expand Down Expand Up @@ -156,7 +160,7 @@ const getPendingTransactions = async params => {
.slice(offset, offset + limit)
.map(transaction => {
// Set the 'executionStatus'
transaction.executionStatus = 'pending';
transaction.executionStatus = TRANSACTION_STATUS.PENDING;
return transaction;
});

Expand All @@ -169,9 +173,16 @@ const getPendingTransactions = async params => {
return pendingTransactions;
};

const txPoolNewTransactionListener = async payload => {
const [transaction] = payload.data;
pendingTransactionsList.push(transaction);
};
Signals.get('txPoolNewTransaction').add(txPoolNewTransactionListener);

module.exports = {
getPendingTransactions,
loadAllPendingTransactions,
formatPendingTransaction,

// For unit test
validateParams,
Expand Down
2 changes: 2 additions & 0 deletions services/blockchain-indexer/shared/dataService/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const {
getBlockByHeight,
getBlockByID,
loadAllPendingTransactions,
formatPendingTransaction,
getTransactionIDsByBlockID,
getTransactionsByIDs,
normalizeTransaction,
Expand Down Expand Up @@ -141,6 +142,7 @@ module.exports = {
getTransactions,
getPendingTransactions,
reloadAllPendingTransactions,
formatPendingTransaction,
postTransactions,
getTransactionsByBlockID,
dryRunTransactions,
Expand Down
15 changes: 14 additions & 1 deletion services/blockchain-indexer/shared/messageProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const {
reloadValidatorCache,
getGenerators,
getNumberOfGenerators,
formatPendingTransaction,
} = require('./dataService');
const { accountAddrUpdateQueue } = require('./indexer/accountIndex');

Expand Down Expand Up @@ -117,6 +118,15 @@ const newRoundProcessor = async () => {
logger.info(`Finished performing all updates on new round.`);
};

const txPoolNewTransactionProcessor = async transaction => {
logger.debug(`New transaction (${transaction.id}) received.`);
const formattedTransaction = await formatPendingTransaction(transaction);
Signals.get('txPoolNewTransaction').dispatch({
data: [formattedTransaction],
meta: { count: 1, total: 1, offset: 0 },
});
};

const initMessageProcessors = async () => {
logger.info(`Registering job processor for ${accountMessageQueue.name} message queue.`);
accountMessageQueue.process(async job => {
Expand All @@ -138,7 +148,7 @@ const initMessageProcessors = async () => {

eventMessageQueue.process(async job => {
logger.debug('Subscribed to the events from coordinator.');
const { isNewBlock, isDeleteBlock, isNewRound } = job.data;
const { isNewBlock, isDeleteBlock, isNewRound, isTxPoolNewTransaction } = job.data;

if (isNewBlock) {
const { block } = job.data;
Expand All @@ -154,6 +164,9 @@ const initMessageProcessors = async () => {
}
} else if (isNewRound) {
await newRoundProcessor();
} else if (isTxPoolNewTransaction) {
const { transaction } = job.data;
await txPoolNewTransactionProcessor(transaction);
}
});

Expand Down