From fb069d8cc1385d42e0d508bc36fdbf5e62a12ee7 Mon Sep 17 00:00:00 2001 From: Sonali Thakur Date: Thu, 21 Nov 2024 11:58:50 +0700 Subject: [PATCH 1/6] put archiver robust query behind flag --- src/Data/Collector.ts | 86 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/src/Data/Collector.ts b/src/Data/Collector.ts index 9026cba..e24fc5b 100644 --- a/src/Data/Collector.ts +++ b/src/Data/Collector.ts @@ -75,6 +75,11 @@ const isReceiptRobust = async ( executionGroupNodes: ConsensusNodeInfo[], minConfirmations: number = config.RECEIPT_CONFIRMATIONS ): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { + // If robustQuery is disabled, do offline verification + if (!config.useRobustQueryForReceipt) { + return verifyReceiptOffline(receipt, executionGroupNodes, minConfirmations) + } + const result = { success: false } // Created signedData with full_receipt = false outside of queryReceipt to avoid signing the same data multiple times let signedData = Crypto.sign({ @@ -272,6 +277,87 @@ const isReceiptRobust = async ( return { success: true } } +// Offline receipt verification +const verifyReceiptOffline = async ( + receipt: Receipt.ArchiverReceipt, + executionGroupNodes: ConsensusNodeInfo[], + minConfirmations: number +): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { + const { signedReceipt, globalModification } = receipt + + if (globalModification) { + const globalReceipt = signedReceipt as P2PTypes.GlobalAccountsTypes.GlobalTxReceipt + // Verify global modification receipt has enough valid signatures + const validSigners = new Set() + + // Ensure signs array exists and is not empty + if (!globalReceipt.signs || !Array.isArray(globalReceipt.signs) || globalReceipt.signs.length === 0) { + return { success: false } + } + + // Ensure tx exists and has required fields + if (!globalReceipt.tx || !globalReceipt.tx.address) { + return { success: false } + } + + for (const sign of globalReceipt.signs) { + // Skip invalid signatures + if (!sign || !sign.owner) continue + + const node = executionGroupNodes.find(n => n.publicKey === sign.owner) + if (!node) continue + + try { + if (Crypto.verify({ ...globalReceipt.tx, sign })) { + validSigners.add(sign.owner) + } + } catch (error) { + console.error('Error verifying signature:', error) + continue + } + } + + return { + success: validSigners.size >= minConfirmations + } + } + + // Rest of the function remains the same... + // Code for normal receipts verification + const normalReceipt = signedReceipt as Receipt.SignedReceipt + const validSigners = new Set() + + if (!normalReceipt.signaturePack || !Array.isArray(normalReceipt.signaturePack)) { + return { success: false } + } + + for (const signature of normalReceipt.signaturePack) { + if (!signature || !signature.owner) continue + + const node = executionGroupNodes.find(n => n.publicKey === signature.owner) + if (!node) continue + + try { + const voteHash = calculateVoteHash(normalReceipt.proposal) + const appliedVoteHash = { + txid: receipt.tx.txId, + voteHash + } + + if (Crypto.verify({ ...appliedVoteHash, sign: signature })) { + validSigners.add(signature.owner) + } + } catch (error) { + console.error('Error verifying signature:', error) + continue + } + } + + return { + success: validSigners.size >= minConfirmations + } +} + /** * Validate type and field existence of the receipt data before processing it further * @param receipt From 3857e81c76889758e7e61c4a24f092bc78000dff Mon Sep 17 00:00:00 2001 From: Sonali Thakur Date: Thu, 12 Dec 2024 20:05:29 +0530 Subject: [PATCH 2/6] Enhance receipt validation with global support and AJV integration --- src/Data/Collector.ts | 335 ++++++++++++++++----------- src/primary-process/index.ts | 12 +- src/shardeum/calculateAccountHash.ts | 17 +- src/shardeum/verifyAppReceiptData.ts | 21 +- src/types/ajv/Receipts.ts | 1 + src/types/enum/AJVSchemaEnum.ts | 3 +- 6 files changed, 247 insertions(+), 142 deletions(-) diff --git a/src/Data/Collector.ts b/src/Data/Collector.ts index e24fc5b..921b624 100644 --- a/src/Data/Collector.ts +++ b/src/Data/Collector.ts @@ -31,7 +31,6 @@ import { accountSpecificHash, verifyAccountHash } from '../shardeum/calculateAcc import { verifyAppReceiptData } from '../shardeum/verifyAppReceiptData' import { Cycle as DbCycle } from '../dbstore/types' import { Utils as StringUtils } from '@shardus/types' -import { offloadReceipt } from '../primary-process' import { verifyPayload } from '../types/ajv/Helpers' import { AJVSchemaEnum } from '../types/enum/AJVSchemaEnum' @@ -63,6 +62,60 @@ export interface ReceiptVerificationResult { nestedCounterMessages?: string[] } +const verifyReceiptMajority = async ( + receipt: Receipt.ArchiverReceipt, + executionGroupNodes: ConsensusNodeInfo[], + minConfirmations: number = config.RECEIPT_CONFIRMATIONS +): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { + /** + * Note: + * Currently, only the non-global receipt flow is implemented in `verifyReceiptMajority`, + * `verifyReceiptOffline`, and `verifyNonGlobalTxReceiptWithValidators`. In the future, + * global receipt methods can be added to maintain consistency. As of now, only offline + * verification for global receipts is available, and `verifyGlobalTxReceiptWithValidators` + * is not yet implemented. + */ + + // If robustQuery is disabled, do offline verification + if (!config.useRobustQueryForReceipt) { + return verifyReceiptOffline(receipt, executionGroupNodes, minConfirmations) + } + return verifyReceiptWithValidators(receipt, executionGroupNodes, minConfirmations) +} + +// Offline receipt verification +/** + * Note: + * The `verifyReceiptWithValidators` function currently supports only + * non-global receipt verification. Future enhancements should include + * validation logic for global receipts to ensure comprehensive receipt + * verification. This will help in maintaining consistency and reliability + * across all types of receipts processed by the system. + */ +const verifyReceiptOffline = async ( + receipt: Receipt.ArchiverReceipt, + executionGroupNodes: ConsensusNodeInfo[], + minConfirmations: number +): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { + return verifyNonGlobalTxReceiptOffline(receipt, executionGroupNodes, minConfirmations) +} + +/** + * Note: + * The `verifyReceiptWithValidators` function currently supports only + * non-global receipt verification. Future enhancements should include + * validation logic for global receipts to ensure comprehensive receipt + * verification. This will help in maintaining consistency and reliability + * across all types of receipts processed by the system. + */ +const verifyReceiptWithValidators = async ( + receipt: Receipt.ArchiverReceipt, + executionGroupNodes: ConsensusNodeInfo[], + minConfirmations: number = config.RECEIPT_CONFIRMATIONS +): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { + return verifyNonGlobalTxReceiptWithValidators(receipt, executionGroupNodes, minConfirmations) +} + /** * Calls the /get-tx-receipt endpoint of the nodes in the execution group of the receipt to verify the receipt. If "RECEIPT_CONFIRMATIONS" number of nodes return the same receipt, the receipt is deemed valid. * @param receipt @@ -70,16 +123,11 @@ export interface ReceiptVerificationResult { * @param minConfirmations * @returns boolean */ -const isReceiptRobust = async ( +const verifyNonGlobalTxReceiptWithValidators = async ( receipt: Receipt.ArchiverReceipt, executionGroupNodes: ConsensusNodeInfo[], minConfirmations: number = config.RECEIPT_CONFIRMATIONS ): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { - // If robustQuery is disabled, do offline verification - if (!config.useRobustQueryForReceipt) { - return verifyReceiptOffline(receipt, executionGroupNodes, minConfirmations) - } - const result = { success: false } // Created signedData with full_receipt = false outside of queryReceipt to avoid signing the same data multiple times let signedData = Crypto.sign({ @@ -277,54 +325,110 @@ const isReceiptRobust = async ( return { success: true } } -// Offline receipt verification -const verifyReceiptOffline = async ( - receipt: Receipt.ArchiverReceipt, - executionGroupNodes: ConsensusNodeInfo[], - minConfirmations: number -): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { - const { signedReceipt, globalModification } = receipt - - if (globalModification) { - const globalReceipt = signedReceipt as P2PTypes.GlobalAccountsTypes.GlobalTxReceipt - // Verify global modification receipt has enough valid signatures - const validSigners = new Set() - - // Ensure signs array exists and is not empty - if (!globalReceipt.signs || !Array.isArray(globalReceipt.signs) || globalReceipt.signs.length === 0) { - return { success: false } - } +// Offline global receipt verification +const verifyGlobalTxreceiptOffline = async ( + receipt: Receipt.ArchiverReceipt +): Promise<{ success: boolean; requiredSignatures?: number }> => { + const appliedReceipt = receipt.signedReceipt as P2PTypes.GlobalAccountsTypes.GlobalTxReceipt + const result = { success: false } + const { txId, timestamp } = receipt.tx + const { executionShardKey, cycle } = receipt + const cycleShardData = shardValuesByCycle.get(cycle) + const { homePartition } = ShardFunction.addressToPartition(cycleShardData.shardGlobals, executionShardKey) + const { signs } = appliedReceipt + // Refer to https://github.com/shardeum/shardus-core/blob/7d8877b7e1a5b18140f898a64b932182d8a35298/src/p2p/GlobalAccounts.ts#L397 + let votingGroupCount = cycleShardData.shardGlobals.nodesPerConsenusGroup + if (votingGroupCount > cycleShardData.nodes.length) { + if (nestedCountersInstance) + nestedCountersInstance.countEvent('receipt', 'votingGroupCount_greater_than_nodes_length') + Logger.mainLogger.error( + 'votingGroupCount_greater_than_nodes_length', + votingGroupCount, + cycleShardData.nodes.length + ) + votingGroupCount = cycleShardData.nodes.length + } + let isReceiptMajority = (signs.length / votingGroupCount) * 100 >= config.requiredMajorityVotesPercentage + if (!isReceiptMajority) { + Logger.mainLogger.error( + `Invalid receipt globalModification signs count is less than ${config.requiredMajorityVotesPercentage}% of the votingGroupCount, ${signs.length}, ${votingGroupCount}` + ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + `Invalid_receipt_globalModification_signs_count_less_than_${config.requiredMajorityVotesPercentage}%` + ) + return result + } - // Ensure tx exists and has required fields - if (!globalReceipt.tx || !globalReceipt.tx.address) { - return { success: false } + const nodeMap = new Map() + // Fill the map with nodes keyed by their public keys + cycleShardData.nodes.forEach((node) => { + if (node.publicKey) { + nodeMap.set(node.publicKey, node) } - - for (const sign of globalReceipt.signs) { - // Skip invalid signatures - if (!sign || !sign.owner) continue - - const node = executionGroupNodes.find(n => n.publicKey === sign.owner) - if (!node) continue - - try { - if (Crypto.verify({ ...globalReceipt.tx, sign })) { - validSigners.add(sign.owner) - } - } catch (error) { - console.error('Error verifying signature:', error) - continue - } + }) + // Using a set to store the unique signers to avoid duplicates + const uniqueSigners = new Set() + for (const sign of signs) { + const { owner: nodePubKey } = sign + // Get the node id from the public key + const node = nodeMap.get(nodePubKey) + if (node == null) { + Logger.mainLogger.error( + `The node with public key ${nodePubKey} of the receipt ${txId} with ${timestamp} is not in the active nodesList of cycle ${cycle}` + ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent('receipt', 'globalModification_sign_owner_not_in_active_nodesList') + continue } - - return { - success: validSigners.size >= minConfirmations + // Check if the node is in the execution group + if (!cycleShardData.parititionShardDataMap.get(homePartition).coveredBy[node.id]) { + Logger.mainLogger.error( + `The node with public key ${nodePubKey} of the receipt ${txId} with ${timestamp} is not in the execution group of the tx` + ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'globalModification_sign_node_not_in_execution_group_of_tx' + ) + continue } + // Check if the signature is valid + if (!Crypto.verify({ ...appliedReceipt.tx, sign: sign })) { + Logger.mainLogger.error(`Invalid receipt globalModification signature verification failed`) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_globalModification_signature_verification_failed' + ) + continue + } + uniqueSigners.add(nodePubKey) + } + isReceiptMajority = (uniqueSigners.size / votingGroupCount) * 100 >= config.requiredMajorityVotesPercentage + if (!isReceiptMajority) { + Logger.mainLogger.error( + `Invalid receipt globalModification valid signs count is less than votingGroupCount ${uniqueSigners.size}, ${votingGroupCount}` + ) + if (nestedCountersInstance) + nestedCountersInstance.countEvent( + 'receipt', + 'Invalid_receipt_globalModification_valid_signs_count_less_than_votingGroupCount' + ) + return result } + const requiredSignatures = Math.floor(votingGroupCount * (config.requiredMajorityVotesPercentage / 100)) + return { success: true, requiredSignatures } +} - // Rest of the function remains the same... +const verifyNonGlobalTxReceiptOffline = async ( + receipt: Receipt.ArchiverReceipt, + executionGroupNodes: ConsensusNodeInfo[], + minConfirmations: number +): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { // Code for normal receipts verification - const normalReceipt = signedReceipt as Receipt.SignedReceipt + const normalReceipt = receipt.signedReceipt as Receipt.SignedReceipt const validSigners = new Set() if (!normalReceipt.signaturePack || !Array.isArray(normalReceipt.signaturePack)) { @@ -334,7 +438,7 @@ const verifyReceiptOffline = async ( for (const signature of normalReceipt.signaturePack) { if (!signature || !signature.owner) continue - const node = executionGroupNodes.find(n => n.publicKey === signature.owner) + const node = executionGroupNodes.find((n) => n.publicKey.toLowerCase() === signature.owner.toLowerCase()) if (!node) continue try { @@ -422,90 +526,22 @@ export const verifyReceiptData = async ( } // Determine the home partition index of the primary account (executionShardKey) const { homePartition } = ShardFunction.addressToPartition(cycleShardData.shardGlobals, executionShardKey) - if (globalModification) { - const appliedReceipt = receipt.signedReceipt as P2PTypes.GlobalAccountsTypes.GlobalTxReceipt - - const { signs } = appliedReceipt - // Refer to https://github.com/shardeum/shardus-core/blob/7d8877b7e1a5b18140f898a64b932182d8a35298/src/p2p/GlobalAccounts.ts#L397 - let votingGroupCount = cycleShardData.shardGlobals.nodesPerConsenusGroup - if (votingGroupCount > cycleShardData.nodes.length) { - if (nestedCountersInstance) - nestedCountersInstance.countEvent('receipt', 'votingGroupCount_greater_than_nodes_length') - Logger.mainLogger.error( - 'votingGroupCount_greater_than_nodes_length', - votingGroupCount, - cycleShardData.nodes.length - ) - votingGroupCount = cycleShardData.nodes.length - } - let isReceiptMajority = - (signs.length / votingGroupCount) * 100 >= config.requiredMajorityVotesPercentage - if (!isReceiptMajority) { - Logger.mainLogger.error( - `Invalid receipt globalModification signs count is less than ${config.requiredMajorityVotesPercentage}% of the votingGroupCount, ${signs.length}, ${votingGroupCount}` - ) - if (nestedCountersInstance) - nestedCountersInstance.countEvent( - 'receipt', - `Invalid_receipt_globalModification_signs_count_less_than_${config.requiredMajorityVotesPercentage}%` - ) - return result - } - - const nodeMap = new Map() - // Fill the map with nodes keyed by their public keys - cycleShardData.nodes.forEach((node) => { - if (node.publicKey) { - nodeMap.set(node.publicKey, node) - } - }) - // Using a set to store the unique signers to avoid duplicates - const uniqueSigners = new Set() - for (const sign of signs) { - const { owner: nodePubKey } = sign - // Get the node id from the public key - const node = nodeMap.get(nodePubKey) - if (node == null) { - Logger.mainLogger.error( - `The node with public key ${nodePubKey} of the receipt ${txId} with ${timestamp} is not in the active nodesList of cycle ${cycle}` - ) - if (nestedCountersInstance) - nestedCountersInstance.countEvent( - 'receipt', - 'globalModification_sign_owner_not_in_active_nodesList' - ) - continue - } - // Check if the node is in the execution group - if (!cycleShardData.parititionShardDataMap.get(homePartition).coveredBy[node.id]) { - Logger.mainLogger.error( - `The node with public key ${nodePubKey} of the receipt ${txId} with ${timestamp} is not in the execution group of the tx` - ) - if (nestedCountersInstance) - nestedCountersInstance.countEvent( - 'receipt', - 'globalModification_sign_node_not_in_execution_group_of_tx' - ) - continue - } - uniqueSigners.add(nodePubKey) - } - isReceiptMajority = - (uniqueSigners.size / votingGroupCount) * 100 >= config.requiredMajorityVotesPercentage - if (!isReceiptMajority) { - Logger.mainLogger.error( - `Invalid receipt globalModification valid signs count is less than votingGroupCount ${uniqueSigners.size}, ${votingGroupCount}` - ) - if (nestedCountersInstance) - nestedCountersInstance.countEvent( - 'receipt', - 'Invalid_receipt_globalModification_valid_signs_count_less_than_votingGroupCount' - ) - return result - } - const requiredSignatures = Math.floor(votingGroupCount * (config.requiredMajorityVotesPercentage / 100)) - return { success: true, requiredSignatures } + let globalReceiptValidationErrors + try { + // Validate if receipt is a global modification receipt using AJV + globalReceiptValidationErrors = verifyPayload(AJVSchemaEnum.GlobalTxReceipt, receipt?.signedReceipt) + } catch (error) { + globalReceiptValidationErrors = true + if (nestedCountersInstance) + nestedCountersInstance.countEvent('receipt', 'Error processing receipt error', error) + Logger.mainLogger.error('Error processing receipt error', error) + return result + } + + // If the receipt is a global modification receipt, validate the receipt + if (!globalReceiptValidationErrors) { + return verifyGlobalTxreceiptOffline(receipt) } const { signaturePack } = receipt.signedReceipt as Receipt.SignedReceipt if (config.newPOQReceipt === false) { @@ -671,7 +707,7 @@ export const verifyReceiptData = async ( nodesPerConsensusGroup > config.RECEIPT_CONFIRMATIONS ? config.RECEIPT_CONFIRMATIONS : Math.ceil(config.RECEIPT_CONFIRMATIONS / 2) // 3 out of 5 nodes - const { success, newReceipt } = await isReceiptRobust(receipt, executionGroupNodes, minConfirmations) + const { success, newReceipt } = await verifyReceiptMajority(receipt, executionGroupNodes, minConfirmations) if (!success) { Logger.mainLogger.error('Invalid receipt: Robust check failed') if (nestedCountersInstance) @@ -691,7 +727,22 @@ const verifyAppliedReceiptSignatures = ( const result = { success: false, failedReasons, nestedCounterMessages } const { globalModification, cycle, executionShardKey } = receipt const { txId: txid, timestamp } = receipt.tx - if (globalModification) { + let globalReceiptValidationErrors // This is used to store the validation errors of the globalTxReceipt + + try { + globalReceiptValidationErrors = verifyPayload(AJVSchemaEnum.GlobalTxReceipt, receipt?.signedReceipt) + } catch (error) { + globalReceiptValidationErrors = true + failedReasons.push( + `Invalid Global Tx Receipt error: ${error}. txId ${receipt.tx.txId} , cycle ${receipt.cycle} , timestamp ${receipt.tx.timestamp}` + ) + nestedCounterMessages.push( + `Invalid Global Tx Receipt error: ${error}. txId ${receipt.tx.txId} , cycle ${receipt.cycle} , timestamp ${receipt.tx.timestamp}` + ) + return result + } + // If the globalReceiptValidationErrors is null, then the receipt is a globalModification receipt + if (!globalReceiptValidationErrors) { const appliedReceipt = receipt.signedReceipt as P2PTypes.GlobalAccountsTypes.GlobalTxReceipt // Refer to https://github.com/shardeum/shardus-core/blob/7d8877b7e1a5b18140f898a64b932182d8a35298/src/p2p/GlobalAccounts.ts#L294 @@ -978,11 +1029,21 @@ export const storeReceiptData = async ( } if (newReceipt) receipt = newReceipt - if (profilerInstance) profilerInstance.profileSectionStart('Offload_receipt') - if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', 'Offload_receipt') + if (profilerInstance) profilerInstance.profileSectionStart('Verify_archiver_receipt') + if (nestedCountersInstance) nestedCountersInstance.countEvent('receipt', 'Verify_archiver_receipt') const start_time = process.hrtime() // console.log('offloading receipt', txId, timestamp) - const result = await offloadReceipt(txId, timestamp, requiredSignatures, receipt) + // const result = await offloadReceipt(txId, timestamp, requiredSignatures, receipt) + let result + try { + result = await verifyArchiverReceipt(receipt, requiredSignatures) + } catch (error) { + receiptsInValidationMap.delete(txId) + if (nestedCountersInstance) + nestedCountersInstance.countEvent('receipt', 'Invalid_receipt_verification_failed') + if (profilerInstance) profilerInstance.profileSectionEnd('Verify_archiver_receipt') + continue + } // console.log('offload receipt result', txId, timestamp, result) const end_time = process.hrtime(start_time) const time_taken = end_time[0] * 1000 + end_time[1] / 1000000 diff --git a/src/primary-process/index.ts b/src/primary-process/index.ts index be97969..1b578e5 100644 --- a/src/primary-process/index.ts +++ b/src/primary-process/index.ts @@ -6,6 +6,8 @@ import { config } from '../Config' import { EventEmitter } from 'events' import { StateManager, Utils as StringUtils } from '@shardus/types' import * as Utils from '../Utils' +import { verifyPayload } from '../types/ajv/Helpers' +import { AJVSchemaEnum } from '../types/enum/AJVSchemaEnum' const MAX_WORKERS = cpus().length - 1 // Leaving 1 core for the master process @@ -235,9 +237,17 @@ export const offloadReceipt = async ( let verificationResult: ReceiptVerificationResult // Check if offloading is disabled globally or for global modifications + let globalReceiptValidationErrors // This is used to store the validation errors of the globalTxReceipt + try { + globalReceiptValidationErrors = verifyPayload(AJVSchemaEnum.GlobalTxReceipt, receipt?.signedReceipt) + } catch (error) { + console.error('Invalid Global Tx Receipt error: ', error) + globalReceiptValidationErrors = true + } + if ( config.disableOffloadReceipt || - (config.disableOffloadReceiptForGlobalModification && receipt.globalModification) + (config.disableOffloadReceiptForGlobalModification && !globalReceiptValidationErrors) ) { mainProcessReceiptTracker++ if (config.workerProcessesDebugLog) console.log('Verifying on the main program', txId, timestamp) diff --git a/src/shardeum/calculateAccountHash.ts b/src/shardeum/calculateAccountHash.ts index d4f6b20..0688048 100644 --- a/src/shardeum/calculateAccountHash.ts +++ b/src/shardeum/calculateAccountHash.ts @@ -1,5 +1,7 @@ import * as crypto from '../Crypto' import { ArchiverReceipt, SignedReceipt } from '../dbstore/receipts' +import { verifyPayload } from '../types/ajv/Helpers' +import { AJVSchemaEnum } from '../types/enum/AJVSchemaEnum' import { verifyGlobalTxAccountChange } from './verifyGlobalTxReceipt' // account types in Shardeum @@ -64,7 +66,20 @@ export const verifyAccountHash = ( nestedCounterMessages = [] ): boolean => { try { - if (receipt.globalModification) { + let globalReceiptValidationErrors // This is used to store the validation errors of the globalTxReceipt + try { + globalReceiptValidationErrors = verifyPayload(AJVSchemaEnum.GlobalTxReceipt, receipt?.signedReceipt) + } catch (error) { + globalReceiptValidationErrors = true + failedReasons.push( + `Invalid Global Tx Receipt error: ${error}. txId ${receipt.tx.txId} , cycle ${receipt.cycle} , timestamp ${receipt.tx.timestamp}` + ) + nestedCounterMessages.push( + `Invalid Global Tx Receipt error: ${error}. txId ${receipt.tx.txId} , cycle ${receipt.cycle} , timestamp ${receipt.tx.timestamp}` + ) + return false + } + if (!globalReceiptValidationErrors) { const result = verifyGlobalTxAccountChange(receipt, failedReasons, nestedCounterMessages) if (!result) return false return true diff --git a/src/shardeum/verifyAppReceiptData.ts b/src/shardeum/verifyAppReceiptData.ts index 1cbdb0d..c531cd8 100644 --- a/src/shardeum/verifyAppReceiptData.ts +++ b/src/shardeum/verifyAppReceiptData.ts @@ -1,6 +1,8 @@ import * as crypto from '../Crypto' import { ArchiverReceipt, Receipt, SignedReceipt } from '../dbstore/receipts' import { Utils as StringUtils } from '@shardus/types' +import { verifyPayload } from '../types/ajv/Helpers' +import { AJVSchemaEnum } from '../types/enum/AJVSchemaEnum' export type ShardeumReceipt = object & { amountSpent: string @@ -14,8 +16,23 @@ export const verifyAppReceiptData = async ( nestedCounterMessages = [] ): Promise<{ valid: boolean; needToSave: boolean }> => { let result = { valid: false, needToSave: false } - const { appReceiptData, globalModification } = receipt - if (globalModification) return { valid: true, needToSave: true } + const { appReceiptData } = receipt + let globalReceiptValidationErrors // This is used to store the validation errors of the globalTxReceipt + try { + globalReceiptValidationErrors = verifyPayload(AJVSchemaEnum.GlobalTxReceipt, receipt?.signedReceipt) + } catch (error) { + globalReceiptValidationErrors = true + failedReasons.push( + `Invalid Global Tx Receipt error: ${error}. txId ${receipt.tx.txId} , cycle ${receipt.cycle} , timestamp ${receipt.tx.timestamp}` + ) + nestedCounterMessages.push( + `Invalid Global Tx Receipt error: ${error}. txId ${receipt.tx.txId} , cycle ${receipt.cycle} , timestamp ${receipt.tx.timestamp}` + ) + return result + } + if (!globalReceiptValidationErrors) { + return { valid: true, needToSave: true } + } const signedReceipt = receipt.signedReceipt as SignedReceipt const newShardeumReceipt = appReceiptData.data as ShardeumReceipt if (!newShardeumReceipt.amountSpent || !newShardeumReceipt.readableReceipt) { diff --git a/src/types/ajv/Receipts.ts b/src/types/ajv/Receipts.ts index 921742d..dbb6412 100644 --- a/src/types/ajv/Receipts.ts +++ b/src/types/ajv/Receipts.ts @@ -195,4 +195,5 @@ function addSchemas(): void { // addSchema('ReceiptTx', schemaTx); addSchema(AJVSchemaEnum.ArchiverReceipt, schemaArchiverReceipt); addSchema(AJVSchemaEnum.Receipt, schemaReceipt); + addSchema(AJVSchemaEnum.GlobalTxReceipt, schemaGlobalTxReceipt); } diff --git a/src/types/enum/AJVSchemaEnum.ts b/src/types/enum/AJVSchemaEnum.ts index 4a3f1da..10efa34 100644 --- a/src/types/enum/AJVSchemaEnum.ts +++ b/src/types/enum/AJVSchemaEnum.ts @@ -2,5 +2,6 @@ export enum AJVSchemaEnum { Receipt = 'Receipt', AccountsCopy = 'AccountsCopy', ArchiverReceipt = 'ArchiverReceipt', - OriginalTxData = 'OriginalTxData' + OriginalTxData = 'OriginalTxData', + GlobalTxReceipt = 'GlobalTxReceipt', } From fbf539e6db94e6fd88d11bab1a693d253871ad50 Mon Sep 17 00:00:00 2001 From: Sonali Thakur Date: Fri, 13 Dec 2024 11:59:23 +0530 Subject: [PATCH 3/6] Refactor receipt verification functions to accept both Receipt and ArchiverReceipt --- src/Data/Collector.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Data/Collector.ts b/src/Data/Collector.ts index 921b624..be6fda0 100644 --- a/src/Data/Collector.ts +++ b/src/Data/Collector.ts @@ -63,7 +63,7 @@ export interface ReceiptVerificationResult { } const verifyReceiptMajority = async ( - receipt: Receipt.ArchiverReceipt, + receipt: Receipt.Receipt | Receipt.ArchiverReceipt, executionGroupNodes: ConsensusNodeInfo[], minConfirmations: number = config.RECEIPT_CONFIRMATIONS ): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { @@ -93,7 +93,7 @@ const verifyReceiptMajority = async ( * across all types of receipts processed by the system. */ const verifyReceiptOffline = async ( - receipt: Receipt.ArchiverReceipt, + receipt: Receipt.Receipt | Receipt.ArchiverReceipt, executionGroupNodes: ConsensusNodeInfo[], minConfirmations: number ): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { @@ -109,7 +109,7 @@ const verifyReceiptOffline = async ( * across all types of receipts processed by the system. */ const verifyReceiptWithValidators = async ( - receipt: Receipt.ArchiverReceipt, + receipt: Receipt.Receipt | Receipt.ArchiverReceipt, executionGroupNodes: ConsensusNodeInfo[], minConfirmations: number = config.RECEIPT_CONFIRMATIONS ): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { @@ -124,7 +124,7 @@ const verifyReceiptWithValidators = async ( * @returns boolean */ const verifyNonGlobalTxReceiptWithValidators = async ( - receipt: Receipt.ArchiverReceipt, + receipt: Receipt.Receipt | Receipt.ArchiverReceipt, executionGroupNodes: ConsensusNodeInfo[], minConfirmations: number = config.RECEIPT_CONFIRMATIONS ): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { @@ -327,7 +327,7 @@ const verifyNonGlobalTxReceiptWithValidators = async ( // Offline global receipt verification const verifyGlobalTxreceiptOffline = async ( - receipt: Receipt.ArchiverReceipt + receipt: Receipt.Receipt | Receipt.ArchiverReceipt ): Promise<{ success: boolean; requiredSignatures?: number }> => { const appliedReceipt = receipt.signedReceipt as P2PTypes.GlobalAccountsTypes.GlobalTxReceipt const result = { success: false } @@ -423,7 +423,7 @@ const verifyGlobalTxreceiptOffline = async ( } const verifyNonGlobalTxReceiptOffline = async ( - receipt: Receipt.ArchiverReceipt, + receipt: Receipt.Receipt | Receipt.ArchiverReceipt, executionGroupNodes: ConsensusNodeInfo[], minConfirmations: number ): Promise<{ success: boolean; newReceipt?: Receipt.ArchiverReceipt }> => { @@ -496,7 +496,7 @@ export const validateReceiptType = (receipt: Receipt.Receipt | Receipt.ArchiverR export const verifyReceiptData = async ( - receipt: Receipt.ArchiverReceipt, + receipt: Receipt.Receipt | Receipt.ArchiverReceipt, checkReceiptRobust = true ): Promise<{ success: boolean; requiredSignatures?: number; newReceipt?: Receipt.ArchiverReceipt }> => { const result = { success: false } From 780ed888bb31bba232e5c79ac02ae8db50bf9a6d Mon Sep 17 00:00:00 2001 From: Sonali Thakur Date: Fri, 13 Dec 2024 14:15:34 +0530 Subject: [PATCH 4/6] Refactored receipt logic: clarified verifyReceiptOffline, removed offloadReceipt, improved error logs, deprecated setupWorkerProcesses and offloadReceipt. --- src/Data/Collector.ts | 32 +++++----- src/primary-process/index.ts | 113 ++--------------------------------- 2 files changed, 21 insertions(+), 124 deletions(-) diff --git a/src/Data/Collector.ts b/src/Data/Collector.ts index be6fda0..74db31d 100644 --- a/src/Data/Collector.ts +++ b/src/Data/Collector.ts @@ -86,11 +86,15 @@ const verifyReceiptMajority = async ( // Offline receipt verification /** * Note: - * The `verifyReceiptWithValidators` function currently supports only - * non-global receipt verification. Future enhancements should include - * validation logic for global receipts to ensure comprehensive receipt - * verification. This will help in maintaining consistency and reliability + * The `verifyReceiptOffline` function is responsible for verifying receipts + * without querying external validators, which is useful when the robust query + * feature is disabled. It currently supports only non-global receipt verification. + * Future enhancements should include validation logic for global receipts to ensure + * comprehensive receipt verification. This will help maintain consistency and reliability * across all types of receipts processed by the system. + * + * The function delegates the verification process to `verifyNonGlobalTxReceiptOffline` + * based on the whether the receipt is global or not. */ const verifyReceiptOffline = async ( receipt: Receipt.Receipt | Receipt.ArchiverReceipt, @@ -394,16 +398,7 @@ const verifyGlobalTxreceiptOffline = async ( ) continue } - // Check if the signature is valid - if (!Crypto.verify({ ...appliedReceipt.tx, sign: sign })) { - Logger.mainLogger.error(`Invalid receipt globalModification signature verification failed`) - if (nestedCountersInstance) - nestedCountersInstance.countEvent( - 'receipt', - 'Invalid_receipt_globalModification_signature_verification_failed' - ) - continue - } + uniqueSigners.add(nodePubKey) } isReceiptMajority = (uniqueSigners.size / votingGroupCount) * 100 >= config.requiredMajorityVotesPercentage @@ -534,8 +529,13 @@ export const verifyReceiptData = async ( } catch (error) { globalReceiptValidationErrors = true if (nestedCountersInstance) - nestedCountersInstance.countEvent('receipt', 'Error processing receipt error', error) - Logger.mainLogger.error('Error processing receipt error', error) + nestedCountersInstance.countEvent( + 'receipt', + `Failed to validate receipt schema txId: ${txId}, cycle: ${cycle}, timestamp: ${timestamp}, error: ${error}` + ) + Logger.mainLogger.error( + `Failed to validate receipt schema txId: ${txId}, cycle: ${cycle}, timestamp: ${timestamp}, error: ${error}` + ) return result } diff --git a/src/primary-process/index.ts b/src/primary-process/index.ts index 1b578e5..d446971 100644 --- a/src/primary-process/index.ts +++ b/src/primary-process/index.ts @@ -41,6 +41,11 @@ let currentWorker = 0 const emitter = new EventEmitter() +/** + * @deprecated This method is currently used in server.ts but will be removed as part of cleanup. + * Do not use this method in new code. + */ + export const setupWorkerProcesses = (cluster: Cluster): void => { console.log(`Master ${process.pid} is running`) // Set interval to check receipt count every 15 seconds @@ -225,111 +230,3 @@ const forwardReceiptVerificationResult = ( }) }) } - -export const offloadReceipt = async ( - txId: string, - timestamp: number, - requiredSignatures: number, - receipt: ArchiverReceipt -): Promise => { - receivedReceiptCount++ // Increment the counter for each receipt received - receiptLoadTraker++ // Increment the receipt load tracker - let verificationResult: ReceiptVerificationResult - - // Check if offloading is disabled globally or for global modifications - let globalReceiptValidationErrors // This is used to store the validation errors of the globalTxReceipt - try { - globalReceiptValidationErrors = verifyPayload(AJVSchemaEnum.GlobalTxReceipt, receipt?.signedReceipt) - } catch (error) { - console.error('Invalid Global Tx Receipt error: ', error) - globalReceiptValidationErrors = true - } - - if ( - config.disableOffloadReceipt || - (config.disableOffloadReceiptForGlobalModification && !globalReceiptValidationErrors) - ) { - mainProcessReceiptTracker++ - if (config.workerProcessesDebugLog) console.log('Verifying on the main program', txId, timestamp) - verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures) - mainProcessReceiptTracker-- - verifiedReceiptCount++ - if (verificationResult.success) { - successReceiptCount++ - } else { - failureReceiptCount++ - } - return verificationResult - } - - // Existing logic for offloading - if (workers.length === 0 && mainProcessReceiptTracker > config.receiptLoadTrakerLimit) { - // If there are extra workers available, put them to the workers list - if (extraWorkers.size > 0) { - console.log( - `offloadReceipt - Extra workers available: ${extraWorkers.size}, moving them to workers list` - ) - // Move the extra workers to the workers list - for (const [pid, worker] of extraWorkers) { - workers.push(worker) - extraWorkers.delete(pid) - } - } - // // If there are still no workers available, add randon wait time (0-1 second) and proceed - // if (workers.length === 0 && mainProcessReceiptTracker > config.receiptLoadTrakerLimit) { - // await Utils.sleep(Math.floor(Math.random() * 1000)) - // } - } - if (workers.length === 0) { - mainProcessReceiptTracker++ - if (config.workerProcessesDebugLog) console.log('Verifying on the main program 1', txId, timestamp) - verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures) - mainProcessReceiptTracker-- - } else { - mainProcessReceiptTracker = 0 - // Forward the request to a worker in a round-robin fashion - let worker = workers[currentWorker] - currentWorker = (currentWorker + 1) % workers.length - if (!worker) { - console.error('No worker available to process the receipt 1') - worker = workers[currentWorker] - currentWorker = (currentWorker + 1) % workers.length - if (worker) { - console.log('Verifying on the worker process 2', txId, timestamp, worker.process.pid) - const cloneReceipt = Utils.deepCopy(receipt) - delete cloneReceipt.tx.originalTxData - delete cloneReceipt.executionShardKey - const stringifiedReceipt = StringUtils.safeStringify(cloneReceipt) - worker.send({ - type: 'receipt-verification', - data: { stringifiedReceipt, requiredSignatures }, - }) - verificationResult = await forwardReceiptVerificationResult(txId, timestamp, worker) - } else { - console.error('No worker available to process the receipt 2') - // Verifying the receipt in the main thread - console.log('Verifying on the main program 2', txId, timestamp) - verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures) - } - } else { - if (config.workerProcessesDebugLog) - console.log('Verifying on the worker process 1', txId, timestamp, worker.process.pid) - const cloneReceipt = Utils.deepCopy(receipt) - delete cloneReceipt.tx.originalTxData - delete cloneReceipt.executionShardKey - const stringifiedReceipt = StringUtils.safeStringify(cloneReceipt) - worker.send({ - type: 'receipt-verification', - data: { stringifiedReceipt, requiredSignatures }, - }) - verificationResult = await forwardReceiptVerificationResult(txId, timestamp, worker) - } - } - verifiedReceiptCount++ - if (verificationResult.success) { - successReceiptCount++ - } else { - failureReceiptCount++ - } - return verificationResult -} From a8889610d801d5267a5f336f5592b724e2245f78 Mon Sep 17 00:00:00 2001 From: Sonali Thakur Date: Fri, 3 Jan 2025 13:06:34 +0530 Subject: [PATCH 5/6] comment out depreciated worker process setup and initialization --- src/server.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/server.ts b/src/server.ts index 731bffb..b136a9b 100644 --- a/src/server.ts +++ b/src/server.ts @@ -42,8 +42,6 @@ import { setShutdownCycleRecord, cycleRecordWithShutDownMode } from './Data/Cycl import { registerRoutes } from './API' import { Utils as StringUtils } from '@shardus/types' import { healthCheckRouter } from './routes/healthCheck' -import { setupWorkerProcesses } from './primary-process' -import { initWorkerProcess } from './worker-process' import { initializeTickets } from './routes/tickets'; import { initAjvSchemas } from './types/ajv/Helpers' import { initializeSerialization } from './utils/serialization/SchemaHelpers' @@ -93,7 +91,7 @@ async function start(): Promise { if (!cluster.isPrimary) { // Initialize state from config await State.initFromConfig(config, false, false) - await initWorkerProcess() + // await initWorkerProcess() return } @@ -497,7 +495,7 @@ async function startServer(): Promise { Logger.mainLogger.info(`Worker ${process.pid}: Archive-server is listening on http://0.0.0.0:${config.ARCHIVER_PORT}`) State.setActive() Collector.scheduleMissingTxsDataQuery() - setupWorkerProcesses(cluster) + // setupWorkerProcesses(cluster) } ) } From 5425f3e33cddf75262d5993d20208807132638b1 Mon Sep 17 00:00:00 2001 From: Sonali Thakur Date: Fri, 3 Jan 2025 16:53:28 +0530 Subject: [PATCH 6/6] Update cycle property in OriginalTxData schema to allow -1 --- src/types/ajv/OriginalTxData.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/types/ajv/OriginalTxData.ts b/src/types/ajv/OriginalTxData.ts index 707a014..d5054ae 100644 --- a/src/types/ajv/OriginalTxData.ts +++ b/src/types/ajv/OriginalTxData.ts @@ -8,7 +8,7 @@ const schemaOriginalTxData = { properties: { txId: { type: 'string' }, // txId must be a string timestamp: { type: 'integer', minimum: 0 }, // timestamp must be an integer - cycle: { type: 'integer', minimum: 0 }, // cycle must be an integer + cycle: { type: 'integer', minimum: -1 }, // cycle must be an integer originalTxData: { type: 'object' }, // originalTxData must be an object // Uncomment if sign is required: // sign: { type: 'string' } // Sign (if used) must be a string