Skip to content

Commit

Permalink
Refactored receipt logic: clarified verifyReceiptOffline, removed off…
Browse files Browse the repository at this point in the history
…loadReceipt, improved error logs, deprecated setupWorkerProcesses and offloadReceipt.
  • Loading branch information
S0naliThakur committed Dec 30, 2024
1 parent fbf539e commit 780ed88
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 124 deletions.
32 changes: 16 additions & 16 deletions src/Data/Collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,15 @@ const verifyReceiptMajority = async (
// Offline receipt verification
/**
* Note:
* The `verifyReceiptWithValidators` function currently supports only
* non-global receipt verification. Future enhancements should include
* validation logic for global receipts to ensure comprehensive receipt
* verification. This will help in maintaining consistency and reliability
* The `verifyReceiptOffline` function is responsible for verifying receipts
* without querying external validators, which is useful when the robust query
* feature is disabled. It currently supports only non-global receipt verification.
* Future enhancements should include validation logic for global receipts to ensure
* comprehensive receipt verification. This will help maintain consistency and reliability
* across all types of receipts processed by the system.
*
* The function delegates the verification process to `verifyNonGlobalTxReceiptOffline`
* based on the whether the receipt is global or not.
*/
const verifyReceiptOffline = async (
receipt: Receipt.Receipt | Receipt.ArchiverReceipt,
Expand Down Expand Up @@ -394,16 +398,7 @@ const verifyGlobalTxreceiptOffline = async (
)
continue
}
// Check if the signature is valid
if (!Crypto.verify({ ...appliedReceipt.tx, sign: sign })) {
Logger.mainLogger.error(`Invalid receipt globalModification signature verification failed`)
if (nestedCountersInstance)
nestedCountersInstance.countEvent(
'receipt',
'Invalid_receipt_globalModification_signature_verification_failed'
)
continue
}

uniqueSigners.add(nodePubKey)
}
isReceiptMajority = (uniqueSigners.size / votingGroupCount) * 100 >= config.requiredMajorityVotesPercentage
Expand Down Expand Up @@ -534,8 +529,13 @@ export const verifyReceiptData = async (
} catch (error) {
globalReceiptValidationErrors = true
if (nestedCountersInstance)
nestedCountersInstance.countEvent('receipt', 'Error processing receipt error', error)
Logger.mainLogger.error('Error processing receipt error', error)
nestedCountersInstance.countEvent(
'receipt',
`Failed to validate receipt schema txId: ${txId}, cycle: ${cycle}, timestamp: ${timestamp}, error: ${error}`
)
Logger.mainLogger.error(
`Failed to validate receipt schema txId: ${txId}, cycle: ${cycle}, timestamp: ${timestamp}, error: ${error}`
)
return result
}

Expand Down
113 changes: 5 additions & 108 deletions src/primary-process/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ let currentWorker = 0

const emitter = new EventEmitter()

/**
* @deprecated This method is currently used in server.ts but will be removed as part of cleanup.
* Do not use this method in new code.
*/

export const setupWorkerProcesses = (cluster: Cluster): void => {
console.log(`Master ${process.pid} is running`)
// Set interval to check receipt count every 15 seconds
Expand Down Expand Up @@ -225,111 +230,3 @@ const forwardReceiptVerificationResult = (
})
})
}

export const offloadReceipt = async (
txId: string,
timestamp: number,
requiredSignatures: number,
receipt: ArchiverReceipt
): Promise<ReceiptVerificationResult> => {
receivedReceiptCount++ // Increment the counter for each receipt received
receiptLoadTraker++ // Increment the receipt load tracker
let verificationResult: ReceiptVerificationResult

// Check if offloading is disabled globally or for global modifications
let globalReceiptValidationErrors // This is used to store the validation errors of the globalTxReceipt
try {
globalReceiptValidationErrors = verifyPayload(AJVSchemaEnum.GlobalTxReceipt, receipt?.signedReceipt)
} catch (error) {
console.error('Invalid Global Tx Receipt error: ', error)
globalReceiptValidationErrors = true
}

if (
config.disableOffloadReceipt ||
(config.disableOffloadReceiptForGlobalModification && !globalReceiptValidationErrors)
) {
mainProcessReceiptTracker++
if (config.workerProcessesDebugLog) console.log('Verifying on the main program', txId, timestamp)
verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures)
mainProcessReceiptTracker--
verifiedReceiptCount++
if (verificationResult.success) {
successReceiptCount++
} else {
failureReceiptCount++
}
return verificationResult
}

// Existing logic for offloading
if (workers.length === 0 && mainProcessReceiptTracker > config.receiptLoadTrakerLimit) {
// If there are extra workers available, put them to the workers list
if (extraWorkers.size > 0) {
console.log(
`offloadReceipt - Extra workers available: ${extraWorkers.size}, moving them to workers list`
)
// Move the extra workers to the workers list
for (const [pid, worker] of extraWorkers) {
workers.push(worker)
extraWorkers.delete(pid)
}
}
// // If there are still no workers available, add randon wait time (0-1 second) and proceed
// if (workers.length === 0 && mainProcessReceiptTracker > config.receiptLoadTrakerLimit) {
// await Utils.sleep(Math.floor(Math.random() * 1000))
// }
}
if (workers.length === 0) {
mainProcessReceiptTracker++
if (config.workerProcessesDebugLog) console.log('Verifying on the main program 1', txId, timestamp)
verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures)
mainProcessReceiptTracker--
} else {
mainProcessReceiptTracker = 0
// Forward the request to a worker in a round-robin fashion
let worker = workers[currentWorker]
currentWorker = (currentWorker + 1) % workers.length
if (!worker) {
console.error('No worker available to process the receipt 1')
worker = workers[currentWorker]
currentWorker = (currentWorker + 1) % workers.length
if (worker) {
console.log('Verifying on the worker process 2', txId, timestamp, worker.process.pid)
const cloneReceipt = Utils.deepCopy(receipt)
delete cloneReceipt.tx.originalTxData
delete cloneReceipt.executionShardKey
const stringifiedReceipt = StringUtils.safeStringify(cloneReceipt)
worker.send({
type: 'receipt-verification',
data: { stringifiedReceipt, requiredSignatures },
})
verificationResult = await forwardReceiptVerificationResult(txId, timestamp, worker)
} else {
console.error('No worker available to process the receipt 2')
// Verifying the receipt in the main thread
console.log('Verifying on the main program 2', txId, timestamp)
verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures)
}
} else {
if (config.workerProcessesDebugLog)
console.log('Verifying on the worker process 1', txId, timestamp, worker.process.pid)
const cloneReceipt = Utils.deepCopy(receipt)
delete cloneReceipt.tx.originalTxData
delete cloneReceipt.executionShardKey
const stringifiedReceipt = StringUtils.safeStringify(cloneReceipt)
worker.send({
type: 'receipt-verification',
data: { stringifiedReceipt, requiredSignatures },
})
verificationResult = await forwardReceiptVerificationResult(txId, timestamp, worker)
}
}
verifiedReceiptCount++
if (verificationResult.success) {
successReceiptCount++
} else {
failureReceiptCount++
}
return verificationResult
}

0 comments on commit 780ed88

Please sign in to comment.