Skip to content

Commit

Permalink
WIP migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
bshifter committed Dec 6, 2024
1 parent 5e08999 commit 327b9d5
Show file tree
Hide file tree
Showing 16 changed files with 442 additions and 6 deletions.
74 changes: 68 additions & 6 deletions common/src/database/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ use crate::transformers::output_files_use_path::new;

use super::schema::{Migration, MigrationBase, Version};

const MIGRATION_TABLE_NAME: &str = "__schema_migrations";

#[async_trait]
pub trait RedisMigration: Migration {
/// Called when this migration is to be executed.
Expand Down Expand Up @@ -111,6 +113,18 @@ impl RedisDatabase {

Ok(db)
}

/// Register a migration. If a migration with the same version is already registered, a warning
/// is logged and the registration fails.
pub fn register_migration(&mut self, migration: Arc<dyn RedisMigration + Send + Sync>) {
let version = migration.version();
if let Vacant(e) = self.migrations.entry(version) {
e.insert(migration);
} else {
warn!("Migration with version {:?} is already registered", version);
}
}

async fn get_heartbeats_by_field(
&self,
fields: HashMap<RedisDomain, String>
Expand Down Expand Up @@ -457,28 +471,76 @@ impl Database for RedisDatabase {
Ok(())
}

/// Fails if `setup_schema` hasn't previously been called or if the query otherwise fails.
async fn current_version(&self) -> Result<Option<Version>> {
todo!()
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
let key = MIGRATION_TABLE_NAME;
let versions:Vec<String> = conn.zrange(key, -1, -1).await.context("There is no version info stored in DB.")?;
let last_version = versions.last().and_then(|v| v.parse::<i64>().ok());
Ok(last_version)
}

/// Fails if `setup_schema` hasn't previously been called or if the query otherwise fails.
async fn migrated_versions(&self) -> Result<BTreeSet<Version>> {
todo!()
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
let key = MIGRATION_TABLE_NAME;
let versions:Vec<String> = conn.zrange(key, 0, -1).await.context("There is no version info stored in DB.")?;
let result : BTreeSet<i64> = versions.into_iter().map(|v| v.parse::<i64>().context(format!("Failed to parse version: {}", v))).collect::<Result<_>>()?;
Ok(result)
}

/// Fails if `setup_schema` hasn't previously been called or if the migration otherwise fails.
async fn apply_migration(&self, version: Version) -> Result<()> {
todo!()
let migration = self
.migrations
.get(&version)
.ok_or_else(|| anyhow!("Could not retrieve migration with version {}", version))?
.clone();
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
migration.up(&mut conn).await?;
let key = MIGRATION_TABLE_NAME;
let version = migration.version();
let added_count: i64 = conn.zadd(key, version, version).await.context(format!("Unable to add version: {}", version))?;
if added_count > 0 {
println!("Successfully added version {} to sorted set", version);
} else {
println!("Version {} was not added (it may already exist)", version);
}
Ok(())
}

/// Fails if `setup_schema` hasn't previously been called or if the migration otherwise fails.
async fn revert_migration(&self, version: Version) -> Result<()> {
todo!()
let migration = self
.migrations
.get(&version)
.ok_or_else(|| anyhow!("Could not retrieve migration with version {}", version))?
.clone();
let mut conn = self.pool.get().await.context("Failed to get Redis connection")?;
migration.down(&mut conn).await?;
let key = MIGRATION_TABLE_NAME;
let version = migration.version();
let removed_count: i64 = conn.zrem(key, version).await.context("Failed to remove version")?;
if removed_count > 0 {
println!("Successfully removed version: {}", version);
} else {
println!("Version {} not found in the sorted set.", version);
}
Ok(())
}

/// Create the tables required to keep track of schema state. If the tables already
/// exist, this function has no operation.
async fn setup_schema(&self) -> Result<()> {
todo!()
Ok(())
}

async fn migrations(&self) -> BTreeMap<Version, Arc<dyn Migration + Send + Sync>> {
todo!()
let mut base_migrations = BTreeMap::new();
for (version, migration) in self.migrations.iter() {
base_migrations.insert(*version, migration.to_base());
}
base_migrations
}

async fn get_stats(
Expand Down
1 change: 1 addition & 0 deletions common/src/database/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use super::Database;

pub mod postgres;
pub mod sqlite;
pub mod redis;

/// The version type alias used to uniquely reference migrations.
pub type Version = i64;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use redis::AsyncCommands;
use crate::database::redis::RedisMigration;
use crate::database::redisdomain::RedisDomain;
use crate::migration;
use deadpool_redis::*;

pub(super) struct CreateSubscriptionsTable;
migration!(CreateSubscriptionsTable, 1, "create subscriptions table");

#[async_trait]
impl RedisMigration for CreateSubscriptionsTable {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, conn: &mut Connection) -> Result<()> {
let key = format!("{}:{}:{}", RedisDomain::Subscription, RedisDomain::Any, RedisDomain::Any);
let subs : Vec<String> = conn.keys(key).await.context("Unable to list keys")?;
if !subs.is_empty() {
let _: () = conn.del(subs).await.context("Failed to delete subscription data")?;
}
Ok(())
}
}
26 changes: 26 additions & 0 deletions common/src/database/schema/redis/_002_create_bookmarks_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use crate::database::redis::RedisMigration;
use crate::database::redisdomain::RedisDomain;
use crate::migration;
use deadpool_redis::*;
use redis::AsyncCommands;

pub(super) struct CreateBookmarksTable;
migration!(CreateBookmarksTable, 2, "create bookmarks table");

#[async_trait]
impl RedisMigration for CreateBookmarksTable {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, conn: &mut Connection) -> Result<()> {
let key = format!("{}:{}:{}", RedisDomain::BookMark, RedisDomain::Any, RedisDomain::Any);
let bms : Vec<String> = conn.keys(key).await.context("Unable to list keys")?;
if !bms.is_empty() {
let _: () = conn.del(bms).await.context("Failed to delete bookmark data")?;
}
Ok(())
}
}
26 changes: 26 additions & 0 deletions common/src/database/schema/redis/_003_create_heartbeats_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use crate::database::redis::RedisMigration;
use crate::database::redisdomain::RedisDomain;
use crate::migration;
use deadpool_redis::*;
use redis::AsyncCommands;

pub(super) struct CreateHeartbeatsTable;
migration!(CreateHeartbeatsTable, 3, "create heartbeats table");

#[async_trait]
impl RedisMigration for CreateHeartbeatsTable {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, conn: &mut Connection) -> Result<()> {
let key = format!("{}:{}:{}", RedisDomain::Heartbeat, RedisDomain::Any, RedisDomain::Any);
let hbs : Vec<String> = conn.keys(key).await.context("Unable to list keys")?;
if !hbs.is_empty() {
let _: () = conn.del(hbs).await.context("Failed to delete hearthbeat data")?;
}
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use anyhow::Result;
use async_trait::async_trait;
use crate::database::redis::RedisMigration;
use crate::migration;
use deadpool_redis::*;

pub(super) struct AddLastEventSeenFieldInHeartbeatsTable;
migration!(
AddLastEventSeenFieldInHeartbeatsTable,
4,
"add last_event_seen field in heartbeats table"
);

#[async_trait]
impl RedisMigration for AddLastEventSeenFieldInHeartbeatsTable {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use anyhow::Result;
use async_trait::async_trait;
use crate::database::redis::RedisMigration;
use crate::migration;
use deadpool_redis::*;

pub(super) struct AddUriFieldInSubscriptionsTable;
migration!(
AddUriFieldInSubscriptionsTable,
5,
"add uri field in subscriptions table"
);

#[async_trait]
impl RedisMigration for AddUriFieldInSubscriptionsTable {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use anyhow::Result;
use async_trait::async_trait;
use crate::database::redis::RedisMigration;
use crate::migration;
use deadpool_redis::*;

pub(super) struct AddContentFormatFieldInSubscriptionsTable;
migration!(
AddContentFormatFieldInSubscriptionsTable,
6,
"add content_format field in subscriptions table"
);

#[async_trait]
impl RedisMigration for AddContentFormatFieldInSubscriptionsTable {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use anyhow::Result;
use async_trait::async_trait;
use crate::database::redis::RedisMigration;
use crate::migration;
use deadpool_redis::*;

pub(super) struct AddIgnoreChannelErrorFieldInSubscriptionsTable;
migration!(
AddIgnoreChannelErrorFieldInSubscriptionsTable,
7,
"add ignore_channel_error field in subscriptions table"
);

#[async_trait]
impl RedisMigration for AddIgnoreChannelErrorFieldInSubscriptionsTable {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use anyhow::Result;
use async_trait::async_trait;
use crate::database::redis::RedisMigration;
use crate::migration;
use deadpool_redis::*;

pub(super) struct AddPrincsFilterFieldsInSubscriptionsTable;
migration!(
AddPrincsFilterFieldsInSubscriptionsTable,
8,
"add princs_filter fields in subscriptions table"
);

#[async_trait]
impl RedisMigration for AddPrincsFilterFieldsInSubscriptionsTable {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}
}
19 changes: 19 additions & 0 deletions common/src/database/schema/redis/_009_alter_outputs_format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use anyhow::Result;
use async_trait::async_trait;
use crate::database::redis::RedisMigration;
use crate::migration;
use deadpool_redis::*;

pub(super) struct AlterOutputsFormat;
migration!(AlterOutputsFormat, 9, "alter outputs format");

#[async_trait]
impl RedisMigration for AlterOutputsFormat {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use anyhow::Result;
use async_trait::async_trait;
use crate::database::redis::RedisMigration;
use crate::migration;
use deadpool_redis::*;

pub(super) struct AddRevisionFieldInSubscriptionsTable;
migration!(
AddRevisionFieldInSubscriptionsTable,
10,
"add revision field in subscriptions table"
);

#[async_trait]
impl RedisMigration for AddRevisionFieldInSubscriptionsTable {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use anyhow::Result;
use async_trait::async_trait;
use crate::database::redis::RedisMigration;
use crate::migration;
use deadpool_redis::*;

pub(super) struct AddLocaleFieldsInSubscriptionsTable;
migration!(
AddLocaleFieldsInSubscriptionsTable,
11,
"add locale fields in subscriptions table"
);

#[async_trait]
impl RedisMigration for AddLocaleFieldsInSubscriptionsTable {
async fn up(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}

async fn down(&self, _conn: &mut Connection) -> Result<()> {
Ok(())
}
}
Loading

0 comments on commit 327b9d5

Please sign in to comment.