diff --git a/.gitignore b/.gitignore index e21dfa5d..31a2a314 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,4 @@ archiver-logs data-logs .direnv/ .secrets -src/utils/esm/bundle.js \ No newline at end of file +src/ipfsPublisher/esm/bundle.js \ No newline at end of file diff --git a/archiver-config.json b/archiver-config.json index bdcb1026..cb75ed72 100644 --- a/archiver-config.json +++ b/archiver-config.json @@ -62,7 +62,6 @@ "web3Storage": { "rootDID": "did:key:REPLACE_ME", "adminEmail": "" - }, - "enableSavingToWeb3Storage": false + } } } diff --git a/package.json b/package.json index ba14da3e..b48876c3 100644 --- a/package.json +++ b/package.json @@ -16,8 +16,9 @@ }, "scripts": { "start": "npm run prepare && node build/server.js", - "w3s-build": "esbuild --platform=node src/utils/esm/index.ts --bundle --outfile=src/utils/esm/bundle.js", - "txDigestCronServer": "npm run w3s-build && npm run prepare && node build/txDigester.js", + "w3s-build": "esbuild --platform=node src/ipfsPublisher/esm/index.ts --bundle --outfile=src/ipfsPublisher/esm/bundle.js", + "ipfsPublisher": "npm run w3s-build && npm run prepare && node build/ipfsPublisher/index.js", + "txDigestCronServer": "npm run prepare && node build/txDigester.js", "txDigestApiServer": "npm run prepare && node build/txDigestAPIserver.js", "release": "npm run prepare && np --no-cleanup --no-tests --no-yarn --any-branch", "check": "gts check", diff --git a/src/Config.ts b/src/Config.ts index f6133f11..9daab38e 100644 --- a/src/Config.ts +++ b/src/Config.ts @@ -21,6 +21,7 @@ export interface Config { originalTxDataDB: string processedTxDB: string txDigestDB: string + ipfsRecordsDB: string } DATASENDER_TIMEOUT: number RATE_LIMIT: number // number of allowed request per second, @@ -98,7 +99,6 @@ export interface Config { rootDID: string adminEmail: string } - enableSavingToWeb3Storage: boolean } workerProcessesDebugLog: boolean // To enable debug logs for worker processes managed by the main process restrictFirstNodeSelectionByPublicKey: boolean // The flag to pick the first node that matches the PUBLIC_KEY specified in the firstNodeInfo @@ -121,6 +121,7 @@ let config: Config = { originalTxDataDB: 'originalTxsData.sqlite3', processedTxDB: 'processedTransactions.sqlite3', txDigestDB: 'txDigest.sqlite3', + ipfsRecordsDB: 'ipfsRecords.sqlite3', }, DATASENDER_TIMEOUT: 1000 * 60 * 5, RATE_LIMIT: 100, // 100 req per second, @@ -193,7 +194,6 @@ let config: Config = { rootDID: 'did:key:', // Should be in the format: did:key: adminEmail: '', // Make sure the email here is the one to which the intended Web3.Storage Account is linked to }, - enableSavingToWeb3Storage: false, // Set rootDID and adminEmail when you enable this }, workerProcessesDebugLog: false, restrictFirstNodeSelectionByPublicKey: false, diff --git a/src/ipfsPublisher/dbStore.ts b/src/ipfsPublisher/dbStore.ts new file mode 100644 index 00000000..9b6a0b26 --- /dev/null +++ b/src/ipfsPublisher/dbStore.ts @@ -0,0 +1,80 @@ +import { Database } from 'sqlite3' +import * as db from '../dbstore/sqlite3storage' + +import { Config, config } from '../Config' +import { createDirectories } from '../Utils' +import { initializeDB } from '../txDigester/index' +import { TransactionDigest } from '../txDigester/txDigests' +import { createDB, runCreate, close } from '../dbstore/sqlite3storage' + +export let ipfsRecordsDB: Database + +export interface IPFSRecord extends Omit { + cid: string + spaceDID: string + timestamp: number +} + +export const initTxDigestDB = async (config: Config): Promise => initializeDB(config) + +export const initializeIPFSRecordsDB = async (config: Config): Promise => { + createDirectories(config.ARCHIVER_DB) + ipfsRecordsDB = await createDB( + `${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.ipfsRecordsDB}`, + 'IPFSRecordsDB' + ) + await runCreate( + ipfsRecordsDB, + 'CREATE TABLE if not exists `ipfsRecords` (`CID` TEXT NOT NULL, `cycleStart` NUMBER NOT NULL UNIQUE, `cycleEnd` NUMBER NOT NULL UNIQUE, `hash` TEXT NOT NULL, `timestamp` BIGINT NOT NULL, `spaceDID` TEXT NOT NULL, PRIMARY KEY (`cycleEnd`))' + ) +} + +export async function insertUploadedIPFSRecord(ipfsRecord: IPFSRecord): Promise { + try { + const fields = Object.keys(ipfsRecord).join(', ') + const placeholders = Object.keys(ipfsRecord).fill('?').join(', ') + const values = db.extractValues(ipfsRecord) + const sql = + 'INSERT INTO ipfsRecords (' + + fields + + ') VALUES (' + + placeholders + + ') ON CONFLICT (cycleEnd) DO UPDATE SET ' + + 'CID = excluded.CID, ' + + 'cycleStart = excluded.cycleStart, ' + + 'hash = excluded.hash, ' + + 'timestamp = excluded.timestamp, ' + + 'spaceDID = excluded.spaceDID' + + await db.run(ipfsRecordsDB, sql, values) + if (config.VERBOSE) { + console.log( + `Successfully inserted ipfsRecord for cycle records from ${ipfsRecord.cycleStart} to ${ipfsRecord.cycleEnd}` + ) + } + } catch (e) { + console.error(e) + throw new Error( + `Unable to Insert ipfsRecord for cycle records from ${ipfsRecord.cycleStart} to ${ipfsRecord.cycleEnd}` + ) + } +} + +export async function getLastUploadedIPFSRecord(): Promise { + try { + const sql = `SELECT * FROM ipfsRecords ORDER BY cycleEnd DESC LIMIT 1` + const lastUploadedDigest = (await db.get(ipfsRecordsDB, sql)) as IPFSRecord + if (config.VERBOSE) { + console.log('Last Digest Uploaded to IPFS: ') + console.dir(lastUploadedDigest) + } + return lastUploadedDigest + } catch (e) { + console.error('Error while Fetching last uploaded IPFS Record: ', e) + return null + } +} + +export const closeDatabase = async (): Promise => { + await close(ipfsRecordsDB, 'IPFSRecordsDB') +} diff --git a/src/utils/esm/index.ts b/src/ipfsPublisher/esm/index.ts similarity index 100% rename from src/utils/esm/index.ts rename to src/ipfsPublisher/esm/index.ts diff --git a/src/ipfsPublisher/index.ts b/src/ipfsPublisher/index.ts new file mode 100644 index 00000000..2d5acdc2 --- /dev/null +++ b/src/ipfsPublisher/index.ts @@ -0,0 +1,104 @@ +import * as cron from 'node-cron' +import { sleep } from '../Utils' +import { readFileSync } from 'fs' +import * as Crypto from '../Crypto' +import * as Logger from '../Logger' +import { join, resolve } from 'path' +import { startSaving } from '../saveConsoleOutput' +import { Utils as StringUtils } from '@shardus/types' +import { overrideDefaultConfig, config } from '../Config' +import { getLastProcessedTxDigest } from '../txDigester/txDigests' +import { getTxDigestsForACycleRange } from '../txDigester/txDigestFunctions' + +import { + init as initPublisher, + uploadDigestToIPFS, + processFailedDigestUploads, + failedDigests, + REST_PERIOD_BETWEEN_UPLOADS, +} from './ipfsPublisher' +import { + initTxDigestDB, + insertUploadedIPFSRecord, + initializeIPFSRecordsDB, + getLastUploadedIPFSRecord, +} from './dbStore' + +let isCronActive = false + +const configFile = join(process.cwd(), 'archiver-config.json') + +;(async (): Promise => { + overrideDefaultConfig(configFile) + const BASE_DIR = '.' + const LOG_DIR = `${config.ARCHIVER_LOGS}/ipfsPublisher` + + Crypto.setCryptoHashKey(config.ARCHIVER_HASH_KEY) + let logsConfig + try { + logsConfig = StringUtils.safeJsonParse( + readFileSync(resolve(__dirname, '../../archiver-log.json'), 'utf8') + ) + logsConfig.dir = LOG_DIR + } catch (err) { + console.log('Failed to parse archiver log file:', err) + } + + Logger.initLogger(BASE_DIR, logsConfig) + if (logsConfig.saveConsoleOutput) { + startSaving(join(BASE_DIR, logsConfig.dir)) + } + + await initTxDigestDB(config) + await initializeIPFSRecordsDB(config) + await initPublisher() + + cron.schedule(config.txDigest.txCronSchedule, async () => { + try { + if (isCronActive) { + console.log('IPFS Publisher Cron Job already running....') + return + } + isCronActive = true + console.log('Running IPFS Publisher Cron Job....') + + if (failedDigests.length > 0) { + console.log(`⏳ Re-trying: ${failedDigests.length} Failed Digest Uploads...`) + await processFailedDigestUploads() + } + + const lastTxDigest = await getLastProcessedTxDigest() + let latestDigestPushedToIPFS = (await getLastUploadedIPFSRecord()) || { cycleEnd: 0 } + + console.log( + `Latest Cycle of Tx-Digest Processed vs Pushed to IPFS: ${lastTxDigest.cycleEnd} vs ${latestDigestPushedToIPFS.cycleEnd}` + ) + + if (latestDigestPushedToIPFS) { + if (latestDigestPushedToIPFS.cycleEnd < lastTxDigest.cycleEnd) { + const txDigests = await getTxDigestsForACycleRange( + latestDigestPushedToIPFS.cycleEnd, + lastTxDigest.cycleEnd + ) + console.log('TX-Digests to Upload: ', txDigests.length) + for (const digest of txDigests) { + const uploadedDigest = await uploadDigestToIPFS(digest) + if (uploadedDigest) { + await insertUploadedIPFSRecord(uploadedDigest) + latestDigestPushedToIPFS = uploadedDigest + console.log( + `✅ Tx-Digest till Cycle ${latestDigestPushedToIPFS.cycleEnd} pushed to IPFS (CID: ${uploadedDigest.cid}).` + ) + console.log(`Waiting for ${REST_PERIOD_BETWEEN_UPLOADS}ms before next upload...`) + await sleep(REST_PERIOD_BETWEEN_UPLOADS) + } + } + } + } + isCronActive = false + } catch (error) { + isCronActive = false + console.error('Error in ipfsPublisher Cron Job:', error) + } + }) +})() diff --git a/src/txDigester/ipfsPublisher.ts b/src/ipfsPublisher/ipfsPublisher.ts similarity index 65% rename from src/txDigester/ipfsPublisher.ts rename to src/ipfsPublisher/ipfsPublisher.ts index 54de4fc7..538fe2d6 100644 --- a/src/txDigester/ipfsPublisher.ts +++ b/src/ipfsPublisher/ipfsPublisher.ts @@ -1,12 +1,15 @@ +import type { Client } from '@web3-storage/w3up-client' +import { BlobLike } from '@web3-storage/w3up-client/dist/src/types' + import { join } from 'path' import { Blob } from 'buffer' -import { TransactionDigest } from './txDigests' -import type { Client } from '@web3-storage/w3up-client' -import { overrideDefaultConfig, config } from '../Config' import { mainLogger } from '../Logger' -import { BlobLike } from '@web3-storage/w3up-client/dist/src/types' +import { IPFSRecord, insertUploadedIPFSRecord } from './dbStore' +import { overrideDefaultConfig, config } from '../Config' +import { TransactionDigest } from '../txDigester/txDigests' + // eslint-disable-next-line @typescript-eslint/no-var-requires -const { W3SClient, Email } = require(join(process.cwd(), '/src/utils/esm/bundle.js')) +const { W3SClient, Email } = require(join(process.cwd(), '/src/ipfsPublisher/esm/bundle.js')) const configFile = join(process.cwd(), 'archiver-config.json') overrideDefaultConfig(configFile) @@ -19,25 +22,27 @@ type Web3StorageAdminEmail = `${string}@${string}` let client: Client let lastUploadTime: number | null = null let isPublisherActive = false +export const failedDigests: TransactionDigest[] = [] // Note: This can be removed in production or in cases where uploads will happen once in 5 or more cycles/minutes -const REST_PERIOD_BETWEEN_UPLOADS = 1000 * 60 * 5 // 5 minutes +export const REST_PERIOD_BETWEEN_UPLOADS = 1000 * 30 // 30 seconds // Upload a file to the specified space DID -export const uploadDigestToIPFS = async (data: TransactionDigest): Promise => { +export const uploadDigestToIPFS = async (data: TransactionDigest): Promise | null => { try { if (!isUploadPossible()) { console.log( - `❌ Publisher cannot upload to IPFS: ${ + `Publisher cannot upload to IPFS: ${ isPublisherActive ? 'Another Upload in progress.' : 'Rest Period is Active.' }` ) - return + return null } isPublisherActive = true - console.log(`Uploading TX Digest for Cycle Range ${data.cycleStart} to ${data.cycleEnd}`) await client.setCurrentSpace(rootDID as Web3StorageRootDID) - console.log(`Uploading Data to Root-DID: ${rootDID}`) + console.log( + `⏳ Uploading TX-Digest for Cycle Range ${data.cycleStart} to ${data.cycleEnd} | Space-DID: ${rootDID}` + ) const { cycleStart: cs, cycleEnd: ce, txCount: tc, hash: h } = data const optimisedJSON = { cs, ce, tc, h } @@ -45,20 +50,38 @@ export const uploadDigestToIPFS = async (data: TransactionDigest): Promise const cid = await client.uploadFile( new Blob([JSON.stringify(optimisedJSON)], { type: 'application/json' }) as BlobLike ) - console.log( + mainLogger.log( `✅ Uploaded to IPFS Successfully for Cycles (${data.cycleStart} to ${data.cycleEnd}) @ https://${cid}.ipfs.w3s.link` ) lastUploadTime = Date.now() + removeFailedDigest(data) const storageUsed = await client.currentSpace()?.usage.get() console.log(`${Number(storageUsed!.ok)} bytes used on Web3.Storage.`) isPublisherActive = false + delete data.txCount + return { ...data, cid: cid.toString(), spaceDID: client.currentSpace()?.did(), timestamp: lastUploadTime } } catch (error) { isPublisherActive = false - console.error( + mainLogger.error( `❌ Error while Uploading Digest for Cycles: ${data.cycleStart} to ${data.cycleEnd} w/ Hash: ${data.hash}) to IPFS:` ) - console.error(error) - return + mainLogger.error(error) + addFailedDigest(data) + return null + } +} + +const addFailedDigest = (digest: TransactionDigest): void => { + const index = failedDigests.findIndex((failedDigest) => failedDigest.hash === digest.hash) + if (index === -1) { + failedDigests.push(digest) + } +} + +const removeFailedDigest = (digest: TransactionDigest): void => { + const index = failedDigests.findIndex((failedDigest) => failedDigest.hash === digest.hash) + if (index > -1) { + failedDigests.splice(index, 1) } } @@ -81,6 +104,27 @@ const maskedEmail = (email: string): string => { return `${name.slice(0, 1)}${'*'.repeat(name.length - 2)}${name.slice(-1)}@${domain}` } +export const processFailedDigestUploads = async (): Promise => { + try { + if (failedDigests.length > 0) { + for (const failedDigest of failedDigests) { + const uploadedDigest = await uploadDigestToIPFS(failedDigest) + if (!uploadedDigest) { + console.error( + `❌ Failed to upload Digest for Cycle Range ${failedDigest.cycleStart} to ${failedDigest.cycleEnd}. Failed Digests: ${failedDigests.length}` + ) + } else { + await insertUploadedIPFSRecord(uploadedDigest) + failedDigests.splice(failedDigests.indexOf(failedDigest), 1) + } + } + } + } catch (error) { + mainLogger.error(`❌ Error in processFailedDigestUploads():-`) + mainLogger.error(error) + } +} + export const init = async (): Promise => { try { isPublisherActive = true diff --git a/src/txDigester.ts b/src/txDigester.ts index a0a11220..f1db4d22 100644 --- a/src/txDigester.ts +++ b/src/txDigester.ts @@ -12,7 +12,6 @@ import { resolve } from 'path' import * as Logger from './Logger' import { startSaving } from './saveConsoleOutput' import axios from 'axios' -import * as ipfsPublisher from './txDigester/ipfsPublisher' const configFile = join(process.cwd(), 'archiver-config.json') const start = async (): Promise => { @@ -38,9 +37,6 @@ const start = async (): Promise => { await txDigesterDB.initializeDB(config) - // Initialises the Client that publishes Transaction Digest to Web3.Storage - if (config.txDigest.enableSavingToWeb3Storage) ipfsPublisher.init() - const ARCHIVER_STATUS_CHECK_URL = `http://${config.ARCHIVER_IP}:${config.ARCHIVER_PORT}/status` cron.schedule(config.txDigest.txCronSchedule, async () => { diff --git a/src/txDigester/txDigestFunctions.ts b/src/txDigester/txDigestFunctions.ts index 4b5b10c1..fd006fe5 100644 --- a/src/txDigester/txDigestFunctions.ts +++ b/src/txDigester/txDigestFunctions.ts @@ -2,7 +2,6 @@ import { config } from '../Config' import * as processedTxs from '../dbstore/processedTxs' import * as txDigest from './txDigests' import * as Crypto from '../Crypto' -import { uploadDigestToIPFS } from './ipfsPublisher' let lastProcessedTxDigest: txDigest.TransactionDigest = null @@ -58,7 +57,7 @@ export const processAndInsertTxDigests = async ( return } - if(config.VERBOSE) { + if (config.VERBOSE) { console.log(`TxIds from ${currentCycle} to ${endCycle} of length ${txIds.length}: `, txIds) } @@ -81,9 +80,6 @@ export const processAndInsertTxDigests = async ( try { txDigest.insertTransactionDigest(txDigestObj) - if (config.txDigest.enableSavingToWeb3Storage) { - await uploadDigestToIPFS(txDigestObj) - } } catch (e) { console.error('Failed to insert txDigestObj: ', txDigestObj) console.error(e)