Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BLUE-283: IPFS-publisher's PM2 process & separate DB added #89

Open
wants to merge 1 commit into
base: BLUE-280
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ archiver-logs
data-logs
.direnv/
.secrets
src/utils/esm/bundle.js
src/ipfsPublisher/esm/bundle.js
3 changes: 1 addition & 2 deletions archiver-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
"web3Storage": {
"rootDID": "did:key:REPLACE_ME",
"adminEmail": ""
},
"enableSavingToWeb3Storage": false
}
}
}
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
},
"scripts": {
"start": "npm run prepare && node build/server.js",
"w3s-build": "esbuild --platform=node src/utils/esm/index.ts --bundle --outfile=src/utils/esm/bundle.js",
"txDigestCronServer": "npm run w3s-build && npm run prepare && node build/txDigester.js",
"w3s-build": "esbuild --platform=node src/ipfsPublisher/esm/index.ts --bundle --outfile=src/ipfsPublisher/esm/bundle.js",
"ipfsPublisher": "npm run w3s-build && npm run prepare && node build/ipfsPublisher/index.js",
"txDigestCronServer": "npm run prepare && node build/txDigester.js",
"txDigestApiServer": "npm run prepare && node build/txDigestAPIserver.js",
"release": "npm run prepare && np --no-cleanup --no-tests --no-yarn --any-branch",
"check": "gts check",
Expand Down
4 changes: 2 additions & 2 deletions src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export interface Config {
originalTxDataDB: string
processedTxDB: string
txDigestDB: string
ipfsRecordsDB: string
}
DATASENDER_TIMEOUT: number
RATE_LIMIT: number // number of allowed request per second,
Expand Down Expand Up @@ -98,7 +99,6 @@ export interface Config {
rootDID: string
adminEmail: string
}
enableSavingToWeb3Storage: boolean
}
workerProcessesDebugLog: boolean // To enable debug logs for worker processes managed by the main process
restrictFirstNodeSelectionByPublicKey: boolean // The flag to pick the first node that matches the PUBLIC_KEY specified in the firstNodeInfo
Expand All @@ -121,6 +121,7 @@ let config: Config = {
originalTxDataDB: 'originalTxsData.sqlite3',
processedTxDB: 'processedTransactions.sqlite3',
txDigestDB: 'txDigest.sqlite3',
ipfsRecordsDB: 'ipfsRecords.sqlite3',
},
DATASENDER_TIMEOUT: 1000 * 60 * 5,
RATE_LIMIT: 100, // 100 req per second,
Expand Down Expand Up @@ -193,7 +194,6 @@ let config: Config = {
rootDID: 'did:key:', // Should be in the format: did:key:<DID>
adminEmail: '', // Make sure the email here is the one to which the intended Web3.Storage Account is linked to
},
enableSavingToWeb3Storage: false, // Set rootDID and adminEmail when you enable this
},
workerProcessesDebugLog: false,
restrictFirstNodeSelectionByPublicKey: false,
Expand Down
80 changes: 80 additions & 0 deletions src/ipfsPublisher/dbStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { Database } from 'sqlite3'
import * as db from '../dbstore/sqlite3storage'

import { Config, config } from '../Config'
import { createDirectories } from '../Utils'
import { initializeDB } from '../txDigester/index'
import { TransactionDigest } from '../txDigester/txDigests'
import { createDB, runCreate, close } from '../dbstore/sqlite3storage'

export let ipfsRecordsDB: Database

export interface IPFSRecord extends Omit<TransactionDigest, 'txCount'> {
cid: string
spaceDID: string
timestamp: number
}

export const initTxDigestDB = async (config: Config): Promise<void> => initializeDB(config)

export const initializeIPFSRecordsDB = async (config: Config): Promise<void> => {
createDirectories(config.ARCHIVER_DB)
ipfsRecordsDB = await createDB(
`${config.ARCHIVER_DB}/${config.ARCHIVER_DATA.ipfsRecordsDB}`,
'IPFSRecordsDB'
)
await runCreate(
ipfsRecordsDB,
'CREATE TABLE if not exists `ipfsRecords` (`CID` TEXT NOT NULL, `cycleStart` NUMBER NOT NULL UNIQUE, `cycleEnd` NUMBER NOT NULL UNIQUE, `hash` TEXT NOT NULL, `timestamp` BIGINT NOT NULL, `spaceDID` TEXT NOT NULL, PRIMARY KEY (`cycleEnd`))'
)
}

export async function insertUploadedIPFSRecord(ipfsRecord: IPFSRecord): Promise<void> {
try {
const fields = Object.keys(ipfsRecord).join(', ')
const placeholders = Object.keys(ipfsRecord).fill('?').join(', ')
const values = db.extractValues(ipfsRecord)
const sql =
'INSERT INTO ipfsRecords (' +
fields +
') VALUES (' +
placeholders +
') ON CONFLICT (cycleEnd) DO UPDATE SET ' +
'CID = excluded.CID, ' +
'cycleStart = excluded.cycleStart, ' +
'hash = excluded.hash, ' +
'timestamp = excluded.timestamp, ' +
'spaceDID = excluded.spaceDID'

await db.run(ipfsRecordsDB, sql, values)
if (config.VERBOSE) {
console.log(
`Successfully inserted ipfsRecord for cycle records from ${ipfsRecord.cycleStart} to ${ipfsRecord.cycleEnd}`
)
}
} catch (e) {
console.error(e)
throw new Error(
`Unable to Insert ipfsRecord for cycle records from ${ipfsRecord.cycleStart} to ${ipfsRecord.cycleEnd}`
)
}
}

export async function getLastUploadedIPFSRecord(): Promise<IPFSRecord> {
try {
const sql = `SELECT * FROM ipfsRecords ORDER BY cycleEnd DESC LIMIT 1`
const lastUploadedDigest = (await db.get(ipfsRecordsDB, sql)) as IPFSRecord
if (config.VERBOSE) {
console.log('Last Digest Uploaded to IPFS: ')
console.dir(lastUploadedDigest)
}
return lastUploadedDigest
} catch (e) {
console.error('Error while Fetching last uploaded IPFS Record: ', e)
return null
}
}

export const closeDatabase = async (): Promise<void> => {
await close(ipfsRecordsDB, 'IPFSRecordsDB')
}
File renamed without changes.
104 changes: 104 additions & 0 deletions src/ipfsPublisher/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import * as cron from 'node-cron'
import { sleep } from '../Utils'
import { readFileSync } from 'fs'
import * as Crypto from '../Crypto'
import * as Logger from '../Logger'
import { join, resolve } from 'path'
import { startSaving } from '../saveConsoleOutput'
import { Utils as StringUtils } from '@shardus/types'
import { overrideDefaultConfig, config } from '../Config'
import { getLastProcessedTxDigest } from '../txDigester/txDigests'
import { getTxDigestsForACycleRange } from '../txDigester/txDigestFunctions'

import {
init as initPublisher,
uploadDigestToIPFS,
processFailedDigestUploads,
failedDigests,
REST_PERIOD_BETWEEN_UPLOADS,
} from './ipfsPublisher'
import {
initTxDigestDB,
insertUploadedIPFSRecord,
initializeIPFSRecordsDB,
getLastUploadedIPFSRecord,
} from './dbStore'

let isCronActive = false

const configFile = join(process.cwd(), 'archiver-config.json')

