Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHARD-44: GlobalTx fixes #103

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 238 additions & 91 deletions src/Data/Collector.ts

Large diffs are not rendered by default.

107 changes: 7 additions & 100 deletions src/primary-process/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { config } from '../Config'
import { EventEmitter } from 'events'
import { StateManager, Utils as StringUtils } from '@shardus/types'
import * as Utils from '../Utils'
import { verifyPayload } from '../types/ajv/Helpers'
import { AJVSchemaEnum } from '../types/enum/AJVSchemaEnum'

const MAX_WORKERS = cpus().length - 1 // Leaving 1 core for the master process

Expand Down Expand Up @@ -39,6 +41,11 @@ let currentWorker = 0

const emitter = new EventEmitter()

/**
* @deprecated This method is currently used in server.ts but will be removed as part of cleanup.
* Do not use this method in new code.
*/

export const setupWorkerProcesses = (cluster: Cluster): void => {
console.log(`Master ${process.pid} is running`)
// Set interval to check receipt count every 15 seconds
Expand Down Expand Up @@ -223,103 +230,3 @@ const forwardReceiptVerificationResult = (
})
})
}

export const offloadReceipt = async (
txId: string,
timestamp: number,
requiredSignatures: number,
receipt: ArchiverReceipt
): Promise<ReceiptVerificationResult> => {
receivedReceiptCount++ // Increment the counter for each receipt received
receiptLoadTraker++ // Increment the receipt load tracker
let verificationResult: ReceiptVerificationResult

// Check if offloading is disabled globally or for global modifications
if (
config.disableOffloadReceipt ||
(config.disableOffloadReceiptForGlobalModification && receipt.globalModification)
) {
mainProcessReceiptTracker++
if (config.workerProcessesDebugLog) console.log('Verifying on the main program', txId, timestamp)
verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures)
mainProcessReceiptTracker--
verifiedReceiptCount++
if (verificationResult.success) {
successReceiptCount++
} else {
failureReceiptCount++
}
return verificationResult
}

// Existing logic for offloading
if (workers.length === 0 && mainProcessReceiptTracker > config.receiptLoadTrakerLimit) {
// If there are extra workers available, put them to the workers list
if (extraWorkers.size > 0) {
console.log(
`offloadReceipt - Extra workers available: ${extraWorkers.size}, moving them to workers list`
)
// Move the extra workers to the workers list
for (const [pid, worker] of extraWorkers) {
workers.push(worker)
extraWorkers.delete(pid)
}
}
// // If there are still no workers available, add randon wait time (0-1 second) and proceed
// if (workers.length === 0 && mainProcessReceiptTracker > config.receiptLoadTrakerLimit) {
// await Utils.sleep(Math.floor(Math.random() * 1000))
// }
}
if (workers.length === 0) {
mainProcessReceiptTracker++
if (config.workerProcessesDebugLog) console.log('Verifying on the main program 1', txId, timestamp)
verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures)
mainProcessReceiptTracker--
} else {
mainProcessReceiptTracker = 0
// Forward the request to a worker in a round-robin fashion
let worker = workers[currentWorker]
currentWorker = (currentWorker + 1) % workers.length
if (!worker) {
console.error('No worker available to process the receipt 1')
worker = workers[currentWorker]
currentWorker = (currentWorker + 1) % workers.length
if (worker) {
console.log('Verifying on the worker process 2', txId, timestamp, worker.process.pid)
const cloneReceipt = Utils.deepCopy(receipt)
delete cloneReceipt.tx.originalTxData
delete cloneReceipt.executionShardKey
const stringifiedReceipt = StringUtils.safeStringify(cloneReceipt)
worker.send({
type: 'receipt-verification',
data: { stringifiedReceipt, requiredSignatures },
})
verificationResult = await forwardReceiptVerificationResult(txId, timestamp, worker)
} else {
console.error('No worker available to process the receipt 2')
// Verifying the receipt in the main thread
console.log('Verifying on the main program 2', txId, timestamp)
verificationResult = await verifyArchiverReceipt(receipt, requiredSignatures)
}
} else {
if (config.workerProcessesDebugLog)
console.log('Verifying on the worker process 1', txId, timestamp, worker.process.pid)
const cloneReceipt = Utils.deepCopy(receipt)
delete cloneReceipt.tx.originalTxData
delete cloneReceipt.executionShardKey
const stringifiedReceipt = StringUtils.safeStringify(cloneReceipt)
worker.send({
type: 'receipt-verification',
data: { stringifiedReceipt, requiredSignatures },
})
verificationResult = await forwardReceiptVerificationResult(txId, timestamp, worker)
}
}
verifiedReceiptCount++
if (verificationResult.success) {
successReceiptCount++
} else {
failureReceiptCount++
}
return verificationResult
}
6 changes: 2 additions & 4 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ import { setShutdownCycleRecord, cycleRecordWithShutDownMode } from './Data/Cycl
import { registerRoutes } from './API'
import { Utils as StringUtils } from '@shardus/types'
import { healthCheckRouter } from './routes/healthCheck'
import { setupWorkerProcesses } from './primary-process'
import { initWorkerProcess } from './worker-process'
import { initializeTickets } from './routes/tickets';
import { initAjvSchemas } from './types/ajv/Helpers'
import { initializeSerialization } from './utils/serialization/SchemaHelpers'
Expand Down Expand Up @@ -93,7 +91,7 @@ async function start(): Promise<void> {
if (!cluster.isPrimary) {
// Initialize state from config
await State.initFromConfig(config, false, false)
await initWorkerProcess()
// await initWorkerProcess()
return
}

Expand Down Expand Up @@ -497,7 +495,7 @@ async function startServer(): Promise<void> {
Logger.mainLogger.info(`Worker ${process.pid}: Archive-server is listening on http://0.0.0.0:${config.ARCHIVER_PORT}`)
State.setActive()
Collector.scheduleMissingTxsDataQuery()
setupWorkerProcesses(cluster)
// setupWorkerProcesses(cluster)
}
)
}
Expand Down
17 changes: 16 additions & 1 deletion src/shardeum/calculateAccountHash.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import * as crypto from '../Crypto'
import { ArchiverReceipt, SignedReceipt } from '../dbstore/receipts'
import { verifyPayload } from '../types/ajv/Helpers'
import { AJVSchemaEnum } from '../types/enum/AJVSchemaEnum'
import { verifyGlobalTxAccountChange } from './verifyGlobalTxReceipt'

