Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Section for Version Tracking, Restart Behavior, and Backfilling #769

Merged
merged 6 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/nextra/next.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@ export default withBundleAnalyzer(
"/en/build/indexer/indexer-sdk/documentation/connect-steps",
permanent: true,
},
{
source: "/indexer/indexer-sdk/documentation/version-tracking",
destination:
"/en/build/indexer/indexer-sdk/documentation/version-tracking",
permanent: true,
},
{
source: "/indexer/txn-stream/labs-hosted",
destination: "/en/build/indexer/api/labs-hosted",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ export default {
"connect-steps": {
title: "Connecting Steps",
},
"version-tracking": {
title: "Version Tracking, Restart Behavior, and Backfilling",
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
title: "Version Tracking, Restart Behavior, and Backfilling"
---

# Version Tracking, Restart Behavior, and Backfilling

## Version Tracking
The `VersionTrackerStep` is a [common step in the SDK](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk/src/common_steps/version_tracker_step.rs#L29C12-L29C30) that checkpoints indexer progress. The struct
requires implementation of the [`ProcessorStatusSaver` trait](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk/src/common_steps/version_tracker_step.rs#L16) and can be used with the same [`.connect_to()`](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/processors/events_processor.rs#L117) method as other steps. Upon a successfully processed batch,
the `VersionTrackerStep` will [call](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk/src/common_steps/version_tracker_step.rs#L57) the trait implementation of `save_processor_status()`.

### `ProcessorStatusSaver`
The `ProcessorStatusSaver` trait requires the implementation of the method `save_processor_status` with the following signature:

```rust
async fn save_processor_status(
&self,
last_success_batch: &TransactionContext<()>,
) -> Result<(), ProcessorError>;
```
This method is where checkpointing should be written. Examples can be found [here](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/steps/common/processor_status_saver.rs#L23).
Note that it is possible to checkpoint progress in different ways by using enums. [This example](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/steps/common/processor_status_saver.rs#L131) inserts into Postgres using a simple [processor_status model](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/processor/src/db/postgres/models/processor_status.rs).

## Restart Behavior
Now that the processor successfully writes to the chosen store for checkpointing, upon restarting it needs to retrieve the latest successful version from that store. Here is an example of a `get_starting_version()` method that returns the latest processed version saved. This `starting_version: u64` can then be used as below. If there is no checkpoint, the processor will start from the beginning of the chain.

```rust
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;
```

## Backfilling
The `IndexerProcessorConfig`, `ProcessorStatusSaver` and `get_starting_version()` can all be modified to allow for convenient backfills. Without these changes, it is difficult to run a live processor at the latest transaction version as well as a backfill processor.

### Updates to Config
[Add an additional field](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/config/indexer_processor_config.rs#L44) on your `IndexerProcessorConfig` for a `BackfillConfig` which contains a single field, `backfill_alias` . The introduction of this field will be used to determine whether the processor is a backfill processor or regular one.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the extra spaces around the period in backfill_alias .

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.


```rust
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerProcessorConfig {
pub processor_config: ProcessorConfig,
pub transaction_stream_config: TransactionStreamConfig,
pub db_config: DbConfig,
pub backfill_config: Option<BackfillConfig>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackfillConfig {
pub backfill_alias: String,
}
```

### Backfill Processor Status Table
To avoid write conflicts with the regular `processor_status` table, introduce a `backfill_processor_status_table` that will use `backfill_alias` as the primary key instead of `processor_name`.
Expand on this [implementation](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs). This model introduces a new state, `BackfillStatus`, which is either `InProgress` or `Complete` which will determine the backfilling restart behavior as expected.


### Updates to ProcessorStatusSaver
Expand your `ProcessorStatusSaverEnum` to include a `Backfill` variant that extracts the `backfill_alias` from the `BackfillConfig`, and the `backfill_start_version` `backfill_end_version` from `IndexerProcessorConfig.transaction_stream_config` [like this](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/steps/common/processor_status_saver.rs#L27). Update the corresponding write query to write to the new `backfill_processor_status` table.

### Updates to `get_starting_version()`
Add a [statement](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/utils/starting_version.rs#L143) in your `get_starting_version` method to query the `backfill_processor_status_table` when the `BackfillConfig` field is present in `IndexerProcessorConfig` .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: line +65]

nit: can we add a small section on how to update the `config.yaml` to run the processor in backfill mode?

See this comment inline on Graphite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

12 changes: 7 additions & 5 deletions packages/aptos-nextra-components/src/utils/mdast/mdast.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ test("keyRotation.md parse and fix AST", () => {
markdownToMdx(tree);
const markdown = astToMarkdown(tree);
expect(markdown).toEqual(
astToMarkdown(readFileAsTree("./examples/keyRotation.expect.md")).toString()
astToMarkdown(
readFileAsTree("./examples/keyRotation.expect.md"),
).toString(),
);
});

Expand All @@ -18,8 +20,8 @@ test("account_snippet.md parse and fix AST", () => {
const markdown = astToMarkdown(tree);
expect(markdown).toEqual(
astToMarkdown(
readFileAsTree("./examples/account_snippet.expect.md")
).toString()
readFileAsTree("./examples/account_snippet.expect.md"),
).toString(),
);
});

Expand All @@ -29,8 +31,8 @@ test("vector_snippet.md parse and fix AST", () => {
const markdown = astToMarkdown(tree);
expect(markdown).toEqual(
astToMarkdown(
readFileAsTree("./examples/vector_snippet.expect.md")
).toString()
readFileAsTree("./examples/vector_snippet.expect.md"),
).toString(),
);
});

Expand Down
Loading