Skip to content

Commit

Permalink
feat(cat-gateway): Add simple signed_docs select query, update inse…
Browse files Browse the repository at this point in the history
…rt query. (#1393)

* add upsert signed docs query

* add integration tests for signed docs query

* wip

* fix

* wip

* wip

* wip

* remake query from upsert to insert

* add simple select signed docs query

* cleanup jinja, add jinit initialization

* update tests

* fix

* fix spelling

* add advanced select query

* fix docs

* add QueryLimits

* wip

* wip

* update tests

* fix docs

* fix cardano-chain-follower dep, remove init of jinja

* refactor QueryLimits struct

* revert select_signed_docs query

* refactor QueryLimits

* wip

* rename QueryLimits methods

* replace `new` with TryFrom impl

* wip

* wip

* wip

* refactor

* fix fmt

* wip

* move docs query filters into different mod

* refactor `filtered_select_signed_docs` function

* wip

* replace query_stmt with trait Display impl

* rename methods

* add query_stream fn

* fix clippy

* fix test

* get rid of usage of `pin!`

---------

Co-authored-by: Steven Johnson <[email protected]>
  • Loading branch information
Mr-Leshiy and stevenj authored Jan 3, 2025
1 parent 2a42d8a commit 6b25bd9
Show file tree
Hide file tree
Showing 17 changed files with 618 additions and 77 deletions.
1 change: 1 addition & 0 deletions .config/dictionaries/project.dic
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ metamap
mgrybyk
miniaturizable
minicbor
minijinja
mithril
mitigations
mocktail
Expand Down
1 change: 1 addition & 0 deletions catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ jsonschema = "0.26.1"
bech32 = "0.11.0"
const_format = "0.2.33"
regex = "1.11.1"
minijinja = "2.5.0"

[dev-dependencies]
proptest = "1.5.0"
Expand Down
3 changes: 3 additions & 0 deletions catalyst-gateway/bin/src/db/event/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Reusable common database objects
pub(crate) mod query_limits;
60 changes: 60 additions & 0 deletions catalyst-gateway/bin/src/db/event/common/query_limits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! `QueryLimits` query argument object.
#![allow(dead_code)]

use std::fmt::Display;

use crate::service::common::types::generic::query::pagination::{Limit, Page};

/// A query limits struct.
pub(crate) struct QueryLimits(QueryLimitsInner);

/// `QueryLimits` inner enum representation.
enum QueryLimitsInner {
/// Return all entries without any `LIMIT` and `OFFSET` parameters
All,
/// Specifies `LIMIT` parameter
Limit(u64),
/// Specifies `LIMIT` and `OFFSET` parameters
LimitAndOffset(u64, u64),
}

impl Display for QueryLimits {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.0 {
QueryLimitsInner::All => write!(f, ""),
QueryLimitsInner::Limit(limit) => write!(f, "LIMIT {limit}"),
QueryLimitsInner::LimitAndOffset(limit, offset) => {
write!(f, "LIMIT {limit} OFFSET {offset}")
},
}
}
}