// account types in Shardeum
Expand Down Expand Up @@ -64,7 +66,20 @@ export const verifyAccountHash = (
nestedCounterMessages = []
): boolean => {
try {
if (receipt.globalModification) {
let globalReceiptValidationErrors // This is used to store the validation errors of the globalTxReceipt
try {
globalReceiptValidationErrors = verifyPayload(AJVSchemaEnum.GlobalTxReceipt, receipt?.signedReceipt)
} catch (error) {
globalReceiptValidationErrors = true
failedReasons.push(
`Invalid Global Tx Receipt error: ${error}. txId ${receipt.tx.txId} , cycle ${receipt.cycle} , timestamp ${receipt.tx.timestamp}`
)
nestedCounterMessages.push(
`Invalid Global Tx Receipt error: ${error}. txId ${receipt.tx.txId} , cycle ${receipt.cycle} , timestamp ${receipt.tx.timestamp}`
)
return false
}
if (!globalReceiptValidationErrors) {
const result = verifyGlobalTxAccountChange(receipt, failedReasons, nestedCounterMessages)
if (!result) return false
return true
Expand Down
21 changes: 19 additions & 2 deletions src/shardeum/verifyAppReceiptData.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as crypto from '../Crypto'
import { ArchiverReceipt, Receipt, SignedReceipt } from '../dbstore/receipts'
import { Utils as StringUtils } from '@shardus/types'
import { verifyPayload } from '../types/ajv/Helpers'
import { AJVSchemaEnum } from '../types/enum/AJVSchemaEnum'

export type ShardeumReceipt = object & {
amountSpent: string
Expand All @@ -14,8 +16,23 @@ export const verifyAppReceiptData = async (
nestedCounterMessages = []
): Promise<{ valid: boolean; needToSave: boolean }> => {
let result = { valid: false, needToSave: false }
const { appReceiptData, globalModification } = receipt
if (globalModification) return { valid: true, needToSave: true }
const { appReceiptData } = receipt
let globalReceiptValidationErrors // This is used to store the validation errors of the globalTxReceipt
try {
globalReceiptValidationErrors = verifyPayload(AJVSchemaEnum.GlobalTxReceipt, receipt?.signedReceipt)
} catch (error) {
globalReceiptValidationErrors = true
failedReasons.push(
`Invalid Global Tx Receipt error: ${error}. txId ${receipt.tx.txId} , cycle ${receipt.cycle} , timestamp ${receipt.tx.timestamp}`
)
nestedCounterMessages.push(
`Invalid Global Tx Receipt error: ${error}. txId ${receipt.tx.txId} , cycle ${receipt.cycle} , timestamp ${receipt.tx.timestamp}`
)
return result
}
if (!globalReceiptValidationErrors) {
return { valid: true, needToSave: true }
}
const signedReceipt = receipt.signedReceipt as SignedReceipt
const newShardeumReceipt = appReceiptData.data as ShardeumReceipt
if (!newShardeumReceipt.amountSpent || !newShardeumReceipt.readableReceipt) {
Expand Down
2 changes: 1 addition & 1 deletion src/types/ajv/OriginalTxData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const schemaOriginalTxData = {
properties: {
txId: { type: 'string' }, // txId must be a string
timestamp: { type: 'integer', minimum: 0 }, // timestamp must be an integer
cycle: { type: 'integer', minimum: 0 }, // cycle must be an integer
cycle: { type: 'integer', minimum: -1 }, // cycle must be an integer
originalTxData: { type: 'object' }, // originalTxData must be an object
// Uncomment if sign is required:
// sign: { type: 'string' } // Sign (if used) must be a string
Expand Down
1 change: 1 addition & 0 deletions src/types/ajv/Receipts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,5 @@ function addSchemas(): void {
// addSchema('ReceiptTx', schemaTx);
addSchema(AJVSchemaEnum.ArchiverReceipt, schemaArchiverReceipt);
addSchema(AJVSchemaEnum.Receipt, schemaReceipt);
addSchema(AJVSchemaEnum.GlobalTxReceipt, schemaGlobalTxReceipt);
}
3 changes: 2 additions & 1 deletion src/types/enum/AJVSchemaEnum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export enum AJVSchemaEnum {
Receipt = 'Receipt',
AccountsCopy = 'AccountsCopy',
ArchiverReceipt = 'ArchiverReceipt',
OriginalTxData = 'OriginalTxData'
OriginalTxData = 'OriginalTxData',
GlobalTxReceipt = 'GlobalTxReceipt',
}
Loading