Skip to content

Commit

Permalink
substra no longer guatranteed finalised block emission
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuripetusko committed Feb 26, 2022
1 parent 1e9d979 commit 6518bc0
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 11 deletions.
1 change: 1 addition & 0 deletions cli/run-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const runListener = async () => {
prefixes: ["0x726d726b", "0x524d524b"],
consolidateFunction,
storageProvider,
loggerEnabled: true
});
const subscriber = listener.initialiseObservable();
subscriber.subscribe((val) => console.log(val));
Expand Down
72 changes: 61 additions & 11 deletions src/rmrk2.0.0/listener.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import "isomorphic-fetch";
import "@polkadot/api-augment";
import { ApiPromise } from "@polkadot/api";
import { Observable, Subscriber } from "rxjs";
import { Header } from "@polkadot/types/interfaces/runtime";
Expand All @@ -16,7 +15,7 @@ import { ConsolidatorReturnType } from "./tools/consolidator/consolidator";
import fetchRemarks from "./tools/fetchRemarks";
import { hexToString } from "@polkadot/util";
import { VERSION } from "./tools/constants";
import { PromiseRpcResult } from "@polkadot/api/types";
import { PromiseRpcResult } from "@polkadot/api-base/types/rpc";

interface IProps {
polkadotApi: ApiPromise | null;
Expand All @@ -26,6 +25,7 @@ interface IProps {
consolidateFunction: (remarks: Remark[]) => Promise<ConsolidatorReturnType>;
storageProvider?: IStorageProvider;
storageKey?: string;
loggerEnabled?: boolean;
}

export interface IStorageProvider {
Expand Down Expand Up @@ -83,6 +83,7 @@ export class RemarkListener {
private missingBlockCallsFetched: boolean;
private prefixes: string[];
private currentBlockNum: number;
private loggerEnabled: boolean;
public storageProvider: IStorageProvider;
private consolidateFunction: (
remarks: Remark[]
Expand All @@ -94,6 +95,7 @@ export class RemarkListener {
consolidateFunction,
storageProvider,
storageKey,
loggerEnabled = false,
}: IProps) {
if (!polkadotApi) {
throw new Error(
Expand All @@ -112,6 +114,7 @@ export class RemarkListener {
this.prefixes = prefixes || ["0x726d726b", "0x524d524b"];
this.consolidateFunction = consolidateFunction;
this.storageProvider = storageProvider || new StorageProvider(storageKey);
this.loggerEnabled = loggerEnabled;
}

private initialize = async () => {
Expand All @@ -129,6 +132,12 @@ export class RemarkListener {
}
};

private logger = (message: string) => {
if (this.loggerEnabled) {
console.log(message);
}
};

/* Rxjs observable for finalised remarks, this will return all of consolidated remarks */
public initialiseObservable = (): Observable<ConsolidatorReturnType> => {
const subscriber = new Observable<ConsolidatorReturnType>((observer) => {
Expand All @@ -153,9 +162,18 @@ export class RemarkListener {
/*
Fetch blocks between last block in dump and last block on chain
*/
public async fetchMissingBlockCalls(latestBlock: number): Promise<Block[]> {
public async fetchMissingBlockCalls(
latestBlock: number,
toBlock?: number
): Promise<Block[]> {
try {
const to = await getLatestFinalizedBlock(this.apiPromise);
const to = toBlock || (await getLatestFinalizedBlock(this.apiPromise));

this.logger(
`Fetching missing or skipped blocks between ${
latestBlock + 1
} and ${to}`
);
return await fetchRemarks(
this.apiPromise,
latestBlock + 1,
Expand Down Expand Up @@ -199,6 +217,16 @@ export class RemarkListener {
...this.latestBlockCallsFinalised,
];

// Logging
if (blockCalls.length > 0 && this.loggerEnabled) {
const blockNums = blockCalls.map((blockCall) => blockCall.block);
this.logger(
`Consolidating block range between: ${blockNums[0]} and ${
blockNums[blockNums.length - 1]
}`
);
}

const remarks = getRemarksFromBlocks(blockCalls, this.prefixes);
this.latestBlockCallsFinalised = [];
this.missingBlockCalls = [];
Expand All @@ -222,7 +250,7 @@ export class RemarkListener {
Subscribe to latest block heads, (finalised, and un-finalised)
Save them to 2 separate arrays, and once block is finalised, remove it from unfinalised array
this.latestBlockCalls is array of unfinalised blocks,
we keep it for reference in case burnr wants to disable remarks that are being interacted with
we keep it for reference in case burner wants to disable remarks that are being interacted with
*/
private async initialiseListener({ finalised }: { finalised: boolean }) {
const headSubscriber = finalised
Expand All @@ -244,13 +272,29 @@ export class RemarkListener {
this.prefixes,
this.apiPromise
);

const filteredCalls = calls.filter((call) => {
return hexToString(call.value).includes(`::${VERSION}::`);
});

const latestFinalisedBlockNum = header.number.toNumber();

if (finalised) {
this.currentBlockNum = header.number.toNumber();
const latestSavedBlock = this.currentBlockNum;
// Compare block sequence order to see if there's a skipped finalised block
if (
latestSavedBlock &&
latestSavedBlock + 1 < latestFinalisedBlockNum &&
this.missingBlockCallsFetched
) {
// Fetch all the missing blocks and save their remarks for next consolidation.
this.missingBlockCallsFetched = false;
this.missingBlockCalls = await this.fetchMissingBlockCalls(
latestSavedBlock,
latestFinalisedBlockNum - 1
);
this.missingBlockCallsFetched = true;
}
this.currentBlockNum = latestFinalisedBlockNum;
}

// Update local db latestBlock
Expand All @@ -260,29 +304,35 @@ export class RemarkListener {
filteredCalls.length === 0
) {
try {
await this.storageProvider.set(header.number.toNumber());
await this.storageProvider.set(latestFinalisedBlockNum);
} catch (e: any) {
console.error(e);
}
}

if (filteredCalls.length > 0) {
const blockCalls: BlockCalls = {
block: header.number.toNumber(),
block: latestFinalisedBlockNum,
calls: filteredCalls,
};

// If we are listening to finalised blocks
if (finalised) {
this.latestBlockCallsFinalised.push(blockCalls);
// Now that block has been finalised,
// remove remarks that we found in it from unfinalised blockCalls array that we keep in memory
// remove remarks that we found in it from unfinalised blockCalls array that we keep in memory or stalled blocks (more than 10 blocks)
this.latestBlockCalls = this.latestBlockCalls.filter(
(item) => item?.block !== blockCalls.block
(item) =>
item?.block !== blockCalls.block ||
blockCalls.block - item.block > 20
);
// Call consolidate to re-consolidate and fire subscription event back to subscriber
await this.consolidate();
} else {
// Filter stalled blocks (20 blocks) to free up memory
this.latestBlockCalls = this.latestBlockCalls.filter(
(item) => blockCalls.block - item.block > 20
);
this.latestBlockCalls.push(blockCalls);
/* If someone is listening to unfinalised blocks, return them here */
if (this.observerUnfinalised) {
Expand Down

0 comments on commit 6518bc0

Please sign in to comment.