Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cat-gateway): Add simple signed_docs select query, update insert query. #1393

Merged
merged 49 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
bc762dc
add upsert signed docs query
Mr-Leshiy Dec 12, 2024
8686038
add integration tests for signed docs query
Mr-Leshiy Dec 13, 2024
c966f61
wip
Mr-Leshiy Dec 13, 2024
40d039e
fix
Mr-Leshiy Dec 13, 2024
7e11ced
wip
Mr-Leshiy Dec 13, 2024
dfee564
wip
Mr-Leshiy Dec 13, 2024
0d2dc80
wip
Mr-Leshiy Dec 13, 2024
d14c428
remake query from upsert to insert
Mr-Leshiy Dec 13, 2024
c872de4
add simple select signed docs query
Mr-Leshiy Dec 13, 2024
ea95f13
Merge branch 'main' into feat/select-signed-docs
Mr-Leshiy Dec 13, 2024
facdcb5
cleanup jinja, add jinit initialization
Mr-Leshiy Dec 13, 2024
0c491b0
Merge branch 'main' into feat/select-signed-docs
Mr-Leshiy Dec 13, 2024
4844246
update tests
Mr-Leshiy Dec 13, 2024
7a4c3c2
fix
Mr-Leshiy Dec 13, 2024
2f83960
fix spelling
Mr-Leshiy Dec 13, 2024
a7587c7
add advanced select query
Mr-Leshiy Dec 13, 2024
6bdb60b
fix docs
Mr-Leshiy Dec 14, 2024
d0fd295
add QueryLimits
Mr-Leshiy Dec 16, 2024
c52828e
wip
Mr-Leshiy Dec 16, 2024
d866f71
wip
Mr-Leshiy Dec 16, 2024
9fc43b4
update tests
Mr-Leshiy Dec 16, 2024
b088b06
fix docs
Mr-Leshiy Dec 16, 2024
3f3eccf
fix cardano-chain-follower dep, remove init of jinja
Mr-Leshiy Dec 18, 2024
e1181dd
refactor QueryLimits struct
Mr-Leshiy Dec 18, 2024
c6bb226
revert select_signed_docs query
Mr-Leshiy Dec 18, 2024
d26b843
Merge branch 'main' into feat/select-signed-docs
Mr-Leshiy Dec 18, 2024
5315e59
refactor QueryLimits
Mr-Leshiy Dec 18, 2024
eaa0e36
wip
Mr-Leshiy Dec 18, 2024
e14fe96
rename QueryLimits methods
Mr-Leshiy Dec 19, 2024
b46043a
replace `new` with TryFrom impl
Mr-Leshiy Dec 19, 2024
1dbdd50
wip
Mr-Leshiy Dec 19, 2024
7e09a23
wip
Mr-Leshiy Dec 19, 2024
780c7da
wip
Mr-Leshiy Dec 19, 2024
e0e78ac
refactor
Mr-Leshiy Dec 19, 2024
6904aae
fix fmt
Mr-Leshiy Dec 19, 2024
0b15dd0
wip
Mr-Leshiy Dec 19, 2024
38d1606
move docs query filters into different mod
Mr-Leshiy Dec 19, 2024
b787ae0
refactor `filtered_select_signed_docs` function
Mr-Leshiy Dec 20, 2024
eb1441d
wip
Mr-Leshiy Dec 20, 2024
5e40d91
Merge branch 'main' into feat/select-signed-docs
Mr-Leshiy Dec 20, 2024
8ce23c7
replace query_stmt with trait Display impl
Mr-Leshiy Jan 2, 2025
8c076e7
rename methods
Mr-Leshiy Jan 2, 2025
d97601e
add query_stream fn
Mr-Leshiy Jan 2, 2025
74e2962
fix clippy
Mr-Leshiy Jan 2, 2025
f35f850
Merge branch 'main' into feat/select-signed-docs
Mr-Leshiy Jan 2, 2025
1074e01
fix test
Mr-Leshiy Jan 2, 2025
8af1cef
get rid of usage of `pin!`
Mr-Leshiy Jan 2, 2025
febece0
Merge branch 'main' into feat/select-signed-docs
stevenj Jan 3, 2025
246f4f1
Merge branch 'main' into feat/select-signed-docs
stevenj Jan 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .config/dictionaries/project.dic
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ metamap
mgrybyk
miniaturizable
minicbor
minijinja
mithril
mitigations
mocktail
Expand Down
1 change: 1 addition & 0 deletions catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ jsonschema = "0.26.1"
bech32 = "0.11.0"
const_format = "0.2.33"
regex = "1.11.1"
minijinja = "2.5.0"

