Skip to content

Commit

Permalink
fix(pool): tweaks and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
abonander committed Nov 11, 2024
1 parent 80831e8 commit d671b98
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 31 deletions.
2 changes: 1 addition & 1 deletion sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pin-project-lite = "0.2.14"

[dev-dependencies]
sqlx = { workspace = true, features = ["postgres", "sqlite", "mysql", "migrate", "macros", "time", "uuid"] }
tokio = { version = "1", features = ["rt"] }
tokio = { version = "1", features = ["rt", "sync"] }

[lints]
workspace = true
16 changes: 9 additions & 7 deletions sqlx-core/src/pool/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::pool::PoolConnection;
use crate::rt::JoinHandle;
use crate::Error;
use ease_off::EaseOff;
use event_listener::Event;
use event_listener::{listener, Event};
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::ptr;
Expand Down Expand Up @@ -50,7 +50,7 @@ use std::io;
/// let database_url = database_url.clone();
/// async move {
/// println!(
/// "opening connection {}, attempt {}; elapsed time: {}",
/// "opening connection {}, attempt {}; elapsed time: {:?}",
/// meta.pool_size,
/// meta.num_attempts + 1,
/// meta.start.elapsed()
Expand Down Expand Up @@ -96,10 +96,10 @@ use std::io;
///
/// let pool = PgPoolOptions::new()
/// .connect_with_connector(move |meta: PoolConnectMetadata| {
/// let connect_opts_ = connect_opts.clone();
/// let connect_opts = connect_opts_.clone();
/// async move {
/// println!(
/// "opening connection {}, attempt {}; elapsed time: {}",
/// "opening connection {}, attempt {}; elapsed time: {:?}",
/// meta.pool_size,
/// meta.num_attempts + 1,
/// meta.start.elapsed()
Expand Down Expand Up @@ -318,7 +318,8 @@ impl ConnectionCounter {

pub async fn drain(&self) {
while self.count.load(Ordering::Acquire) > 0 {
self.connect_available.listen().await;
listener!(self.connect_available => permit_released);
permit_released.await;
}
}

Expand Down Expand Up @@ -386,13 +387,14 @@ impl ConnectionCounter {
return acquired;
}

self.connect_available.listen().await;

if attempt == 2 {
tracing::warn!(
"unable to acquire a connect permit after sleeping; this may indicate a bug"
);
}

listener!(self.connect_available => connect_available);
connect_available.await;
}

panic!("BUG: was never able to acquire a connection despite waking many times")
Expand Down
5 changes: 4 additions & 1 deletion sqlx-core/src/pool/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use futures_util::FutureExt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use event_listener::listener;

