Skip to content

Commit

Permalink
workers: task-driver: state_migration: purge historical state startup…
Browse files Browse the repository at this point in the history
… task (#896)
  • Loading branch information
akirillo authored Jan 24, 2025
1 parent 4e93520 commit ebf5588
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 45 deletions.
56 changes: 56 additions & 0 deletions state/src/storage/tx/order_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ impl<'db> StateTxn<'db, RW> {
self.write_order_history(wallet_id, orders)
}

/// Purges terminal orders from a wallet's order history
pub fn purge_terminal_orders(&self, wallet_id: &WalletIdentifier) -> Result<(), StorageError> {
let mut orders = self.get_order_history(wallet_id)?;
orders.retain(|o| !o.state.is_terminal());
self.write_order_history(wallet_id, orders)
}

/// Write the order history for a given wallet
#[allow(clippy::needless_pass_by_value)]
fn write_order_history(
Expand Down Expand Up @@ -276,4 +283,53 @@ mod tests {
let orders = tx.get_order_history(&wallet_id).unwrap();
assert_eq!(orders, history);
}

/// Test purging a terminal order
#[test]
fn test_purge_terminal_order() {
const N: usize = 100;
let db = mock_db();
let wallet_id = Uuid::new_v4();
let mut history = random_order_history(N);

// Choose a random order to mark as terminal (cancelled)
let mut rng = rand::thread_rng();
let index = rng.gen_range(0..history.len());
history[index].state = OrderState::Cancelled;
let order_id = history[index].id;

// Setup the history
let tx = db.new_write_tx().unwrap();
for order_md in history.iter().cloned() {
tx.push_order_history(&wallet_id, order_md).unwrap();
}
tx.commit().unwrap();

// Check that the order can be fetched
let tx = db.new_read_tx().unwrap();
let res = tx.get_order_metadata(wallet_id, order_id).unwrap();
tx.commit().unwrap();
assert!(res.is_some());

// Purge terminal orders
let tx = db.new_write_tx().unwrap();
tx.purge_terminal_orders(&wallet_id).unwrap();
tx.commit().unwrap();

// Check that the order can no longer be fetched
let tx = db.new_read_tx().unwrap();
let res = tx.get_order_metadata(wallet_id, order_id).unwrap();
tx.commit().unwrap();
assert!(res.is_none());

// Check that the remaining orders are unchanged

// Filter out the removed order & sort the history by descending creation time
history.retain(|o| o.id != order_id);
history.sort_by_key(|o| Reverse(o.created));

let tx = db.new_read_tx().unwrap();
let orders = tx.get_order_history(&wallet_id).unwrap();
assert_eq!(orders, history);
}
}
38 changes: 38 additions & 0 deletions state/src/storage/tx/task_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ impl<'db> StateTxn<'db, RW> {
self.inner.write(TASK_HISTORY_TABLE, &key, &tasks)?;
Ok(())
}

/// Purge the task history for a given task queue
pub fn purge_task_history(&self, key: &TaskQueueKey) -> Result<(), StorageError> {
let key = task_history_key(key);
self.inner.delete(TASK_HISTORY_TABLE, &key).map(|_| ())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -141,4 +147,36 @@ mod tests {
assert_eq!(a.id, b.id);
}
}

/// Tests purging task history
#[test]
fn test_purge_history() {
const N: usize = 100;
let db = mock_db();
let wallet_id = WalletIdentifier::new_v4();

let tasks = (0..N).map(|_| mock_historical_task()).collect_vec();
let tx = db.new_write_tx().unwrap();
for task in tasks.iter() {
tx.append_task_to_history(&wallet_id, task.clone()).unwrap();
}
tx.commit().unwrap();

// Assert current length of task history
let tx = db.new_read_tx().unwrap();
let history = tx.get_task_history(&wallet_id).unwrap();
tx.commit().unwrap();
assert_eq!(history.len(), N);

// Purge the history
let tx = db.new_write_tx().unwrap();
tx.purge_task_history(&wallet_id).unwrap();
tx.commit().unwrap();

// Assert that the history is now empty
let tx = db.new_read_tx().unwrap();
let history = tx.get_task_history(&wallet_id).unwrap();
tx.commit().unwrap();
assert!(history.is_empty());
}
}
2 changes: 2 additions & 0 deletions workers/task-driver/src/state_migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@
//!
//! These migrations should be idempotent, and defined as need be
mod purge_historical_state;
mod remove_phantom_orders;
pub(crate) use purge_historical_state::purge_historical_state;
pub(crate) use remove_phantom_orders::remove_phantom_orders;
58 changes: 58 additions & 0 deletions workers/task-driver/src/state_migration/purge_historical_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! Purges historical state
//!
//! This includes terminal historical orders and task history
use common::types::wallet::WalletIdentifier;
use state::State;
use tracing::error;