;(async (): Promise<void> => {
overrideDefaultConfig(configFile)
const BASE_DIR = '.'
const LOG_DIR = `${config.ARCHIVER_LOGS}/ipfsPublisher`

Crypto.setCryptoHashKey(config.ARCHIVER_HASH_KEY)
let logsConfig
try {
logsConfig = StringUtils.safeJsonParse(
readFileSync(resolve(__dirname, '../../archiver-log.json'), 'utf8')
)
logsConfig.dir = LOG_DIR
} catch (err) {
console.log('Failed to parse archiver log file:', err)
}

Logger.initLogger(BASE_DIR, logsConfig)
if (logsConfig.saveConsoleOutput) {
startSaving(join(BASE_DIR, logsConfig.dir))
}

await initTxDigestDB(config)
await initializeIPFSRecordsDB(config)
await initPublisher()

cron.schedule(config.txDigest.txCronSchedule, async () => {
try {
if (isCronActive) {
console.log('IPFS Publisher Cron Job already running....')
return
}
isCronActive = true
console.log('Running IPFS Publisher Cron Job....')

if (failedDigests.length > 0) {
console.log(`⏳ Re-trying: ${failedDigests.length} Failed Digest Uploads...`)
await processFailedDigestUploads()
}

const lastTxDigest = await getLastProcessedTxDigest()
let latestDigestPushedToIPFS = (await getLastUploadedIPFSRecord()) || { cycleEnd: 0 }

console.log(
`Latest Cycle of Tx-Digest Processed vs Pushed to IPFS: ${lastTxDigest.cycleEnd} vs ${latestDigestPushedToIPFS.cycleEnd}`
)

if (latestDigestPushedToIPFS) {
if (latestDigestPushedToIPFS.cycleEnd < lastTxDigest.cycleEnd) {
const txDigests = await getTxDigestsForACycleRange(
latestDigestPushedToIPFS.cycleEnd,
lastTxDigest.cycleEnd
)
console.log('TX-Digests to Upload: ', txDigests.length)
for (const digest of txDigests) {
const uploadedDigest = await uploadDigestToIPFS(digest)
if (uploadedDigest) {
await insertUploadedIPFSRecord(uploadedDigest)
latestDigestPushedToIPFS = uploadedDigest
console.log(
`✅ Tx-Digest till Cycle ${latestDigestPushedToIPFS.cycleEnd} pushed to IPFS (CID: ${uploadedDigest.cid}).`
)
console.log(`Waiting for ${REST_PERIOD_BETWEEN_UPLOADS}ms before next upload...`)
await sleep(REST_PERIOD_BETWEEN_UPLOADS)
}
}
}
}
isCronActive = false
} catch (error) {
isCronActive = false
console.error('Error in ipfsPublisher Cron Job:', error)
}
})
})()
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import type { Client } from '@web3-storage/w3up-client'
import { BlobLike } from '@web3-storage/w3up-client/dist/src/types'

import { join } from 'path'
import { Blob } from 'buffer'
import { TransactionDigest } from './txDigests'
import type { Client } from '@web3-storage/w3up-client'
import { overrideDefaultConfig, config } from '../Config'
import { mainLogger } from '../Logger'
import { BlobLike } from '@web3-storage/w3up-client/dist/src/types'
import { IPFSRecord, insertUploadedIPFSRecord } from './dbStore'
import { overrideDefaultConfig, config } from '../Config'
import { TransactionDigest } from '../txDigester/txDigests'

// eslint-disable-next-line @typescript-eslint/no-var-requires
const { W3SClient, Email } = require(join(process.cwd(), '/src/utils/esm/bundle.js'))
const { W3SClient, Email } = require(join(process.cwd(), '/src/ipfsPublisher/esm/bundle.js'))

const configFile = join(process.cwd(), 'archiver-config.json')
overrideDefaultConfig(configFile)
Expand All @@ -19,46 +22,66 @@ type Web3StorageAdminEmail = `${string}@${string}`
let client: Client
let lastUploadTime: number | null = null
let isPublisherActive = false
export const failedDigests: TransactionDigest[] = []

// Note: This can be removed in production or in cases where uploads will happen once in 5 or more cycles/minutes
const REST_PERIOD_BETWEEN_UPLOADS = 1000 * 60 * 5 // 5 minutes
export const REST_PERIOD_BETWEEN_UPLOADS = 1000 * 30 // 30 seconds

