Skip to content

Commit

Permalink
Add prepared queries and tiny benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
Einliterflasche committed Sep 25, 2023
1 parent 3aff64e commit 02d10fc
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 54 deletions.
7 changes: 6 additions & 1 deletion pg-worm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ uuid = ["tokio-postgres/with-uuid-1", "pg-worm-derive/uuid"]
time = ["tokio-postgres/with-time-0_3", "pg-worm-derive/time"]

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
criterion = { version = "0.5", features = ["async_tokio"] }
tokio = { version = "1", features = ["full"] }

[[bench]]
name = "bench"
harness = false
49 changes: 49 additions & 0 deletions pg-worm/benches/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::runtime::Runtime;

use pg_worm::{prelude::*, query::Prepared};

#[allow(dead_code)]
#[derive(Model)]
struct Book {
#[column(primary_key, auto)]
id: i64,
title: String,
author_id: i64,
}

#[allow(dead_code)]
#[derive(Model)]
struct Author {
id: i64,
title: String,
}

fn setup() {
// Use the tokio runtime to complete the setup in an async block
tokio::runtime::Runtime::new().unwrap().block_on(async {
Connection::build("postgres://postgres:postgres@localhost:5432")
.connect()
.await
.expect("benchmark setup: failed to connect");

force_create_table!(Book, Author)
.await
.expect("benchmark setup: failed to create tables");
});
}

fn bench_main(criterion: &mut Criterion) {
setup();

criterion.bench_function("all-books", |bench| {
bench
.to_async(Runtime::new().expect("failed to create runtime"))
.iter(|| async {
Book::select().await.expect("failed to query");
})
});
}

criterion_group!(benches, bench_main);
criterion_main!(benches);
6 changes: 4 additions & 2 deletions pg-worm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ and you are ready to go!
Here's a quick example:
```ignore
```
// Import the prelude to get started quickly
use pg_worm::prelude::*;
Expand All @@ -51,7 +51,9 @@ struct Author {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// First create a connection. This can be only done once.
Connection::build("postgres://postgres:postgres@localhost:5432").connect()?;
Connection::build("postgres://postgres:postgres@localhost:5432")
.connect()
.await?;
// Then, create tables for your models.
// Use `try_create_table!` if you want to fail if a
Expand Down
25 changes: 15 additions & 10 deletions pg-worm/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,18 @@ pub async fn fetch_prepared_client() -> Result<&'static Client, Error> {
PREPARED_CLIENT.get().ok_or(Error::NotConnected)
}

