Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove latest epoch id and add some todo documentation comments #26

Merged
merged 1 commit into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions optd-persistent/src/cost_model/catalog/mock_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct MockTrigger {
pub function: String,
}

/// TODO: documentation
#[derive(Default)]
pub struct MockCatalog {
pub databases: Vec<MockDatabaseMetadata>,
Expand Down
16 changes: 10 additions & 6 deletions optd-persistent/src/cost_model/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,51 @@ use sea_orm_migration::prelude::*;
use serde_json::json;
use std::sync::Arc;

/// TODO: documentation
pub enum CatalogSource {
Iceberg(),
Mock,
}

/// TODO: documentation
pub enum AttrType {
Integer,
Float,
Varchar,
Boolean,
}

/// TODO: documentation
pub enum IndexType {
BTree,
Hash,
}

/// TODO: documentation
pub enum ConstraintType {
PrimaryKey,
ForeignKey,
Unique,
Check,
}

/// TODO: documentation
pub enum StatType {
Count,
Cardinality,
Min,
Max,
}

/// TODO: documentation
#[derive(PartialEq)]
pub enum EpochOption {
// TODO(lanlou): Could I make i32 -> EpochId?
Existed(i32),
New(String, String),
}

/// TODO: documentation
#[derive(Clone)]
pub struct Stat {
pub stat_type: i32,
Expand All @@ -58,6 +65,7 @@ pub struct Stat {
pub name: String,
}

/// TODO: documentation
#[trait_variant::make(Send)]
pub trait CostModelStorageLayer {
type GroupId;
Expand All @@ -68,11 +76,7 @@ pub trait CostModelStorageLayer {
type StatId;

// TODO: Change EpochId to event::Model::epoch_id
async fn create_new_epoch(
&mut self,
source: String,
data: String,
) -> StorageResult<Self::EpochId>;
async fn create_new_epoch(&self, source: String, data: String) -> StorageResult<Self::EpochId>;

async fn update_stats_from_catalog(
&self,
Expand All @@ -81,7 +85,7 @@ pub trait CostModelStorageLayer {
) -> StorageResult<()>;

async fn update_stats(
&mut self,
&self,
stat: Stat,
epoch_option: EpochOption,
) -> StorageResult<Option<Self::EpochId>>;
Expand Down
33 changes: 11 additions & 22 deletions optd-persistent/src/cost_model/orm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,19 @@ impl CostModelStorageLayer for BackendManager {
type EpochId = i32;
type StatId = i32;

async fn create_new_epoch(
&mut self,
source: String,
data: String,
) -> StorageResult<Self::EpochId> {
/// TODO: documentation
async fn create_new_epoch(&self, source: String, data: String) -> StorageResult<Self::EpochId> {
let new_event = event::ActiveModel {
source_variant: sea_orm::ActiveValue::Set(source),
timestamp: sea_orm::ActiveValue::Set(Utc::now()),
data: sea_orm::ActiveValue::Set(sea_orm::JsonValue::String(data)),
..Default::default()
};
let insert_res = Event::insert(new_event).exec(&self.db).await?;
self.latest_epoch_id.store(
insert_res.last_insert_id as usize,
std::sync::atomic::Ordering::Relaxed,
);
Ok(insert_res.last_insert_id)
}

/// TODO: documentation
async fn update_stats_from_catalog(
&self,
c: CatalogSource,
Expand Down Expand Up @@ -154,6 +148,7 @@ impl CostModelStorageLayer for BackendManager {
}
}

/// TODO: improve the documentation
/* Update the statistics in the database.
* The statistic can be newly inserted or updated. If the statistic value
* is the same as the latest existing one, the update will be ignored, and
Expand All @@ -166,14 +161,14 @@ impl CostModelStorageLayer for BackendManager {
* If the statistic value is the same as the latest existing one, this function
* won't create a new epoch.
*
* For batch updates, if the caller can directly call this function with
* For batch updates, the caller can directly call this function with
* New epoch option at the first time, and if the epoch_id is returned, the
* caller can use the returned epoch_id for the rest of the updates.
* But if the epoch_id is not returned, the caller should continue using
* the New epoch option for the next statistic update.
*/
async fn update_stats(
&mut self,
&self,
stat: Stat,
epoch_option: EpochOption,
) -> StorageResult<Option<Self::EpochId>> {
Expand Down Expand Up @@ -270,11 +265,9 @@ impl CostModelStorageLayer for BackendManager {
}
};
// 1. Insert into attr_stats and related junction tables.
let mut insert_new_epoch = false;
let epoch_id = match epoch_option {
EpochOption::Existed(e) => e,
EpochOption::New(source, data) => {
insert_new_epoch = true;
let new_event = event::ActiveModel {
source_variant: sea_orm::ActiveValue::Set(source),
timestamp: sea_orm::ActiveValue::Set(Utc::now()),
Expand Down Expand Up @@ -317,19 +310,11 @@ impl CostModelStorageLayer for BackendManager {
.exec(&transaction)
.await?;

// TODO(lanlou): consider the update conflict for latest_epoch_id in multiple threads
// Assume the txn fails to commit, and the epoch_id is updated. But the epoch_id
// is always increasing and won't be overwritten even if the record is deleted, it
// might be fine.
if insert_new_epoch {
self.latest_epoch_id
.store(epoch_id as usize, std::sync::atomic::Ordering::Relaxed);
}

transaction.commit().await?;
Ok(Some(epoch_id))
}

/// TODO: documentation
async fn store_expr_stats_mappings(
&self,
expr_id: Self::ExprId,
Expand All @@ -350,6 +335,7 @@ impl CostModelStorageLayer for BackendManager {
Ok(())
}

/// TODO: documentation
async fn get_stats_for_table(
&self,
table_id: i32,
Expand Down Expand Up @@ -377,6 +363,7 @@ impl CostModelStorageLayer for BackendManager {
}
}

/// TODO: documentation
async fn get_stats_for_attr(
&self,
mut attr_ids: Vec<Self::AttrId>,
Expand Down Expand Up @@ -414,6 +401,7 @@ impl CostModelStorageLayer for BackendManager {
}
}

/// TODO: documentation
async fn get_cost_analysis(
&self,
expr_id: Self::ExprId,
Expand All @@ -440,6 +428,7 @@ impl CostModelStorageLayer for BackendManager {
Ok(cost.map(|c| c.cost))
}

/// TODO: documentation
async fn store_cost(
&self,
physical_expression_id: Self::ExprId,
Expand Down
6 changes: 2 additions & 4 deletions optd-persistent/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(dead_code)]

use std::{cell::LazyCell, sync::atomic::AtomicUsize};
use std::cell::LazyCell;

use sea_orm::*;
use sea_orm_migration::prelude::*;
Expand Down Expand Up @@ -47,7 +47,7 @@ pub enum CostModelError {
pub enum BackendError {
CostModel(CostModelError),
Database(DbErr),
// Add other variants as needed for different error types
// TODO: Add other variants as needed for different error types
}

impl From<CostModelError> for BackendError {
Expand All @@ -64,15 +64,13 @@ impl From<DbErr> for BackendError {

pub struct BackendManager {
db: DatabaseConnection,
latest_epoch_id: AtomicUsize,
}

impl BackendManager {
/// Creates a new `BackendManager`.
pub async fn new(database_url: Option<&str>) -> StorageResult<Self> {
Ok(Self {
db: Database::connect(database_url.unwrap_or(DATABASE_URL)).await?,
latest_epoch_id: AtomicUsize::new(0),
})
}
}
Expand Down
Loading