[dev-dependencies]
proptest = "1.5.0"
Expand Down
3 changes: 3 additions & 0 deletions catalyst-gateway/bin/src/db/event/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Reusable common database objects

pub(crate) mod query_limits;
60 changes: 60 additions & 0 deletions catalyst-gateway/bin/src/db/event/common/query_limits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! `QueryLimits` query argument object.

#![allow(dead_code)]

use std::fmt::Display;

use crate::service::common::types::generic::query::pagination::{Limit, Page};

/// A query limits struct.
pub(crate) struct QueryLimits(QueryLimitsInner);

/// `QueryLimits` inner enum representation.
enum QueryLimitsInner {
/// Return all entries without any `LIMIT` and `OFFSET` parameters
All,
/// Specifies `LIMIT` parameter
Limit(u64),
/// Specifies `LIMIT` and `OFFSET` parameters
LimitAndOffset(u64, u64),
}

impl Display for QueryLimits {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.0 {
QueryLimitsInner::All => write!(f, ""),
QueryLimitsInner::Limit(limit) => write!(f, "LIMIT {limit}"),
QueryLimitsInner::LimitAndOffset(limit, offset) => {
write!(f, "LIMIT {limit} OFFSET {offset}")
},
}
}
}

impl QueryLimits {
/// Create a `QueryLimits` object without the any limits.
pub(crate) const ALL: QueryLimits = Self(QueryLimitsInner::All);
/// Create a `QueryLimits` object with the limit equals to `1`.
pub(crate) const ONE: QueryLimits = Self(QueryLimitsInner::Limit(1));

/// Create a `QueryLimits` object from the service `Limit` and `Page` values.
///
/// # Errors
/// - Invalid `limit` value, must be more than `0`.
/// - Invalid arguments, `limit` must be provided when `page` is not None.
pub(crate) fn new(limit: Option<Limit>, page: Option<Page>) -> anyhow::Result<Self> {
match (limit, page) {
(Some(limit), Some(page)) => {
Ok(Self(QueryLimitsInner::LimitAndOffset(
limit.into(),
page.into(),
)))
},
(Some(limit), None) => Ok(Self(QueryLimitsInner::Limit(limit.into()))),
(None, None) => Ok(Self(QueryLimitsInner::All)),
(None, Some(_)) => {
anyhow::bail!("Invalid arguments, `limit` must be provided when `page` is not None")
},
}
}
}
37 changes: 33 additions & 4 deletions catalyst-gateway/bin/src/db/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ use std::{

use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use error::NotFoundError;
use futures::{Stream, StreamExt, TryStreamExt};
use tokio_postgres::{types::ToSql, NoTls, Row};
use tracing::{debug, debug_span, error, Instrument};

use crate::settings::Settings;

pub(crate) mod common;
pub(crate) mod config;
pub(crate) mod error;
pub(crate) mod legacy;
Expand Down Expand Up @@ -71,11 +74,11 @@ impl EventDB {
///
/// # Returns
///
/// `Result<Vec<Row>, anyhow::Error>`
/// `anyhow::Result<Vec<Row>>`
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<Row>, anyhow::Error> {
) -> anyhow::Result<Vec<Row>> {
if Self::is_deep_query_enabled() {
Self::explain_analyze_rollback(stmt, params).await?;
}
Expand All @@ -85,6 +88,32 @@ impl EventDB {
Ok(rows)
}

/// Query the database and return a async stream of rows.
///
/// If deep query inspection is enabled, this will log the query plan inside a
/// rolled-back transaction, before running the query.
///
/// # Arguments
///
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
///
/// # Returns
///
/// `anyhow::Result<impl Stream<Item = anyhow::Result<Row>>>`
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query_stream(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
if Self::is_deep_query_enabled() {
Self::explain_analyze_rollback(stmt, params).await?;
}
let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
let conn = pool.get().await?;
let rows = conn.query_raw(stmt, params.iter().copied()).await?;
Ok(rows.map_err(Into::into).boxed())
}

/// Query the database for a single row.
///
/// # Arguments
Expand All @@ -98,13 +127,13 @@ impl EventDB {
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query_one(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> Result<Row, anyhow::Error> {
) -> anyhow::Result<Row> {
if Self::is_deep_query_enabled() {
Self::explain_analyze_rollback(stmt, params).await?;
}
let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
let conn = pool.get().await?;
let row = conn.query_one(stmt, params).await?;
let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
Ok(row)
}

Expand Down
156 changes: 156 additions & 0 deletions catalyst-gateway/bin/src/db/event/signed_docs/full_signed_doc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//! `FullSignedDoc` struct implementation.

use super::SignedDocBody;
use crate::{
db::event::{EventDB, NotFoundError},
jinja::{get_template, JinjaTemplateSource},
};

/// Insert sql query
const INSERT_SIGNED_DOCS: &str = include_str!("./sql/insert_signed_documents.sql");

/// Select sql query jinja template
pub(crate) const SELECT_SIGNED_DOCS_TEMPLATE: JinjaTemplateSource = JinjaTemplateSource {
name: "select_signed_documents.jinja.template",
source: include_str!("./sql/select_signed_documents.sql.jinja"),
};

/// Full signed doc event db struct
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct FullSignedDoc {
/// Signed doc body
body: SignedDocBody,
/// `signed_doc` table `payload` field
payload: Option<serde_json::Value>,
/// `signed_doc` table `raw` field
raw: Vec<u8>,
}

impl FullSignedDoc {
/// Creates a `FullSignedDoc` instance.
#[allow(dead_code)]
pub(crate) fn new(
body: SignedDocBody, payload: Option<serde_json::Value>, raw: Vec<u8>,
) -> Self {
Self { body, payload, raw }
}

/// Returns the document id.
pub(crate) fn id(&self) -> &uuid::Uuid {
self.body.id()
}

/// Returns the document version.
pub(crate) fn ver(&self) -> &uuid::Uuid {
self.body.ver()
}

/// Returns the document author.
#[allow(dead_code)]
pub(crate) fn author(&self) -> &String {
self.body.author()
}

/// Returns the `SignedDocBody`.
#[allow(dead_code)]
pub(crate) fn body(&self) -> &SignedDocBody {
&self.body
}

/// Uploads a `FullSignedDoc` to the event db.
///
/// Make an insert query into the `event-db` by adding data into the `signed_docs`
/// table.
///
/// * IF the record primary key (id,ver) does not exist, then add the new record.
/// Return success.
/// * IF the record does exist, but all values are the same as stored, return Success.
/// * Otherwise return an error. (Can not over-write an existing record with new
/// data).
///
/// # Arguments:
/// - `id` is a UUID v7
/// - `ver` is a UUID v7
/// - `doc_type` is a UUID v4
#[allow(dead_code)]
pub(crate) async fn store(&self) -> anyhow::Result<()> {
match Self::retrieve(self.id(), Some(self.ver())).await {
Ok(res_doc) => {
anyhow::ensure!(
&res_doc == self,
"Document with the same `id` and `ver` already exists"
);
return Ok(());
},
Err(err) if err.is::<NotFoundError>() => {},
Err(err) => return Err(err),
}

EventDB::modify(INSERT_SIGNED_DOCS, &self.postgres_db_fields()).await?;
Ok(())
}

/// Loads a `FullSignedDoc` from the event db.
///
/// Make a select query into the `event-db` by getting data from the `signed_docs`
/// table.
///
/// * This returns a single document. All data from the document is returned,
/// including the `payload` and `raw` fields.
/// * `ver` should be able to be optional, in which case get the latest ver of the
/// given `id`.
///
/// # Arguments:
/// - `id` is a UUID v7
/// - `ver` is a UUID v7
#[allow(dead_code)]
pub(crate) async fn retrieve(
id: &uuid::Uuid, ver: Option<&uuid::Uuid>,
) -> anyhow::Result<Self> {
let query_template = get_template(&SELECT_SIGNED_DOCS_TEMPLATE)?;
let query = query_template.render(serde_json::json!({
"id": id,
"ver": ver,
}))?;
let row = EventDB::query_one(&query, &[]).await?;

Self::from_row(id, ver, &row)
}

/// Returns all signed document fields for the event db queries
fn postgres_db_fields(&self) -> [&(dyn tokio_postgres::types::ToSql + Sync); 7] {
let body_fields = self.body.postgres_db_fields();
[
body_fields[0],
body_fields[1],
body_fields[2],
body_fields[3],
body_fields[4],
&self.payload,
&self.raw,
]
}

/// Creates a `FullSignedDoc` from postgresql row object.
fn from_row(
id: &uuid::Uuid, ver: Option<&uuid::Uuid>, row: &tokio_postgres::Row,
) -> anyhow::Result<Self> {
let ver = if let Some(ver) = ver {
*ver
} else {
row.try_get("ver")?
};

Ok(FullSignedDoc {
body: SignedDocBody::new(
*id,
ver,
row.try_get("type")?,
row.try_get("author")?,
row.try_get("metadata")?,
),
payload: row.try_get("payload")?,
raw: row.try_get("raw")?,
})
}
}
34 changes: 7 additions & 27 deletions catalyst-gateway/bin/src/db/event/signed_docs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,12 @@
//! Signed docs queries

mod full_signed_doc;
mod query_filter;
mod signed_doc_body;
#[cfg(test)]
mod tests;

use super::EventDB;

/// Insert sql query
const INSERT_SIGNED_DOCS: &str = include_str!("./sql/insert_signed_documents.sql");

/// Make an insert query into the `event-db` by adding data into the `signed_docs` table
///
/// * IF the record primary key (id,ver) does not exist, then add the new record. Return
/// success.
/// * IF the record does exist, but all values are the same as stored, return Success.
/// * Otherwise return an error. (Can not over-write an existing record with new data).
///
/// # Arguments:
/// - `id` is a UUID v7
/// - `ver` is a UUID v7
/// - `doc_type` is a UUID v4
#[allow(dead_code)]
pub(crate) async fn insert_signed_docs(
id: &uuid::Uuid, ver: &uuid::Uuid, doc_type: &uuid::Uuid, author: &String,
metadata: &Option<serde_json::Value>, payload: &Option<serde_json::Value>, raw: &Vec<u8>,
) -> anyhow::Result<()> {
EventDB::modify(INSERT_SIGNED_DOCS, &[
id, ver, doc_type, author, metadata, payload, raw,
])
.await?;
Ok(())
}
#[allow(unused_imports)]
pub(crate) use full_signed_doc::{FullSignedDoc, SELECT_SIGNED_DOCS_TEMPLATE};
pub(crate) use query_filter::DocsQueryFilter;
pub(crate) use signed_doc_body::{SignedDocBody, FILTERED_SELECT_SIGNED_DOCS_TEMPLATE};
32 changes: 32 additions & 0 deletions catalyst-gateway/bin/src/db/event/signed_docs/query_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! `DocsQueryFilter` struct implementation.

use std::fmt::Display;

/// A `select_signed_docs` query filtering argument.
#[allow(dead_code)]
pub(crate) enum DocsQueryFilter {
/// All entries
All,
/// Select docs with the specific `type` field
DocType(uuid::Uuid),
/// Select docs with the specific `id` field
DocId(uuid::Uuid),
/// Select docs with the specific `id` and `ver` field
DocVer(uuid::Uuid, uuid::Uuid),
/// Select docs with the specific `author` field
Author(String),
}

impl Display for DocsQueryFilter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::All => write!(f, "TRUE"),
Self::DocType(doc_type) => write!(f, "signed_docs.type = '{doc_type}'"),
Self::DocId(id) => write!(f, "signed_docs.id = '{id}'"),
Self::DocVer(id, ver) => {
write!(f, "signed_docs.id = '{id}' AND signed_docs.ver = '{ver}'")
},
Self::Author(author) => write!(f, "signed_docs.author = '{author}'"),
}
}
}
Loading
Loading