-
Notifications
You must be signed in to change notification settings - Fork 121
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Section for Version Tracking, Restart Behavior, and Backfilling (#…
…769) * add content * formatting * Change some headers and remove new lines * Change wording * Update reference to example repo and add sections on write conflicts + config.yaml
- Loading branch information
1 parent
be13ea4
commit a5fa73b
Showing
3 changed files
with
87 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
78 changes: 78 additions & 0 deletions
78
apps/nextra/pages/en/build/indexer/indexer-sdk/documentation/version-tracking.mdx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
--- | ||
title: "Version Tracking" | ||
--- | ||
|
||
# Version Tracking, Restart Behavior, and Backfilling | ||
|
||
## Version Tracking | ||
The `VersionTrackerStep` is a [common step in the SDK](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk/src/common_steps/version_tracker_step.rs#L29C12-L29C30) that checkpoints indexer progress. The struct requires implementation of the [`ProcessorStatusSaver` trait](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk/src/common_steps/version_tracker_step.rs#L16) and can be used with the same [`.connect_to()`](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/processors/events_processor.rs#L117) method as other steps. Upon a successfully processed batch, the `VersionTrackerStep` will [call](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk/src/common_steps/version_tracker_step.rs#L57) the trait implementation of `save_processor_status()`. | ||
|
||
### ProcessorStatusSaver | ||
The `ProcessorStatusSaver` trait requires the implementation of the method `save_processor_status` with the following signature: | ||
|
||
```rust | ||
async fn save_processor_status( | ||
&self, | ||
last_success_batch: &TransactionContext<()>, | ||
) -> Result<(), ProcessorError>; | ||
``` | ||
This method is where checkpointing should be written. Examples can be found [here](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/steps/common/processor_status_saver.rs#L23). It is possible to checkpoint progress in different ways by using enums. [This example](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/steps/common/processor_status_saver.rs#L131) inserts into Postgres using a simple [`processor_status` model](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/processor/src/db/postgres/models/processor_status.rs). | ||
|
||
|
||
## Restart Behavior | ||
Now that the processor successfully writes to the chosen store for version tracking, upon restarting it needs to retrieve the latest successful version from that store. [Here is an example](https://github.com/aptos-labs/aptos-indexer-processors/blob/29f4e9c2266db11cbec2177c17fc210d4c687d32/rust/sdk-processor/src/utils/starting_version.rs#L26) of a `get_starting_version()` method that returns the latest processed version saved. This `starting_version: u64` can then be used as below. If there is no checkpoint, the processor will start from the beginning of the chain. | ||
|
||
```rust | ||
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { | ||
starting_version: Some(starting_version), | ||
..self.config.transaction_stream_config.clone() | ||
}) | ||
.await?; | ||
``` | ||
|
||
|
||
## Backfilling | ||
To enable backfilling, `IndexerProcessorConfig`, `ProcessorStatusSaver` and `get_starting_version()` need some updates. Without these changes, it is difficult to run a live processor at the latest transaction version as well as a backfill processor. | ||
|
||
### Updates to Config | ||
[Add an additional field](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/main/aptos-indexer-processor-example/src/config/indexer_processor_config.rs#L20) on your `IndexerProcessorConfig` for a `BackfillConfig` which contains a single field, `backfill_alias`. The introduction of this field will be used to determine whether the processor is a backfill processor or regular one. | ||
|
||
```rust | ||
#[derive(Clone, Debug, Deserialize, Serialize)] | ||
#[serde(deny_unknown_fields)] | ||
pub struct IndexerProcessorConfig { | ||
pub processor_config: ProcessorConfig, | ||
pub transaction_stream_config: TransactionStreamConfig, | ||
pub db_config: DbConfig, | ||
pub backfill_config: Option<BackfillConfig>, | ||
} | ||
|
||
#[derive(Debug, Clone, Serialize, Deserialize)] | ||
pub struct BackfillConfig { | ||
pub backfill_alias: String, | ||
} | ||
``` | ||
|
||
### Updates to `config.yaml` | ||
Add the `backfill_config` section to `server_config` in your yaml file to set `backfill_alias`. | ||
|
||
```yaml | ||
server_config: | ||
processor_config: | ||
type: "events_processor" | ||
... | ||
backfill_config: | ||
backfill_alias: "events_processor_backfill" | ||
``` | ||
### Backfill Processor Status Table | ||
Use a separate table for backfill processor status to avoid write conflicts. This table (`backfill_processor_status_table`) uses `backfill_alias` as the primary key instead of `processor_name` to prevent conflicts with the main `processor_status` table when running head and backfill processors concurrently. Create multiple backfill processors with differing `backfill_alias` and transaction version ranges for a faster backfill. | ||
Expand on this [implementation](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs). This model introduces a new state, `BackfillStatus`, which is either `InProgress` or `Complete` which will determine the backfilling restart behavior. | ||
|
||
### Updates to ProcessorStatusSaver | ||
Expand your `ProcessorStatusSaverEnum` to include a `Backfill` variant that extracts the `backfill_alias` from the `BackfillConfig`, and the `backfill_start_version` `backfill_end_version` from `IndexerProcessorConfig.transaction_stream_config` [like this](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/steps/common/processor_status_saver.rs#L27). Update the corresponding write query to write to the new `backfill_processor_status` table. | ||
|
||
### Updates to get_starting_version | ||
Add a [statement](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/utils/starting_version.rs#L143) in your `get_starting_version` method to query the `backfill_processor_status_table` when the `BackfillConfig` field is present in `IndexerProcessorConfig` . | ||
|