impl QueryLimits {
/// Create a `QueryLimits` object without the any limits.
pub(crate) const ALL: QueryLimits = Self(QueryLimitsInner::All);
/// Create a `QueryLimits` object with the limit equals to `1`.
pub(crate) const ONE: QueryLimits = Self(QueryLimitsInner::Limit(1));

/// Create a `QueryLimits` object from the service `Limit` and `Page` values.
///
/// # Errors
/// - Invalid `limit` value, must be more than `0`.
/// - Invalid arguments, `limit` must be provided when `page` is not None.
pub(crate) fn new(limit: Option<Limit>, page: Option<Page>) -> anyhow::Result<Self> {
match (limit, page) {
(Some(limit), Some(page)) => {
Ok(Self(QueryLimitsInner::LimitAndOffset(
limit.into(),
page.into(),
)))
},
(Some(limit), None) => Ok(Self(QueryLimitsInner::Limit(limit.into()))),
(None, None) => Ok(Self(QueryLimitsInner::All)),
(None, Some(_)) => {
anyhow::bail!("Invalid arguments, `limit` must be provided when `page` is not None")
},
}
}
}
37 changes: 33 additions & 4 deletions catalyst-gateway/bin/src/db/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ use std::{

use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use error::NotFoundError;
use futures::{Stream, StreamExt, TryStreamExt};
use tokio_postgres::{types::ToSql, NoTls, Row};
use tracing::{debug, debug_span, error, Instrument};

use crate::settings::Settings;

pub(crate) mod common;
pub(crate) mod config;
pub(crate) mod error;
pub(crate) mod legacy;
Expand Down Expand Up @@ -71,11 +74,11 @@ impl EventDB {
///
/// # Returns
///
/// `Result<Vec<Row>, anyhow::Error>`
/// `anyhow::Result<Vec<Row>>`
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<Row>, anyhow::Error> {
) -> anyhow::Result<Vec<Row>> {
if Self::is_deep_query_enabled() {
Self::explain_analyze_rollback(stmt, params).await?;
}
Expand All @@ -85,6 +88,32 @@ impl EventDB {
Ok(rows)
}

/// Query the database and return a async stream of rows.
///
/// If deep query inspection is enabled, this will log the query plan inside a
/// rolled-back transaction, before running the query.
///
/// # Arguments
///
/// * `stmt` - `&str` SQL statement.
/// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
///
/// # Returns
///
/// `anyhow::Result<impl Stream<Item = anyhow::Result<Row>>>`
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query_stream(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
if Self::is_deep_query_enabled() {
Self::explain_analyze_rollback(stmt, params).await?;
}
let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
let conn = pool.get().await?;
let rows = conn.query_raw(stmt, params.iter().copied()).await?;
Ok(rows.map_err(Into::into).boxed())
}

/// Query the database for a single row.
///
/// # Arguments
Expand All @@ -98,13 +127,13 @@ impl EventDB {
#[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
pub(crate) async fn query_one(
stmt: &str, params: &[&(dyn ToSql + Sync)],
) -> Result<Row, anyhow::Error> {
) -> anyhow::Result<Row> {
if Self::is_deep_query_enabled() {
Self::explain_analyze_rollback(stmt, params).await?;
}
let pool = EVENT_DB_POOL.get().ok_or(Error::DbPoolUninitialized)?;
let conn = pool.get().await?;
let row = conn.query_one(stmt, params).await?;
let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
Ok(row)
}

Expand Down
156 changes: 156 additions & 0 deletions catalyst-gateway/bin/src/db/event/signed_docs/full_signed_doc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//! `FullSignedDoc` struct implementation.
use super::SignedDocBody;
use crate::{
db::event::{EventDB, NotFoundError},
jinja::{get_template, JinjaTemplateSource},
};

/// Insert sql query
const INSERT_SIGNED_DOCS: &str = include_str!("./sql/insert_signed_documents.sql");

/// Select sql query jinja template
pub(crate) const SELECT_SIGNED_DOCS_TEMPLATE: JinjaTemplateSource = JinjaTemplateSource {
name: "select_signed_documents.jinja.template",
source: include_str!("./sql/select_signed_documents.sql.jinja"),
};

/// Full signed doc event db struct
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct FullSignedDoc {
/// Signed doc body
body: SignedDocBody,
/// `signed_doc` table `payload` field
payload: Option<serde_json::Value>,
/// `signed_doc` table `raw` field
raw: Vec<u8>,
}

impl FullSignedDoc {
/// Creates a `FullSignedDoc` instance.
#[allow(dead_code)]
pub(crate) fn new(
body: SignedDocBody, payload: Option<serde_json::Value>, raw: Vec<u8>,
) -> Self {
Self { body, payload, raw }
}

/// Returns the document id.
pub(crate) fn id(&self) -> &uuid::Uuid {
self.body.id()
}

/// Returns the document version.
pub(crate) fn ver(&self) -> &uuid::Uuid {
self.body.ver()
}

/// Returns the document author.
#[allow(dead_code)]
pub(crate) fn author(&self) -> &String {
self.body.author()
}

/// Returns the `SignedDocBody`.
#[allow(dead_code)]
pub(crate) fn body(&self) -> &SignedDocBody {
&self.body
}

/// Uploads a `FullSignedDoc` to the event db.
///
/// Make an insert query into the `event-db` by adding data into the `signed_docs`
/// table.
///
/// * IF the record primary key (id,ver) does not exist, then add the new record.
/// Return success.
/// * IF the record does exist, but all values are the same as stored, return Success.
/// * Otherwise return an error. (Can not over-write an existing record with new
/// data).
///
/// # Arguments:
/// - `id` is a UUID v7
/// - `ver` is a UUID v7
/// - `doc_type` is a UUID v4
#[allow(dead_code)]
pub(crate) async fn store(&self) -> anyhow::Result<()> {
match Self::retrieve(self.id(), Some(self.ver())).await {
Ok(res_doc) => {
anyhow::ensure!(
&res_doc == self,
"Document with the same `id` and `ver` already exists"
);
return Ok(());
},
Err(err) if err.is::<NotFoundError>() => {},
Err(err) => return Err(err),
}

EventDB::modify(INSERT_SIGNED_DOCS, &self.postgres_db_fields()).await?;
Ok(())
}

/// Loads a `FullSignedDoc` from the event db.
///
/// Make a select query into the `event-db` by getting data from the `signed_docs`
/// table.
///
/// * This returns a single document. All data from the document is returned,
/// including the `payload` and `raw` fields.
/// * `ver` should be able to be optional, in which case get the latest ver of the
/// given `id`.
///
/// # Arguments:
/// - `id` is a UUID v7
/// - `ver` is a UUID v7
#[allow(dead_code)]
pub(crate) async fn retrieve(
id: &uuid::Uuid, ver: Option<&uuid::Uuid>,
) -> anyhow::Result<Self> {
let query_template = get_template(&SELECT_SIGNED_DOCS_TEMPLATE)?;
let query = query_template.render(serde_json::json!({
"id": id,
"ver": ver,
}))?;
let row = EventDB::query_one(&query, &[]).await?;

Self::from_row(id, ver, &row)
}

/// Returns all signed document fields for the event db queries
fn postgres_db_fields(&self) -> [&(dyn tokio_postgres::types::ToSql + Sync); 7] {
let body_fields = self.body.postgres_db_fields();
[
body_fields[0],
body_fields[1],
body_fields[2],
body_fields[3],
body_fields[4],
&self.payload,
&self.raw,
]
}

/// Creates a `FullSignedDoc` from postgresql row object.
fn from_row(
id: &uuid::Uuid, ver: Option<&uuid::Uuid>, row: &tokio_postgres::Row,
) -> anyhow::Result<Self> {
let ver = if let Some(ver) = ver {
*ver
} else {
row.try_get("ver")?
};

Ok(FullSignedDoc {
body: SignedDocBody::new(
*id,
ver,
row.try_get("type")?,
row.try_get("author")?,
row.try_get("metadata")?,
),
payload: row.try_get("payload")?,
raw: row.try_get("raw")?,
})
}
}
34 changes: 7 additions & 27 deletions catalyst-gateway/bin/src/db/event/signed_docs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,12 @@
//! Signed docs queries
mod full_signed_doc;
mod query_filter;
mod signed_doc_body;
#[cfg(test)]
mod tests;

use super::EventDB;

/// Insert sql query
const INSERT_SIGNED_DOCS: &str = include_str!("./sql/insert_signed_documents.sql");

/// Make an insert query into the `event-db` by adding data into the `signed_docs` table
///
/// * IF the record primary key (id,ver) does not exist, then add the new record. Return
/// success.
/// * IF the record does exist, but all values are the same as stored, return Success.
/// * Otherwise return an error. (Can not over-write an existing record with new data).
///
/// # Arguments:
/// - `id` is a UUID v7
/// - `ver` is a UUID v7
/// - `doc_type` is a UUID v4
#[allow(dead_code)]
pub(crate) async fn insert_signed_docs(
id: &uuid::Uuid, ver: &uuid::Uuid, doc_type: &uuid::Uuid, author: &String,
metadata: &Option<serde_json::Value>, payload: &Option<serde_json::Value>, raw: &Vec<u8>,
) -> anyhow::Result<()> {
EventDB::modify(INSERT_SIGNED_DOCS, &[
id, ver, doc_type, author, metadata, payload, raw,
])
.await?;
Ok(())
}
#[allow(unused_imports)]
pub(crate) use full_signed_doc::{FullSignedDoc, SELECT_SIGNED_DOCS_TEMPLATE};
pub(crate) use query_filter::DocsQueryFilter;
pub(crate) use signed_doc_body::{SignedDocBody, FILTERED_SELECT_SIGNED_DOCS_TEMPLATE};
32 changes: 32 additions & 0 deletions catalyst-gateway/bin/src/db/event/signed_docs/query_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! `DocsQueryFilter` struct implementation.
use std::fmt::Display;

/// A `select_signed_docs` query filtering argument.
#[allow(dead_code)]
pub(crate) enum DocsQueryFilter {
/// All entries
All,
/// Select docs with the specific `type` field
DocType(uuid::Uuid),
/// Select docs with the specific `id` field
DocId(uuid::Uuid),
/// Select docs with the specific `id` and `ver` field
DocVer(uuid::Uuid, uuid::Uuid),
/// Select docs with the specific `author` field
Author(String),
}

impl Display for DocsQueryFilter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::All => write!(f, "TRUE"),
Self::DocType(doc_type) => write!(f, "signed_docs.type = '{doc_type}'"),
Self::DocId(id) => write!(f, "signed_docs.id = '{id}'"),
Self::DocVer(id, ver) => {
write!(f, "signed_docs.id = '{id}' AND signed_docs.ver = '{ver}'")
},
Self::Author(author) => write!(f, "signed_docs.author = '{author}'"),
}
}
}
Loading

0 comments on commit 6b25bd9

Please sign in to comment.