/// Purges historical state for all wallets
pub async fn purge_historical_state(state: &State) -> Result<(), String> {
let wallets = state.get_all_wallets().await?;
for wallet in wallets {
purge_wallet_historical_state(state, wallet.wallet_id).await?;
}

Ok(())
}

/// Purges historical state for a single wallet
async fn purge_wallet_historical_state(
state: &State,
wallet_id: WalletIdentifier,
) -> Result<(), String> {
purge_order_history(state, wallet_id).await?;
purge_task_history(state, wallet_id).await
}

/// Purges all terminal orders from a wallet's order history
async fn purge_order_history(state: &State, wallet_id: WalletIdentifier) -> Result<(), String> {
let res = state
.with_write_tx(move |tx| {
tx.purge_terminal_orders(&wallet_id)?;
Ok(())
})
.await;

if let Err(e) = res {
error!("error purging order history for wallet {wallet_id}: {e}");
}

Ok(())
}

/// Purges a wallet's task history
async fn purge_task_history(state: &State, wallet_id: WalletIdentifier) -> Result<(), String> {
let res = state
.with_write_tx(move |tx| {
tx.purge_task_history(&wallet_id)?;
Ok(())
})
.await;

if let Err(e) = res {
error!("error purging task history for wallet {wallet_id}: {e}");
}

Ok(())
}
44 changes: 0 additions & 44 deletions workers/task-driver/src/state_migration/remove_phantom_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ pub async fn remove_phantom_orders(state: &State) -> Result<(), String> {
info!("removing {} phantom orders...", phantom_orders.len());
remove_orders_from_order_book(state, &phantom_orders).await?;

// Remove all local orders that are not part of a wallet
let missing_local_orders = find_missing_local_orders(state).await?;
info!("removing {} missing local orders...", missing_local_orders.len());
remove_local_orders(state, &missing_local_orders).await?;

Ok(())
}

Expand All @@ -37,25 +32,6 @@ async fn find_phantom_orders(state: &State) -> Result<Vec<OrderIdentifier>, Stri
Ok(phantom_orders)
}

/// Find all local orders that are not part of a wallet
async fn find_missing_local_orders(state: &State) -> Result<Vec<OrderIdentifier>, String> {
let local_orders = state
.with_read_tx(|tx| {
let orders = tx.get_local_orders()?;
Ok(orders)
})
.await?;

let mut missing_local_orders = Vec::new();
for order_id in local_orders {
if check_order_missing(state, &order_id).await? {
missing_local_orders.push(order_id);
}
}

Ok(missing_local_orders)
}

/// Check whether an order is missing from its wallet
async fn check_order_missing(state: &State, order_id: &OrderIdentifier) -> Result<bool, String> {
// Check that the order has a wallet ID mapped to it
Expand Down Expand Up @@ -92,23 +68,3 @@ async fn remove_orders_from_order_book(

Ok(())
}

/// Remove a set of orders from the local orders list
async fn remove_local_orders(state: &State, order_ids: &[OrderIdentifier]) -> Result<(), String> {
for order_id in order_ids {
let oid = *order_id;
let res = state
.with_write_tx(move |tx| {
tx.remove_local_order(&oid)?;
Ok(())
})
.await;

// Do not abort on error, just log them
if let Err(e) = res {
error!("error removing local order {order_id}: {e}");
}
}

Ok(())
}
14 changes: 13 additions & 1 deletion workers/task-driver/src/tasks/node_startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use util::{

use crate::{
await_task,
state_migration::remove_phantom_orders,
state_migration::{purge_historical_state, remove_phantom_orders},
task_state::StateWrapper,
traits::{Task, TaskContext, TaskError, TaskState},
utils::ERR_WALLET_NOT_FOUND,
Expand Down Expand Up @@ -457,6 +457,18 @@ impl NodeStartupTask {
info!("done removing phantom orders");
}
});

// Purge historical state for all wallets
let state = self.state.clone();
tokio::task::spawn(async move {
info!("purging historical state for all wallets...");
if let Err(e) = purge_historical_state(&state).await {
error!("error purging historical state: {e}");
} else {
info!("done purging historical state");
}
});

Ok(())
}

Expand Down

0 comments on commit ebf5588

Please sign in to comment.