// Upload a file to the specified space DID
export const uploadDigestToIPFS = async (data: TransactionDigest): Promise<void> => {
export const uploadDigestToIPFS = async (data: TransactionDigest): Promise<IPFSRecord> | null => {
try {
if (!isUploadPossible()) {
console.log(
`Publisher cannot upload to IPFS: ${
`Publisher cannot upload to IPFS: ${
isPublisherActive ? 'Another Upload in progress.' : 'Rest Period is Active.'
}`
)
return
return null
}
isPublisherActive = true
console.log(`Uploading TX Digest for Cycle Range ${data.cycleStart} to ${data.cycleEnd}`)
await client.setCurrentSpace(rootDID as Web3StorageRootDID)
console.log(`Uploading Data to Root-DID: ${rootDID}`)
console.log(
`⏳ Uploading TX-Digest for Cycle Range ${data.cycleStart} to ${data.cycleEnd} | Space-DID: ${rootDID}`
)

const { cycleStart: cs, cycleEnd: ce, txCount: tc, hash: h } = data
const optimisedJSON = { cs, ce, tc, h }

const cid = await client.uploadFile(
new Blob([JSON.stringify(optimisedJSON)], { type: 'application/json' }) as BlobLike
)
console.log(
mainLogger.log(
`✅ Uploaded to IPFS Successfully for Cycles (${data.cycleStart} to ${data.cycleEnd}) @ https://${cid}.ipfs.w3s.link`
)
lastUploadTime = Date.now()
removeFailedDigest(data)
const storageUsed = await client.currentSpace()?.usage.get()
console.log(`${Number(storageUsed!.ok)} bytes used on Web3.Storage.`)
isPublisherActive = false
delete data.txCount
return { ...data, cid: cid.toString(), spaceDID: client.currentSpace()?.did(), timestamp: lastUploadTime }
} catch (error) {
isPublisherActive = false
console.error(
mainLogger.error(
`❌ Error while Uploading Digest for Cycles: ${data.cycleStart} to ${data.cycleEnd} w/ Hash: ${data.hash}) to IPFS:`
)
console.error(error)
return
mainLogger.error(error)
addFailedDigest(data)
return null
}
}

const addFailedDigest = (digest: TransactionDigest): void => {
const index = failedDigests.findIndex((failedDigest) => failedDigest.hash === digest.hash)
if (index === -1) {
failedDigests.push(digest)
}
}

const removeFailedDigest = (digest: TransactionDigest): void => {
const index = failedDigests.findIndex((failedDigest) => failedDigest.hash === digest.hash)
if (index > -1) {
failedDigests.splice(index, 1)
}
}

Expand All @@ -81,6 +104,27 @@ const maskedEmail = (email: string): string => {
return `${name.slice(0, 1)}${'*'.repeat(name.length - 2)}${name.slice(-1)}@${domain}`
}

export const processFailedDigestUploads = async (): Promise<void> => {
try {
if (failedDigests.length > 0) {
for (const failedDigest of failedDigests) {
const uploadedDigest = await uploadDigestToIPFS(failedDigest)
if (!uploadedDigest) {
console.error(
`❌ Failed to upload Digest for Cycle Range ${failedDigest.cycleStart} to ${failedDigest.cycleEnd}. Failed Digests: ${failedDigests.length}`
)
} else {
await insertUploadedIPFSRecord(uploadedDigest)
failedDigests.splice(failedDigests.indexOf(failedDigest), 1)
}
}
}
} catch (error) {
mainLogger.error(`❌ Error in processFailedDigestUploads():-`)
mainLogger.error(error)
}
}

export const init = async (): Promise<void> => {
try {
isPublisherActive = true
Expand Down
4 changes: 0 additions & 4 deletions src/txDigester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { resolve } from 'path'
import * as Logger from './Logger'
import { startSaving } from './saveConsoleOutput'
import axios from 'axios'
import * as ipfsPublisher from './txDigester/ipfsPublisher'
const configFile = join(process.cwd(), 'archiver-config.json')

const start = async (): Promise<void> => {
Expand All @@ -38,9 +37,6 @@ const start = async (): Promise<void> => {

await txDigesterDB.initializeDB(config)

// Initialises the Client that publishes Transaction Digest to Web3.Storage
if (config.txDigest.enableSavingToWeb3Storage) ipfsPublisher.init()

const ARCHIVER_STATUS_CHECK_URL = `http://${config.ARCHIVER_IP}:${config.ARCHIVER_PORT}/status`

cron.schedule(config.txDigest.txCronSchedule, async () => {
Expand Down
Loading