From 6b4d92c2796ea78417678fbca6a7a8f2381d0635 Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Thu, 5 Sep 2024 17:09:27 +0530 Subject: [PATCH 01/13] have fallback IPFS --- indexer-compose.yml | 4 +-- src/config.ts | 13 ++++---- src/index.ts | 74 +++++++++++++++++++++++++++++++++------------ 3 files changed, 63 insertions(+), 28 deletions(-) diff --git a/indexer-compose.yml b/indexer-compose.yml index d2cf9758..a2a33f3b 100644 --- a/indexer-compose.yml +++ b/indexer-compose.yml @@ -20,7 +20,7 @@ services: ENABLE_RESOURCE_MONITOR: ${ENABLE_RESOURCE_MONITOR} ESTIMATES_LINEARQF_WORKER_POOL_SIZE: ${ESTIMATES_LINEARQF_WORKER_POOL_SIZE} PINO_PRETTY: ${PINO_PRETTY} - IPFS_GATEWAY: ${IPFS_GATEWAY} + IPFS_GATEWAYS: ${IPFS_GATEWAYS} COINGECKO_API_KEY: ${COINGECKO_API_KEY} GRAPHILE_LICENSE: ${GRAPHILE_LICENSE} SEPOLIA_RPC_URL: ${SEPOLIA_RPC_URL} @@ -62,7 +62,7 @@ services: ENABLE_RESOURCE_MONITOR: ${ENABLE_RESOURCE_MONITOR} ESTIMATES_LINEARQF_WORKER_POOL_SIZE: ${ESTIMATES_LINEARQF_WORKER_POOL_SIZE} PINO_PRETTY: ${PINO_PRETTY} - IPFS_GATEWAY: ${IPFS_GATEWAY} + IPFS_GATEWAYS: ${IPFS_GATEWAYS} COINGECKO_API_KEY: ${COINGECKO_API_KEY} GRAPHILE_LICENSE: ${GRAPHILE_LICENSE} SEPOLIA_RPC_URL: ${SEPOLIA_RPC_URL} diff --git a/src/config.ts b/src/config.ts index 2fae6422..bc4c9605 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1487,7 +1487,7 @@ const CHAINS: Chain[] = [ .default("https://evm-rpc.sei-apis.com") .parse(process.env.SEI_MAINNET_RPC_URL), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), - maxGetLogsRange: 10000, + maxGetLogsRange: 1000, tokens: [ { code: "SEI", @@ -1818,7 +1818,7 @@ export type Config = { httpServerWaitForSync: boolean; httpServerEnabled: boolean; indexerEnabled: boolean; - ipfsGateway: string; + ipfsGateways: string[]; coingeckoApiKey: string | null; coingeckoApiUrl: string; chains: Chain[]; @@ -1981,10 +1981,11 @@ export function getConfig(): Config { const runOnce = z.boolean().default(false).parse(args["run-once"]); - const ipfsGateway = z + const ipfsGateways = z .string() - .default("https://ipfs.io") - .parse(process.env.IPFS_GATEWAY); + .array() + .default(["https://ipfs.io"]) + .parse(JSON.parse(process.env.IPFS_GATEWAYS!)); const sentryDsn = z .union([z.string(), z.null()]) @@ -2041,7 +2042,7 @@ export function getConfig(): Config { cacheDir, logLevel, runOnce, - ipfsGateway, + ipfsGateways, passportScorerId, apiHttpPort, pinoPretty, diff --git a/src/index.ts b/src/index.ts index 5062a1aa..67d1f3c5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -467,29 +467,63 @@ async function catchupAndWatchChain( return undefined; } - const url = `${config.ipfsGateway}/ipfs/${cid}`; + // Fetch from a single IPFS gateway + const fetchFromGateway = async (url: string): Promise => { + try { + const res = await fetch(url, { + timeout: 2000, + onRetry(cause) { + chainLogger.debug({ + msg: "Retrying IPFS request", + url: url, + err: cause, + }); + }, + retry: { retries: 3, minTimeout: 2000, maxTimeout: 60 * 10000 }, + // IPFS data is immutable, we can rely entirely on the cache when present + cache: "force-cache", + cachePath: + config.cacheDir !== null + ? path.join(config.cacheDir, "ipfs") + : undefined, + }); - // chainLogger.trace(`Fetching ${url}`); + if (res.ok) { + return (await res.json()) as T; // Return the fetched data + } else { + chainLogger.warn( + `Failed to fetch from ${url}, status: ${res.status} ${res.statusText}` + ); + } + } catch (err) { + chainLogger.error( + `Error fetching from gateway ${url}: ${String(err)}` + ); + } + }; - const res = await fetch(url, { - timeout: 2000, - onRetry(cause) { - chainLogger.debug({ - msg: "Retrying IPFS request", - url: url, - err: cause, - }); - }, - retry: { retries: 3, minTimeout: 2000, maxTimeout: 60 * 10000 }, - // IPFS data is immutable, we can rely entirely on the cache when present - cache: "force-cache", - cachePath: - config.cacheDir !== null - ? path.join(config.cacheDir, "ipfs") - : undefined, - }); + // Iterate through each gateway and attempt to fetch data + for (const gateway of config.ipfsGateways) { + const url = `${gateway}/ipfs/${cid}`; + chainLogger.info(`Trying IPFS gateway: ${gateway} for CID: ${cid}`); - return (await res.json()) as T; + const result = await fetchFromGateway(url); + if (result !== undefined) { + chainLogger.info( + `Fetch successful from gateway: ${gateway} for CID: ${cid}` + ); + return result; // Return the result if fetched successfully + } else { + chainLogger.warn( + `IPFS fetch failed for gateway ${gateway} for CID ${cid}` + ); + } + } + + chainLogger.error( + `Failed to fetch IPFS data for CID ${cid} from all gateways.` + ); + return undefined; // Return undefined if all gateways fail }; chainLogger.info("DEBUG: catching up with blockchain events"); From b389c5d694bb077ad17582321a5b7e9240bbfbe0 Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Thu, 5 Sep 2024 21:59:00 +0530 Subject: [PATCH 02/13] feat: store ipfs data in db --- docs/reindexing.md | 8 +- src/config.ts | 14 +++ src/database/changeset.ts | 5 + src/database/index.ts | 201 +++++++++++++++++++++++++++++++------- src/database/migrate.ts | 10 ++ src/database/schema.ts | 10 ++ src/http/api/v1/status.ts | 3 +- src/index.ts | 38 ++++++- 8 files changed, 248 insertions(+), 41 deletions(-) diff --git a/docs/reindexing.md b/docs/reindexing.md index 00f6fc67..5f114bcc 100644 --- a/docs/reindexing.md +++ b/docs/reindexing.md @@ -12,9 +12,13 @@ When deploying changes to the indexer, it's important to clarify the results you - The indexer will create a new schema in Postgres named `chain_data_${version}`. If this schema does not exist, it will be created, all necessary tables will be set up, and indexing will start from scratch. - If the schema already exists, the indexer will resume indexing from the last indexed block unless the `--drop-db` flag is specified via the CLI. This will drop the existing database and start fresh. -### Using `--drop-db` in Development +### Using `--drop-db` | `--drop-chain-db` | `--drop-ipfs-db` in Development -- During development, you can use the `--drop-db` flag to ensure the indexer always deletes the existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. +- During development, you can use the `--drop-db` flag to ensure the indexer always deletes all existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. + +- During development, you can use the `--drop-chain-db` flag to ensure the indexer always deletes chain schema and migrates from scratch. + +- During development, you can use the `--drop-ipfs-db` flag to ensure the indexer always deletes ipfs schema and migrates from scratch. ### Important Notes diff --git a/src/config.ts b/src/config.ts index bc4c9605..db3451de 100644 --- a/src/config.ts +++ b/src/config.ts @@ -21,6 +21,7 @@ type CoingeckoSupportedChainId = | 1088; const CHAIN_DATA_VERSION = "81"; +const IPFS_DATA_VERSION = "1"; export type Token = { code: string; @@ -1829,11 +1830,15 @@ export type Config = { readOnlyDatabaseUrl: string; dataVersion: string; databaseSchemaName: string; + ipfsDataVersion: string; + ipfsDatabaseSchemaName: string; hostname: string; pinoPretty: boolean; deploymentEnvironment: "local" | "development" | "staging" | "production"; enableResourceMonitor: boolean; dropDb: boolean; + dropChainDb: boolean; + dropIpfsDb: boolean; removeCache: boolean; estimatesLinearQfWorkerPoolSize: number | null; }; @@ -2002,7 +2007,12 @@ export function getConfig(): Config { const dataVersion = CHAIN_DATA_VERSION; const databaseSchemaName = `chain_data_${dataVersion}`; + const ipfsDataVersion = IPFS_DATA_VERSION; + const ipfsDatabaseSchemaName = `ipfs_data_${ipfsDataVersion}`; + const dropDb = z.boolean().default(false).parse(args["drop-db"]); + const dropChainDb = z.boolean().default(false).parse(args["drop-chain-db"]); + const dropIpfsDb = z.boolean().default(false).parse(args["drop-ipfs-db"]); const removeCache = z.boolean().default(false).parse(args["rm-cache"]); @@ -2051,9 +2061,13 @@ export function getConfig(): Config { databaseUrl, readOnlyDatabaseUrl, dropDb, + dropChainDb, + dropIpfsDb, removeCache, dataVersion, databaseSchemaName, + ipfsDataVersion, + ipfsDatabaseSchemaName, httpServerWaitForSync, httpServerEnabled, indexerEnabled, diff --git a/src/database/changeset.ts b/src/database/changeset.ts index 5eb7f12e..c53d2a14 100644 --- a/src/database/changeset.ts +++ b/src/database/changeset.ts @@ -16,6 +16,7 @@ import { NewPrice, NewLegacyProject, NewApplicationPayout, + NewIpfsData, } from "./schema.js"; export type DataChange = @@ -140,4 +141,8 @@ export type DataChange = | { type: "InsertApplicationPayout"; payout: NewApplicationPayout; + } + | { + type: "InsertIpfsData"; + ipfs: NewIpfsData; }; diff --git a/src/database/index.ts b/src/database/index.ts index 2b749ff2..68a5c161 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -14,8 +14,9 @@ import { NewDonation, LegacyProjectTable, ApplicationPayout, + IpfsDataTable, } from "./schema.js"; -import { migrate } from "./migrate.js"; +import { migrate, migrateDataFetcher } from "./migrate.js"; import { encodeJsonWithBigInts } from "../utils/index.js"; import type { DataChange } from "./changeset.js"; import { Logger } from "pino"; @@ -37,6 +38,7 @@ interface Tables { prices: PriceTable; legacyProjects: LegacyProjectTable; applicationsPayouts: ApplicationPayout; + ipfsData: IpfsDataTable; } type KyselyDb = Kysely; @@ -53,13 +55,15 @@ export class Database { #statsTimeout: ReturnType | null = null; #logger: Logger; - readonly databaseSchemaName: string; + readonly chainDataSchemaName: string; + readonly ipfsDataSchemaName: string; constructor(options: { statsUpdaterEnabled: boolean; logger: Logger; connectionPool: Pool; - schemaName: string; + chainDataSchemaName: string; + ipfsDataSchemaName: string; }) { const dialect = new PostgresDialect({ pool: options.connectionPool, @@ -72,10 +76,11 @@ export class Database { plugins: [new CamelCasePlugin()], }); - this.#db = this.#db.withSchema(options.schemaName); + // Initialize schema names + this.chainDataSchemaName = options.chainDataSchemaName; + this.ipfsDataSchemaName = options.ipfsDataSchemaName; this.#logger = options.logger; - this.databaseSchemaName = options.schemaName; this.scheduleDonationQueueFlush(); @@ -87,21 +92,40 @@ export class Database { async acquireWriteLock() { const client = await this.#connectionPool.connect(); - // generate lock id based on schema - const lockId = this.databaseSchemaName.split("").reduce((acc, char) => { - return acc + char.charCodeAt(0); - }, 0); + // Helper function to generate lock ID based on schema name + const generateLockId = (schemaName: string): number => { + return schemaName.split("").reduce((acc, char) => { + return acc + char.charCodeAt(0); + }, 0); + }; - try { + // Helper function to acquire a lock for a specific schema + const acquireLockForSchema = async (lockId: number) => { const result = await client.query( `SELECT pg_try_advisory_lock(${lockId}) as lock` ); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - if (result.rows[0].lock === true) { + return result.rows[0].lock === true; + }; + + // Helper function to release a lock for a specific schema + const releaseLockForSchema = async (lockId: number) => { + await client.query(`SELECT pg_advisory_unlock(${lockId})`); + }; + + // Acquire locks for both schemas + const chainDataLockId = generateLockId(this.chainDataSchemaName); + const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); + + try { + const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); + const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + + if (chainDataLockAcquired && ipfsDataLockAcquired) { return { release: async () => { - await client.query(`SELECT pg_advisory_unlock(${lockId})`); + await releaseLockForSchema(chainDataLockId); + await releaseLockForSchema(ipfsDataLockId); client.release(); }, client, @@ -132,12 +156,12 @@ export class Database { } private async updateStats() { - const donationsTableRef = `"${this.databaseSchemaName}"."donations"`; + const donationsTableRef = `"${this.chainDataSchemaName}"."donations"`; await sql .raw( ` - UPDATE "${this.databaseSchemaName}"."rounds" AS r + UPDATE "${this.chainDataSchemaName}"."rounds" AS r SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -160,7 +184,7 @@ export class Database { await sql .raw( ` - UPDATE "${this.databaseSchemaName}"."applications" AS a + UPDATE "${this.chainDataSchemaName}"."applications" AS a SET total_amount_donated_in_usd = d.total_amount, total_donations_count = d.donation_count, @@ -223,38 +247,71 @@ export class Database { } } - async dropSchemaIfExists() { + async dropChainDataSchemaIfExists() { await this.#db.schema - .dropSchema(this.databaseSchemaName) + .withSchema(this.chainDataSchemaName) + .dropSchema(this.chainDataSchemaName) .ifExists() .cascade() .execute(); } - async createSchemaIfNotExists(logger: Logger) { + async dropIpfsDataSchemaIfExists() { + await this.#db.schema + .withSchema(this.ipfsDataSchemaName) + .dropSchema(this.ipfsDataSchemaName) + .ifExists() + .cascade() + .execute(); + } + + async dropAllSchemaIfExists() { + await this.dropChainDataSchemaIfExists(); + await this.dropIpfsDataSchemaIfExists(); + } + + async createSchemaIfNotExists( + schemaName: string, + migrateFn: (tx: any, schemaName: string) => Promise, + logger: Logger + ) { const exists = await sql<{ exists: boolean }>` - SELECT EXISTS ( - SELECT 1 FROM information_schema.schemata - WHERE schema_name = ${this.databaseSchemaName} - )`.execute(this.#db); + SELECT EXISTS ( + SELECT 1 FROM information_schema.schemata + WHERE schema_name = ${schemaName} + )`.execute(this.#db.withSchema(schemaName)); if (exists.rows.length > 0 && exists.rows[0].exists) { logger.info({ - msg: `schema "${this.databaseSchemaName}" exists, skipping creation`, + msg: `schema "${schemaName}" exists, skipping creation`, }); - return; } logger.info({ - msg: `schema "${this.databaseSchemaName}" does not exist, creating schema`, + msg: `schema "${schemaName}" does not exist, creating schema`, }); - await this.#db.transaction().execute(async (tx) => { - await tx.schema.createSchema(this.databaseSchemaName).execute(); + await this.#db + .withSchema(schemaName) + .transaction() + .execute(async (tx) => { + await tx.schema.createSchema(schemaName).execute(); + await migrateFn(tx, schemaName); + }); + } - await migrate(tx, this.databaseSchemaName); - }); + async createAllSchemas(logger: Logger) { + await this.createSchemaIfNotExists( + this.chainDataSchemaName, + migrate, + logger + ); + await this.createSchemaIfNotExists( + this.ipfsDataSchemaName, + migrateDataFetcher, + logger + ); } async applyChanges(changes: DataChange[]): Promise { @@ -267,6 +324,7 @@ export class Database { switch (change.type) { case "InsertPendingProjectRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("pendingProjectRoles") .values(change.pendingProjectRole) .execute(); @@ -275,6 +333,7 @@ export class Database { case "DeletePendingProjectRoles": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("pendingProjectRoles") .where("id", "in", change.ids) .execute(); @@ -283,6 +342,7 @@ export class Database { case "InsertPendingRoundRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("pendingRoundRoles") .values(change.pendingRoundRole) .execute(); @@ -291,6 +351,7 @@ export class Database { case "DeletePendingRoundRoles": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("pendingRoundRoles") .where("id", "in", change.ids) .execute(); @@ -298,12 +359,17 @@ export class Database { } case "InsertProject": { - await this.#db.insertInto("projects").values(change.project).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("projects") + .values(change.project) + .execute(); break; } case "UpdateProject": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("projects") .set(change.project) .where("id", "=", change.projectId) @@ -314,6 +380,7 @@ export class Database { case "InsertProjectRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("projectRoles") .values(change.projectRole) .execute(); @@ -322,6 +389,7 @@ export class Database { case "DeleteAllProjectRolesByRole": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -332,6 +400,7 @@ export class Database { case "DeleteAllProjectRolesByRoleAndAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("projectRoles") .where("chainId", "=", change.projectRole.chainId) .where("projectId", "=", change.projectRole.projectId) @@ -342,12 +411,17 @@ export class Database { } case "InsertRound": { - await this.#db.insertInto("rounds").values(change.round).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("rounds") + .values(change.round) + .execute(); break; } case "UpdateRound": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -358,6 +432,7 @@ export class Database { case "IncrementRoundFundedAmount": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ fundedAmount: eb("fundedAmount", "+", change.fundedAmount), @@ -375,6 +450,7 @@ export class Database { case "UpdateRoundByStrategyAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set(change.round) .where("chainId", "=", change.chainId) @@ -385,6 +461,7 @@ export class Database { case "InsertRoundRole": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("roundRoles") .values(change.roundRole) .execute(); @@ -393,6 +470,7 @@ export class Database { case "DeleteAllRoundRolesByRoleAndAddress": { await this.#db + .withSchema(this.chainDataSchemaName) .deleteFrom("roundRoles") .where("chainId", "=", change.roundRole.chainId) .where("roundId", "=", change.roundRole.roundId) @@ -411,7 +489,11 @@ export class Database { }; } - await this.#db.insertInto("applications").values(application).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("applications") + .values(application) + .execute(); break; } @@ -425,6 +507,7 @@ export class Database { } await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("applications") .set(application) .where("chainId", "=", change.chainId) @@ -441,6 +524,7 @@ export class Database { case "InsertManyDonations": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("donations") .values(change.donations) .onConflict((c) => c.column("id").doNothing()) @@ -449,12 +533,17 @@ export class Database { } case "InsertManyPrices": { - await this.#db.insertInto("prices").values(change.prices).execute(); + await this.#db + .withSchema(this.chainDataSchemaName) + .insertInto("prices") + .values(change.prices) + .execute(); break; } case "IncrementRoundDonationStats": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -472,6 +561,7 @@ export class Database { case "IncrementRoundTotalDistributed": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("rounds") .set((eb) => ({ totalDistributed: eb("totalDistributed", "+", change.amount), @@ -484,6 +574,7 @@ export class Database { case "IncrementApplicationDonationStats": { await this.#db + .withSchema(this.chainDataSchemaName) .updateTable("applications") .set((eb) => ({ totalAmountDonatedInUsd: eb( @@ -502,6 +593,7 @@ export class Database { case "NewLegacyProject": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("legacyProjects") .values(change.legacyProject) .execute(); @@ -510,12 +602,22 @@ export class Database { case "InsertApplicationPayout": { await this.#db + .withSchema(this.chainDataSchemaName) .insertInto("applicationsPayouts") .values(change.payout) .execute(); break; } + case "InsertIpfsData": { + await this.#db + .withSchema(this.ipfsDataSchemaName) + .insertInto("ipfsData") + .values(change.ipfs) + .execute(); + break; + } + default: throw new Error(`Unknown changeset type`); } @@ -523,6 +625,7 @@ export class Database { async getPendingProjectRolesByRole(chainId: ChainId, role: string) { const pendingProjectRole = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("pendingProjectRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -534,6 +637,7 @@ export class Database { async getPendingRoundRolesByRole(chainId: ChainId, role: string) { const pendingRoundRole = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("pendingRoundRoles") .where("chainId", "=", chainId) .where("role", "=", role) @@ -545,6 +649,7 @@ export class Database { async getProjectById(chainId: ChainId, projectId: string) { const project = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("id", "=", projectId) @@ -556,6 +661,7 @@ export class Database { async getProjectByAnchor(chainId: ChainId, anchorAddress: Address) { const project = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .where("anchorAddress", "=", anchorAddress) @@ -567,6 +673,7 @@ export class Database { async getRoundById(chainId: ChainId, roundId: string) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -578,6 +685,7 @@ export class Database { async getRoundByStrategyAddress(chainId: ChainId, strategyAddress: Address) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("strategyAddress", "=", strategyAddress) @@ -593,6 +701,7 @@ export class Database { roleValue: string ) { const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where(`${roleName}Role`, "=", roleValue) @@ -615,6 +724,7 @@ export class Database { } const round = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .where("id", "=", roundId) @@ -631,6 +741,7 @@ export class Database { async getAllChainRounds(chainId: ChainId) { const rounds = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("rounds") .where("chainId", "=", chainId) .selectAll() @@ -641,6 +752,7 @@ export class Database { async getAllRoundApplications(chainId: ChainId, roundId: string) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -650,6 +762,7 @@ export class Database { async getAllRoundDonations(chainId: ChainId, roundId: string) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -663,6 +776,7 @@ export class Database { applicationId: string ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -679,6 +793,7 @@ export class Database { projectId: string ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -695,6 +810,7 @@ export class Database { anchorAddress: Address ) { const application = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("applications") .where("chainId", "=", chainId) .where("roundId", "=", roundId) @@ -707,6 +823,7 @@ export class Database { async getLatestPriceTimestampForChain(chainId: ChainId) { const latestPriceTimestamp = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("timestamp", "desc") @@ -723,6 +840,7 @@ export class Database { blockNumber: bigint | "latest" ) { let priceQuery = this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .where("tokenAddress", "=", tokenAddress) @@ -741,6 +859,7 @@ export class Database { async getAllChainPrices(chainId: ChainId) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("blockNumber", "asc") @@ -750,6 +869,7 @@ export class Database { async getAllChainProjects(chainId: ChainId) { return await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("projects") .where("chainId", "=", chainId) .selectAll() @@ -761,6 +881,7 @@ export class Database { donorAddress: Address ) { const donations = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("donations") .where("donations.donorAddress", "=", donorAddress) .where("donations.chainId", "=", chainId) @@ -784,6 +905,7 @@ export class Database { async getV2ProjectIdByV1ProjectId(v1ProjectId: string) { const result = await this.#db + .withSchema(this.chainDataSchemaName) .selectFrom("legacyProjects") .where("v1ProjectId", "=", v1ProjectId) .select("v2ProjectId") @@ -791,4 +913,15 @@ export class Database { return result ?? null; } + + async getDataByCid(cId: string) { + const metadata = await this.#db + .withSchema(this.ipfsDataSchemaName) + .selectFrom("ipfsData") + .where("cid", "=", cId) + .selectAll() + .executeTakeFirst(); + + return metadata ?? null; + } } diff --git a/src/database/migrate.ts b/src/database/migrate.ts index c473d710..c59d6bd2 100644 --- a/src/database/migrate.ts +++ b/src/database/migrate.ts @@ -392,3 +392,13 @@ export async function migrate(db: Kysely, schemaName: string) { $$ language sql stable; `.execute(db); } + +export async function migrateDataFetcher(db: Kysely, schemaName: string) { + const schema = db.withSchema(schemaName).schema; + + await schema + .createTable("ipfs_data") + .addColumn("cid", "text") + .addColumn("data", "jsonb") + .execute(); +} diff --git a/src/database/schema.ts b/src/database/schema.ts index 06b9a906..8e50c133 100644 --- a/src/database/schema.ts +++ b/src/database/schema.ts @@ -125,6 +125,11 @@ export type ProjectTable = { projectType: ProjectType; }; +export type IpfsDataTable = { + cid: string; + data: unknown; +}; + export type Project = Selectable; export type NewProject = Insertable; export type PartialProject = Updateable; @@ -253,3 +258,8 @@ export type ApplicationPayout = { }; export type NewApplicationPayout = Insertable; + +export type NewIpfsData = { + cid: string; + data: unknown; +}; diff --git a/src/http/api/v1/status.ts b/src/http/api/v1/status.ts index e4ad3c08..be6910b1 100644 --- a/src/http/api/v1/status.ts +++ b/src/http/api/v1/status.ts @@ -9,7 +9,8 @@ export const createHandler = (config: HttpApiConfig): express.Router => { res.json({ hostname: config.hostname, buildTag: config.buildTag, - databaseSchema: config.db.databaseSchemaName, + chainDataSchemaName: config.db.chainDataSchemaName, + ipfsDataSchema: config.db.ipfsDataSchemaName, }); }); diff --git a/src/index.ts b/src/index.ts index 67d1f3c5..205c9923 100644 --- a/src/index.ts +++ b/src/index.ts @@ -153,7 +153,8 @@ async function main(): Promise { logger: baseLogger.child({ subsystem: "Database" }), statsUpdaterEnabled: config.indexerEnabled, connectionPool: databaseConnectionPool, - schemaName: config.databaseSchemaName, + chainDataSchemaName: config.databaseSchemaName, + ipfsDataSchemaName: config.ipfsDatabaseSchemaName, }); baseLogger.info({ @@ -246,11 +247,17 @@ async function main(): Promise { if (isFirstRun) { if (config.dropDb) { - baseLogger.info("dropping schema"); - await db.dropSchemaIfExists(); + baseLogger.info("dropping all schemas"); + await db.dropAllSchemaIfExists(); + } else if (config.dropChainDb) { + baseLogger.info("resetting chain data schema"); + await db.dropChainDataSchemaIfExists(); + } else if (config.dropIpfsDb) { + baseLogger.info("resetting ipfs data schema"); + await db.dropIpfsDataSchemaIfExists(); } - await db.createSchemaIfNotExists(baseLogger); + await db.createAllSchemas(baseLogger); await subscriptionStore.init(); } @@ -467,6 +474,13 @@ async function catchupAndWatchChain( return undefined; } + // Check if data is already in the IPFS database + const ipfsData = await db.getDataByCid(cid); + if (ipfsData) { + chainLogger.info(`Found IPFS data in database for CID: ${cid}`); + return Promise.resolve(ipfsData.data as string as T); + } + // Fetch from a single IPFS gateway const fetchFromGateway = async (url: string): Promise => { try { @@ -512,6 +526,22 @@ async function catchupAndWatchChain( chainLogger.info( `Fetch successful from gateway: ${gateway} for CID: ${cid}` ); + + // Save to IpfsData table + try { + await db.applyChange({ + type: "InsertIpfsData", + ipfs: { + cid, + data: result, // TODO: check is JSON.parse is needed + }, + }); + } catch (err) { + chainLogger.error( + `Error saving IPFS data to database: ${String(err)}` + ); + } + return result; // Return the result if fetched successfully } else { chainLogger.warn( From eaaaad43569fd88fb838d1e66e90c588434f9ee4 Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Fri, 6 Sep 2024 12:26:15 +0530 Subject: [PATCH 03/13] chore: move prices to it's own schema --- docs/reindexing.md | 4 +++- src/config.ts | 11 +++++++++++ src/database/index.ts | 39 ++++++++++++++++++++++++++++++++------- src/database/migrate.ts | 39 +++++++++++++++++++++++---------------- src/http/api/v1/status.ts | 1 + src/index.ts | 10 +++++++++- 6 files changed, 79 insertions(+), 25 deletions(-) diff --git a/docs/reindexing.md b/docs/reindexing.md index 5f114bcc..8ea7689e 100644 --- a/docs/reindexing.md +++ b/docs/reindexing.md @@ -12,7 +12,7 @@ When deploying changes to the indexer, it's important to clarify the results you - The indexer will create a new schema in Postgres named `chain_data_${version}`. If this schema does not exist, it will be created, all necessary tables will be set up, and indexing will start from scratch. - If the schema already exists, the indexer will resume indexing from the last indexed block unless the `--drop-db` flag is specified via the CLI. This will drop the existing database and start fresh. -### Using `--drop-db` | `--drop-chain-db` | `--drop-ipfs-db` in Development +### Dropping Schemas in Development - During development, you can use the `--drop-db` flag to ensure the indexer always deletes all existing schema and migrates from scratch. This can be useful for testing schema changes and event handler modifications without retaining old data. @@ -20,6 +20,8 @@ When deploying changes to the indexer, it's important to clarify the results you - During development, you can use the `--drop-ipfs-db` flag to ensure the indexer always deletes ipfs schema and migrates from scratch. +- During development, you can use the `--drop-price-db` flag to ensure the indexer always deletes price schema and migrates from scratch. + ### Important Notes - **Reindexing Time**: Deployments involving reindexing will take significantly longer. Plan accordingly to minimize downtime or performance impact. diff --git a/src/config.ts b/src/config.ts index db3451de..9811cf1f 100644 --- a/src/config.ts +++ b/src/config.ts @@ -22,6 +22,7 @@ type CoingeckoSupportedChainId = const CHAIN_DATA_VERSION = "81"; const IPFS_DATA_VERSION = "1"; +const PRICE_DATA_VERSION = "1"; export type Token = { code: string; @@ -1832,6 +1833,8 @@ export type Config = { databaseSchemaName: string; ipfsDataVersion: string; ipfsDatabaseSchemaName: string; + priceDataVersion: string; + priceDatabaseSchemaName: string; hostname: string; pinoPretty: boolean; deploymentEnvironment: "local" | "development" | "staging" | "production"; @@ -1839,6 +1842,7 @@ export type Config = { dropDb: boolean; dropChainDb: boolean; dropIpfsDb: boolean; + dropPriceDb: boolean; removeCache: boolean; estimatesLinearQfWorkerPoolSize: number | null; }; @@ -2010,9 +2014,13 @@ export function getConfig(): Config { const ipfsDataVersion = IPFS_DATA_VERSION; const ipfsDatabaseSchemaName = `ipfs_data_${ipfsDataVersion}`; + const priceDataVersion = PRICE_DATA_VERSION; + const priceDatabaseSchemaName = `price_data_${priceDataVersion}`; + const dropDb = z.boolean().default(false).parse(args["drop-db"]); const dropChainDb = z.boolean().default(false).parse(args["drop-chain-db"]); const dropIpfsDb = z.boolean().default(false).parse(args["drop-ipfs-db"]); + const dropPriceDb = z.boolean().default(false).parse(args["drop-price-db"]); const removeCache = z.boolean().default(false).parse(args["rm-cache"]); @@ -2063,11 +2071,14 @@ export function getConfig(): Config { dropDb, dropChainDb, dropIpfsDb, + dropPriceDb, removeCache, dataVersion, databaseSchemaName, ipfsDataVersion, ipfsDatabaseSchemaName, + priceDataVersion, + priceDatabaseSchemaName, httpServerWaitForSync, httpServerEnabled, indexerEnabled, diff --git a/src/database/index.ts b/src/database/index.ts index 68a5c161..58e33d97 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -16,7 +16,7 @@ import { ApplicationPayout, IpfsDataTable, } from "./schema.js"; -import { migrate, migrateDataFetcher } from "./migrate.js"; +import { migrate, migrateDataFetcher, migratePriceFetcher } from "./migrate.js"; import { encodeJsonWithBigInts } from "../utils/index.js"; import type { DataChange } from "./changeset.js"; import { Logger } from "pino"; @@ -57,6 +57,7 @@ export class Database { readonly chainDataSchemaName: string; readonly ipfsDataSchemaName: string; + readonly priceDataSchemaName: string; constructor(options: { statsUpdaterEnabled: boolean; @@ -64,6 +65,7 @@ export class Database { connectionPool: Pool; chainDataSchemaName: string; ipfsDataSchemaName: string; + priceDataSchemaName: string; }) { const dialect = new PostgresDialect({ pool: options.connectionPool, @@ -79,6 +81,7 @@ export class Database { // Initialize schema names this.chainDataSchemaName = options.chainDataSchemaName; this.ipfsDataSchemaName = options.ipfsDataSchemaName; + this.priceDataSchemaName = options.priceDataSchemaName; this.#logger = options.logger; @@ -113,19 +116,26 @@ export class Database { await client.query(`SELECT pg_advisory_unlock(${lockId})`); }; - // Acquire locks for both schemas + // Acquire locks for all schemas const chainDataLockId = generateLockId(this.chainDataSchemaName); const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); + const priceDataLockId = generateLockId(this.priceDataSchemaName); try { const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); - if (chainDataLockAcquired && ipfsDataLockAcquired) { + if ( + chainDataLockAcquired && + ipfsDataLockAcquired && + priceDataLockAcquired + ) { return { release: async () => { await releaseLockForSchema(chainDataLockId); await releaseLockForSchema(ipfsDataLockId); + await releaseLockForSchema(priceDataLockId); client.release(); }, client, @@ -265,9 +275,19 @@ export class Database { .execute(); } + async dropPriceDataSchemaIfExists() { + await this.#db.schema + .withSchema(this.priceDataSchemaName) + .dropSchema(this.priceDataSchemaName) + .ifExists() + .cascade() + .execute(); + } + async dropAllSchemaIfExists() { await this.dropChainDataSchemaIfExists(); await this.dropIpfsDataSchemaIfExists(); + await this.dropPriceDataSchemaIfExists(); } async createSchemaIfNotExists( @@ -312,6 +332,11 @@ export class Database { migrateDataFetcher, logger ); + await this.createSchemaIfNotExists( + this.priceDataSchemaName, + migratePriceFetcher, + logger + ); } async applyChanges(changes: DataChange[]): Promise { @@ -534,7 +559,7 @@ export class Database { case "InsertManyPrices": { await this.#db - .withSchema(this.chainDataSchemaName) + .withSchema(this.priceDataSchemaName) .insertInto("prices") .values(change.prices) .execute(); @@ -823,7 +848,7 @@ export class Database { async getLatestPriceTimestampForChain(chainId: ChainId) { const latestPriceTimestamp = await this.#db - .withSchema(this.chainDataSchemaName) + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("timestamp", "desc") @@ -840,7 +865,7 @@ export class Database { blockNumber: bigint | "latest" ) { let priceQuery = this.#db - .withSchema(this.chainDataSchemaName) + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .where("tokenAddress", "=", tokenAddress) @@ -859,7 +884,7 @@ export class Database { async getAllChainPrices(chainId: ChainId) { return await this.#db - .withSchema(this.chainDataSchemaName) + .withSchema(this.priceDataSchemaName) .selectFrom("prices") .where("chainId", "=", chainId) .orderBy("blockNumber", "asc") diff --git a/src/database/migrate.ts b/src/database/migrate.ts index c59d6bd2..bedfb571 100644 --- a/src/database/migrate.ts +++ b/src/database/migrate.ts @@ -297,22 +297,6 @@ export async function migrate(db: Kysely, schemaName: string) { .columns(["chainId", "roundId", "applicationId"]) .execute(); - await schema - .createTable("prices") - .addColumn("id", "serial", (cb) => cb.primaryKey()) - .addColumn("chainId", CHAIN_ID_TYPE) - .addColumn("tokenAddress", ADDRESS_TYPE) - .addColumn("priceInUSD", "real") - .addColumn("timestamp", "timestamptz") - .addColumn("blockNumber", BIGINT_TYPE) - .execute(); - - await db.schema - .createIndex("idx_prices_chain_token_block") - .on("prices") - .expression(sql`chain_id, token_address, block_number DESC`) - .execute(); - await schema .createTable("legacy_projects") .addColumn("id", "serial", (col) => col.primaryKey()) @@ -402,3 +386,26 @@ export async function migrateDataFetcher(db: Kysely, schemaName: string) { .addColumn("data", "jsonb") .execute(); } + +export async function migratePriceFetcher( + db: Kysely, + schemaName: string +) { + const schema = db.withSchema(schemaName).schema; + + await schema + .createTable("prices") + .addColumn("id", "serial", (cb) => cb.primaryKey()) + .addColumn("chainId", CHAIN_ID_TYPE) + .addColumn("tokenAddress", ADDRESS_TYPE) + .addColumn("priceInUSD", "real") + .addColumn("timestamp", "timestamptz") + .addColumn("blockNumber", BIGINT_TYPE) + .execute(); + + await db.schema + .createIndex("idx_prices_chain_token_block") + .on("prices") + .expression(sql`chain_id, token_address, block_number DESC`) + .execute(); +} diff --git a/src/http/api/v1/status.ts b/src/http/api/v1/status.ts index be6910b1..f29c33a9 100644 --- a/src/http/api/v1/status.ts +++ b/src/http/api/v1/status.ts @@ -11,6 +11,7 @@ export const createHandler = (config: HttpApiConfig): express.Router => { buildTag: config.buildTag, chainDataSchemaName: config.db.chainDataSchemaName, ipfsDataSchema: config.db.ipfsDataSchemaName, + priceDataSchema: config.db.priceDataSchemaName, }); }); diff --git a/src/index.ts b/src/index.ts index 205c9923..d57fa9d5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -155,6 +155,7 @@ async function main(): Promise { connectionPool: databaseConnectionPool, chainDataSchemaName: config.databaseSchemaName, ipfsDataSchemaName: config.ipfsDatabaseSchemaName, + priceDataSchemaName: config.priceDatabaseSchemaName, }); baseLogger.info({ @@ -255,6 +256,9 @@ async function main(): Promise { } else if (config.dropIpfsDb) { baseLogger.info("resetting ipfs data schema"); await db.dropIpfsDataSchemaIfExists(); + } else if (config.dropPriceDb) { + baseLogger.info("resetting price data schema"); + await db.dropPriceDataSchemaIfExists(); } await db.createAllSchemas(baseLogger); @@ -333,7 +337,11 @@ async function main(): Promise { const graphqlHandler = postgraphile( readOnlyDatabaseConnectionPool, - config.databaseSchemaName, + [ + config.databaseSchemaName, + config.ipfsDatabaseSchemaName, + config.priceDatabaseSchemaName, + ], { watchPg: false, graphqlRoute: "/graphql", From d3747eec295549b4d7be922bc087b94ffa307a38 Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Mon, 9 Sep 2024 14:57:25 +0530 Subject: [PATCH 04/13] Refactor lock acquisition and release logic (#661) * update lock logic * forceRelease of schema --- src/database/index.ts | 63 +++++++++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/src/database/index.ts b/src/database/index.ts index 58e33d97..0226366d 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -116,36 +116,59 @@ export class Database { await client.query(`SELECT pg_advisory_unlock(${lockId})`); }; + // Helper function to force release a lock for a specific schema + const forceReleaseLockForSchema = async (lockId: number) => { + await client.query(` + SELECT pg_terminate_backend(pid) + FROM pg_locks + WHERE locktype = 'advisory' + AND objid = ${lockId} + AND pid != pg_backend_pid() + `); + }; + // Acquire locks for all schemas const chainDataLockId = generateLockId(this.chainDataSchemaName); const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); const priceDataLockId = generateLockId(this.priceDataSchemaName); + // Lock acquisition status + let chainDataLockAcquired = false; + let ipfsDataLockAcquired = false; + let priceDataLockAcquired = false; + try { - const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); - const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); - const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); - - if ( - chainDataLockAcquired && - ipfsDataLockAcquired && - priceDataLockAcquired - ) { - return { - release: async () => { + chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); + ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); + + return { + release: async () => { + if (chainDataLockAcquired) { await releaseLockForSchema(chainDataLockId); - await releaseLockForSchema(ipfsDataLockId); - await releaseLockForSchema(priceDataLockId); - client.release(); - }, - client, - }; - } + } + if (ipfsDataLockAcquired) { + await forceReleaseLockForSchema(ipfsDataLockId); + // await releaseLockForSchema(ipfsDataLockId); + } + if (priceDataLockAcquired) { + await forceReleaseLockForSchema(priceDataLockId); + // await releaseLockForSchema(priceDataLockId); + } + client.release(); + }, + client, + }; } catch (error) { this.#logger.error({ error }, "Failed to acquire write lock"); - } + } finally { + // Ensure any acquired locks are released if they were not all acquired + if (chainDataLockAcquired) await releaseLockForSchema(chainDataLockId); + if (ipfsDataLockAcquired) await releaseLockForSchema(ipfsDataLockId); + if (priceDataLockAcquired) await releaseLockForSchema(priceDataLockId); - client.release(); + client.release(); + } return null; } From 1fb850209f350fe7b810feae8e260d25ddd99682 Mon Sep 17 00:00:00 2001 From: Kurt Date: Mon, 9 Sep 2024 13:06:47 +0200 Subject: [PATCH 05/13] index single chain (#657) * update schema name * updates * fix * Revert "Refactor lock acquisition and release logic (#661)" This reverts commit 9a2ffe10209a4f93c37c00aef4f258e5a7b046fc. --- .env.example | 3 +- src/config.ts | 2 +- src/database/index.ts | 150 ++++++++++++++++++++++++++++------------ src/http/app.ts | 157 ++++++++++++++++++++++++++++++++++++++++++ src/index.ts | 1 + 5 files changed, 268 insertions(+), 45 deletions(-) diff --git a/.env.example b/.env.example index 397a0336..743de715 100644 --- a/.env.example +++ b/.env.example @@ -42,7 +42,8 @@ DATABASE_URL=postgres://postgres:postgres@localhost:5432/grants_stack_indexer # METIS_ANDROMEDA_RPC_URL #COINGECKO_API_KEY= -#IPFS_GATEWAY= +#IPFS_GATEWAYs=[] +#WHITELISTED_ADDRESSES=["0x123..","0x456.."] # optional, enable the Postgraphile Pro plugin: https://www.npmjs.com/package/@graphile/pro #GRAPHILE_LICENSE diff --git a/src/config.ts b/src/config.ts index 9811cf1f..862f9e64 100644 --- a/src/config.ts +++ b/src/config.ts @@ -20,7 +20,7 @@ type CoingeckoSupportedChainId = | 42220 | 1088; -const CHAIN_DATA_VERSION = "81"; +const CHAIN_DATA_VERSION = "83"; const IPFS_DATA_VERSION = "1"; const PRICE_DATA_VERSION = "1"; diff --git a/src/database/index.ts b/src/database/index.ts index 0226366d..2a37240f 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -116,60 +116,37 @@ export class Database { await client.query(`SELECT pg_advisory_unlock(${lockId})`); }; - // Helper function to force release a lock for a specific schema - const forceReleaseLockForSchema = async (lockId: number) => { - await client.query(` - SELECT pg_terminate_backend(pid) - FROM pg_locks - WHERE locktype = 'advisory' - AND objid = ${lockId} - AND pid != pg_backend_pid() - `); - }; - // Acquire locks for all schemas const chainDataLockId = generateLockId(this.chainDataSchemaName); const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); const priceDataLockId = generateLockId(this.priceDataSchemaName); - // Lock acquisition status - let chainDataLockAcquired = false; - let ipfsDataLockAcquired = false; - let priceDataLockAcquired = false; - try { - chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); - ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); - priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); - - return { - release: async () => { - if (chainDataLockAcquired) { + const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); + const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); + + if ( + chainDataLockAcquired && + ipfsDataLockAcquired && + priceDataLockAcquired + ) { + return { + release: async () => { await releaseLockForSchema(chainDataLockId); - } - if (ipfsDataLockAcquired) { - await forceReleaseLockForSchema(ipfsDataLockId); - // await releaseLockForSchema(ipfsDataLockId); - } - if (priceDataLockAcquired) { - await forceReleaseLockForSchema(priceDataLockId); - // await releaseLockForSchema(priceDataLockId); - } - client.release(); - }, - client, - }; + await releaseLockForSchema(ipfsDataLockId); + await releaseLockForSchema(priceDataLockId); + client.release(); + }, + client, + }; + } } catch (error) { this.#logger.error({ error }, "Failed to acquire write lock"); - } finally { - // Ensure any acquired locks are released if they were not all acquired - if (chainDataLockAcquired) await releaseLockForSchema(chainDataLockId); - if (ipfsDataLockAcquired) await releaseLockForSchema(ipfsDataLockId); - if (priceDataLockAcquired) await releaseLockForSchema(priceDataLockId); - - client.release(); } + client.release(); + return null; } @@ -962,6 +939,93 @@ export class Database { return result ?? null; } + async deleteChainData(chainId: ChainId) { + this.#logger.info("Deleting chain data for chainId:", chainId); + + await this.#db.transaction().execute(async (trx) => { + this.#logger.info("Deleting pending round roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("pendingRoundRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting round roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("roundRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting pending project roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("pendingProjectRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting project roles"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("projectRoles") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("applications") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting applications donations"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("donations") + .where("chainId", "=", chainId) + .execute(); + + // this.#logger.info("Deleting donation prices"); + // await trx + // .withSchema(this.priceDataSchemaName) + // .deleteFrom("prices") + // .where("chainId", "=", chainId) + // .execute(); + + this.#logger.info("Deleting applications"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("applications") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting rounds"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("rounds") + .where("chainId", "=", chainId) + .execute(); + + this.#logger.info("Deleting projects"); + await trx + .withSchema(this.chainDataSchemaName) + .deleteFrom("projects") + .where("chainId", "=", chainId) + .execute(); + }); + + this.#logger.info("Updating subscriptions indexed_to_block"); + const sqlQuery = ` + UPDATE ${this.chainDataSchemaName}.subscriptions + SET indexed_to_block = 0::bigint + WHERE chain_id = ${chainId} + `; + + await sql.raw(sqlQuery).execute(this.#db); + + this.#logger.info("Deleted chain data for chainId:", chainId); + } + async getDataByCid(cId: string) { const metadata = await this.#db .withSchema(this.ipfsDataSchemaName) diff --git a/src/http/app.ts b/src/http/app.ts index d5ef33ea..602f7fc7 100644 --- a/src/http/app.ts +++ b/src/http/app.ts @@ -13,6 +13,8 @@ import { PassportProvider } from "../passport/index.js"; import { DataProvider } from "../calculator/dataProvider/index.js"; import { Chain } from "../config.js"; import { Database } from "../database/index.js"; +import { Indexer } from "chainsauce"; +import { recoverMessageAddress } from "viem"; type AsyncRequestHandler = ( req: express.Request, @@ -38,6 +40,7 @@ export interface HttpApiConfig { | { type: "in-thread" } | { type: "worker-pool"; workerPoolSize: number }; }; + indexedChains?: Indexer[] | null; } interface HttpApi { @@ -100,6 +103,109 @@ export const createHttpApi = (config: HttpApiConfig): HttpApi => { res.send(config.dataVersion); }); + app.get("/config", (_req, res) => { + res.send(config); + }); + + app.post("/index", (req, res) => { + try { + const { chainId, address, timestamp, signature } = req.body as { + chainId: string; + address: string; + timestamp: number; + signature: `0x${string}`; + }; + + const reindex = async () => { + if (!chainId || !config.indexedChains) { + return res.status(400).send("chainId is required"); + } + + try { + const isAuthenticated = await recoverEthereumAddress({ + address, + timestamp, + signature, + }); + + config.logger.info( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp}` + ); + + if (isAuthenticated) { + await config.db.deleteChainData(Number(chainId)); + + const filteredIndexedChains = config.indexedChains.filter( + (chain) => + (chain as { context: { chainId: number } }).context.chainId === + Number(chainId) + ); + + if (filteredIndexedChains.length === 0) { + config.logger.error(`Chain ${chainId} not found`); + return res.status(400).send("chain not found"); + } + + const filteredChains = config.chains.filter( + (chain) => chain.id === Number(chainId) + ); + + if (filteredChains.length === 0) { + config.logger.error(`Chain ${chainId} not found`); + return res.status(400).send("chain not found"); + } + + const chain = filteredChains[0]; + const indexedChain = filteredIndexedChains[0]; + + chain.subscriptions.forEach((subscription) => { + indexedChain.unsubscribeFromContract({ + address: subscription.address, + }); + + const contractName = subscription.contractName; + const subscriptionFromBlock = + subscription.fromBlock === undefined + ? undefined + : BigInt(subscription.fromBlock); + + indexedChain.subscribeToContract({ + contract: contractName, + address: subscription.address, + fromBlock: subscriptionFromBlock || BigInt(0), + }); + }); + } else { + config.logger.error( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed authentication` + ); + return res.status(401).send("Authentication failed"); + } + } catch { + config.logger.error( + `Reindexing chain ${chainId} requested by ${address} at ${timestamp} failed with error` + ); + return res.status(500).send("An error occurred"); + } + }; + + reindex() + .then(() => { + config.logger.info(`Reindexing of chain ${chainId} finished`); + res.send("Reindexing finished"); + }) + .catch(() => { + config.logger.error( + `Reindexing of chain ${chainId} failed with error` + ); + res.status(500).send("An error occurred"); + }); + } catch { + config.logger.error(`Reindexing failed with error`); + res.status(500).send("An error occurred"); + } + }); + app.use("/api/v1", api); if (config.graphqlHandler !== undefined) { @@ -149,3 +255,54 @@ function staticJsonDataHandler(dataProvider: DataProvider) { } }; } + +const VALIDITY_PERIOD = 1 * 60 * 1000; // 1 minute validity period + +const recoverEthereumAddress = async ({ + address, + timestamp, + signature, +}: { + address: string; + timestamp: number; + signature: `0x${string}`; +}) => { + if (!address || !timestamp || !signature) { + return false; + } + const whitelistedAddresses: string[] = JSON.parse( + process.env.WHITELISTED_ADDRESSES! + ); + + // Check timestamp validity + const currentTime = Date.now(); + if (currentTime - timestamp > VALIDITY_PERIOD) { + return false; + } + + // Construct the expected message to be signed + const expectedMessage = `Authenticate with timestamp: ${timestamp}`; + try { + // Recover address from signature and expected message + const recoveredAddress = await recoverMessageAddress({ + message: expectedMessage, + signature, + }); + + const whitelistedAddressesLowercase = whitelistedAddresses.map((addr) => + addr.toLowerCase() + ); + + if ( + recoveredAddress.toLowerCase() === address.toLowerCase() && + whitelistedAddressesLowercase.includes(address.toLowerCase()) + ) { + return true; + } else { + return false; + } + } catch (error) { + console.error("Error verifying signature:", error); + return false; + } +}; diff --git a/src/index.ts b/src/index.ts index d57fa9d5..868671f7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -418,6 +418,7 @@ async function main(): Promise { workerPoolSize: config.estimatesLinearQfWorkerPoolSize, }, }, + indexedChains: await indexChainsPromise, }); await httpApi.start(); From a07c1a22e3f6e6da238f706faa06f52287f4f3eb Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Mon, 9 Sep 2024 16:54:50 +0530 Subject: [PATCH 06/13] chore: avoid duplicate write + cleanup (#664) * chore: avoid duplicate write * cleanup --- src/config.ts | 2 +- src/index.ts | 8 ++++---- src/prices/provider.ts | 17 +++++++++++++---- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/config.ts b/src/config.ts index 862f9e64..631b7bd2 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1489,7 +1489,7 @@ const CHAINS: Chain[] = [ .default("https://evm-rpc.sei-apis.com") .parse(process.env.SEI_MAINNET_RPC_URL), pricesFromTimestamp: Date.UTC(2024, 0, 1, 0, 0, 0), - maxGetLogsRange: 1000, + maxGetLogsRange: 10000, tokens: [ { code: "SEI", diff --git a/src/index.ts b/src/index.ts index 868671f7..cbf3bb70 100644 --- a/src/index.ts +++ b/src/index.ts @@ -528,13 +528,13 @@ async function catchupAndWatchChain( // Iterate through each gateway and attempt to fetch data for (const gateway of config.ipfsGateways) { const url = `${gateway}/ipfs/${cid}`; - chainLogger.info(`Trying IPFS gateway: ${gateway} for CID: ${cid}`); + // chainLogger.info(`Trying IPFS gateway: ${gateway} for CID: ${cid}`); const result = await fetchFromGateway(url); if (result !== undefined) { - chainLogger.info( - `Fetch successful from gateway: ${gateway} for CID: ${cid}` - ); + // chainLogger.info( + // `Fetch successful from gateway: ${gateway} for CID: ${cid}` + // ); // Save to IpfsData table try { diff --git a/src/prices/provider.ts b/src/prices/provider.ts index 01f75656..a8d48091 100644 --- a/src/prices/provider.ts +++ b/src/prices/provider.ts @@ -211,10 +211,19 @@ export function createPriceProvider( }); } - await db.applyChange({ - type: "InsertManyPrices", - prices: [newPrice], - }); + // Check if the price is already in the database + const existingPrice = await db.getTokenPriceByBlockNumber( + chainId, + newPrice.tokenAddress, + blockNumber + ); + + if (!existingPrice) { + await db.applyChange({ + type: "InsertManyPrices", + prices: [newPrice], + }); + } return { ...newPrice, tokenDecimals: token.decimals }; } From fb5ac79e9a9be70ec4dd71cf45dfd4124588300f Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Mon, 9 Sep 2024 20:52:34 +0530 Subject: [PATCH 07/13] improve locking (#665) add forciblyAcquireLockForSchema --- src/database/index.ts | 56 +++++++++++++++++++++++++++++-------------- src/http/app.ts | 12 ++++++---- src/index.ts | 2 -- 3 files changed, 46 insertions(+), 24 deletions(-) diff --git a/src/database/index.ts b/src/database/index.ts index 2a37240f..2fed6a0f 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -102,6 +102,12 @@ export class Database { }, 0); }; + // Helper function to forcibly acquire a lock for a specific schema + const forciblyAcquireLockForSchema = async (lockId: number) => { + // This will block until the lock is acquired + await client.query(`SELECT pg_advisory_lock(${lockId})`); + }; + // Helper function to acquire a lock for a specific schema const acquireLockForSchema = async (lockId: number) => { const result = await client.query( @@ -121,26 +127,40 @@ export class Database { const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); const priceDataLockId = generateLockId(this.priceDataSchemaName); + // Track acquired locks + const acquiredLocks: number[] = []; + try { const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); - const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); - const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); - - if ( - chainDataLockAcquired && - ipfsDataLockAcquired && - priceDataLockAcquired - ) { - return { - release: async () => { - await releaseLockForSchema(chainDataLockId); - await releaseLockForSchema(ipfsDataLockId); - await releaseLockForSchema(priceDataLockId); - client.release(); - }, - client, - }; - } + if (chainDataLockAcquired) acquiredLocks.push(chainDataLockId); + + // const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); + // if (ipfsDataLockAcquired) acquiredLocks.push(ipfsDataLockId); + // const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); + // if (priceDataLockAcquired) acquiredLocks.push(priceDataLockId); + + // NOTE: We are forcibly acquiring locks for IPFS and Price data schemas + await forciblyAcquireLockForSchema(priceDataLockId); + await forciblyAcquireLockForSchema(priceDataLockId); + acquiredLocks.push(ipfsDataLockId); + acquiredLocks.push(priceDataLockId); + + // this.#logger.info(`Lock Status => + // Chain Data (${chainDataLockId}): ${chainDataLockAcquired}, + // IPFS Data (${ipfsDataLockId}): ${ipfsDataLockAcquired}, + // Price Data (${priceDataLockId}): ${priceDataLockAcquired} + // `); + + return { + release: async () => { + for (const lockId of acquiredLocks) { + await releaseLockForSchema(lockId); + } + client.release(); + }, + client, + acquiredLocks, + }; } catch (error) { this.#logger.error({ error }, "Failed to acquire write lock"); } diff --git a/src/http/app.ts b/src/http/app.ts index 602f7fc7..22a36bd1 100644 --- a/src/http/app.ts +++ b/src/http/app.ts @@ -173,7 +173,7 @@ export const createHttpApi = (config: HttpApiConfig): HttpApi => { contract: contractName, address: subscription.address, fromBlock: subscriptionFromBlock || BigInt(0), - }); + }); }); } else { config.logger.error( @@ -270,9 +270,13 @@ const recoverEthereumAddress = async ({ if (!address || !timestamp || !signature) { return false; } - const whitelistedAddresses: string[] = JSON.parse( - process.env.WHITELISTED_ADDRESSES! - ); + const whitelistedAddresses: string[] = process.env.WHITELISTED_ADDRESSES + ? (JSON.parse(process.env.WHITELISTED_ADDRESSES) as string[]) + : []; + + if (!whitelistedAddresses) { + return false; + } // Check timestamp validity const currentTime = Date.now(); diff --git a/src/index.ts b/src/index.ts index cbf3bb70..1fe23af2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -244,8 +244,6 @@ async function main(): Promise { const lock = await db.acquireWriteLock(); if (lock !== null) { - baseLogger.info("acquired write lock"); - if (isFirstRun) { if (config.dropDb) { baseLogger.info("dropping all schemas"); From aa2a8d8ad6d4d1996f95e087903c7fd64ea7b7c8 Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Tue, 10 Sep 2024 15:53:15 +0530 Subject: [PATCH 08/13] test --- src/database/index.ts | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/database/index.ts b/src/database/index.ts index 2fed6a0f..3f858008 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -124,8 +124,8 @@ export class Database { // Acquire locks for all schemas const chainDataLockId = generateLockId(this.chainDataSchemaName); - const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); - const priceDataLockId = generateLockId(this.priceDataSchemaName); + // const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); + // const priceDataLockId = generateLockId(this.priceDataSchemaName); // Track acquired locks const acquiredLocks: number[] = []; @@ -140,16 +140,16 @@ export class Database { // if (priceDataLockAcquired) acquiredLocks.push(priceDataLockId); // NOTE: We are forcibly acquiring locks for IPFS and Price data schemas - await forciblyAcquireLockForSchema(priceDataLockId); - await forciblyAcquireLockForSchema(priceDataLockId); - acquiredLocks.push(ipfsDataLockId); - acquiredLocks.push(priceDataLockId); - - // this.#logger.info(`Lock Status => - // Chain Data (${chainDataLockId}): ${chainDataLockAcquired}, - // IPFS Data (${ipfsDataLockId}): ${ipfsDataLockAcquired}, - // Price Data (${priceDataLockId}): ${priceDataLockAcquired} - // `); + // await forciblyAcquireLockForSchema(ipfsDataLockId); + // await forciblyAcquireLockForSchema(priceDataLockId); + // acquiredLocks.push(ipfsDataLockId); + // acquiredLocks.push(priceDataLockId); + + this.#logger.info(`Lock Status => + Chain Data (${chainDataLockId}): ${chainDataLockAcquired} + `); + // IPFS Data (${ipfsDataLockId}): ${ipfsDataLockAcquired}, + // Price Data (${priceDataLockId}): ${priceDataLockAcquired} return { release: async () => { From e9282205e36a40a0736abfc601bfb4ba5fe6d60d Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Tue, 10 Sep 2024 15:57:27 +0530 Subject: [PATCH 09/13] remove unused code --- src/database/index.ts | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/src/database/index.ts b/src/database/index.ts index 3f858008..ae9d94cb 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -102,12 +102,6 @@ export class Database { }, 0); }; - // Helper function to forcibly acquire a lock for a specific schema - const forciblyAcquireLockForSchema = async (lockId: number) => { - // This will block until the lock is acquired - await client.query(`SELECT pg_advisory_lock(${lockId})`); - }; - // Helper function to acquire a lock for a specific schema const acquireLockForSchema = async (lockId: number) => { const result = await client.query( @@ -124,8 +118,6 @@ export class Database { // Acquire locks for all schemas const chainDataLockId = generateLockId(this.chainDataSchemaName); - // const ipfsDataLockId = generateLockId(this.ipfsDataSchemaName); - // const priceDataLockId = generateLockId(this.priceDataSchemaName); // Track acquired locks const acquiredLocks: number[] = []; @@ -134,22 +126,9 @@ export class Database { const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); if (chainDataLockAcquired) acquiredLocks.push(chainDataLockId); - // const ipfsDataLockAcquired = await acquireLockForSchema(ipfsDataLockId); - // if (ipfsDataLockAcquired) acquiredLocks.push(ipfsDataLockId); - // const priceDataLockAcquired = await acquireLockForSchema(priceDataLockId); - // if (priceDataLockAcquired) acquiredLocks.push(priceDataLockId); - - // NOTE: We are forcibly acquiring locks for IPFS and Price data schemas - // await forciblyAcquireLockForSchema(ipfsDataLockId); - // await forciblyAcquireLockForSchema(priceDataLockId); - // acquiredLocks.push(ipfsDataLockId); - // acquiredLocks.push(priceDataLockId); - this.#logger.info(`Lock Status => Chain Data (${chainDataLockId}): ${chainDataLockAcquired} `); - // IPFS Data (${ipfsDataLockId}): ${ipfsDataLockAcquired}, - // Price Data (${priceDataLockId}): ${priceDataLockAcquired} return { release: async () => { From f2c5ab5173e047376f72c308a64ba24f884ec3e9 Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Tue, 10 Sep 2024 16:16:29 +0530 Subject: [PATCH 10/13] add unique constraint --- src/database/migrate.ts | 6 ++++++ src/prices/provider.ts | 16 ++++++++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/database/migrate.ts b/src/database/migrate.ts index bedfb571..db6e2b88 100644 --- a/src/database/migrate.ts +++ b/src/database/migrate.ts @@ -384,6 +384,7 @@ export async function migrateDataFetcher(db: Kysely, schemaName: string) { .createTable("ipfs_data") .addColumn("cid", "text") .addColumn("data", "jsonb") + .addUniqueConstraint("unique_cid", ["cid"]) .execute(); } @@ -401,6 +402,11 @@ export async function migratePriceFetcher( .addColumn("priceInUSD", "real") .addColumn("timestamp", "timestamptz") .addColumn("blockNumber", BIGINT_TYPE) + .addUniqueConstraint("unique_chainId_tokenAddress_blockNumber", [ + "chainId", + "tokenAddress", + "blockNumber", + ]) .execute(); await db.schema diff --git a/src/prices/provider.ts b/src/prices/provider.ts index a8d48091..8279432f 100644 --- a/src/prices/provider.ts +++ b/src/prices/provider.ts @@ -219,10 +219,18 @@ export function createPriceProvider( ); if (!existingPrice) { - await db.applyChange({ - type: "InsertManyPrices", - prices: [newPrice], - }); + try { + await db.applyChange({ + type: "InsertManyPrices", + prices: [newPrice], + }); + } catch (e) { + logger.error({ + msg: "Failed to insert price", + error: e, + price: newPrice, + }); + } } return { ...newPrice, tokenDecimals: token.decimals }; From f4bf5675bdbc9e638f149bea4f6d695a49e05168 Mon Sep 17 00:00:00 2001 From: Hussein Martinez Date: Tue, 10 Sep 2024 18:17:44 +0700 Subject: [PATCH 11/13] chore: revert acquireWriteLock function --- src/database/index.ts | 57 ++++++++++++------------------------------- 1 file changed, 16 insertions(+), 41 deletions(-) diff --git a/src/database/index.ts b/src/database/index.ts index ae9d94cb..e767b354 100644 --- a/src/database/index.ts +++ b/src/database/index.ts @@ -95,51 +95,26 @@ export class Database { async acquireWriteLock() { const client = await this.#connectionPool.connect(); - // Helper function to generate lock ID based on schema name - const generateLockId = (schemaName: string): number => { - return schemaName.split("").reduce((acc, char) => { - return acc + char.charCodeAt(0); - }, 0); - }; - - // Helper function to acquire a lock for a specific schema - const acquireLockForSchema = async (lockId: number) => { + // generate lock id based on schema + const lockId = this.chainDataSchemaName.split("").reduce((acc, char) => { + return acc + char.charCodeAt(0); + }, 0); + + try { const result = await client.query( `SELECT pg_try_advisory_lock(${lockId}) as lock` ); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - return result.rows[0].lock === true; - }; - - // Helper function to release a lock for a specific schema - const releaseLockForSchema = async (lockId: number) => { - await client.query(`SELECT pg_advisory_unlock(${lockId})`); - }; - - // Acquire locks for all schemas - const chainDataLockId = generateLockId(this.chainDataSchemaName); - // Track acquired locks - const acquiredLocks: number[] = []; - - try { - const chainDataLockAcquired = await acquireLockForSchema(chainDataLockId); - if (chainDataLockAcquired) acquiredLocks.push(chainDataLockId); - - this.#logger.info(`Lock Status => - Chain Data (${chainDataLockId}): ${chainDataLockAcquired} - `); - - return { - release: async () => { - for (const lockId of acquiredLocks) { - await releaseLockForSchema(lockId); - } - client.release(); - }, - client, - acquiredLocks, - }; + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + if (result.rows[0].lock === true) { + return { + release: async () => { + await client.query(`SELECT pg_advisory_unlock(${lockId})`); + client.release(); + }, + client, + }; + } } catch (error) { this.#logger.error({ error }, "Failed to acquire write lock"); } From e08593084356451394c6f1d560aceba9a82e55f5 Mon Sep 17 00:00:00 2001 From: Aditya Anand M C Date: Tue, 10 Sep 2024 16:54:25 +0530 Subject: [PATCH 12/13] f --- src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index.ts b/src/index.ts index 1fe23af2..23136f74 100644 --- a/src/index.ts +++ b/src/index.ts @@ -484,7 +484,7 @@ async function catchupAndWatchChain( // Check if data is already in the IPFS database const ipfsData = await db.getDataByCid(cid); if (ipfsData) { - chainLogger.info(`Found IPFS data in database for CID: ${cid}`); + // chainLogger.info(`Found IPFS data in database for CID: ${cid}`); return Promise.resolve(ipfsData.data as string as T); } From 3a9df730da930096034029af30dd02a811b74aed Mon Sep 17 00:00:00 2001 From: Hussein Martinez Date: Wed, 11 Sep 2024 20:15:05 +0700 Subject: [PATCH 13/13] fix: args parser --- src/config.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/config.ts b/src/config.ts index 631b7bd2..8a114905 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1856,9 +1856,18 @@ export function getConfig(): Config { "from-block": { type: "string", }, + "drop-chain-db": { + type: "boolean", + }, + "drop-ipfs-db": { + type: "boolean", + }, "drop-db": { type: "boolean", }, + "drop-price-db": { + type: "boolean", + }, "rm-cache": { type: "boolean", },