pub struct IdleQueue<DB: Database> {
queue: ArrayQueue<Idle<DB>>,
// Keep a separate count because `ArrayQueue::len()` loops until the head and tail pointers
Expand Down Expand Up @@ -36,7 +38,8 @@ impl<DB: Database> IdleQueue<DB> {

for attempt in 1usize.. {
if should_wait {
self.release_event.listen().await;
listener!(self.release_event => release_event);
release_event.await;
}

if let Some(conn) = self.try_acquire(pool) {
Expand Down
17 changes: 11 additions & 6 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::rt::JoinHandle;
use crate::{private_tracing_dynamic_event, rt};
use either::Either;
use futures_util::future::{self, OptionFuture};
use futures_util::{select, FutureExt};
use futures_util::{FutureExt};
use std::time::{Duration, Instant};
use tracing::Level;

Expand Down Expand Up @@ -77,14 +77,19 @@ impl<DB: Database> PoolInner<DB> {

// Keep clearing the idle queue as connections are released until the count reaches zero.
async move {
let mut drained = pin!(self.counter.drain()).fuse();
let mut drained = pin!(self.counter.drain());

loop {
select! {
idle = self.idle.acquire(self) => {
let mut acquire_idle = pin!(self.idle.acquire(self));

// Not using `futures::select!{}` here because it requires a proc-macro dep,
// and frankly it's a little broken.
match future::select(drained.as_mut(), acquire_idle.as_mut()).await {
// *not* `either::Either`; they rolled their own
future::Either::Left(_) => break,
future::Either::Right((idle, _)) => {
idle.close().await;
},
() = drained.as_mut() => break,
}
}
}
}
Expand Down
31 changes: 15 additions & 16 deletions sqlx-core/src/raw_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub struct RawSql<'q>(&'q str);
///
/// See [MySQL manual, section 13.3.3: Statements That Cause an Implicit Commit](https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html) for details.
/// See also: [MariaDB manual: SQL statements That Cause an Implicit Commit](https://mariadb.com/kb/en/sql-statements-that-cause-an-implicit-commit/).
pub fn raw_sql(sql: &str) -> RawSql<'_> {
pub fn raw_sql<'q>(sql: &'q str) -> RawSql<'q> {
RawSql(sql)
}

Expand All @@ -138,27 +138,26 @@ impl<'q, DB: Database> Execute<'q, DB> for RawSql<'q> {

impl<'q> RawSql<'q> {
/// Execute the SQL string and return the total number of rows affected.
#[inline]
pub async fn execute<'e, E>(
pub async fn execute<'e, 'c: 'e, E>(
self,
executor: E,
) -> crate::Result<<E::Database as Database>::QueryResult>
where
'q: 'e,
E: Executor<'e>,
E: Executor<'c>,
{
executor.execute(self).await
}

/// Execute the SQL string. Returns a stream which gives the number of rows affected for each statement in the string.
#[inline]
pub fn execute_many<'e, E>(
pub fn execute_many<'e, 'c: 'e, E>(
self,
executor: E,
) -> BoxStream<'e, crate::Result<<E::Database as Database>::QueryResult>>
where
'q: 'e,
E: Executor<'e>,
E: Executor<'c>,
{
executor.execute_many(self)
}
Expand All @@ -167,13 +166,13 @@ impl<'q> RawSql<'q> {
///
/// If the string contains multiple statements, their results will be concatenated together.
#[inline]
pub fn fetch<'e, E>(
pub fn fetch<'e, 'c: 'e, E>(
self,
executor: E,
) -> BoxStream<'e, Result<<E::Database as Database>::Row, Error>>
where
'q: 'e,
E: Executor<'e>,
E: Executor<'c>,
{
executor.fetch(self)
}
Expand All @@ -183,7 +182,7 @@ impl<'q> RawSql<'q> {
/// For each query in the stream, any generated rows are returned first,
/// then the `QueryResult` with the number of rows affected.
#[inline]
pub fn fetch_many<'e, E>(
pub fn fetch_many<'e, 'c: 'e, E>(
self,
executor: E,
) -> BoxStream<
Expand All @@ -195,7 +194,7 @@ impl<'q> RawSql<'q> {
>
where
'q: 'e,
E: Executor<'e>,
E: Executor<'c>,
{
executor.fetch_many(self)
}
Expand All @@ -208,13 +207,13 @@ impl<'q> RawSql<'q> {
/// To avoid exhausting available memory, ensure the result set has a known upper bound,
/// e.g. using `LIMIT`.
#[inline]
pub async fn fetch_all<'e, E>(
pub async fn fetch_all<'e, 'c: 'e, E>(
self,
executor: E,
) -> crate::Result<Vec<<E::Database as Database>::Row>>
where
'q: 'e,
E: Executor<'e>,
E: Executor<'c>,
{
executor.fetch_all(self).await
}
Expand All @@ -232,13 +231,13 @@ impl<'q> RawSql<'q> {
///
/// Otherwise, you might want to add `LIMIT 1` to your query.
#[inline]
pub async fn fetch_one<'e, E>(
pub async fn fetch_one<'e, 'c: 'e, E>(
self,
executor: E,
) -> crate::Result<<E::Database as Database>::Row>
where
'q: 'e,
E: Executor<'e>,
E: Executor<'c>,
{
executor.fetch_one(self).await
}
Expand All @@ -256,13 +255,13 @@ impl<'q> RawSql<'q> {
///
/// Otherwise, you might want to add `LIMIT 1` to your query.
#[inline]
pub async fn fetch_optional<'e, E>(
pub async fn fetch_optional<'e, 'c: 'e, E>(
self,
executor: E,
) -> crate::Result<<E::Database as Database>::Row>
where
'q: 'e,
E: Executor<'e>,
E: Executor<'c>,
{
executor.fetch_one(self).await
}
Expand Down

0 comments on commit d671b98

Please sign in to comment.