Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Add handling for txpool_newTransaction event #2009

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions services/blockchain-connector/events/controller/blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ const { Logger, Signals } = require('lisk-service-framework');
const { MODULE_NAME_POS } = require('../../shared/sdk/constants/names');

const { getBlockByID } = require('../../shared/sdk/endpoints');
const { formatBlock: formatBlockFromFormatter } = require('../../shared/sdk/formatter');
const {
formatBlock: formatBlockFromFormatter,
formatTransaction,
} = require('../../shared/sdk/formatter');

const EMPTY_TREE_ROOT_HASH = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855';
const logger = Logger();
Expand All @@ -43,7 +46,10 @@ const appNetworkEventController = async cb => {
};

const txpoolNewTransactionController = async cb => {
const txpoolNewTransactionListener = async payload => cb(payload);
const txpoolNewTransactionListener = async payload =>
cb({
transaction: formatTransaction(payload.transaction),
nagdahimanshu marked this conversation as resolved.
Show resolved Hide resolved
});
Signals.get('txpoolNewTransaction').add(txpoolNewTransactionListener);
};

Expand Down
4 changes: 4 additions & 0 deletions services/blockchain-coordinator/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ const app = Microservice({
logger.debug("Received a 'systemNodeInfo' moleculer event from connecter.");
Signals.get('nodeInfo').dispatch(payload);
},
txpoolNewTransaction: async payload => {
logger.debug("Received a 'txpoolNewTransaction' moleculer event from connecter.");
Signals.get('txpoolNewTransaction').dispatch(payload);
},
},
dependencies: ['connector', 'indexer'],
});
Expand Down
11 changes: 10 additions & 1 deletion services/blockchain-coordinator/shared/eventsScheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ const scheduleDeleteBlock = async payload => {
const scheduleUpdatesOnNewRound = async payload => {
logger.debug('Scheduling updates on new round.');
await eventMessageQueue.add({ ...payload, isNewRound: true });
logger.debug('Finished scheduling updates on new round}.');
logger.debug('Finished scheduling updates on new round.');
};

const scheduleUpdatesOnNewTransaction = async payload => {
logger.debug('Scheduling updates on new transaction in the pool.');
await eventMessageQueue.add({ ...payload, isTxPoolNewTransaction: true });
logger.debug('Finished scheduling updates on new transaction in the pool.');
};

const initEventsScheduler = async () => {
Expand All @@ -53,6 +59,9 @@ const initEventsScheduler = async () => {

const newRoundListener = async payload => scheduleUpdatesOnNewRound(payload);
Signals.get('newRound').add(newRoundListener);

const txpoolNewTransactionListener = async payload => scheduleUpdatesOnNewTransaction(payload);
Signals.get('txpoolNewTransaction').add(txpoolNewTransactionListener);
};

module.exports = {
Expand Down
4 changes: 4 additions & 0 deletions services/blockchain-indexer/events/blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ module.exports = [
logger.error(`Error occurred when processing 'transactions.new' event:\n${err.stack}`);
}
};

const txPoolNewTransactionListener = async payload => callback(payload);

Signals.get('newBlock').add(newTransactionsListener);
Signals.get('txPoolNewTransaction').add(txPoolNewTransactionListener);
},
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ const {
formatTransactionsInBlock,
} = require('./transactions');

const { getPendingTransactions, loadAllPendingTransactions } = require('./pendingTransactions');
const {
getPendingTransactions,
loadAllPendingTransactions,
formatPendingTransaction,
} = require('./pendingTransactions');

const {
getBlockchainApps,
Expand Down Expand Up @@ -127,6 +131,7 @@ module.exports = {
normalizeTransaction,
getPendingTransactions,
loadAllPendingTransactions,
formatPendingTransaction,
postTransactions,
dryRunTransactions,
estimateTransactionFees,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,44 @@ const { indexAccountPublicKey } = require('../../indexer/accountIndex');

let pendingTransactionsList = [];

const formatPendingTransaction = async transaction => {
const normalizedTransaction = await normalizeTransaction(transaction);
const senderAddress = getLisk32AddressFromPublicKey(normalizedTransaction.senderPublicKey);
const account = await getIndexedAccountInfo({ address: senderAddress }, ['name']);

normalizedTransaction.sender = {
address: senderAddress,
publicKey: normalizedTransaction.senderPublicKey,
name: account.name || null,
};

if (normalizedTransaction.params.recipientAddress) {
const recipientAccount = await getIndexedAccountInfo(
{ address: normalizedTransaction.params.recipientAddress },
['publicKey', 'name'],
);

normalizedTransaction.meta = {
recipient: {
address: normalizedTransaction.params.recipientAddress,
publicKey: recipientAccount ? recipientAccount.publicKey : null,
name: recipientAccount ? recipientAccount.name : null,
},
};
}

indexAccountPublicKey(normalizedTransaction.senderPublicKey);
normalizedTransaction.executionStatus = 'pending';
nagdahimanshu marked this conversation as resolved.
Show resolved Hide resolved
return normalizedTransaction;
};

const getPendingTransactionsFromCore = async () => {
nagdahimanshu marked this conversation as resolved.
Show resolved Hide resolved
const response = await requestConnector('getTransactionsFromPool');
const pendingTx = await BluebirdPromise.map(
response,
async transaction => {
nagdahimanshu marked this conversation as resolved.
Show resolved Hide resolved
const normalizedTransaction = await normalizeTransaction(transaction);
const senderAddress = getLisk32AddressFromPublicKey(normalizedTransaction.senderPublicKey);
const account = await getIndexedAccountInfo({ address: senderAddress }, ['name']);

normalizedTransaction.sender = {
address: senderAddress,
publicKey: normalizedTransaction.senderPublicKey,
name: account.name || null,
};

if (normalizedTransaction.params.recipientAddress) {
const recipientAccount = await getIndexedAccountInfo(
{ address: normalizedTransaction.params.recipientAddress },
['publicKey', 'name'],
);

normalizedTransaction.meta = {
recipient: {
address: normalizedTransaction.params.recipientAddress,
publicKey: recipientAccount ? recipientAccount.publicKey : null,
name: recipientAccount ? recipientAccount.name : null,
},
};
}

indexAccountPublicKey(normalizedTransaction.senderPublicKey);
normalizedTransaction.executionStatus = 'pending';
return normalizedTransaction;
const formattedTransaction = await formatPendingTransaction(transaction);
return formattedTransaction;
},
{ concurrency: response.length },
nagdahimanshu marked this conversation as resolved.
Show resolved Hide resolved
);
Expand Down Expand Up @@ -172,6 +177,7 @@ const getPendingTransactions = async params => {
module.exports = {
getPendingTransactions,
loadAllPendingTransactions,
formatPendingTransaction,

// For unit test
validateParams,
Expand Down
2 changes: 2 additions & 0 deletions services/blockchain-indexer/shared/dataService/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const {
getBlockByHeight,
getBlockByID,
loadAllPendingTransactions,
formatPendingTransaction,
getTransactionIDsByBlockID,
getTransactionsByIDs,
normalizeTransaction,
Expand Down Expand Up @@ -141,6 +142,7 @@ module.exports = {
getTransactions,
getPendingTransactions,
reloadAllPendingTransactions,
formatPendingTransaction,
postTransactions,
getTransactionsByBlockID,
dryRunTransactions,
Expand Down
15 changes: 14 additions & 1 deletion services/blockchain-indexer/shared/messageProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const {
reloadValidatorCache,
getGenerators,
getNumberOfGenerators,
formatPendingTransaction,
} = require('./dataService');
const { accountAddrUpdateQueue } = require('./indexer/accountIndex');

Expand Down Expand Up @@ -117,6 +118,15 @@ const newRoundProcessor = async () => {
logger.info(`Finished performing all updates on new round.`);
};

const txPoolNewTransactionProcessor = async transaction => {
logger.debug(`New transaction (${transaction.id}) received.`);
const formattedTransaction = await formatPendingTransaction(transaction);
Signals.get('txPoolNewTransaction').dispatch({
data: [formattedTransaction],
meta: { count: 1, total: 1, offset: 0 },
});
};

const initMessageProcessors = async () => {
logger.info(`Registering job processor for ${accountMessageQueue.name} message queue.`);
accountMessageQueue.process(async job => {
Expand All @@ -138,7 +148,7 @@ const initMessageProcessors = async () => {

eventMessageQueue.process(async job => {
logger.debug('Subscribed to the events from coordinator.');
const { isNewBlock, isDeleteBlock, isNewRound } = job.data;
const { isNewBlock, isDeleteBlock, isNewRound, isTxPoolNewTransaction } = job.data;

if (isNewBlock) {
const { block } = job.data;
Expand All @@ -154,6 +164,9 @@ const initMessageProcessors = async () => {
}
} else if (isNewRound) {
await newRoundProcessor();
} else if (isTxPoolNewTransaction) {
const { transaction } = job.data;
await txPoolNewTransactionProcessor(transaction);
}
});

Expand Down