-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Section for Version Tracking, Restart Behavior, and Backfilling #769
Changes from 4 commits
008df20
dcc2f6d
b62fb58
1c8891f
04358b0
db4cb4c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
--- | ||
title: "Version Tracking" | ||
--- | ||
|
||
# Version Tracking, Restart Behavior, and Backfilling | ||
|
||
## Version Tracking | ||
The `VersionTrackerStep` is a [common step in the SDK](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk/src/common_steps/version_tracker_step.rs#L29C12-L29C30) that checkpoints indexer progress. The struct requires implementation of the [`ProcessorStatusSaver` trait](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk/src/common_steps/version_tracker_step.rs#L16) and can be used with the same [`.connect_to()`](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/processors/events_processor.rs#L117) method as other steps. Upon a successfully processed batch, the `VersionTrackerStep` will [call](https://github.com/aptos-labs/aptos-indexer-processor-sdk/blob/main/aptos-indexer-processors-sdk/sdk/src/common_steps/version_tracker_step.rs#L57) the trait implementation of `save_processor_status()`. | ||
|
||
### ProcessorStatusSaver | ||
The `ProcessorStatusSaver` trait requires the implementation of the method `save_processor_status` with the following signature: | ||
|
||
```rust | ||
async fn save_processor_status( | ||
&self, | ||
last_success_batch: &TransactionContext<()>, | ||
) -> Result<(), ProcessorError>; | ||
``` | ||
This method is where checkpointing should be written. Examples can be found [here](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/steps/common/processor_status_saver.rs#L23). It is possible to checkpoint progress in different ways by using enums. [This example](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/steps/common/processor_status_saver.rs#L131) inserts into Postgres using a simple [`processor_status` model](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/processor/src/db/postgres/models/processor_status.rs). | ||
|
||
|
||
## Restart Behavior | ||
Now that the processor successfully writes to the chosen store for version tracking, upon restarting it needs to retrieve the latest successful version from that store. [Here is an example](https://github.com/aptos-labs/aptos-indexer-processors/blob/29f4e9c2266db11cbec2177c17fc210d4c687d32/rust/sdk-processor/src/utils/starting_version.rs#L26) of a `get_starting_version()` method that returns the latest processed version saved. This `starting_version: u64` can then be used as below. If there is no checkpoint, the processor will start from the beginning of the chain. | ||
|
||
```rust | ||
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { | ||
starting_version: Some(starting_version), | ||
..self.config.transaction_stream_config.clone() | ||
}) | ||
.await?; | ||
``` | ||
|
||
|
||
## Backfilling | ||
To enable backfilling, `IndexerProcessorConfig`, `ProcessorStatusSaver` and `get_starting_version()` need some updates. Without these changes, it is difficult to run a live processor at the latest transaction version as well as a backfill processor. | ||
|
||
### Updates to Config | ||
[Add an additional field](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/config/indexer_processor_config.rs#L44) on your `IndexerProcessorConfig` for a `BackfillConfig` which contains a single field, `backfill_alias`. The introduction of this field will be used to determine whether the processor is a backfill processor or regular one. | ||
|
||
```rust | ||
#[derive(Clone, Debug, Deserialize, Serialize)] | ||
#[serde(deny_unknown_fields)] | ||
pub struct IndexerProcessorConfig { | ||
pub processor_config: ProcessorConfig, | ||
pub transaction_stream_config: TransactionStreamConfig, | ||
pub db_config: DbConfig, | ||
pub backfill_config: Option<BackfillConfig>, | ||
} | ||
|
||
#[derive(Debug, Clone, Serialize, Deserialize)] | ||
pub struct BackfillConfig { | ||
pub backfill_alias: String, | ||
} | ||
``` | ||
|
||
### Backfill Processor Status Table | ||
To avoid write conflicts with the regular `processor_status` table, introduce a `backfill_processor_status_table` that will use `backfill_alias` as the primary key instead of `processor_name`. Expand on this [implementation](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs). This model introduces a new state, `BackfillStatus`, which is either `InProgress` or `Complete` which will determine the backfilling restart behavior. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might also help to explain why it's important to avoid write conflicts - usually we run head and backfill processors at the same time so that data still gets indexed at head. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good call out. addded |
||
|
||
### Updates to ProcessorStatusSaver | ||
Expand your `ProcessorStatusSaverEnum` to include a `Backfill` variant that extracts the `backfill_alias` from the `BackfillConfig`, and the `backfill_start_version` `backfill_end_version` from `IndexerProcessorConfig.transaction_stream_config` [like this](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/steps/common/processor_status_saver.rs#L27). Update the corresponding write query to write to the new `backfill_processor_status` table. | ||
|
||
### Updates to get_starting_version | ||
Add a [statement](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/utils/starting_version.rs#L143) in your `get_starting_version` method to query the `backfill_processor_status_table` when the `BackfillConfig` field is present in `IndexerProcessorConfig` . | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
[Re: line +65]
nit: can we add a small section on how to update the `config.yaml` to run the processor in backfill mode? See this comment inline on Graphite. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been linking to the example repo event processor implementation since it's an easier starting point for beginners. We could link to the core processors if they need more examples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated