diff --git a/services/blockchain-connector/events/controller/blockchain.js b/services/blockchain-connector/events/controller/blockchain.js index 447f27ff65..df5a0c9fbd 100644 --- a/services/blockchain-connector/events/controller/blockchain.js +++ b/services/blockchain-connector/events/controller/blockchain.js @@ -17,7 +17,10 @@ const { Logger, Signals } = require('lisk-service-framework'); const { MODULE_NAME_POS } = require('../../shared/sdk/constants/names'); const { getBlockByID } = require('../../shared/sdk/endpoints'); -const { formatBlock: formatBlockFromFormatter } = require('../../shared/sdk/formatter'); +const { + formatBlock: formatBlockFromFormatter, + formatTransaction, +} = require('../../shared/sdk/formatter'); const EMPTY_TREE_ROOT_HASH = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'; const logger = Logger(); @@ -43,7 +46,11 @@ const appNetworkEventController = async cb => { }; const txpoolNewTransactionController = async cb => { - const txpoolNewTransactionListener = async payload => cb(payload); + const txpoolNewTransactionListener = async payload => + cb({ + ...payload, + transaction: formatTransaction(payload.transaction), + }); Signals.get('txpoolNewTransaction').add(txpoolNewTransactionListener); }; diff --git a/services/blockchain-coordinator/app.js b/services/blockchain-coordinator/app.js index bbbc71bf5f..6ce5f2f0e9 100644 --- a/services/blockchain-coordinator/app.js +++ b/services/blockchain-coordinator/app.js @@ -47,6 +47,10 @@ const app = Microservice({ logger.debug("Received a 'systemNodeInfo' moleculer event from connecter."); Signals.get('nodeInfo').dispatch(payload); }, + txpoolNewTransaction: async payload => { + logger.debug("Received a 'txpoolNewTransaction' moleculer event from connecter."); + Signals.get('txpoolNewTransaction').dispatch(payload); + }, }, dependencies: ['connector', 'indexer'], }); diff --git a/services/blockchain-coordinator/shared/eventsScheduler.js b/services/blockchain-coordinator/shared/eventsScheduler.js index 281a207b8d..0e7fadbc67 100644 --- a/services/blockchain-coordinator/shared/eventsScheduler.js +++ b/services/blockchain-coordinator/shared/eventsScheduler.js @@ -41,7 +41,13 @@ const scheduleDeleteBlock = async payload => { const scheduleUpdatesOnNewRound = async payload => { logger.debug('Scheduling updates on new round.'); await eventMessageQueue.add({ ...payload, isNewRound: true }); - logger.debug('Finished scheduling updates on new round}.'); + logger.debug('Finished scheduling updates on new round.'); +}; + +const scheduleUpdatesOnNewTransaction = async payload => { + logger.debug('Scheduling updates on new transaction in the pool.'); + await eventMessageQueue.add({ ...payload, isTxPoolNewTransaction: true }); + logger.debug('Finished scheduling updates on new transaction in the pool.'); }; const initEventsScheduler = async () => { @@ -53,6 +59,9 @@ const initEventsScheduler = async () => { const newRoundListener = async payload => scheduleUpdatesOnNewRound(payload); Signals.get('newRound').add(newRoundListener); + + const txpoolNewTransactionListener = async payload => scheduleUpdatesOnNewTransaction(payload); + Signals.get('txpoolNewTransaction').add(txpoolNewTransactionListener); }; module.exports = { diff --git a/services/blockchain-indexer/events/blockchain.js b/services/blockchain-indexer/events/blockchain.js index 8fec171341..be1cde02bd 100644 --- a/services/blockchain-indexer/events/blockchain.js +++ b/services/blockchain-indexer/events/blockchain.js @@ -82,7 +82,11 @@ module.exports = [ logger.error(`Error occurred when processing 'transactions.new' event:\n${err.stack}`); } }; + + const txPoolNewTransactionListener = async payload => callback(payload); + Signals.get('newBlock').add(newTransactionsListener); + Signals.get('txPoolNewTransaction').add(txPoolNewTransactionListener); }, }, { diff --git a/services/blockchain-indexer/shared/dataService/business/index.js b/services/blockchain-indexer/shared/dataService/business/index.js index 15de7d62fb..cdb786f77b 100644 --- a/services/blockchain-indexer/shared/dataService/business/index.js +++ b/services/blockchain-indexer/shared/dataService/business/index.js @@ -35,7 +35,11 @@ const { formatTransactionsInBlock, } = require('./transactions'); -const { getPendingTransactions, loadAllPendingTransactions } = require('./pendingTransactions'); +const { + getPendingTransactions, + loadAllPendingTransactions, + formatPendingTransaction, +} = require('./pendingTransactions'); const { getBlockchainApps, @@ -127,6 +131,7 @@ module.exports = { normalizeTransaction, getPendingTransactions, loadAllPendingTransactions, + formatPendingTransaction, postTransactions, dryRunTransactions, estimateTransactionFees, diff --git a/services/blockchain-indexer/shared/dataService/business/pendingTransactions.js b/services/blockchain-indexer/shared/dataService/business/pendingTransactions.js index d75376f680..69bc369336 100644 --- a/services/blockchain-indexer/shared/dataService/business/pendingTransactions.js +++ b/services/blockchain-indexer/shared/dataService/business/pendingTransactions.js @@ -17,6 +17,7 @@ const BluebirdPromise = require('bluebird'); const { Logger, Exceptions: { ValidationException }, + Signals, } = require('lisk-service-framework'); const logger = Logger(); @@ -26,44 +27,47 @@ const { normalizeTransaction } = require('./transactions'); const { getIndexedAccountInfo } = require('../utils/account'); const { requestConnector } = require('../../utils/request'); const { getLisk32AddressFromPublicKey } = require('../../utils/account'); +const { TRANSACTION_STATUS } = require('../../constants'); const { indexAccountPublicKey } = require('../../indexer/accountIndex'); let pendingTransactionsList = []; -const getPendingTransactionsFromCore = async () => { +const formatPendingTransaction = async transaction => { + const normalizedTransaction = await normalizeTransaction(transaction); + const senderAddress = getLisk32AddressFromPublicKey(normalizedTransaction.senderPublicKey); + const account = await getIndexedAccountInfo({ address: senderAddress }, ['name']); + + normalizedTransaction.sender = { + address: senderAddress, + publicKey: normalizedTransaction.senderPublicKey, + name: account.name || null, + }; + + if (normalizedTransaction.params.recipientAddress) { + const recipientAccount = await getIndexedAccountInfo( + { address: normalizedTransaction.params.recipientAddress }, + ['publicKey', 'name'], + ); + + normalizedTransaction.meta = { + recipient: { + address: normalizedTransaction.params.recipientAddress, + publicKey: recipientAccount ? recipientAccount.publicKey : null, + name: recipientAccount ? recipientAccount.name : null, + }, + }; + } + + indexAccountPublicKey(normalizedTransaction.senderPublicKey); + normalizedTransaction.executionStatus = TRANSACTION_STATUS.PENDING; + return normalizedTransaction; +}; + +const getPendingTransactionsFromNode = async () => { const response = await requestConnector('getTransactionsFromPool'); const pendingTx = await BluebirdPromise.map( response, - async transaction => { - const normalizedTransaction = await normalizeTransaction(transaction); - const senderAddress = getLisk32AddressFromPublicKey(normalizedTransaction.senderPublicKey); - const account = await getIndexedAccountInfo({ address: senderAddress }, ['name']); - - normalizedTransaction.sender = { - address: senderAddress, - publicKey: normalizedTransaction.senderPublicKey, - name: account.name || null, - }; - - if (normalizedTransaction.params.recipientAddress) { - const recipientAccount = await getIndexedAccountInfo( - { address: normalizedTransaction.params.recipientAddress }, - ['publicKey', 'name'], - ); - - normalizedTransaction.meta = { - recipient: { - address: normalizedTransaction.params.recipientAddress, - publicKey: recipientAccount ? recipientAccount.publicKey : null, - name: recipientAccount ? recipientAccount.name : null, - }, - }; - } - - indexAccountPublicKey(normalizedTransaction.senderPublicKey); - normalizedTransaction.executionStatus = 'pending'; - return normalizedTransaction; - }, + async transaction => formatPendingTransaction(transaction), { concurrency: response.length }, ); return pendingTx; @@ -71,7 +75,7 @@ const getPendingTransactionsFromCore = async () => { const loadAllPendingTransactions = async () => { try { - pendingTransactionsList = await getPendingTransactionsFromCore(); + pendingTransactionsList = await getPendingTransactionsFromNode(); logger.info( `Updated pending transaction cache with ${pendingTransactionsList.length} transactions.`, ); @@ -156,7 +160,7 @@ const getPendingTransactions = async params => { .slice(offset, offset + limit) .map(transaction => { // Set the 'executionStatus' - transaction.executionStatus = 'pending'; + transaction.executionStatus = TRANSACTION_STATUS.PENDING; return transaction; }); @@ -169,9 +173,16 @@ const getPendingTransactions = async params => { return pendingTransactions; }; +const txPoolNewTransactionListener = async payload => { + const [transaction] = payload.data; + pendingTransactionsList.push(transaction); +}; +Signals.get('txPoolNewTransaction').add(txPoolNewTransactionListener); + module.exports = { getPendingTransactions, loadAllPendingTransactions, + formatPendingTransaction, // For unit test validateParams, diff --git a/services/blockchain-indexer/shared/dataService/index.js b/services/blockchain-indexer/shared/dataService/index.js index d620f7cac8..7d828a02b2 100644 --- a/services/blockchain-indexer/shared/dataService/index.js +++ b/services/blockchain-indexer/shared/dataService/index.js @@ -22,6 +22,7 @@ const { getBlockByHeight, getBlockByID, loadAllPendingTransactions, + formatPendingTransaction, getTransactionIDsByBlockID, getTransactionsByIDs, normalizeTransaction, @@ -141,6 +142,7 @@ module.exports = { getTransactions, getPendingTransactions, reloadAllPendingTransactions, + formatPendingTransaction, postTransactions, getTransactionsByBlockID, dryRunTransactions, diff --git a/services/blockchain-indexer/shared/messageProcessor.js b/services/blockchain-indexer/shared/messageProcessor.js index 410dd783cd..e37004c242 100644 --- a/services/blockchain-indexer/shared/messageProcessor.js +++ b/services/blockchain-indexer/shared/messageProcessor.js @@ -36,6 +36,7 @@ const { reloadValidatorCache, getGenerators, getNumberOfGenerators, + formatPendingTransaction, } = require('./dataService'); const { accountAddrUpdateQueue } = require('./indexer/accountIndex'); @@ -117,6 +118,15 @@ const newRoundProcessor = async () => { logger.info(`Finished performing all updates on new round.`); }; +const txPoolNewTransactionProcessor = async transaction => { + logger.debug(`New transaction (${transaction.id}) received.`); + const formattedTransaction = await formatPendingTransaction(transaction); + Signals.get('txPoolNewTransaction').dispatch({ + data: [formattedTransaction], + meta: { count: 1, total: 1, offset: 0 }, + }); +}; + const initMessageProcessors = async () => { logger.info(`Registering job processor for ${accountMessageQueue.name} message queue.`); accountMessageQueue.process(async job => { @@ -138,7 +148,7 @@ const initMessageProcessors = async () => { eventMessageQueue.process(async job => { logger.debug('Subscribed to the events from coordinator.'); - const { isNewBlock, isDeleteBlock, isNewRound } = job.data; + const { isNewBlock, isDeleteBlock, isNewRound, isTxPoolNewTransaction } = job.data; if (isNewBlock) { const { block } = job.data; @@ -154,6 +164,9 @@ const initMessageProcessors = async () => { } } else if (isNewRound) { await newRoundProcessor(); + } else if (isTxPoolNewTransaction) { + const { transaction } = job.data; + await txPoolNewTransactionProcessor(transaction); } });