/// This functions
#[doc(hidden)]
#[inline]
pub async fn ensure_prepared(statement: &str) -> Result<(), Error> {
let is_prepared = PREPARED_STATEMENTS
pub async fn ensure_prepared(statement: &str) -> Result<Statement, Error> {
let prepared_stmt = PREPARED_STATEMENTS
.lock()
.map_err(|_| Error::NotConnected)?
.contains_key(statement);
.get(statement)
.cloned();

if is_prepared {
return Ok(());
if let Some(statement) = prepared_stmt {
return Ok(statement);
}

let prepared_stmt = fetch_prepared_client().await?.prepare(statement).await?;
Expand All @@ -92,14 +94,17 @@ pub async fn ensure_prepared(statement: &str) -> Result<(), Error> {
PREPARED_STATEMENTS
.lock()
.map_err(|_| Error::NotConnected)?
.insert(owned_stmt, prepared_stmt);
.insert(owned_stmt, prepared_stmt.clone());

Ok(())
Ok(prepared_stmt)
}

/// Hidden function so set the pool from the `config` module.
#[doc(hidden)]
pub fn set_pool(pool: Pool) -> Result<(), Error> {
pub async fn set_pool(pool: Pool) -> Result<(), Error> {
PREPARED_CLIENT
.set(pool.get().await.map_err(|_| Error::ConnectionBuildError)?)
.map_err(|_| Error::AlreadyConnected)?;
POOL.set(pool).map_err(|_| Error::AlreadyConnected)
}

Expand All @@ -115,7 +120,7 @@ impl Connection {
impl ConnectionBuilder {
/// Finish building and set up the pool. Does not actually connect until
/// the first `Client`s are retrieved.
pub fn connect(self) -> Result<(), Error> {
pub async fn connect(self) -> Result<(), Error> {
let pg_config =
PgConfig::from_str(&self.conn_string).map_err(|_| Error::InvalidPoolConfig)?;

Expand All @@ -124,7 +129,7 @@ impl ConnectionBuilder {
let pool = Pool::builder(manager)
.build()
.map_err(|_| Error::InvalidPoolConfig)?;
set_pool(pool)
set_pool(pool).await
}

/// Set the maximum amount of Connections in the pool.
Expand Down
99 changes: 60 additions & 39 deletions pg-worm/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
use async_trait::async_trait;
use tokio_postgres::{types::ToSql, Row, Transaction as PgTransaction};

use crate::{fetch_client, Client, Error, FromRow};
use crate::{fetch_client, pool::fetch_prepared_client, Client, Error, FromRow};

pub use delete::Delete;
pub use select::Select;
Expand Down Expand Up @@ -61,45 +61,11 @@ pub trait QueryOutcome: Sized {
) -> Result<Self, crate::Error>;
}

///
#[async_trait]
impl QueryOutcome for u64 {
async fn exec_with(
statement: &str,
params: &[&(dyn ToSql + Sync)],
client: impl Executor + Sync + Send,
) -> Result<u64, crate::Error> {
client.execute(statement, params).await
}
}

#[async_trait]
impl<T> QueryOutcome for Vec<T>
where
T: FromRow,
{
async fn exec_with(
statement: &str,
params: &[&(dyn ToSql + Sync)],
client: impl Executor + Sync + Send,
) -> Result<Vec<T>, crate::Error> {
let res = client.query(statement, params).await?;
res.into_iter().map(T::try_from).collect()
}
}

#[async_trait]
impl<T> QueryOutcome for Option<T>
where
T: FromRow,
{
async fn exec_with(
statement: &str,
params: &[&(dyn ToSql + Sync)],
client: impl Executor + Sync + Send,
) -> Result<Option<T>, crate::Error> {
let res = client.query(statement, params).await?;
res.into_iter().map(T::try_from).next().transpose()
}
pub trait Prepared<T>: Sized {
///
async fn prepared(self) -> Result<T, crate::Error>;
}

/// A struct for storing a complete query along with
Expand Down Expand Up @@ -154,6 +120,20 @@ pub enum Where<'a> {
Empty,
}

#[async_trait]
impl<'a, T, U> Prepared<U> for T
where
Query<'a, U>: From<T>,
U: QueryOutcome + Send + Sync,
T: Send + Sync,
{
async fn prepared(self) -> Result<U, crate::Error> {
let client = fetch_prepared_client().await?;
let q: Query<'a, U> = self.into();
U::exec_with(&q.0, q.1.as_slice(), client).await
}
}

/// Replace all `?` placeholders with the Postgres variant
/// `$1`, `$2`, etc.
fn replace_question_marks(stmt: String) -> String {
Expand Down Expand Up @@ -183,6 +163,47 @@ fn replace_question_marks(stmt: String) -> String {
buf
}

#[async_trait]
impl QueryOutcome for u64 {
async fn exec_with(
statement: &str,
params: &[&(dyn ToSql + Sync)],
client: impl Executor + Sync + Send,
) -> Result<u64, crate::Error> {
client.execute(statement, params).await
}
}

#[async_trait]
impl<T> QueryOutcome for Vec<T>
where
T: FromRow,
{
async fn exec_with(
statement: &str,
params: &[&(dyn ToSql + Sync)],
client: impl Executor + Sync + Send,
) -> Result<Vec<T>, crate::Error> {
let res = client.query(statement, params).await?;
res.into_iter().map(T::try_from).collect()
}
}

#[async_trait]
impl<T> QueryOutcome for Option<T>
where
T: FromRow,
{
async fn exec_with(
statement: &str,
params: &[&(dyn ToSql + Sync)],
client: impl Executor + Sync + Send,
) -> Result<Option<T>, crate::Error> {
let res = client.query(statement, params).await?;
res.into_iter().map(T::try_from).next().transpose()
}
}

impl<'a, T> Default for Query<'a, T> {
fn default() -> Self {
Self("".into(), vec![], PhantomData::<T>)
Expand Down
6 changes: 4 additions & 2 deletions pg-worm/tests/connect.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(dead_code)]

use pg_worm::prelude::*;
use pg_worm::{prelude::*, query::Prepared};

#[derive(Model)]
struct Book {
Expand All @@ -24,7 +24,8 @@ async fn complete_procedure() -> Result<(), pg_worm::Error> {
// First create a connection. This can be only done once.
Connection::build("postgres://postgres:postgres@localhost:5432")
.max_pool_size(16)
.connect()?;
.connect()
.await?;
println!("Hello World!");

// Then, create the tables for your models.
Expand Down Expand Up @@ -63,6 +64,7 @@ async fn complete_procedure() -> Result<(), pg_worm::Error> {
.where_(Book::title.eq(&"The Communist Manifesto".into()))
.where_(Book::pages.contains(&"You have nothing to lose but your chains!".into()))
.where_(Book::id.gt(&3))
.prepared()
.await?;
assert!(manifesto.is_none());

Expand Down

0 comments on commit 02d10fc

Please sign in to comment.