From 7db90ee06ad072b706b8792d22ecc65c48c2a764 Mon Sep 17 00:00:00 2001 From: felipe stival <14948182+v0idpwn@users.noreply.github.com> Date: Fri, 15 Sep 2023 20:52:19 +0300 Subject: [PATCH] Migrate schemas (#115) * Fix not creating is_partitioned column on extensionless * Migrate everything to the pgmq schema * Update python client for extension 0.25 * Update ruby example for pgmq 0.25 * Update README for 0.25 * Change prefixes/sufixes to q_ and a_ prefixes * Rename TABLE_PREFIX constant to QUEUE_PREFIX --- Cargo.lock | 4 +- Cargo.toml | 2 +- README.md | 50 +- core/Cargo.toml | 2 +- core/src/query.rs | 89 +-- core/src/types.rs | 5 +- core/src/util.rs | 4 +- examples/ruby.rb | 12 +- pgmq-rs/Cargo.toml | 2 +- pgmq-rs/sqlx-data.json | 226 +++---- pgmq-rs/src/pg_ext.rs | 44 +- pgmq-rs/src/query.rs | 2 + pgmq-rs/tests/integration_test.rs | 33 +- pgmq-rs/tests/pg_ext_integration_test.rs | 16 +- pgmq.control | 1 + sql/pgmq--0.24.0--0.25.0.sql | 280 ++++++++ src/api.rs | 629 ++++++++++++++++- src/lib.rs | 673 +------------------ src/metrics.rs | 10 +- src/partition.rs | 12 +- src/sql_src.sql | 16 + tembo-pgmq-python/benches/bench.py | 12 +- tembo-pgmq-python/tembo_pgmq_python/queue.py | 16 +- tests/integration_tests.rs | 40 +- 24 files changed, 1225 insertions(+), 955 deletions(-) create mode 100644 sql/pgmq--0.24.0--0.25.0.sql create mode 100644 src/sql_src.sql diff --git a/Cargo.lock b/Cargo.lock index 999d6804..da9b2178 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1243,7 +1243,7 @@ dependencies = [ [[package]] name = "pgmq" -version = "0.24.0" +version = "0.25.0" dependencies = [ "chrono", "pgmq-core", @@ -1260,7 +1260,7 @@ dependencies = [ [[package]] name = "pgmq-core" -version = "0.4.0" +version = "0.5.0" dependencies = [ "chrono", "log", diff --git a/Cargo.toml b/Cargo.toml index 0e957fd9..18558427 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq" -version = "0.24.0" +version = "0.25.0" edition = "2021" authors = ["Tembo.io"] description = "Postgres extension for PGMQ" diff --git a/README.md b/README.md index 9c8f71ec..bb247ecc 100644 --- a/README.md +++ b/README.md @@ -66,16 +66,16 @@ CREATE EXTENSION pgmq; ### Creating a queue -Every queue is its own table in Postgres. The table name is the queue name prefixed with `pgmq_`. - For example, `pgmq_my_queue` is the table for the queue `my_queue`. +Every queue is its own table in Postgres. The table name is the queue name prefixed with `queue_`. +For example, `queue_my_mq` is the table for the queue `my_mq`. ```sql -- creates the queue -SELECT pgmq_create('my_queue'); +SELECT pgmq.create('my_queue'); ``` ```text - pgmq_create + create ------------- (1 row) @@ -85,19 +85,19 @@ SELECT pgmq_create('my_queue'); ```sql -- messages are sent as JSON -SELECT * from pgmq_send('my_queue', '{"foo": "bar1"}'); -SELECT * from pgmq_send('my_queue', '{"foo": "bar2"}'); +SELECT * from pgmq.send('my_queue', '{"foo": "bar1"}'); +SELECT * from pgmq.send('my_queue', '{"foo": "bar2"}'); ``` The message id is returned from the send function. ```text - pgmq_send + send ----------- 1 (1 row) - pgmq_send + send ----------- 2 (1 row) @@ -110,7 +110,7 @@ Read `2` message from the queue. Make them invisible for `30` seconds. and can be read by another consumer. ```sql -SELECT * from pgmq_read('my_queue', 30, 2); +SELECT pgmq.read('my_queue', 30, 2); ``` ```text @@ -123,7 +123,7 @@ SELECT * from pgmq_read('my_queue', 30, 2); If the queue is empty, or if all messages are currently invisible, no rows will be returned. ```sql -SELECT * from pgmq_read('my_queue', 30, 1); +SELECT pgmq.read('my_queue', 30, 1); ``` ```text @@ -135,7 +135,7 @@ SELECT * from pgmq_read('my_queue', 30, 1); ```sql -- Read a message and immediately delete it from the queue. Returns `None` if the queue is empty. -SELECT * from pgmq_pop('my_queue'); +SELECT pgmq.pop('my_queue'); ``` ```text @@ -151,18 +151,18 @@ Archiving a message removes it from the queue, and inserts it to the archive tab Archive message with msg_id=2. ```sql -SELECT * from pgmq_archive('my_queue', 2); +SELECT pgmq.archive('my_queue', 2); ``` ```text - pgmq_archive + archive -------------- t (1 row) ``` ```sql -SELECT * from pgmq_my_queue_archive; +SELECT pgmq.my_queue_archive; ``` ```text @@ -176,11 +176,11 @@ SELECT * from pgmq_my_queue_archive; Send another message, so that we can delete it. ```sql -SELECT * from pgmq_send('my_queue', '{"foo": "bar3"}'); +SELECT pgmq.send('my_queue', '{"foo": "bar3"}'); ``` ```text - pgmq_send + send ----------- 3 (1 row) @@ -189,11 +189,11 @@ SELECT * from pgmq_send('my_queue', '{"foo": "bar3"}'); Delete the message with id `3` from the queue named `my_queue`. ```sql -SELECT pgmq_delete('my_queue', 3); +SELECT pgmq.delete('my_queue', 3); ``` ```text - pgmq_delete + delete ------------- t (1 row) @@ -204,11 +204,11 @@ SELECT pgmq_delete('my_queue', 3); Delete the queue `my_queue`. ```sql -SELECT pgmq_drop_queue('my_queue'); +SELECT pgmq.drop_queue('my_queue'); ``` ```text - pgmq_drop_queue + drop_queue ----------------- t (1 row) @@ -220,18 +220,18 @@ SELECT pgmq_drop_queue('my_queue'); You will need to install [pg_partman](https://github.com/pgpartman/pg_partman/) if you want to use `pgmq` partitioned queues. -`pgmq` queue tables can be created as a partitioned table by using `pgmq_create_partitioned()`. [pg_partman](https://github.com/pgpartman/pg_partman/) +`pgmq` queue tables can be created as a partitioned table by using `pgmq.create_partitioned()`. [pg_partman](https://github.com/pgpartman/pg_partman/) handles all maintenance of queue tables. This includes creating new partitions and dropping old partitions. -Partitions behavior is configured at the time queues are created, via `pgmq_create_partitioned()`. This function has three parameters: +Partitions behavior is configured at the time queues are created, via `pgmq.create_partitioned()`. This function has three parameters: -`queue_name: text`: The name of the queue. Queues are Postgres tables prepended with `pgmq_`. For example, `pgmq_my_queue`. +`queue_name: text`: The name of the queue. Queues are Postgres tables prepended with `queue_`. For example, `queue_my_mq`. `partition_interval: text` - The interval at which partitions are created. This can be either any valid Postgres `Duration` supported by pg_partman, or an integer value. When it is a duration, queues are partitioned by the time at which messages are sent to the table (`enqueued_at`). A value of `'daily'` would create a new partition each day. When it is an integer value, queues are partitioned by the `msg_id`. A value of `'100'` will create a new partition every 100 messages. The value must agree with `retention_interval` (time based or numeric). The default value is `daily`. -`retention_interval: text` - The interval for retaining partitions. This can be either any valid Postgres `Duration` supported by pg_partman, or an integer value. When it is a duration, partitions containing data greater than the duration will be dropped. When it is an integer value, any messages that have a `msg_id` less than `max(msg_id) - retention_interval` will be dropped. For example, if the max `msg_id` is 100 and the `retention_interval` is 60, any partitions with `msg_id` values less than 40 will be dropped. The value must agree with `partition_interval` (time based or numeric). The default is `'5 days'`. Note: `retention_interval` does not apply to messages that have been deleted via `pgmq_delete()` or archived with `pgmq_archive()`. `pgmq_delete()` removes messages forever and `pgmq_archive()` moves messages to the corresponding archive table forever (for example, `pgmq_my_queue_archive`). +`retention_interval: text` - The interval for retaining partitions. This can be either any valid Postgres `Duration` supported by pg_partman, or an integer value. When it is a duration, partitions containing data greater than the duration will be dropped. When it is an integer value, any messages that have a `msg_id` less than `max(msg_id) - retention_interval` will be dropped. For example, if the max `msg_id` is 100 and the `retention_interval` is 60, any partitions with `msg_id` values less than 40 will be dropped. The value must agree with `partition_interval` (time based or numeric). The default is `'5 days'`. Note: `retention_interval` does not apply to messages that have been deleted via `pgmq.delete()` or archived with `pgmq.archive()`. `pgmq.delete()` removes messages forever and `pgmq.archive()` moves messages to the corresponding archive table forever (for example, `queue_my_mq_archive`). In order for automatic partition maintenance to take place, several settings must be added to the `postgresql.conf` file, which is typically located in the postgres `DATADIR`. @@ -252,7 +252,7 @@ pg_partman_bgw.dbname = 'postgres' ## Visibility Timeout (vt) -pgmq guarantees exactly once delivery of a message within a visibility timeout. The visibility timeout is the amount of time a message is invisible to other consumers after it has been read by a consumer. If the message is NOT deleted or archived within the visibility timeout, it will become visible again and can be read by another consumer. The visibility timeout is set when a message is read from the queue, via `pgmq_read()`. It is recommended to set a `vt` value that is greater than the expected time it takes to process a message. After the application successfully processes the message, it should call `pgmq_delete()` to completely remove the message from the queue or `pgmq_archive()` to move it to the archive table for the queue. +pgmq guarantees exactly once delivery of a message within a visibility timeout. The visibility timeout is the amount of time a message is invisible to other consumers after it has been read by a consumer. If the message is NOT deleted or archived within the visibility timeout, it will become visible again and can be read by another consumer. The visibility timeout is set when a message is read from the queue, via `pgmq.read()`. It is recommended to set a `vt` value that is greater than the expected time it takes to process a message. After the application successfully processes the message, it should call `pgmq.delete()` to completely remove the message from the queue or `pgmq.archive()` to move it to the archive table for the queue. ## ✨ Contributors diff --git a/core/Cargo.toml b/core/Cargo.toml index 87836ee6..31fe08cd 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq-core" -version = "0.4.1" +version = "0.5.0" edition = "2021" authors = ["Tembo.io"] description = "Core functionality shared between the PGMQ Rust SDK and Postgres Extension" diff --git a/core/src/query.rs b/core/src/query.rs index 5b9ac9df..1df53362 100644 --- a/core/src/query.rs +++ b/core/src/query.rs @@ -2,7 +2,7 @@ use crate::{ errors::PgmqError, - types::{PGMQ_SCHEMA, TABLE_PREFIX}, + types::{ARCHIVE_PREFIX, PGMQ_SCHEMA, QUEUE_PREFIX}, util::check_input, util::CheckedName, }; @@ -37,7 +37,7 @@ pub fn destroy_queue(name: &str) -> Result, PgmqError> { pub fn create_queue(name: CheckedName<'_>) -> Result { Ok(format!( " - CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} ( + CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} ( msg_id BIGSERIAL PRIMARY KEY, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, @@ -51,7 +51,7 @@ pub fn create_queue(name: CheckedName<'_>) -> Result { pub fn create_archive(name: CheckedName<'_>) -> Result { Ok(format!( " - CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive ( + CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} ( msg_id BIGSERIAL PRIMARY KEY, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, @@ -63,11 +63,16 @@ pub fn create_archive(name: CheckedName<'_>) -> Result { )) } +pub fn create_schema() -> String { + format!("CREATE SCHEMA IF NOT EXISTS {PGMQ_SCHEMA}") +} + pub fn create_meta() -> String { format!( " - CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta ( + CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.meta ( queue_name VARCHAR UNIQUE NOT NULL, + is_partitioned BOOLEAN NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL ); " @@ -75,7 +80,7 @@ pub fn create_meta() -> String { } fn grant_stmt(table: &str) -> String { - let grant_seq = match &table.contains("pgmq_meta") { + let grant_seq = match &table.contains("meta") { true => "".to_string(), false => { format!(" EXECUTE 'GRANT SELECT ON SEQUENCE {table}_msg_id_seq TO pg_monitor';") @@ -100,25 +105,25 @@ $$ LANGUAGE plpgsql; // pg_monitor needs to query queue metadata pub fn grant_pgmon_meta() -> String { - let table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_meta"); + let table = format!("{PGMQ_SCHEMA}.meta"); grant_stmt(&table) } // pg_monitor needs to query queue tables pub fn grant_pgmon_queue(name: CheckedName<'_>) -> Result { - let table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}"); + let table = format!("{PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}"); Ok(grant_stmt(&table)) } pub fn grant_pgmon_queue_seq(name: CheckedName<'_>) -> Result { - let table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_msg_id_seq"); + let table = format!("{PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}_msg_id_seq"); Ok(grant_stmt(&table)) } pub fn drop_queue(name: CheckedName<'_>) -> Result { Ok(format!( " - DROP TABLE IF EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}; + DROP TABLE IF EXISTS {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}; " )) } @@ -131,10 +136,10 @@ pub fn delete_queue_metadata(name: CheckedName<'_>) -> Result IF EXISTS ( SELECT 1 FROM information_schema.tables - WHERE table_name = '{TABLE_PREFIX}_meta') + WHERE table_name = 'meta' and table_schema = 'pgmq') THEN DELETE - FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta + FROM {PGMQ_SCHEMA}.meta WHERE queue_name = '{name}'; END IF; END $$; @@ -145,7 +150,7 @@ pub fn delete_queue_metadata(name: CheckedName<'_>) -> Result pub fn drop_queue_archive(name: CheckedName<'_>) -> Result { Ok(format!( " - DROP TABLE IF EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive; + DROP TABLE IF EXISTS {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name}; " )) } @@ -153,7 +158,7 @@ pub fn drop_queue_archive(name: CheckedName<'_>) -> Result { pub fn insert_meta(name: CheckedName<'_>, is_partitioned: bool) -> Result { Ok(format!( " - INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta (queue_name, is_partitioned) + INSERT INTO {PGMQ_SCHEMA}.meta (queue_name, is_partitioned) VALUES ('{name}', {is_partitioned}) ON CONFLICT DO NOTHING; @@ -164,7 +169,7 @@ pub fn insert_meta(name: CheckedName<'_>, is_partitioned: bool) -> Result) -> Result { Ok(format!( " - CREATE INDEX IF NOT EXISTS archived_at_idx_{name} ON {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (archived_at); + CREATE INDEX IF NOT EXISTS archived_at_idx_{name} ON {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (archived_at); " )) } @@ -173,14 +178,14 @@ pub fn create_archive_index(name: CheckedName<'_>) -> Result pub fn create_index(name: CheckedName<'_>) -> Result { Ok(format!( " - CREATE INDEX IF NOT EXISTS {TABLE_PREFIX}_{name}_vt_idx ON {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} (vt ASC); + CREATE INDEX IF NOT EXISTS {QUEUE_PREFIX}_{name}_vt_idx ON {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (vt ASC); " )) } pub fn purge_queue(name: &str) -> Result { check_input(name)?; - Ok(format!("DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name};")) + Ok(format!("DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name};")) } pub fn enqueue( @@ -199,7 +204,7 @@ pub fn enqueue( values.pop(); Ok(format!( " - INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} (vt, message) + INSERT INTO {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (vt, message) VALUES {values} RETURNING msg_id; " @@ -213,13 +218,13 @@ pub fn read(name: &str, vt: i32, limit: i32) -> Result { WITH cte AS ( SELECT msg_id - FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} + FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} WHERE vt <= clock_timestamp() ORDER BY msg_id ASC LIMIT {limit} FOR UPDATE SKIP LOCKED ) - UPDATE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} t + UPDATE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} t SET vt = clock_timestamp() + interval '{vt} seconds', read_ct = read_ct + 1 @@ -234,7 +239,7 @@ pub fn set_vt(name: &str, msg_id: i64, vt: chrono::DateTime) -> Result Result { Ok(format!( " - DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} + DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} WHERE msg_id = ANY($1) RETURNING msg_id; " @@ -262,11 +267,11 @@ pub fn archive_batch(name: &str) -> Result { Ok(format!( " WITH archived AS ( - DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} + DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} WHERE msg_id = ANY($1) RETURNING msg_id, vt, read_ct, enqueued_at, message ) - INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (msg_id, vt, read_ct, enqueued_at, message) + INSERT INTO {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (msg_id, vt, read_ct, enqueued_at, message) SELECT msg_id, vt, read_ct, enqueued_at, message FROM archived RETURNING msg_id; @@ -281,13 +286,13 @@ pub fn pop(name: &str) -> Result { WITH cte AS ( SELECT msg_id - FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} + FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} WHERE vt <= now() ORDER BY msg_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) - DELETE from {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} + DELETE from {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} WHERE msg_id = (select msg_id from cte) RETURNING *; " @@ -295,22 +300,22 @@ pub fn pop(name: &str) -> Result { } pub fn assign_queue(name: CheckedName<'_>) -> Result { - Ok(assign(name.as_ref())) + Ok(assign(&format!("{QUEUE_PREFIX}_{name}"))) } pub fn assign_archive(name: CheckedName<'_>) -> Result { - Ok(assign(&format!("{name}_archive"))) + Ok(assign(&format!("{ARCHIVE_PREFIX}_{name}"))) } pub fn unassign_queue(name: CheckedName<'_>) -> Result { Ok(format!( - "ALTER EXTENSION pgmq DROP TABLE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}; " + "ALTER EXTENSION pgmq DROP TABLE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}; " )) } pub fn unassign_archive(name: CheckedName<'_>) -> Result { Ok(format!( - "ALTER EXTENSION pgmq DROP TABLE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive; " + "ALTER EXTENSION pgmq DROP TABLE {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name}; " )) } @@ -328,10 +333,10 @@ pub fn assign(table_name: &str) -> String { AND objid = ( SELECT oid FROM pg_class - WHERE relname = '{TABLE_PREFIX}_{table_name}' + WHERE relname = '{table_name}' ) ) THEN - EXECUTE 'ALTER EXTENSION pgmq ADD TABLE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{table_name}'; + EXECUTE 'ALTER EXTENSION pgmq ADD TABLE {PGMQ_SCHEMA}.{table_name}'; END IF; END $$; " @@ -360,15 +365,15 @@ $$ LANGUAGE plpgsql; "; assert_eq!(q, expected); - let q = grant_stmt("pgmq_meta"); + let q = grant_stmt("meta"); let expected = " DO $$ BEGIN IF NOT EXISTS ( SELECT 1 - WHERE has_table_privilege('pg_monitor', 'pgmq_meta', 'SELECT') + WHERE has_table_privilege('pg_monitor', 'meta', 'SELECT') ) THEN - EXECUTE 'GRANT SELECT ON pgmq_meta TO pg_monitor'; + EXECUTE 'GRANT SELECT ON meta TO pg_monitor'; END IF; END; @@ -379,15 +384,15 @@ $$ LANGUAGE plpgsql; #[test] fn test_assign() { - let query = assign("my_queue_archive"); - assert!(query.contains("WHERE relname = 'pgmq_my_queue_archive'")); + let query = assign("a_my_queue_archive"); + assert!(query.contains("WHERE relname = 'a_my_queue_archive'")); } #[test] fn test_create() { let queue_name = CheckedName::new("yolo").unwrap(); let query = create_queue(queue_name); - assert!(query.unwrap().contains("pgmq_yolo")); + assert!(query.unwrap().contains("q_yolo")); } #[test] @@ -398,7 +403,7 @@ $$ LANGUAGE plpgsql; }); msgs.push(msg); let query = enqueue("yolo", &msgs, &0).unwrap(); - assert!(query.contains("pgmq_yolo")); + assert!(query.contains("q_yolo")); assert!(query.contains("{\"foo\":\"bar\"}")); } @@ -418,11 +423,9 @@ $$ LANGUAGE plpgsql; fn check_input_rejects_names_too_large() { let table_name = "my_valid_table_name"; assert!(check_input(table_name).is_ok()); - - assert!(check_input(&"a".repeat(58)).is_ok()); - - assert!(check_input(&"a".repeat(59)).is_err()); - assert!(check_input(&"a".repeat(60)).is_err()); + assert!(check_input(&"a".repeat(61)).is_ok()); + assert!(check_input(&"a".repeat(62)).is_err()); + assert!(check_input(&"a".repeat(64)).is_err()); assert!(check_input(&"a".repeat(70)).is_err()); } diff --git a/core/src/types.rs b/core/src/types.rs index d2eb3963..39d538f6 100644 --- a/core/src/types.rs +++ b/core/src/types.rs @@ -11,8 +11,9 @@ pub const POLL_INTERVAL_DEFAULT: Duration = Duration::from_millis(250); use chrono::serde::ts_seconds::deserialize as from_ts; -pub const TABLE_PREFIX: &str = r#"pgmq"#; -pub const PGMQ_SCHEMA: &str = "public"; +pub const QUEUE_PREFIX: &str = r#"q"#; +pub const ARCHIVE_PREFIX: &str = r#"a"#; +pub const PGMQ_SCHEMA: &str = "pgmq"; pub struct PGMQueueMeta { pub queue_name: String, diff --git a/core/src/util.rs b/core/src/util.rs index f32fd500..616d9fd9 100644 --- a/core/src/util.rs +++ b/core/src/util.rs @@ -2,7 +2,7 @@ use std::fmt::Display; use crate::{ errors::PgmqError, - types::{Message, TABLE_PREFIX}, + types::{Message, QUEUE_PREFIX}, }; use log::LevelFilter; use serde::Deserialize; @@ -105,7 +105,7 @@ pub fn check_input(input: &str) -> Result<(), PgmqError> { // Longer names can be used in commands, but they'll be truncated const MAX_IDENTIFIER_LEN: usize = NAMEDATALEN - 1; // The max length of a PGMQ table, considering its prefix and the underline after it (e.g. "pgmq_") - const MAX_PGMQ_TABLE_LEN: usize = MAX_IDENTIFIER_LEN - TABLE_PREFIX.len() - 1; + const MAX_PGMQ_TABLE_LEN: usize = MAX_IDENTIFIER_LEN - QUEUE_PREFIX.len() - 1; let is_short_enough = input.len() <= MAX_PGMQ_TABLE_LEN; let has_valid_characters = input diff --git a/examples/ruby.rb b/examples/ruby.rb index 617c0fc7..ffc6ed59 100755 --- a/examples/ruby.rb +++ b/examples/ruby.rb @@ -33,10 +33,10 @@ conn.exec( "CREATE EXTENSION if not exists pgmq CASCADE;" ) # create the queue (will create a table pg_ using the queue name) -conn.exec( "select * from pgmq_create('#{QUEUE_NAME}')" ) +conn.exec( "select * from pgmq.create('#{QUEUE_NAME}')" ) # list queues -list_queues = conn.exec( "select * from pgmq_list_queues()" ) +list_queues = conn.exec( "select * from pgmq.list_queues()" ) $stderr.puts '---', "### Queues ###" @@ -44,7 +44,7 @@ # send a message msg = "{yolo: 42}".to_json -msg_result = conn.exec( "select * from pgmq_send('#{QUEUE_NAME}', '#{msg}') as msg_id;" ) +msg_result = conn.exec( "select * from pgmq.send('#{QUEUE_NAME}', '#{msg}') as msg_id;" ) msg_id = msg_result.first["msg_id"] $stderr.puts '---', @@ -52,7 +52,7 @@ msg_id # read a message (making it unavailable for 1 second) -msg_result = conn.exec( "select * from pgmq_read('#{QUEUE_NAME}', #{LOCK_TIMEOUT}, #{NUM_MSGS})" ) +msg_result = conn.exec( "select * from pgmq.read('#{QUEUE_NAME}', #{LOCK_TIMEOUT}, #{NUM_MSGS})" ) msg_row = msg_result.first $stderr.puts '---', @@ -60,13 +60,13 @@ "msg_id: #{msg_row['msg_id']}, value: #{JSON.parse(msg_row['message']).to_s}" # delete a message (for a given ID) -msg_result = conn.exec( "select pgmq_delete('#{QUEUE_NAME}', #{msg_id})" ) +msg_result = conn.exec( "select pgmq.delete('#{QUEUE_NAME}', #{msg_id})" ) $stderr.puts '---', "### msg delete: #{msg_id} ###", msg_result.values.flatten.first.to_s == "t" ? "true" : "false" # read up to 1000 messages -msg_result = conn.exec( "select * from pgmq_read('#{QUEUE_NAME}', #{LOCK_TIMEOUT}, 1000)" ) +msg_result = conn.exec( "select * from pgmq.read('#{QUEUE_NAME}', #{LOCK_TIMEOUT}, 1000)" ) if msg_result.any? $stderr.puts '---', diff --git a/pgmq-rs/Cargo.toml b/pgmq-rs/Cargo.toml index 3a987bb1..0c2fcc33 100644 --- a/pgmq-rs/Cargo.toml +++ b/pgmq-rs/Cargo.toml @@ -12,7 +12,7 @@ readme = "README.md" repository = "https://github.com/tembo-io/pgmq" [dependencies] -pgmq_core = { package = "pgmq-core", version = "0.4.0" } +pgmq_core = {package = "pgmq-core", path = "../core" } chrono = { version = "0.4.23", features = [ "serde" ] } serde = { version = "1.0.152" } serde_json = { version = "1.0.91", features = [ "raw_value" ] } diff --git a/pgmq-rs/sqlx-data.json b/pgmq-rs/sqlx-data.json index c09bf634..3063ec0a 100644 --- a/pgmq-rs/sqlx-data.json +++ b/pgmq-rs/sqlx-data.json @@ -1,12 +1,12 @@ { "db": "PostgreSQL", - "0c81d8ee07fac75c18d9bdebd9b2ac5e473b4ef43a5d332f5d58ae6a0e01e600": { + "001695518a3a4908b757c91599f168ead31456c9b327e0468ef42c170be3a087": { "describe": { "columns": [ { - "name": "pgmq_delete", + "name": "create_partitioned", "ordinal": 0, - "type_info": "Bool" + "type_info": "Void" } ], "nullable": [ @@ -14,14 +14,13 @@ ], "parameters": { "Left": [ - "Text", - "Int8" + "Text" ] } }, - "query": "SELECT * from pgmq_delete($1::text, $2::bigint)" + "query": "SELECT * from pgmq.create_partitioned($1::text);" }, - "0e13957cc5c1f96a07ec242ce393f0738d62d79d34182e3c9333e9f9712d4cbb": { + "02debe6ad1b36abb3aef2a6736059fbb04fae4e3868afb84b175bcb73760ad0c": { "describe": { "columns": [ { @@ -59,43 +58,23 @@ ], "parameters": { "Left": [ - "Text" + "Text", + "Int4", + "Int4", + "Int4", + "Int4" ] } }, - "query": "SELECT * from pgmq_pop($1::text)" - }, - "10ea79c92326fbee074db2d9629f60abb320be01d069b12b2360816267a64e3a": { - "describe": { - "columns": [ - { - "name": "queue_name", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "created_at", - "ordinal": 1, - "type_info": "Timestamptz" - } - ], - "nullable": [ - null, - null - ], - "parameters": { - "Left": [] - } - }, - "query": "SELECT * from pgmq_list_queues();" + "query": "SELECT * from pgmq.read_with_poll($1::text, $2, $3, $4, $5)" }, - "213c722c2cab09b2042dc989130c4131b322b5d0b616dbb91f8e8e8f411406f9": { + "07917815df25f1ac06675a645a812d9791b3c1d32098248cf0f34aead4dcebc8": { "describe": { "columns": [ { - "name": "pgmq_create", + "name": "delete", "ordinal": 0, - "type_info": "Void" + "type_info": "Bool" } ], "nullable": [ @@ -103,17 +82,18 @@ ], "parameters": { "Left": [ - "Text" + "Text", + "Int8" ] } }, - "query": "SELECT * from pgmq_create($1::text);" + "query": "SELECT * from pgmq.delete($1::text, $2::bigint)" }, - "36b2ecea83484248d99c8d091f8d9b7fa6ce368d7db8fe14e3ab491eb52c723c": { + "23e056e8442ba9a1b2f4922d8b0f0b90a7b9d63a40be3345047450d0405b83b6": { "describe": { "columns": [ { - "name": "pgmq_archive", + "name": "archive", "ordinal": 0, "type_info": "Bool" } @@ -128,7 +108,7 @@ ] } }, - "query": "SELECT * from pgmq_archive($1::text, $2::bigint[])" + "query": "SELECT * from pgmq.archive($1::text, $2::bigint[])" }, "3a3440841fba7d0f8744f5873cdb93a64b6e2f6481c3b14af701c5189f5e73f0": { "describe": { @@ -140,7 +120,7 @@ }, "query": "CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;" }, - "68bcfc3f45820da1b9fb7ed449d516297b9986d1c36316656d7acccb32f171a9": { + "60f9928c156667a7d22f0d0d2a7ac2f5272c48228d8ebd03c40bc23162bf5d58": { "describe": { "columns": [ { @@ -159,36 +139,61 @@ ] } }, - "query": "SELECT pgmq_send as msg_id from pgmq_send($1::text, $2::jsonb, 0::integer);" + "query": "SELECT send as msg_id from pgmq.send($1::text, $2::jsonb, 0::integer);" }, - "8e4f6635dc4cfb5ed42ddb87930b985793e81243b8a5609dec4ab566aaab4e9c": { + "775c8aa14d49076963fd996b465ff96213b44421787a615d5502e145b9a71005": { "describe": { "columns": [ { - "name": "pgmq_delete", + "name": "msg_id", "ordinal": 0, - "type_info": "Bool" + "type_info": "Int8" + }, + { + "name": "read_ct", + "ordinal": 1, + "type_info": "Int4" + }, + { + "name": "enqueued_at", + "ordinal": 2, + "type_info": "Timestamptz" + }, + { + "name": "vt", + "ordinal": 3, + "type_info": "Timestamptz" + }, + { + "name": "message", + "ordinal": 4, + "type_info": "Jsonb" } ], "nullable": [ + null, + null, + null, + null, null ], "parameters": { "Left": [ "Text", - "Int8Array" + "Int8", + "Int4" ] } }, - "query": "SELECT * from pgmq_delete($1::text, $2::bigint[])" + "query": "SELECT * from pgmq.set_vt($1::text, $2::bigint, $3::integer);" }, - "98463803e1b4548d758476cce697814fb8411f47f899043ece8a7a6bee71d84d": { + "879cb18f9c3fec28c6d88ced573bc5694e734159f818f5626b21ff64595039ef": { "describe": { "columns": [ { - "name": "pgmq_purge_queue", + "name": "delete", "ordinal": 0, - "type_info": "Int8" + "type_info": "Bool" } ], "nullable": [ @@ -196,19 +201,20 @@ ], "parameters": { "Left": [ - "Text" + "Text", + "Int8Array" ] } }, - "query": "SELECT * from pgmq_purge_queue($1::text);" + "query": "SELECT * from pgmq.delete($1::text, $2::bigint[])" }, - "9919286cee87946b387f69e67df94f94eb0acdd3b5f4848faf092c55d484b61a": { + "8bf48b3cab42cc04047e8cc16cdd4064c3768328a673d8a278002afa85b2b1d4": { "describe": { "columns": [ { - "name": "pgmq_drop_queue", + "name": "create", "ordinal": 0, - "type_info": "Bool" + "type_info": "Void" } ], "nullable": [ @@ -220,9 +226,9 @@ ] } }, - "query": "SELECT * from pgmq_drop_queue($1::text);" + "query": "SELECT * from pgmq.create($1::text);" }, - "b42aa546a8e1f83b4da63a6a20aaa51778e01d8525b4b9b307576b2921eec305": { + "b4624572ccb21a9bf45340b3d438175cb333f8d251264dff74f8032ce15ac89f": { "describe": { "columns": [ { @@ -260,15 +266,33 @@ ], "parameters": { "Left": [ - "Text", - "Int8", - "Int4" + "Text" + ] + } + }, + "query": "SELECT * from pgmq.pop($1::text)" + }, + "b6226444041b43eec27dbf3271075c0ac8530f43116ef02219c1d734904363cb": { + "describe": { + "columns": [ + { + "name": "drop_queue", + "ordinal": 0, + "type_info": "Bool" + } + ], + "nullable": [ + null + ], + "parameters": { + "Left": [ + "Text" ] } }, - "query": "SELECT * from pgmq_set_vt($1::text, $2::bigint, $3::integer);" + "query": "SELECT * from pgmq.drop_queue($1::text);" }, - "c89ad5584222a1f9c0d505fe58bf035d68d033047176b106e289ad6842f49972": { + "bcd20bd8b48f51b401b6bf908d47952ee410cf53fc85dba2192ae16f811de8a9": { "describe": { "columns": [ { @@ -312,13 +336,13 @@ ] } }, - "query": "SELECT * from pgmq_read($1::text, $2, $3)" + "query": "SELECT * from pgmq.read($1::text, $2, $3)" }, - "cb7072cc0f81a187953b6210a40d799b0ff301b73ad873f767c29fd383724a32": { + "cb31699ff46d61c834893d93416c5321ea403e3001cdededf85b9fef2a5906b8": { "describe": { "columns": [ { - "name": "pgmq_archive", + "name": "archive", "ordinal": 0, "type_info": "Bool" } @@ -333,85 +357,59 @@ ] } }, - "query": "SELECT * from pgmq_archive($1::text, $2::bigint)" + "query": "SELECT * from pgmq.archive($1::text, $2::bigint)" }, - "d0f732f460be14cefbdf967f2a64e93eb0cec49121decbcb18b26fa1c1f4e399": { + "ecde748537ee27524a682a71ac5bfd9237c8b44300c6809cc5808cc1ba497d60": { "describe": { "columns": [ { - "name": "msg_id", + "name": "queue_name", "ordinal": 0, - "type_info": "Int8" + "type_info": "Text" + }, + { + "name": "created_at", + "ordinal": 1, + "type_info": "Timestamptz" } ], "nullable": [ + null, null ], "parameters": { - "Left": [ - "Text", - "Jsonb", - "Int4" - ] + "Left": [] } }, - "query": "SELECT pgmq_send as msg_id from pgmq_send($1::text, $2::jsonb, $3::int);" + "query": "SELECT * from pgmq.list_queues();" }, - "e4c38347b44aed05aa890d3351a362d3b6f81387e98fc564ec922cefa1e96f71": { + "f3f011e1a531861630c09d79a2f8f1b11a9543e30bfcbb7654ce9e30f0cf51e3": { "describe": { "columns": [ { - "name": "msg_id", + "name": "purge_queue", "ordinal": 0, "type_info": "Int8" - }, - { - "name": "read_ct", - "ordinal": 1, - "type_info": "Int4" - }, - { - "name": "enqueued_at", - "ordinal": 2, - "type_info": "Timestamptz" - }, - { - "name": "vt", - "ordinal": 3, - "type_info": "Timestamptz" - }, - { - "name": "message", - "ordinal": 4, - "type_info": "Jsonb" } ], "nullable": [ - null, - null, - null, - null, null ], "parameters": { "Left": [ - "Text", - "Int4", - "Int4", - "Int4", - "Int4" + "Text" ] } }, - "query": "SELECT * from pgmq_read_with_poll($1::text, $2, $3, $4, $5)" + "query": "SELECT * from pgmq.purge_queue($1::text);" }, - "ed8b7aacd0d94fe647899b6d2fe61a29372cd7d6dbc28bf59ac6bb3118e3fe6c": { + "f686f11720bb96004b7d51cf1989919f7fe714bfaa5a2c95d1b7f89cd01e29b7": { "describe": { "columns": [ { - "name": "pgmq_create_partitioned", + "name": "msg_id", "ordinal": 0, - "type_info": "Void" + "type_info": "Int8" } ], "nullable": [ @@ -419,10 +417,12 @@ ], "parameters": { "Left": [ - "Text" + "Text", + "Jsonb", + "Int4" ] } }, - "query": "SELECT * from pgmq_create_partitioned($1::text);" + "query": "SELECT send as msg_id from pgmq.send($1::text, $2::jsonb, $3::int);" } } \ No newline at end of file diff --git a/pgmq-rs/src/pg_ext.rs b/pgmq-rs/src/pg_ext.rs index 6580840a..a103b4f2 100644 --- a/pgmq-rs/src/pg_ext.rs +++ b/pgmq-rs/src/pg_ext.rs @@ -1,7 +1,7 @@ use log::info; use pgmq_core::{ errors::PgmqError, - types::{Message, TABLE_PREFIX}, + types::{Message, QUEUE_PREFIX}, util::{check_input, connect}, }; use serde::{Deserialize, Serialize}; @@ -51,7 +51,7 @@ impl PGMQueueExt { /// Errors when there is any database error and Ok(false) when the queue already exists. pub async fn create(&self, queue_name: &str) -> Result { check_input(queue_name)?; - sqlx::query!("SELECT * from pgmq_create($1::text);", queue_name) + sqlx::query!("SELECT * from pgmq.create($1::text);", queue_name) .execute(&self.connection) .await?; Ok(true) @@ -61,7 +61,7 @@ impl PGMQueueExt { /// Errors when there is any database error and Ok(false) when the queue already exists. pub async fn create_partitioned(&self, queue_name: &str) -> Result { check_input(queue_name)?; - let queue_table = format!("public.{TABLE_PREFIX}_{queue_name}"); + let queue_table = format!("pgmq.{QUEUE_PREFIX}_{queue_name}"); // we need to check whether the queue exists first // pg_partman create operations are currently unable to be idempotent let exists_stmt = format!( @@ -76,7 +76,7 @@ impl PGMQueueExt { Ok(false) } else { sqlx::query!( - "SELECT * from pgmq_create_partitioned($1::text);", + "SELECT * from pgmq.create_partitioned($1::text);", queue_name ) .execute(&self.connection) @@ -90,7 +90,7 @@ impl PGMQueueExt { check_input(queue_name)?; self.connection .execute(sqlx::query!( - "SELECT * from pgmq_drop_queue($1::text);", + "SELECT * from pgmq.drop_queue($1::text);", queue_name )) .await?; @@ -101,16 +101,16 @@ impl PGMQueueExt { /// Drop an existing queue table. pub async fn purge_queue(&self, queue_name: &str) -> Result { check_input(queue_name)?; - let purged = sqlx::query!("SELECT * from pgmq_purge_queue($1::text);", queue_name) + let purged = sqlx::query!("SELECT * from pgmq.purge_queue($1::text);", queue_name) .fetch_one(&self.connection) .await?; - Ok(purged.pgmq_purge_queue.expect("no purged count")) + Ok(purged.purge_queue.expect("no purged count")) } /// List all queues in the Postgres instance. pub async fn list_queues(&self) -> Result>, PgmqError> { - let queues = sqlx::query!("SELECT * from pgmq_list_queues();") + let queues = sqlx::query!("SELECT * from pgmq.list_queues();") .fetch_all(&self.connection) .await?; if queues.is_empty() { @@ -136,7 +136,7 @@ impl PGMQueueExt { ) -> Result, PgmqError> { check_input(queue_name)?; let updated = sqlx::query!( - "SELECT * from pgmq_set_vt($1::text, $2::bigint, $3::integer);", + "SELECT * from pgmq.set_vt($1::text, $2::bigint, $3::integer);", queue_name, msg_id, vt @@ -163,7 +163,7 @@ impl PGMQueueExt { check_input(queue_name)?; let msg = serde_json::json!(&message); let sent = sqlx::query!( - "SELECT pgmq_send as msg_id from pgmq_send($1::text, $2::jsonb, 0::integer);", + "SELECT send as msg_id from pgmq.send($1::text, $2::jsonb, 0::integer);", queue_name, msg ) @@ -181,7 +181,7 @@ impl PGMQueueExt { check_input(queue_name)?; let msg = serde_json::json!(&message); let sent = sqlx::query!( - "SELECT pgmq_send as msg_id from pgmq_send($1::text, $2::jsonb, $3::int);", + "SELECT send as msg_id from pgmq.send($1::text, $2::jsonb, $3::int);", queue_name, msg, delay as i32 @@ -198,7 +198,7 @@ impl PGMQueueExt { ) -> Result>, PgmqError> { check_input(queue_name)?; let row = sqlx::query!( - "SELECT * from pgmq_read($1::text, $2, $3)", + "SELECT * from pgmq.read($1::text, $2, $3)", queue_name, vt, 1 @@ -240,7 +240,7 @@ impl PGMQueueExt { let poll_interval_ms = poll_interval.map_or(DEFAULT_POLL_INTERVAL_MS, |i| i.as_millis() as i32); let result = sqlx::query!( - "SELECT * from pgmq_read_with_poll($1::text, $2, $3, $4, $5)", + "SELECT * from pgmq.read_with_poll($1::text, $2, $3, $4, $5)", queue_name, vt, max_batch_size, @@ -282,13 +282,13 @@ impl PGMQueueExt { pub async fn archive(&self, queue_name: &str, msg_id: i64) -> Result { check_input(queue_name)?; let arch = sqlx::query!( - "SELECT * from pgmq_archive($1::text, $2::bigint)", + "SELECT * from pgmq.archive($1::text, $2::bigint)", queue_name, msg_id ) .fetch_one(&self.connection) .await?; - Ok(arch.pgmq_archive.expect("no archive result")) + Ok(arch.archive.expect("no archive result")) } /// Move a message to the archive table. @@ -299,13 +299,13 @@ impl PGMQueueExt { ) -> Result { check_input(queue_name)?; let arch = sqlx::query!( - "SELECT * from pgmq_archive($1::text, $2::bigint[])", + "SELECT * from pgmq.archive($1::text, $2::bigint[])", queue_name, msg_ids ) .fetch_one(&self.connection) .await?; - Ok(arch.pgmq_archive.expect("no archive result")) + Ok(arch.archive.expect("no archive result")) } // Read and message and immediately delete it. @@ -314,7 +314,7 @@ impl PGMQueueExt { queue_name: &str, ) -> Result>, PgmqError> { check_input(queue_name)?; - let row = sqlx::query!("SELECT * from pgmq_pop($1::text)", queue_name,) + let row = sqlx::query!("SELECT * from pgmq.pop($1::text)", queue_name,) .fetch_optional(&self.connection) .await?; match row { @@ -342,24 +342,24 @@ impl PGMQueueExt { // Delete a message by message id. pub async fn delete(&self, queue_name: &str, msg_id: i64) -> Result { let row = sqlx::query!( - "SELECT * from pgmq_delete($1::text, $2::bigint)", + "SELECT * from pgmq.delete($1::text, $2::bigint)", queue_name, msg_id ) .fetch_one(&self.connection) .await?; - Ok(row.pgmq_delete.expect("no delete result")) + Ok(row.delete.expect("no delete result")) } // Delete with a slice of message ids pub async fn delete_batch(&self, queue_name: &str, msg_id: &[i64]) -> Result { let row = sqlx::query!( - "SELECT * from pgmq_delete($1::text, $2::bigint[])", + "SELECT * from pgmq.delete($1::text, $2::bigint[])", queue_name, msg_id ) .fetch_one(&self.connection) .await?; - Ok(row.pgmq_delete.expect("no delete result")) + Ok(row.delete.expect("no delete result")) } } diff --git a/pgmq-rs/src/query.rs b/pgmq-rs/src/query.rs index c57f7f78..f53cbaf7 100644 --- a/pgmq-rs/src/query.rs +++ b/pgmq-rs/src/query.rs @@ -5,6 +5,7 @@ use pgmq_core::{errors, query, util}; pub fn init_queue_client_only(name: &str) -> Result, errors::PgmqError> { let name = util::CheckedName::new(name)?; Ok(vec![ + query::create_schema(), query::create_meta(), query::create_queue(name)?, query::create_index(name)?, @@ -19,6 +20,7 @@ pub fn init_queue_client_only(name: &str) -> Result, errors::PgmqErr pub fn destroy_queue_client_only(name: &str) -> Result, errors::PgmqError> { let name = util::CheckedName::new(name)?; Ok(vec![ + query::create_schema(), query::drop_queue(name)?, query::drop_queue_archive(name)?, query::delete_queue_metadata(name)?, diff --git a/pgmq-rs/tests/integration_test.rs b/pgmq-rs/tests/integration_test.rs index 405d775a..ee13404f 100644 --- a/pgmq-rs/tests/integration_test.rs +++ b/pgmq-rs/tests/integration_test.rs @@ -1,7 +1,7 @@ use chrono::{Duration, Utc}; use pgmq_core::{ errors::PgmqError, - types::{Message, TABLE_PREFIX}, + types::{Message, ARCHIVE_PREFIX, PGMQ_SCHEMA, QUEUE_PREFIX}, }; use rand::Rng; use serde::{Deserialize, Serialize}; @@ -48,7 +48,16 @@ struct YoloMessage { } async fn rowcount(qname: &str, connection: &Pool) -> i64 { - let row_ct_query = format!("SELECT count(*) as ct FROM {TABLE_PREFIX}_{qname}"); + let row_ct_query = format!("SELECT count(*) as ct FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}"); + sqlx::query(&row_ct_query) + .fetch_one(connection) + .await + .unwrap() + .get::(0) +} + +async fn archive_rowcount(qname: &str, connection: &Pool) -> i64 { + let row_ct_query = format!("SELECT count(*) as ct FROM {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{qname}"); sqlx::query(&row_ct_query) .fetch_one(connection) .await @@ -60,7 +69,18 @@ async fn rowcount(qname: &str, connection: &Pool) -> i64 { // simple solution: our existing rowcount() helper will fail // wrap it in a Result<> so we can use it async fn fallible_rowcount(qname: &str, connection: &Pool) -> Result { - let row_ct_query = format!("SELECT count(*) as ct FROM {TABLE_PREFIX}_{qname}"); + let row_ct_query = format!("SELECT count(*) as ct FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}"); + Ok(sqlx::query(&row_ct_query) + .fetch_one(connection) + .await? + .get::(0)) +} + +async fn fallible_archive_rowcount( + qname: &str, + connection: &Pool, +) -> Result { + let row_ct_query = format!("SELECT count(*) as ct FROM {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{qname}"); Ok(sqlx::query(&row_ct_query) .fetch_one(connection) .await? @@ -581,7 +601,7 @@ async fn test_archive() { let num_rows_queue = rowcount(&test_queue, &queue.connection).await; // archived record is no longer on the queue assert_eq!(num_rows_queue, 0); - let num_rows_archive = rowcount(&format!("{test_queue}_archive"), &queue.connection).await; + let num_rows_archive = archive_rowcount(&test_queue, &queue.connection).await; // archived record is now on the archive table assert_eq!(num_rows_archive, 1); } @@ -703,14 +723,13 @@ async fn test_destroy() { // the queue and the queue archive should no longer exist let queue_table = fallible_rowcount(&test_queue, &queue.connection).await; assert!(queue_table.is_err()); - let archive_table = - fallible_rowcount(&format!("{test_queue}_archive"), &queue.connection).await; + let archive_table = fallible_archive_rowcount(&test_queue, &queue.connection).await; assert!(archive_table.is_err()); // queue must not be present on pgmq_meta let pgmq_meta_query = format!( "SELECT count(*) as ct - FROM {TABLE_PREFIX}_meta + FROM {PGMQ_SCHEMA}.meta WHERE queue_name = '{test_queue}'", ); let rowcount = sqlx::query(&pgmq_meta_query) diff --git a/pgmq-rs/tests/pg_ext_integration_test.rs b/pgmq-rs/tests/pg_ext_integration_test.rs index a0d29a3f..3ef294a5 100644 --- a/pgmq-rs/tests/pg_ext_integration_test.rs +++ b/pgmq-rs/tests/pg_ext_integration_test.rs @@ -1,4 +1,4 @@ -use pgmq_core::types::TABLE_PREFIX; +use pgmq_core::types::{ARCHIVE_PREFIX, PGMQ_SCHEMA, QUEUE_PREFIX}; use rand::Rng; use serde::{Deserialize, Serialize}; use sqlx::{Pool, Postgres, Row}; @@ -61,7 +61,16 @@ struct YoloMessage { } async fn rowcount(qname: &str, connection: &Pool) -> i64 { - let row_ct_query = format!("SELECT count(*) as ct FROM {TABLE_PREFIX}_{qname}"); + let row_ct_query = format!("SELECT count(*) as ct FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}"); + sqlx::query(&row_ct_query) + .fetch_one(connection) + .await + .unwrap() + .get::(0) +} + +async fn archive_rowcount(qname: &str, connection: &Pool) -> i64 { + let row_ct_query = format!("SELECT count(*) as ct FROM {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{qname}"); sqlx::query(&row_ct_query) .fetch_one(connection) .await @@ -249,7 +258,6 @@ async fn test_ext_archive_batch() { "test_ext_archive_batch_{}", rand::thread_rng().gen_range(0..100000) ); - let test_queue_archive = format!("{}_archive", test_queue); let queue = init_queue_ext(&test_queue).await; let msg = MyMessage::default(); @@ -267,7 +275,7 @@ async fn test_ext_archive_batch() { assert_eq!(post_archive_rowcount, 0); assert_eq!(archive_result, true); - let post_archive_archive_rowcount = rowcount(&test_queue_archive, &queue.connection).await; + let post_archive_archive_rowcount = archive_rowcount(&test_queue, &queue.connection).await; assert_eq!(post_archive_archive_rowcount, 3); } diff --git a/pgmq.control b/pgmq.control index 9c57d6d7..6c559c0e 100644 --- a/pgmq.control +++ b/pgmq.control @@ -1,5 +1,6 @@ comment = 'A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.' default_version = '@CARGO_VERSION@' module_pathname = '$libdir/pgmq' +schema = 'pgmq' relocatable = false superuser = false diff --git a/sql/pgmq--0.24.0--0.25.0.sql b/sql/pgmq--0.24.0--0.25.0.sql new file mode 100644 index 00000000..bab5981c --- /dev/null +++ b/sql/pgmq--0.24.0--0.25.0.sql @@ -0,0 +1,280 @@ +-- Dropping replaced functions from pgmq schema +DROP FUNCTION pgmq_archive(text, bigint); +DROP FUNCTION pgmq_archive(text, bigint[]); +DROP FUNCTION pgmq_create(text); +DROP FUNCTION pgmq_create_non_partitioned(text); +DROP FUNCTION pgmq_create_partitioned(text, text, text); +DROP FUNCTION pgmq_delete(text, bigint); +DROP FUNCTION pgmq_delete(text, bigint[]); +DROP FUNCTION pgmq_drop_queue(text, boolean); +DROP FUNCTION pgmq_list_queues(); +DROP FUNCTION pgmq_metrics(text); +DROP FUNCTION pgmq_metrics_all(); +DROP FUNCTION pgmq_pop(text); +DROP FUNCTION pgmq_purge_queue(text); +DROP FUNCTION pgmq_read(text, integer, integer); +DROP FUNCTION pgmq_read_with_poll(text, integer, integer, integer, integer); +DROP FUNCTION pgmq_send(text, jsonb, integer); +DROP FUNCTION pgmq_send_batch(text, jsonb[], integer); +DROP FUNCTION pgmq_set_vt(text, bigint, integer); + +-- Creating the new pgmq schema +CREATE SCHEMA pgmq; + +-- Moving pgmq_meta to pgmq schema +ALTER TABLE pgmq_meta SET SCHEMA pgmq; +ALTER TABLE pgmq.pgmq_meta RENAME TO meta; + +-- Moving other tables to pgmq schema, and adopting new prefixes +DO $$ +DECLARE q_row RECORD; +DECLARE part TEXT; +BEGIN + FOR q_row IN (SELECT queue_name, is_partitioned FROM pgmq.meta) + LOOP + EXECUTE FORMAT('ALTER TABLE public.pgmq_%s SET SCHEMA pgmq', q_row.queue_name); + EXECUTE FORMAT('ALTER TABLE public.pgmq_%s_archive SET SCHEMA pgmq', q_row.queue_name); + EXECUTE FORMAT('ALTER SEQUENCE pgmq.pgmq_%s_msg_id_seq RENAME TO q_%s_msg_id_seq', q_row.queue_name, q_row.queue_name); + EXECUTE FORMAT('ALTER TABLE pgmq.pgmq_%s RENAME TO q_%s', q_row.queue_name, q_row.queue_name); + EXECUTE FORMAT('ALTER TABLE pgmq.pgmq_%s_archive RENAME TO a_%s', q_row.queue_name, q_row.queue_name); + EXECUTE FORMAT('ALTER SEQUENCE pgmq.pgmq_%s_archive_msg_id_seq RENAME TO a_%s_msg_id_seq', q_row.queue_name, q_row.queue_name); + IF q_row.is_partitioned THEN + UPDATE part_config + SET + parent_table = FORMAT('pgmq.q_%s', q_row.queue_name), + template_table = FORMAT('pgmq.template_pgmq_q_%s', q_row.queue_name) + WHERE parent_table = FORMAT('public.pgmq_%s', q_row.queue_name); + EXECUTE FORMAT('ALTER TABLE public.template_public_pgmq_%s SET SCHEMA pgmq', q_row.queue_name); + EXECUTE FORMAT('ALTER TABLE pgmq.template_public_pgmq_%s RENAME TO template_q_%s', q_row.queue_name, q_row.queue_name); + FOR part in (SELECT partition_tablename from show_partitions(FORMAT('pgmq.q_%s', q_row.queue_name))) + LOOP + EXECUTE FORMAT('ALTER TABLE public.%s SET SCHEMA pgmq', part); + END LOOP; + END IF; + END LOOP; +END $$; + +-- Create all functions in pgmq schema. This was copied from generated pgrx schema. +-- src/api.rs:432 +-- pgmq::api::set_vt +CREATE FUNCTION pgmq."set_vt"( + "queue_name" TEXT, /* &str */ + "msg_id" bigint, /* i64 */ + "vt_offset" INT /* i32 */ +) RETURNS TABLE ( + "msg_id" bigint, /* i64 */ + "read_ct" INT, /* i32 */ + "enqueued_at" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ + "vt" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ + "message" jsonb /* pgrx::datum::json::JsonB */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_set_vt_wrapper'; + +-- src/api.rs:81 +-- pgmq::api::send_batch +CREATE FUNCTION pgmq."send_batch"( + "queue_name" TEXT, /* &str */ + "messages" jsonb[], /* alloc::vec::Vec */ + "delay" INT DEFAULT 0 /* i32 */ +) RETURNS TABLE ( + "msg_id" bigint /* i64 */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_send_batch_wrapper'; + +-- src/api.rs:165 +-- pgmq::api::send +CREATE FUNCTION pgmq."send"( + "queue_name" TEXT, /* &str */ + "message" jsonb, /* pgrx::datum::json::JsonB */ + "delay" INT DEFAULT 0 /* i32 */ +) RETURNS bigint /* core::result::Result, pgmq::errors::PgmqExtError> */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_send_wrapper'; + +-- src/api.rs:201 +-- pgmq::api::read_with_poll +CREATE FUNCTION pgmq."read_with_poll"( + "queue_name" TEXT, /* &str */ + "vt" INT, /* i32 */ + "limit" INT, /* i32 */ + "poll_timeout_s" INT DEFAULT 5, /* i32 */ + "poll_interval_ms" INT DEFAULT 250 /* i32 */ +) RETURNS TABLE ( + "msg_id" bigint, /* i64 */ + "read_ct" INT, /* i32 */ + "enqueued_at" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ + "vt" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ + "message" jsonb /* pgrx::datum::json::JsonB */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_read_with_poll_wrapper'; + +-- src/api.rs:179 +-- pgmq::api::read +CREATE FUNCTION pgmq."read"( + "queue_name" TEXT, /* &str */ + "vt" INT, /* i32 */ + "limit" INT /* i32 */ +) RETURNS TABLE ( + "msg_id" bigint, /* i64 */ + "read_ct" INT, /* i32 */ + "enqueued_at" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ + "vt" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ + "message" jsonb /* pgrx::datum::json::JsonB */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_read_wrapper'; + +-- src/api.rs:102 +-- pgmq::api::purge_queue +CREATE FUNCTION pgmq."purge_queue"( + "queue_name" TEXT /* alloc::string::String */ +) RETURNS bigint /* core::result::Result */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_purge_queue_wrapper'; + +-- src/api.rs:371 +-- pgmq::api::pop +CREATE FUNCTION pgmq."pop"( + "queue_name" TEXT /* &str */ +) RETURNS TABLE ( + "msg_id" bigint, /* i64 */ + "read_ct" INT, /* i32 */ + "enqueued_at" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ + "vt" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ + "message" jsonb /* pgrx::datum::json::JsonB */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_pop_wrapper'; + +-- src/metrics.rs:39 +-- pgmq::metrics::metrics_all +CREATE FUNCTION pgmq."metrics_all"() RETURNS TABLE ( + "queue_name" TEXT, /* alloc::string::String */ + "queue_length" bigint, /* i64 */ + "newest_msg_age_sec" INT, /* core::option::Option */ + "oldest_msg_age_sec" INT, /* core::option::Option */ + "total_messages" bigint, /* i64 */ + "scrape_time" timestamp with time zone /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_metrics_all_wrapper'; + +-- src/metrics.rs:18 +-- pgmq::metrics::metrics +CREATE FUNCTION pgmq."metrics"( + "queue_name" TEXT /* &str */ +) RETURNS TABLE ( + "queue_name" TEXT, /* alloc::string::String */ + "queue_length" bigint, /* i64 */ + "newest_msg_age_sec" INT, /* core::option::Option */ + "oldest_msg_age_sec" INT, /* core::option::Option */ + "total_messages" bigint, /* i64 */ + "scrape_time" timestamp with time zone /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_metrics_wrapper'; + +-- src/api.rs:49 +-- pgmq::api::list_queues +CREATE FUNCTION pgmq."list_queues"() RETURNS TABLE ( + "queue_name" TEXT, /* alloc::string::String */ + "created_at" timestamp with time zone /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_list_queues_wrapper'; + +-- src/api.rs:20 +-- pgmq::api::drop_queue +CREATE FUNCTION pgmq."drop_queue"( + "queue_name" TEXT, /* alloc::string::String */ + "partitioned" bool DEFAULT false /* bool */ +) RETURNS bool /* core::result::Result */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_drop_queue_wrapper'; + +-- src/api.rs:281 +-- pgmq::api::delete +CREATE FUNCTION pgmq."delete"( + "queue_name" TEXT, /* &str */ + "msg_ids" bigint[] /* alloc::vec::Vec */ +) RETURNS TABLE ( + "delete" bool /* bool */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_delete_batch_wrapper'; + +-- src/api.rs:276 +-- pgmq::api::delete +CREATE FUNCTION pgmq."delete"( + "queue_name" TEXT, /* &str */ + "msg_id" bigint /* i64 */ +) RETURNS bool /* core::result::Result, pgmq::errors::PgmqExtError> */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_delete_wrapper'; + +-- src/api.rs:123 +-- pgmq::api::create_partitioned +CREATE FUNCTION pgmq."create_partitioned"( + "queue_name" TEXT, /* &str */ + "partition_interval" TEXT DEFAULT '10000', /* alloc::string::String */ + "retention_interval" TEXT DEFAULT '100000' /* alloc::string::String */ +) RETURNS VOID /* core::result::Result<(), pgmq::errors::PgmqExtError> */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_create_partitioned_wrapper'; + +-- src/api.rs:111 +-- pgmq::api::create_non_partitioned +CREATE FUNCTION pgmq."create_non_partitioned"( + "queue_name" TEXT /* &str */ +) RETURNS VOID /* core::result::Result<(), pgmq::errors::PgmqExtError> */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_create_non_partitioned_wrapper'; + +-- src/api.rs:151 +-- pgmq::api::create +CREATE FUNCTION pgmq."create"( + "queue_name" TEXT /* &str */ +) RETURNS VOID /* core::result::Result<(), pgmq::errors::PgmqExtError> */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_create_wrapper'; + +-- src/api.rs:328 +-- pgmq::api::archive +CREATE FUNCTION pgmq."archive"( + "queue_name" TEXT, /* &str */ + "msg_ids" bigint[] /* alloc::vec::Vec */ +) RETURNS TABLE ( + "archive" bool /* bool */ +) +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_archive_batch_wrapper'; + +-- src/api.rs:323 +-- pgmq::api::archive +CREATE FUNCTION pgmq."archive"( + "queue_name" TEXT, /* &str */ + "msg_id" bigint /* i64 */ +) RETURNS bool /* core::result::Result, pgmq::errors::PgmqExtError> */ +STRICT +LANGUAGE c /* Rust */ +AS 'MODULE_PATHNAME', 'pgmq_archive_wrapper'; diff --git a/src/api.rs b/src/api.rs index 1c47637e..ca3b5727 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,16 +1,22 @@ use pgrx::prelude::*; use pgrx::spi; use pgrx::spi::SpiTupleTable; +use pgrx::warning; use crate::errors::PgmqExtError; +use crate::partition; use crate::partition::PARTMAN_SCHEMA; use crate::util; + use pgmq_core::{ - query::{destroy_queue, enqueue, purge_queue}, - types::{PGMQ_SCHEMA, TABLE_PREFIX}, + query::{destroy_queue, enqueue, init_queue}, + types::{PGMQ_SCHEMA, QUEUE_PREFIX}, + util::check_input, }; -#[pg_extern] +use std::time::Duration; + +#[pg_extern(name = "drop_queue")] fn pgmq_drop_queue( queue_name: String, partitioned: default!(bool, false), @@ -25,7 +31,7 @@ pub fn delete_queue(queue_name: String, partitioned: bool) -> Result<(), PgmqExt // this should go out before 1.0 let mut queries = destroy_queue(&queue_name)?; if partitioned { - let queue_table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue_name}"); + let queue_table = format!("{PGMQ_SCHEMA}.{QUEUE_PREFIX}_{queue_name}"); queries.push(format!( "DELETE FROM {PARTMAN_SCHEMA}.part_config where parent_table = '{queue_table}';" )) @@ -39,7 +45,7 @@ pub fn delete_queue(queue_name: String, partitioned: bool) -> Result<(), PgmqExt Ok(()) } -#[pg_extern] +#[pg_extern(name = "list_queues")] fn pgmq_list_queues() -> Result< TableIterator< 'static, @@ -56,7 +62,7 @@ fn pgmq_list_queues() -> Result< pub fn listit() -> Result, spi::Error> { let mut results: Vec<(String, TimestampWithTimeZone)> = Vec::new(); - let query = format!("SELECT * FROM {PGMQ_SCHEMA}.pgmq_meta"); + let query = format!("SELECT * FROM {PGMQ_SCHEMA}.meta"); let _: Result<(), spi::Error> = Spi::connect(|client| { let tup_table: SpiTupleTable = client.select(&query, None, None)?; for row in tup_table { @@ -71,7 +77,7 @@ pub fn listit() -> Result, spi::Error> { Ok(results) } -#[pg_extern] +#[pg_extern(name = "send_batch")] fn pgmq_send_batch( queue_name: &str, messages: Vec, @@ -92,11 +98,616 @@ fn pgmq_send_batch( Ok(TableIterator::new(results)) } -#[pg_extern] +#[pg_extern(name = "purge_queue")] fn pgmq_purge_queue(queue_name: String) -> Result { Spi::connect(|mut client| { - let query = purge_queue(&queue_name)?; + let query = pgmq_core::query::purge_queue(&queue_name)?; let tup_table = client.update(query.as_str(), None, None)?; Ok(tup_table.len() as i64) }) } + +#[pg_extern(name = "create_non_partitioned")] +fn pgmq_create_non_partitioned(queue_name: &str) -> Result<(), PgmqExtError> { + let setup = init_queue(queue_name)?; + let ran: Result<_, spi::Error> = Spi::connect(|mut c| { + for q in setup { + let _ = c.update(&q, None, None)?; + } + Ok(()) + }); + Ok(ran?) +} + +#[pg_extern(name = "create_partitioned")] +fn pgmq_create_partitioned( + queue_name: &str, + partition_interval: default!(String, "'10000'"), + retention_interval: default!(String, "'100000'"), +) -> Result<(), PgmqExtError> { + // validate pg_partman is installed + match Spi::get_one::(&partition::partman_installed())? + .expect("could not query extensions table") + { + true => (), + false => { + warning!("pg_partman not installed. Install https://github.com/pgpartman/pg_partman and then run `CREATE EXTENSION pg_partman;`"); + return Err(PgmqExtError::MissingDependency("pg_partman".to_owned())); + } + }; + validate_same_type(&partition_interval, &retention_interval)?; + let setup = + partition::init_partitioned_queue(queue_name, &partition_interval, &retention_interval)?; + let ran: Result<_, spi::Error> = Spi::connect(|mut c| { + for q in setup { + let _ = c.update(&q, None, None)?; + } + Ok(()) + }); + Ok(ran?) +} + +#[pg_extern(name = "create")] +fn pgmq_create(queue_name: &str) -> Result<(), PgmqExtError> { + pgmq_create_non_partitioned(queue_name) +} + +pub fn validate_same_type(a: &str, b: &str) -> Result<(), PgmqExtError> { + // either both can be ints, or not not ints + match (a.parse::(), b.parse::()) { + (Ok(_), Ok(_)) => Ok(()), + (Err(_), Err(_)) => Ok(()), + _ => Err(PgmqExtError::TypeErrorError("".to_owned())), + } +} + +#[pg_extern(name = "send")] +fn pgmq_send( + queue_name: &str, + message: pgrx::JsonB, + delay: default!(i32, 0), +) -> Result, PgmqExtError> { + let delay = util::delay_to_u64(delay)?; + let query = enqueue(queue_name, &[message.0], &delay)?; + Spi::connect(|mut client| { + let tup_table: SpiTupleTable = client.update(&query, None, None)?; + Ok(tup_table.first().get_one::()?) + }) +} + +#[pg_extern(name = "read")] +fn pgmq_read( + queue_name: &str, + vt: i32, + limit: i32, +) -> Result< + TableIterator< + 'static, + ( + name!(msg_id, i64), + name!(read_ct, i32), + name!(enqueued_at, TimestampWithTimeZone), + name!(vt, TimestampWithTimeZone), + name!(message, pgrx::JsonB), + ), + >, + spi::Error, +> { + let results = readit(queue_name, vt, limit)?; + Ok(TableIterator::new(results)) +} + +#[pg_extern(name = "read_with_poll")] +fn pgmq_read_with_poll( + queue_name: &str, + vt: i32, + limit: i32, + poll_timeout_s: default!(i32, 5), + poll_interval_ms: default!(i32, 250), +) -> Result< + TableIterator< + 'static, + ( + name!(msg_id, i64), + name!(read_ct, i32), + name!(enqueued_at, TimestampWithTimeZone), + name!(vt, TimestampWithTimeZone), + name!(message, pgrx::JsonB), + ), + >, + spi::Error, +> { + let start_time = std::time::Instant::now(); + let poll_timeout_ms = (poll_timeout_s * 1000) as u128; + loop { + let results = readit(queue_name, vt, limit)?; + if results.is_empty() && start_time.elapsed().as_millis() < poll_timeout_ms { + std::thread::sleep(Duration::from_millis(poll_interval_ms.try_into().unwrap())); + continue; + } else { + break Ok(TableIterator::new(results)); + } + } +} + +fn readit( + queue_name: &str, + vt: i32, + limit: i32, +) -> Result< + Vec<( + i64, + i32, + TimestampWithTimeZone, + TimestampWithTimeZone, + pgrx::JsonB, + )>, + spi::Error, +> { + let mut results: Vec<( + i64, + i32, + TimestampWithTimeZone, + TimestampWithTimeZone, + pgrx::JsonB, + )> = Vec::new(); + + let _: Result<(), PgmqExtError> = Spi::connect(|mut client| { + let query = pgmq_core::query::read(queue_name, vt, limit)?; + let tup_table: SpiTupleTable = client.update(&query, None, None)?; + results.reserve_exact(tup_table.len()); + + for row in tup_table { + let msg_id = row["msg_id"].value::()?.expect("no msg_id"); + let read_ct = row["read_ct"].value::()?.expect("no read_ct"); + let vt = row["vt"].value::()?.expect("no vt"); + let enqueued_at = row["enqueued_at"] + .value::()? + .expect("no enqueue time"); + let message = row["message"].value::()?.expect("no message"); + results.push((msg_id, read_ct, enqueued_at, vt, message)); + } + Ok(()) + }); + Ok(results) +} + +#[pg_extern(name = "delete")] +fn pgmq_delete(queue_name: &str, msg_id: i64) -> Result, PgmqExtError> { + pgmq_delete_batch(queue_name, vec![msg_id]).map(|mut iter| iter.next().map(|b| b.0)) +} + +#[pg_extern(name = "delete")] +fn pgmq_delete_batch( + queue_name: &str, + msg_ids: Vec, +) -> Result, PgmqExtError> { + let query = pgmq_core::query::delete_batch(queue_name)?; + + let mut deleted: Vec = Vec::new(); + let _: Result<(), spi::Error> = Spi::connect(|mut client| { + let tup_table = client.update( + &query, + None, + Some(vec![( + PgBuiltInOids::INT8ARRAYOID.oid(), + msg_ids.clone().into_datum(), + )]), + )?; + + deleted.reserve_exact(tup_table.len()); + + for row in tup_table { + let msg_id = row["msg_id"].value::()?.expect("no msg_id"); + deleted.push(msg_id); + } + Ok(()) + }); + + let results = msg_ids + .iter() + .map(|msg_id| { + if deleted.contains(msg_id) { + (true,) + } else { + (false,) + } + }) + .collect::>(); + + Ok(TableIterator::new(results)) +} + +/// archive a message forever instead of deleting it +#[pg_extern(name = "archive")] +fn pgmq_archive(queue_name: &str, msg_id: i64) -> Result, PgmqExtError> { + pgmq_archive_batch(queue_name, vec![msg_id]).map(|mut iter| iter.next().map(|b| b.0)) +} + +#[pg_extern(name = "archive")] +fn pgmq_archive_batch( + queue_name: &str, + msg_ids: Vec, +) -> Result, PgmqExtError> { + let query = pgmq_core::query::archive_batch(queue_name)?; + + let mut archived: Vec = Vec::new(); + + let _: Result<(), spi::Error> = Spi::connect(|mut client| { + let tup_table: SpiTupleTable = client.update( + &query, + None, + Some(vec![( + PgBuiltInOids::INT8ARRAYOID.oid(), + msg_ids.clone().into_datum(), + )]), + )?; + + archived.reserve_exact(tup_table.len()); + + for row in tup_table { + let msg_id = row["msg_id"].value::()?.expect("no msg_id"); + archived.push(msg_id); + } + Ok(()) + }); + + let results = msg_ids + .iter() + .map(|msg_id| { + if archived.contains(&msg_id) { + (true,) + } else { + (false,) + } + }) + .collect::>(); + + Ok(TableIterator::new(results)) +} + +// reads and deletes at same time +#[pg_extern(name = "pop")] +fn pgmq_pop( + queue_name: &str, +) -> Result< + TableIterator< + 'static, + ( + name!(msg_id, i64), + name!(read_ct, i32), + name!(enqueued_at, TimestampWithTimeZone), + name!(vt, TimestampWithTimeZone), + name!(message, pgrx::JsonB), + ), + >, + PgmqExtError, +> { + let results = popit(queue_name)?; + Ok(TableIterator::new(results)) +} + +fn popit( + queue_name: &str, +) -> Result< + Vec<( + i64, + i32, + TimestampWithTimeZone, + TimestampWithTimeZone, + pgrx::JsonB, + )>, + PgmqExtError, +> { + let mut results: Vec<( + i64, + i32, + TimestampWithTimeZone, + TimestampWithTimeZone, + pgrx::JsonB, + )> = Vec::new(); + let _: Result<(), PgmqExtError> = Spi::connect(|mut client| { + let query = pgmq_core::query::pop(queue_name)?; + let tup_table: SpiTupleTable = client.update(&query, None, None)?; + results.reserve_exact(tup_table.len()); + for row in tup_table { + let msg_id = row["msg_id"].value::()?.expect("no msg_id"); + let read_ct = row["read_ct"].value::()?.expect("no read_ct"); + let vt = row["vt"].value::()?.expect("no vt"); + let enqueued_at = row["enqueued_at"] + .value::()? + .expect("no enqueue time"); + let message = row["message"].value::()?.expect("no message"); + results.push((msg_id, read_ct, enqueued_at, vt, message)); + } + Ok(()) + }); + Ok(results) +} + +/// change the visibility time on an existing message +/// vt_offset is a time relative to now that the message will be visible +/// accepts positive or negative integers +#[pg_extern(name = "set_vt")] +fn pgmq_set_vt( + queue_name: &str, + msg_id: i64, + vt_offset: i32, +) -> Result< + TableIterator< + 'static, + ( + name!(msg_id, i64), + name!(read_ct, i32), + name!(enqueued_at, TimestampWithTimeZone), + name!(vt, TimestampWithTimeZone), + name!(message, pgrx::JsonB), + ), + >, + PgmqExtError, +> { + check_input(queue_name)?; + let mut results: Vec<( + i64, + i32, + TimestampWithTimeZone, + TimestampWithTimeZone, + pgrx::JsonB, + )> = Vec::new(); + + let query = format!( + " + UPDATE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{queue_name} + SET vt = (now() + interval '{vt_offset} seconds') + WHERE msg_id = $1 + RETURNING *; + " + ); + let args = vec![(PgBuiltInOids::INT8OID.oid(), msg_id.into_datum())]; + let res: Result<(), spi::Error> = Spi::connect(|mut client| { + let tup_table: SpiTupleTable = client.update(&query, None, Some(args))?; + for row in tup_table { + let msg_id = row["msg_id"].value::()?.expect("no msg_id"); + let read_ct = row["read_ct"].value::()?.expect("no read_ct"); + let vt = row["vt"].value::()?.expect("no vt"); + let enqueued_at = row["enqueued_at"] + .value::()? + .expect("no enqueue time"); + let message = row["message"].value::()?.expect("no message"); + results.push((msg_id, read_ct, enqueued_at, vt, message)); + } + Ok(()) + }); + res?; + Ok(TableIterator::new(results)) +} + +#[cfg(any(test, feature = "pg_test"))] +#[pg_schema] +mod tests { + use super::*; + use pgmq_core::types::ARCHIVE_PREFIX; + + #[pg_test] + fn test_create_non_partitioned() { + let qname = r#"test_queue"#; + let _ = pgmq_create_non_partitioned(&qname).unwrap(); + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 0); + let _ = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":"y"})), 0).unwrap(); + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 1); + } + + // assert an invisible message is not readable + #[pg_test] + fn test_default() { + let qname = r#"test_default"#; + let _ = pgmq_create_non_partitioned(&qname); + let init_count = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + // should not be any messages initially + assert_eq!(init_count.unwrap(), 0); + + // put a message on the queue + let _ = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":"y"})), 0); + + // read the message with the pg_extern, sets message invisible + let _ = pgmq_read(&qname, 10_i32, 1_i32); + // but still one record on the table + let init_count = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(init_count.unwrap(), 1); + + // pop the message, must not panic + let popped = pgmq_pop(&qname); + assert!(popped.is_ok()); + } + + // validate all internal functions + + /// lifecycle test for partitioned queues + #[pg_test] + fn test_partitioned() { + let qname = r#"test_internal"#; + + let partition_interval = "2".to_owned(); + let retention_interval = "2".to_owned(); + + let _ = + Spi::run("DROP EXTENSION IF EXISTS pg_partman").expect("Failed dropping pg_partman"); + + let failed = pgmq_create_partitioned( + &qname, + partition_interval.clone(), + retention_interval.clone(), + ); + assert!(failed.is_err()); + + let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman") + .expect("Failed creating pg_partman"); + let _ = pgmq_create_partitioned(&qname, partition_interval, retention_interval).unwrap(); + + let queues = listit().unwrap(); + assert_eq!(queues.len(), 1); + + // put two message on the queue + let msg_id1 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":1})), 0) + .unwrap() + .unwrap(); + let msg_id2 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":2})), 0) + .unwrap() + .unwrap(); + assert_eq!(msg_id1, 1); + assert_eq!(msg_id2, 2); + + // read first message + let msg1 = readit(&qname, 1_i32, 1_i32).unwrap(); + // pop the second message + let msg2 = popit(&qname).unwrap(); + assert_eq!(msg1.len(), 1); + assert_eq!(msg2.len(), 1); + assert_eq!(msg1[0].0, msg_id1); + assert_eq!(msg2[0].0, msg_id2); + + // read again, should be no messages + let nothing = readit(&qname, 2_i32, 1_i32).unwrap(); + assert_eq!(nothing.len(), 0); + + // but still one record on the table + let init_count = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(init_count.unwrap(), 1); + + // delete the messages + let delete1 = pgmq_delete(&qname, msg_id1).unwrap().unwrap(); + assert!(delete1); + + // delete when message is gone returns False + let delete1 = pgmq_delete(&qname, msg_id1).unwrap().unwrap(); + assert!(!delete1); + + // no records after delete + let init_count = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(init_count.unwrap(), 0); + } + + #[pg_test] + fn test_archive() { + let qname = r#"test_archive"#; + let _ = pgmq_create_non_partitioned(&qname).unwrap(); + // no messages in the queue + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 0); + // no messages in queue archive + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 0); + // put a message on the queue + let msg_id = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":"y"})), 0).unwrap(); + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 1); + + // archive the message + let archived = pgmq_archive(&qname, msg_id.unwrap()).unwrap().unwrap(); + assert!(archived); + // should be no messages left on the queue table + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 0); + // but one on the archive table + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 1); + } + + #[pg_test] + fn test_archive_batch() { + let qname = r#"test_archive_batch"#; + let _ = pgmq_create_non_partitioned(&qname).unwrap(); + // no messages in the queue + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 0); + // no messages in queue archive + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 0); + // put messages on the queue + let msg_id1 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":1})), 0) + .unwrap() + .unwrap(); + let msg_id2 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":2})), 0) + .unwrap() + .unwrap(); + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 2); + + // archive the message. The first two exist so should return true, the + // last one doesn't so should return false. + let mut archived = pgmq_archive_batch(&qname, vec![msg_id1, msg_id2, -1]).unwrap(); + assert!(archived.next().unwrap().0); + assert!(archived.next().unwrap().0); + assert!(!archived.next().unwrap().0); + + // should be no messages left on the queue table + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 0); + // but two on the archive table + let retval = Spi::get_one::(&format!( + "SELECT count(*) FROM {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{qname}" + )) + .expect("SQL select failed"); + assert_eq!(retval.unwrap(), 2); + } + + #[pg_test] + fn test_validate_same_type() { + let invalid = validate_same_type("10", "daily"); + assert!(invalid.is_err()); + let invalid = validate_same_type("daily", "10"); + assert!(invalid.is_err()); + + let valid = validate_same_type("10", "10"); + assert!(valid.is_ok()); + let valid = validate_same_type("daily", "weekly"); + assert!(valid.is_ok()); + } +} diff --git a/src/lib.rs b/src/lib.rs index e2370350..d196a953 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,3 @@ -use pgrx::prelude::*; -use pgrx::spi; -use pgrx::spi::SpiTupleTable; -use pgrx::warning; - pgrx::pg_module_magic!(); pub mod api; @@ -11,673 +6,9 @@ pub mod metrics; pub mod partition; pub mod util; -use pgmq_core::{ - query::{archive_batch, delete_batch, enqueue, init_queue, pop, read}, - types::TABLE_PREFIX, - util::check_input, -}; - -use errors::PgmqExtError; -use std::time::Duration; - -extension_sql!( - " -CREATE TABLE public.pgmq_meta ( - queue_name VARCHAR UNIQUE NOT NULL, - is_partitioned BOOLEAN NOT NULL, - created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL -); - -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 - WHERE has_table_privilege('pg_monitor', 'public.pgmq_meta', 'SELECT') - ) THEN - EXECUTE 'GRANT SELECT ON public.pgmq_meta TO pg_monitor'; - END IF; -END; -$$ LANGUAGE plpgsql; -", - name = "bootstrap" -); - -#[pg_extern] -fn pgmq_create_non_partitioned(queue_name: &str) -> Result<(), PgmqExtError> { - let setup = init_queue(queue_name)?; - let ran: Result<_, spi::Error> = Spi::connect(|mut c| { - for q in setup { - let _ = c.update(&q, None, None)?; - } - Ok(()) - }); - Ok(ran?) -} - -#[pg_extern] -fn pgmq_create_partitioned( - queue_name: &str, - partition_interval: default!(String, "'10000'"), - retention_interval: default!(String, "'100000'"), -) -> Result<(), PgmqExtError> { - // validate pg_partman is installed - match Spi::get_one::(&partition::partman_installed())? - .expect("could not query extensions table") - { - true => (), - false => { - warning!("pg_partman not installed. Install https://github.com/pgpartman/pg_partman and then run `CREATE EXTENSION pg_partman;`"); - return Err(PgmqExtError::MissingDependency("pg_partman".to_owned())); - } - }; - validate_same_type(&partition_interval, &retention_interval)?; - let setup = - partition::init_partitioned_queue(queue_name, &partition_interval, &retention_interval)?; - let ran: Result<_, spi::Error> = Spi::connect(|mut c| { - for q in setup { - let _ = c.update(&q, None, None)?; - } - Ok(()) - }); - Ok(ran?) -} - -#[pg_extern] -fn pgmq_create(queue_name: &str) -> Result<(), PgmqExtError> { - pgmq_create_non_partitioned(queue_name) -} - -fn validate_same_type(a: &str, b: &str) -> Result<(), PgmqExtError> { - // either both can be ints, or not not ints - match (a.parse::(), b.parse::()) { - (Ok(_), Ok(_)) => Ok(()), - (Err(_), Err(_)) => Ok(()), - _ => Err(PgmqExtError::TypeErrorError("".to_owned())), - } -} - -#[pg_extern] -fn pgmq_send( - queue_name: &str, - message: pgrx::JsonB, - delay: default!(i32, 0), -) -> Result, PgmqExtError> { - let delay = util::delay_to_u64(delay)?; - let query = enqueue(queue_name, &[message.0], &delay)?; - Spi::connect(|mut client| { - let tup_table: SpiTupleTable = client.update(&query, None, None)?; - Ok(tup_table.first().get_one::()?) - }) -} - -#[pg_extern] -fn pgmq_read( - queue_name: &str, - vt: i32, - limit: i32, -) -> Result< - TableIterator< - 'static, - ( - name!(msg_id, i64), - name!(read_ct, i32), - name!(enqueued_at, TimestampWithTimeZone), - name!(vt, TimestampWithTimeZone), - name!(message, pgrx::JsonB), - ), - >, - spi::Error, -> { - let results = readit(queue_name, vt, limit)?; - Ok(TableIterator::new(results)) -} - -#[pg_extern] -fn pgmq_read_with_poll( - queue_name: &str, - vt: i32, - limit: i32, - poll_timeout_s: default!(i32, 5), - poll_interval_ms: default!(i32, 250), -) -> Result< - TableIterator< - 'static, - ( - name!(msg_id, i64), - name!(read_ct, i32), - name!(enqueued_at, TimestampWithTimeZone), - name!(vt, TimestampWithTimeZone), - name!(message, pgrx::JsonB), - ), - >, - spi::Error, -> { - let start_time = std::time::Instant::now(); - let poll_timeout_ms = (poll_timeout_s * 1000) as u128; - loop { - let results = readit(queue_name, vt, limit)?; - if results.is_empty() && start_time.elapsed().as_millis() < poll_timeout_ms { - std::thread::sleep(Duration::from_millis(poll_interval_ms.try_into().unwrap())); - continue; - } else { - break Ok(TableIterator::new(results)); - } - } -} - -fn readit( - queue_name: &str, - vt: i32, - limit: i32, -) -> Result< - Vec<( - i64, - i32, - TimestampWithTimeZone, - TimestampWithTimeZone, - pgrx::JsonB, - )>, - spi::Error, -> { - let mut results: Vec<( - i64, - i32, - TimestampWithTimeZone, - TimestampWithTimeZone, - pgrx::JsonB, - )> = Vec::new(); - - let _: Result<(), PgmqExtError> = Spi::connect(|mut client| { - let query = read(queue_name, vt, limit)?; - let tup_table: SpiTupleTable = client.update(&query, None, None)?; - results.reserve_exact(tup_table.len()); - - for row in tup_table { - let msg_id = row["msg_id"].value::()?.expect("no msg_id"); - let read_ct = row["read_ct"].value::()?.expect("no read_ct"); - let vt = row["vt"].value::()?.expect("no vt"); - let enqueued_at = row["enqueued_at"] - .value::()? - .expect("no enqueue time"); - let message = row["message"].value::()?.expect("no message"); - results.push((msg_id, read_ct, enqueued_at, vt, message)); - } - Ok(()) - }); - Ok(results) -} - -#[pg_extern] -fn pgmq_delete(queue_name: &str, msg_id: i64) -> Result, PgmqExtError> { - pgmq_delete_batch(queue_name, vec![msg_id]).map(|mut iter| iter.next().map(|b| b.0)) -} - -#[pg_extern(name = "pgmq_delete")] -fn pgmq_delete_batch( - queue_name: &str, - msg_ids: Vec, -) -> Result, PgmqExtError> { - let query = delete_batch(queue_name)?; - - let mut deleted: Vec = Vec::new(); - let _: Result<(), spi::Error> = Spi::connect(|mut client| { - let tup_table = client.update( - &query, - None, - Some(vec![( - PgBuiltInOids::INT8ARRAYOID.oid(), - msg_ids.clone().into_datum(), - )]), - )?; - - deleted.reserve_exact(tup_table.len()); - - for row in tup_table { - let msg_id = row["msg_id"].value::()?.expect("no msg_id"); - deleted.push(msg_id); - } - Ok(()) - }); - - let results = msg_ids - .iter() - .map(|msg_id| { - if deleted.contains(msg_id) { - (true,) - } else { - (false,) - } - }) - .collect::>(); - - Ok(TableIterator::new(results)) -} - -/// archive a message forever instead of deleting it -#[pg_extern] -fn pgmq_archive(queue_name: &str, msg_id: i64) -> Result, PgmqExtError> { - pgmq_archive_batch(queue_name, vec![msg_id]).map(|mut iter| iter.next().map(|b| b.0)) -} - -#[pg_extern(name = "pgmq_archive")] -fn pgmq_archive_batch( - queue_name: &str, - msg_ids: Vec, -) -> Result, PgmqExtError> { - let query = archive_batch(queue_name)?; - - let mut archived: Vec = Vec::new(); - - let _: Result<(), spi::Error> = Spi::connect(|mut client| { - let tup_table: SpiTupleTable = client.update( - &query, - None, - Some(vec![( - PgBuiltInOids::INT8ARRAYOID.oid(), - msg_ids.clone().into_datum(), - )]), - )?; - - archived.reserve_exact(tup_table.len()); - - for row in tup_table { - let msg_id = row["msg_id"].value::()?.expect("no msg_id"); - archived.push(msg_id); - } - Ok(()) - }); - - let results = msg_ids - .iter() - .map(|msg_id| { - if archived.contains(&msg_id) { - (true,) - } else { - (false,) - } - }) - .collect::>(); - - Ok(TableIterator::new(results)) -} - -// reads and deletes at same time -#[pg_extern] -fn pgmq_pop( - queue_name: &str, -) -> Result< - TableIterator< - 'static, - ( - name!(msg_id, i64), - name!(read_ct, i32), - name!(enqueued_at, TimestampWithTimeZone), - name!(vt, TimestampWithTimeZone), - name!(message, pgrx::JsonB), - ), - >, - PgmqExtError, -> { - let results = popit(queue_name)?; - Ok(TableIterator::new(results)) -} - -fn popit( - queue_name: &str, -) -> Result< - Vec<( - i64, - i32, - TimestampWithTimeZone, - TimestampWithTimeZone, - pgrx::JsonB, - )>, - PgmqExtError, -> { - let mut results: Vec<( - i64, - i32, - TimestampWithTimeZone, - TimestampWithTimeZone, - pgrx::JsonB, - )> = Vec::new(); - let _: Result<(), PgmqExtError> = Spi::connect(|mut client| { - let query = pop(queue_name)?; - let tup_table: SpiTupleTable = client.update(&query, None, None)?; - results.reserve_exact(tup_table.len()); - for row in tup_table { - let msg_id = row["msg_id"].value::()?.expect("no msg_id"); - let read_ct = row["read_ct"].value::()?.expect("no read_ct"); - let vt = row["vt"].value::()?.expect("no vt"); - let enqueued_at = row["enqueued_at"] - .value::()? - .expect("no enqueue time"); - let message = row["message"].value::()?.expect("no message"); - results.push((msg_id, read_ct, enqueued_at, vt, message)); - } - Ok(()) - }); - Ok(results) -} - -/// change the visibility time on an existing message -/// vt_offset is a time relative to now that the message will be visible -/// accepts positive or negative integers -#[pg_extern] -fn pgmq_set_vt( - queue_name: &str, - msg_id: i64, - vt_offset: i32, -) -> Result< - TableIterator< - 'static, - ( - name!(msg_id, i64), - name!(read_ct, i32), - name!(enqueued_at, TimestampWithTimeZone), - name!(vt, TimestampWithTimeZone), - name!(message, pgrx::JsonB), - ), - >, - PgmqExtError, -> { - check_input(queue_name)?; - let mut results: Vec<( - i64, - i32, - TimestampWithTimeZone, - TimestampWithTimeZone, - pgrx::JsonB, - )> = Vec::new(); - - let query = format!( - " - UPDATE {TABLE_PREFIX}_{queue_name} - SET vt = (now() + interval '{vt_offset} seconds') - WHERE msg_id = $1 - RETURNING *; - " - ); - let args = vec![(PgBuiltInOids::INT8OID.oid(), msg_id.into_datum())]; - let res: Result<(), spi::Error> = Spi::connect(|mut client| { - let tup_table: SpiTupleTable = client.update(&query, None, Some(args))?; - for row in tup_table { - let msg_id = row["msg_id"].value::()?.expect("no msg_id"); - let read_ct = row["read_ct"].value::()?.expect("no read_ct"); - let vt = row["vt"].value::()?.expect("no vt"); - let enqueued_at = row["enqueued_at"] - .value::()? - .expect("no enqueue time"); - let message = row["message"].value::()?.expect("no message"); - results.push((msg_id, read_ct, enqueued_at, vt, message)); - } - Ok(()) - }); - res?; - Ok(TableIterator::new(results)) -} - -#[cfg(any(test, feature = "pg_test"))] -#[pg_schema] -mod tests { - use crate::*; - use pgmq_core::types::TABLE_PREFIX; - - #[pg_test] - fn test_create_non_partitioned() { - let qname = r#"test_queue"#; - let _ = pgmq_create_non_partitioned(&qname).unwrap(); - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 0); - let _ = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":"y"})), 0).unwrap(); - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 1); - } - - // assert an invisible message is not readable - #[pg_test] - fn test_default() { - let qname = r#"test_default"#; - let _ = pgmq_create_non_partitioned(&qname); - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - // should not be any messages initially - assert_eq!(init_count.unwrap(), 0); - - // put a message on the queue - let _ = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":"y"})), 0); - - // read the message with the pg_extern, sets message invisible - let _ = pgmq_read(&qname, 10_i32, 1_i32); - // but still one record on the table - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(init_count.unwrap(), 1); - - // pop the message, must not panic - let popped = pgmq_pop(&qname); - assert!(popped.is_ok()); - } - - // validate all internal functions - // e.g. readit, popit, listit - #[pg_test] - fn test_internal() { - let qname = r#"test_internal"#; - let _ = pgmq_create_non_partitioned(&qname).unwrap(); - - let queues = api::listit().unwrap(); - assert_eq!(queues.len(), 1); - - // put two message on the queue - let msg_id1 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":1})), 0) - .unwrap() - .unwrap(); - let msg_id2 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":2})), 0) - .unwrap() - .unwrap(); - assert_eq!(msg_id1, 1); - assert_eq!(msg_id2, 2); - - // read first message - let msg1 = readit(&qname, 1_i32, 1_i32).unwrap(); - // pop the second message - let msg2 = popit(&qname).unwrap(); - assert_eq!(msg1.len(), 1); - assert_eq!(msg2.len(), 1); - assert_eq!(msg1[0].0, msg_id1); - assert_eq!(msg2[0].0, msg_id2); - - // read again, should be no messages - let nothing = readit(&qname, 2_i32, 1_i32).unwrap(); - assert_eq!(nothing.len(), 0); - - // but still one record on the table - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(init_count.unwrap(), 1); - - // delete the messages - let delete1 = pgmq_delete(&qname, msg_id1).unwrap().unwrap(); - assert!(delete1); - - // delete when message is gone returns False - let delete1 = pgmq_delete(&qname, msg_id1).unwrap().unwrap(); - assert!(!delete1); - - // no records after delete - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(init_count.unwrap(), 0); - } - - /// lifecycle test for partitioned queues - #[pg_test] - fn test_partitioned() { - let qname = r#"test_internal"#; +use crate::errors::PgmqExtError; - let partition_interval = "2".to_owned(); - let retention_interval = "2".to_owned(); - - let _ = - Spi::run("DROP EXTENSION IF EXISTS pg_partman").expect("Failed dropping pg_partman"); - - let failed = pgmq_create_partitioned( - &qname, - partition_interval.clone(), - retention_interval.clone(), - ); - assert!(failed.is_err()); - - let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman") - .expect("Failed creating pg_partman"); - let _ = pgmq_create_partitioned(&qname, partition_interval, retention_interval).unwrap(); - - let queues = api::listit().unwrap(); - assert_eq!(queues.len(), 1); - - // put two message on the queue - let msg_id1 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":1})), 0) - .unwrap() - .unwrap(); - let msg_id2 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":2})), 0) - .unwrap() - .unwrap(); - assert_eq!(msg_id1, 1); - assert_eq!(msg_id2, 2); - - // read first message - let msg1 = readit(&qname, 1_i32, 1_i32).unwrap(); - // pop the second message - let msg2 = popit(&qname).unwrap(); - assert_eq!(msg1.len(), 1); - assert_eq!(msg2.len(), 1); - assert_eq!(msg1[0].0, msg_id1); - assert_eq!(msg2[0].0, msg_id2); - - // read again, should be no messages - let nothing = readit(&qname, 2_i32, 1_i32).unwrap(); - assert_eq!(nothing.len(), 0); - - // but still one record on the table - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(init_count.unwrap(), 1); - - // delete the messages - let delete1 = pgmq_delete(&qname, msg_id1).unwrap().unwrap(); - assert!(delete1); - - // delete when message is gone returns False - let delete1 = pgmq_delete(&qname, msg_id1).unwrap().unwrap(); - assert!(!delete1); - - // no records after delete - let init_count = - Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(init_count.unwrap(), 0); - } - - #[pg_test] - fn test_archive() { - let qname = r#"test_archive"#; - let _ = pgmq_create_non_partitioned(&qname).unwrap(); - // no messages in the queue - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 0); - // no messages in queue archive - let retval = Spi::get_one::(&format!( - "SELECT count(*) FROM {TABLE_PREFIX}_{qname}_archive" - )) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 0); - // put a message on the queue - let msg_id = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":"y"})), 0).unwrap(); - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 1); - - // archive the message - let archived = pgmq_archive(&qname, msg_id.unwrap()).unwrap().unwrap(); - assert!(archived); - // should be no messages left on the queue table - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 0); - // but one on the archive table - let retval = Spi::get_one::(&format!( - "SELECT count(*) FROM {TABLE_PREFIX}_{qname}_archive" - )) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 1); - } - - #[pg_test] - fn test_archive_batch() { - let qname = r#"test_archive_batch"#; - let _ = pgmq_create_non_partitioned(&qname).unwrap(); - // no messages in the queue - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 0); - // no messages in queue archive - let retval = Spi::get_one::(&format!( - "SELECT count(*) FROM {TABLE_PREFIX}_{qname}_archive" - )) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 0); - // put messages on the queue - let msg_id1 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":1})), 0) - .unwrap() - .unwrap(); - let msg_id2 = pgmq_send(&qname, pgrx::JsonB(serde_json::json!({"x":2})), 0) - .unwrap() - .unwrap(); - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 2); - - // archive the message. The first two exist so should return true, the - // last one doesn't so should return false. - let mut archived = pgmq_archive_batch(&qname, vec![msg_id1, msg_id2, -1]).unwrap(); - assert!(archived.next().unwrap().0); - assert!(archived.next().unwrap().0); - assert!(!archived.next().unwrap().0); - - // should be no messages left on the queue table - let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 0); - // but two on the archive table - let retval = Spi::get_one::(&format!( - "SELECT count(*) FROM {TABLE_PREFIX}_{qname}_archive" - )) - .expect("SQL select failed"); - assert_eq!(retval.unwrap(), 2); - } - - #[pg_test] - fn test_validate_same_type() { - let invalid = validate_same_type("10", "daily"); - assert!(invalid.is_err()); - let invalid = validate_same_type("daily", "10"); - assert!(invalid.is_err()); - - let valid = validate_same_type("10", "10"); - assert!(valid.is_ok()); - let valid = validate_same_type("daily", "weekly"); - assert!(valid.is_ok()); - } -} +pgrx::extension_sql_file!("./sql_src.sql", name = "bootstrap"); #[cfg(test)] pub mod pg_test { diff --git a/src/metrics.rs b/src/metrics.rs index 24e77f89..508bfb15 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,11 +1,9 @@ -/// Metric definitions -/// use pgrx::prelude::*; use pgrx::spi::SpiTupleTable; use pgrx::warning; use crate::api::listit; -use pgmq_core::types::{PGMQ_SCHEMA, TABLE_PREFIX}; +use pgmq_core::types::{PGMQ_SCHEMA, QUEUE_PREFIX}; type MetricResult = Vec<( String, @@ -16,7 +14,7 @@ type MetricResult = Vec<( TimestampWithTimeZone, )>; -#[pg_extern] +#[pg_extern(name = "metrics")] fn pgmq_metrics( queue_name: &str, ) -> Result< @@ -37,7 +35,7 @@ fn pgmq_metrics( Ok(TableIterator::new(results)) } -#[pg_extern] +#[pg_extern(name = "metrics_all")] fn pgmq_metrics_all() -> Result< TableIterator< 'static, @@ -98,7 +96,7 @@ fn query_summary(queue_name: &str) -> Result } fn build_summary_query(queue_name: &str) -> String { - let fq_table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue_name}"); + let fq_table = format!("{PGMQ_SCHEMA}.{QUEUE_PREFIX}_{queue_name}"); format!( "SELECT * FROM (SELECT diff --git a/src/partition.rs b/src/partition.rs index 5806c46e..aa7cbb93 100644 --- a/src/partition.rs +++ b/src/partition.rs @@ -7,7 +7,7 @@ use pgmq_core::{ assign_archive, assign_queue, create_archive, create_archive_index, create_index, create_meta, grant_pgmon_meta, grant_pgmon_queue, grant_pgmon_queue_seq, insert_meta, }, - types::{PGMQ_SCHEMA, TABLE_PREFIX}, + types::{PGMQ_SCHEMA, QUEUE_PREFIX}, util::CheckedName, }; @@ -80,7 +80,7 @@ fn create_partitioned_queue( ) -> Result { Ok(format!( " - CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue} ( + CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{queue} ( msg_id BIGSERIAL NOT NULL, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, @@ -97,7 +97,7 @@ pub fn create_partitioned_index( ) -> Result { Ok(format!( " - CREATE INDEX IF NOT EXISTS pgmq_partition_idx_{queue} ON {PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue} ({partiton_col}); + CREATE INDEX IF NOT EXISTS pgmq_partition_idx_{queue} ON {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{queue} ({partiton_col}); " )) } @@ -109,7 +109,7 @@ fn create_partitioned_table( ) -> Result { Ok(format!( " - SELECT {PARTMAN_SCHEMA}.create_parent('{PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue}', '{partition_col}', 'native', '{partition_interval}'); + SELECT {PARTMAN_SCHEMA}.create_parent('{PGMQ_SCHEMA}.{QUEUE_PREFIX}_{queue}', '{partition_col}', 'native', '{partition_interval}'); " )) } @@ -123,13 +123,13 @@ fn create_partitioned_table( fn set_retention_config(queue: CheckedName<'_>, retention: &str) -> Result { Ok(format!( " - UPDATE {PGMQ_SCHEMA}.part_config + UPDATE {PARTMAN_SCHEMA}.part_config SET retention = '{retention}', retention_keep_table = false, retention_keep_index = true, automatic_maintenance = 'on' - WHERE parent_table = '{PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue}'; + WHERE parent_table = '{PGMQ_SCHEMA}.{QUEUE_PREFIX}_{queue}'; " )) } diff --git a/src/sql_src.sql b/src/sql_src.sql new file mode 100644 index 00000000..4bcd17b5 --- /dev/null +++ b/src/sql_src.sql @@ -0,0 +1,16 @@ +CREATE TABLE pgmq.meta ( + queue_name VARCHAR UNIQUE NOT NULL, + is_partitioned BOOLEAN NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL +); + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + WHERE has_table_privilege('pg_monitor', 'pgmq.meta', 'SELECT') + ) THEN + EXECUTE 'GRANT SELECT ON pgmq.meta TO pg_monitor'; + END IF; +END; +$$ LANGUAGE plpgsql; diff --git a/tembo-pgmq-python/benches/bench.py b/tembo-pgmq-python/benches/bench.py index 7cb58976..4b1f4ca0 100644 --- a/tembo-pgmq-python/benches/bench.py +++ b/tembo-pgmq-python/benches/bench.py @@ -11,7 +11,7 @@ from scipy.ndimage import gaussian_filter1d from sqlalchemy import create_engine, text -from tembo_pgmq_python import PGMQueue +from tembo_pgmq.python import PGMQueue logging.basicConfig(level=logging.INFO) @@ -49,7 +49,7 @@ def produce( while running_duration < duration_seconds: send_start = time.perf_counter() - cur.execute(f"""select * from pgmq_send('{queue_name}', '{{"hello": "world"}}')""") + cur.execute(f"""select * from pgmq.send('{queue_name}', '{{"hello": "world"}}')""") msg_id = cur.fetchall()[0][0] send_duration = time.perf_counter() - send_start all_results.append({"operation": "write", "duration": send_duration, "msg_id": msg_id, "epoch": time.time()}) @@ -94,10 +94,10 @@ def consume(queue_name: str, connection_info: dict): results = [] no_message_timeout = 0 while no_message_timeout < 5: - stmt = f"select * from pgmq_read('{queue_name}', 1, 1)" + stmt = f"select * from pgmq.read('{queue_name}', 1, 1)" read_start = time.perf_counter() cur.execute(stmt) - # cur.execute("select * from pgmq_read(%s, %s, %s);", [queue_name, 1, 1]) + # cur.execute("select * from pgmq.read(%s, %s, %s);", [queue_name, 1, 1]) read_duration = time.perf_counter() - read_start message = cur.fetchall() @@ -114,7 +114,7 @@ def consume(queue_name: str, connection_info: dict): results.append({"operation": "read", "duration": read_duration, "msg_id": msg_id, "epoch": time.time()}) archive_start = time.perf_counter() - cur.execute("select * from pgmq_archive(%s, %s);", [queue_name, msg_id]) + cur.execute("select * from pgmq.archive(%s, %s);", [queue_name, msg_id]) cur.fetchall() archive_duration = time.perf_counter() - archive_start @@ -161,7 +161,7 @@ def queue_depth(queue_name: str, connection_info: dict, kill_flag: multiprocessi start = time.time() while not kill_flag.value: - cur.execute(f"select * from pgmq_metrics('{queue_name}')") + cur.execute(f"select * from pgmq.metrics('{queue_name}')") metrics = cur.fetchall()[0] depth = metrics[1] total_messages = metrics[-2] diff --git a/tembo-pgmq-python/tembo_pgmq_python/queue.py b/tembo-pgmq-python/tembo_pgmq_python/queue.py index 0a50e76e..2d2376fe 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/queue.py @@ -65,7 +65,7 @@ def create_partitioned_queue( """ with self.pool.connection() as conn: - conn.execute("select pgmq_create(%s, %s::text, %s::text);", [queue, partition_interval, retention_interval]) + conn.execute("select pgmq.create(%s, %s::text, %s::text);", [queue, partition_interval, retention_interval]) def create_queue(self, queue: str) -> None: """Create a new queue @@ -74,7 +74,7 @@ def create_queue(self, queue: str) -> None: """ with self.pool.connection() as conn: - conn.execute("select pgmq_create(%s);", [queue]) + conn.execute("select pgmq.create(%s);", [queue]) def send(self, queue: str, message: dict, delay: Optional[int] = None) -> int: """Send a message to a queue""" @@ -84,7 +84,7 @@ def send(self, queue: str, message: dict, delay: Optional[int] = None) -> int: # TODO(chuckend): implement send_delay in pgmq raise NotImplementedError("send_delay is not implemented in pgmq") message = conn.execute( - "select * from pgmq_send(%s, %s);", + "select * from pgmq.send(%s, %s);", [queue, Jsonb(message)], # type: ignore ).fetchall() return message[0][0] @@ -92,7 +92,7 @@ def send(self, queue: str, message: dict, delay: Optional[int] = None) -> int: def read(self, queue: str, vt: Optional[int] = None) -> Optional[Message]: """Read a message from a queue""" with self.pool.connection() as conn: - rows = conn.execute("select * from pgmq_read(%s, %s, %s);", [queue, vt or self.vt, 1]).fetchall() + rows = conn.execute("select * from pgmq.read(%s, %s, %s);", [queue, vt or self.vt, 1]).fetchall() messages = [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows] return messages[0] if len(messages) == 1 else None @@ -100,14 +100,14 @@ def read(self, queue: str, vt: Optional[int] = None) -> Optional[Message]: def read_batch(self, queue: str, vt: Optional[int] = None, batch_size=1) -> Optional[list[Message]]: """Read abatch of messages from a queue""" with self.pool.connection() as conn: - rows = conn.execute("select * from pgmq_read(%s, %s, %s);", [queue, vt or self.vt, batch_size]).fetchall() + rows = conn.execute("select * from pgmq.read(%s, %s, %s);", [queue, vt or self.vt, batch_size]).fetchall() return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows] def pop(self, queue: str) -> Message: """Read a message from a queue""" with self.pool.connection() as conn: - rows = conn.execute("select * from pgmq_pop(%s);", [queue]).fetchall() + rows = conn.execute("select * from pgmq.pop(%s);", [queue]).fetchall() messages = [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows] return messages[0] @@ -115,13 +115,13 @@ def pop(self, queue: str) -> Message: def delete(self, queue: str, msg_id: int) -> bool: """Delete a message from a queue""" with self.pool.connection() as conn: - row = conn.execute("select pgmq_delete(%s, %s);", [queue, msg_id]).fetchall() + row = conn.execute("select pgmq.delete(%s, %s);", [queue, msg_id]).fetchall() return row[0][0] def archive(self, queue: str, msg_id: int) -> bool: """Archive a message from a queue""" with self.pool.connection() as conn: - row = conn.execute("select pgmq_archive(%s, %s);", [queue, msg_id]).fetchall() + row = conn.execute("select pgmq.archive(%s, %s);", [queue, msg_id]).fetchall() return row[0][0] diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 204d699f..9586ad9e 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -50,20 +50,20 @@ async fn test_lifecycle() { // CREATE with default retention and partition strategy let test_default_queue = format!("test_default_{test_num}"); - let _ = sqlx::query(&format!("SELECT pgmq_create('{test_default_queue}');")) + let _ = sqlx::query(&format!("SELECT pgmq.create('{test_default_queue}');")) .execute(&conn) .await .expect("failed to create queue"); // creating a queue must be idempotent // create with same name again, must be no error - let _ = sqlx::query(&format!("SELECT pgmq_create('{test_default_queue}');")) + let _ = sqlx::query(&format!("SELECT pgmq.create('{test_default_queue}');")) .execute(&conn) .await .expect("failed to create queue"); let msg_id = sqlx::query(&format!( - "SELECT * from pgmq_send('{test_default_queue}', '{{\"hello\": \"world\"}}');" + "SELECT * from pgmq.send('{test_default_queue}', '{{\"hello\": \"world\"}}');" )) .fetch_one(&conn) .await @@ -73,7 +73,7 @@ async fn test_lifecycle() { // read message // vt=2, limit=1 - let query = &format!("SELECT * from pgmq_read('{test_default_queue}', 2, 1);"); + let query = &format!("SELECT * from pgmq.read('{test_default_queue}', 2, 1);"); let message = fetch_one_message::(query, &conn) .await @@ -82,7 +82,7 @@ async fn test_lifecycle() { assert_eq!(message.msg_id, 1); // set VT to in 10 seconds - let query = &format!("SELECT * from pgmq_set_vt('{test_default_queue}', {msg_id}, 5);"); + let query = &format!("SELECT * from pgmq.set_vt('{test_default_queue}', {msg_id}, 5);"); let message = fetch_one_message::(query, &conn) .await .expect("failed reading message") @@ -93,14 +93,14 @@ async fn test_lifecycle() { assert!(message.vt > now + chrono::Duration::seconds(4)); // read again, assert no messages because we just set VT to the future - let query = &format!("SELECT * from pgmq_read('{test_default_queue}', 2, 1);"); + let query = &format!("SELECT * from pgmq.read('{test_default_queue}', 2, 1);"); let message = fetch_one_message::(query, &conn) .await .expect("failed reading message"); assert!(message.is_none()); // read again, now using poll to block until message is ready - let query = &format!("SELECT * from pgmq_read_with_poll('{test_default_queue}', 10, 1, 10);"); + let query = &format!("SELECT * from pgmq.read_with_poll('{test_default_queue}', 10, 1, 10);"); let message = fetch_one_message::(query, &conn) .await .expect("failed reading message") @@ -108,7 +108,7 @@ async fn test_lifecycle() { assert_eq!(message.msg_id, 1); // after reading it, set VT to now - let query = &format!("SELECT * from pgmq_set_vt('{test_default_queue}', {msg_id}, 0);"); + let query = &format!("SELECT * from pgmq.set_vt('{test_default_queue}', {msg_id}, 0);"); let message = fetch_one_message::(query, &conn) .await .expect("failed reading message") @@ -116,7 +116,7 @@ async fn test_lifecycle() { assert_eq!(message.msg_id, 1); // read again, should have msg_id 1 again - let query = &format!("SELECT * from pgmq_read('{test_default_queue}', 2, 1);"); + let query = &format!("SELECT * from pgmq.read('{test_default_queue}', 2, 1);"); let message = fetch_one_message::(query, &conn) .await .expect("failed reading message") @@ -125,18 +125,18 @@ async fn test_lifecycle() { // send a batch of 2 messages let batch_queue = format!("test_batch_{test_num}"); - let _ = sqlx::query(&format!("SELECT pgmq_create('{batch_queue}');")) + let _ = sqlx::query(&format!("SELECT pgmq.create('{batch_queue}');")) .execute(&conn) .await .expect("failed to create queue"); let msg_ids = sqlx::query( - &format!("select pgmq_send_batch('{batch_queue}', ARRAY['{{\"hello\": \"world_0\"}}'::jsonb, '{{\"hello\": \"world_1\"}}'::jsonb])") + &format!("select pgmq.send_batch('{batch_queue}', ARRAY['{{\"hello\": \"world_0\"}}'::jsonb, '{{\"hello\": \"world_1\"}}'::jsonb])") ) .fetch_all(&conn).await.expect("failed to send batch"); assert_eq!(msg_ids.len(), 2); assert_eq!(msg_ids[0].get::(0), 1); assert_eq!(msg_ids[1].get::(0), 2); - let rowcount: i64 = sqlx::query_scalar(&format!("SELECT count(*) from pgmq_{batch_queue}")) + let rowcount: i64 = sqlx::query_scalar(&format!("SELECT count(*) from pgmq.q_{batch_queue}")) .fetch_one(&conn) .await .expect("failed to get rowcount"); @@ -149,7 +149,7 @@ async fn test_lifecycle() { // CREATE with 5 seconds per partition, 10 seconds retention let test_duration_queue = format!("test_duration_{test_num}"); - let q = format!("SELECT \"pgmq_create_partitioned\"('{test_duration_queue}'::text, '5 seconds'::text, '10 seconds'::text);"); + let q = format!("SELECT pgmq.create_partitioned('{test_duration_queue}'::text, '5 seconds'::text, '10 seconds'::text);"); let _ = sqlx::query(&q) .execute(&conn) .await @@ -158,7 +158,7 @@ async fn test_lifecycle() { // CREATE with 10 messages per partition, 20 messages retention let test_numeric_queue = format!("test_numeric_{test_num}"); let _ = sqlx::query(&format!( - "SELECT \"pgmq_create_partitioned\"('{test_numeric_queue}'::text, '10'::text, '20'::text);" + "SELECT pgmq.create_partitioned('{test_numeric_queue}'::text, '10'::text, '20'::text);" )) .execute(&conn) .await @@ -177,7 +177,7 @@ async fn test_lifecycle() { // get metrics let rows = sqlx::query_as::<_, MetricsRow>(&format!( - "SELECT * from pgmq_metrics('{test_duration_queue}'::text);" + "SELECT * from pgmq.metrics('{test_duration_queue}'::text);" )) .fetch_all(&conn) .await @@ -185,7 +185,7 @@ async fn test_lifecycle() { assert_eq!(rows.len(), 1); // get metrics all - let rows = sqlx::query_as::<_, MetricsRow>(&format!("SELECT * from pgmq_metrics_all();")) + let rows = sqlx::query_as::<_, MetricsRow>(&format!("SELECT * from pgmq.metrics_all();")) .fetch_all(&conn) .await .expect("failed creating numeric interval queue"); @@ -200,13 +200,13 @@ async fn test_lifecycle() { // delete partitioned queues for queue in [test_duration_queue, test_numeric_queue].iter() { - sqlx::query(&format!("select pgmq_drop_queue('{}', true);", &queue)) + sqlx::query(&format!("select pgmq.drop_queue('{}', true);", &queue)) .execute(&conn) .await .expect("failed to drop partitioned queues"); } - let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq_list_queues();") + let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq.list_queues();") .fetch_all(&conn) .await .expect("failed to list queues"); @@ -214,13 +214,13 @@ async fn test_lifecycle() { // drop the rest of the queues for queue in queues { let q = queue.queue_name; - sqlx::query(&format!("select pgmq_drop_queue('{}');", &q)) + sqlx::query(&format!("select pgmq.drop_queue('{}');", &q)) .execute(&conn) .await .expect("failed to drop standard queues"); } - let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq_list_queues();") + let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq.list_queues();") .fetch_all(&conn) .await .expect("failed to list queues");