Skip to content

Commit

Permalink
Migrate everything to the pgmq schema
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn committed Sep 15, 2023
1 parent eb0f704 commit e31bcc5
Show file tree
Hide file tree
Showing 19 changed files with 1,110 additions and 867 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.24.0"
version = "0.25.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq-core"
version = "0.4.1"
version = "0.5.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Core functionality shared between the PGMQ Rust SDK and Postgres Extension"
Expand Down
32 changes: 18 additions & 14 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,14 @@ pub fn create_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
))
}

pub fn create_schema() -> String {
format!("CREATE SCHEMA IF NOT EXISTS {PGMQ_SCHEMA}")
}

pub fn create_meta() -> String {
format!(
"
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta (
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.meta (
queue_name VARCHAR UNIQUE NOT NULL,
is_partitioned BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
Expand All @@ -76,7 +80,7 @@ pub fn create_meta() -> String {
}

fn grant_stmt(table: &str) -> String {
let grant_seq = match &table.contains("pgmq_meta") {
let grant_seq = match &table.contains("meta") {
true => "".to_string(),
false => {
format!(" EXECUTE 'GRANT SELECT ON SEQUENCE {table}_msg_id_seq TO pg_monitor';")
Expand All @@ -101,7 +105,7 @@ $$ LANGUAGE plpgsql;

// pg_monitor needs to query queue metadata
pub fn grant_pgmon_meta() -> String {
let table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_meta");
let table = format!("{PGMQ_SCHEMA}.meta");
grant_stmt(&table)
}

Expand Down Expand Up @@ -132,10 +136,10 @@ pub fn delete_queue_metadata(name: CheckedName<'_>) -> Result<String, PgmqError>
IF EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = '{TABLE_PREFIX}_meta')
WHERE table_name = 'meta' and table_schema = 'pgmq')
THEN
DELETE
FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta
FROM {PGMQ_SCHEMA}.meta
WHERE queue_name = '{name}';
END IF;
END $$;
Expand All @@ -154,7 +158,7 @@ pub fn drop_queue_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
pub fn insert_meta(name: CheckedName<'_>, is_partitioned: bool) -> Result<String, PgmqError> {
Ok(format!(
"
INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta (queue_name, is_partitioned)
INSERT INTO {PGMQ_SCHEMA}.meta (queue_name, is_partitioned)
VALUES ('{name}', {is_partitioned})
ON CONFLICT
DO NOTHING;
Expand Down Expand Up @@ -361,15 +365,15 @@ $$ LANGUAGE plpgsql;
";
assert_eq!(q, expected);

let q = grant_stmt("pgmq_meta");
let q = grant_stmt("meta");
let expected = "
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
WHERE has_table_privilege('pg_monitor', 'pgmq_meta', 'SELECT')
WHERE has_table_privilege('pg_monitor', 'meta', 'SELECT')
) THEN
EXECUTE 'GRANT SELECT ON pgmq_meta TO pg_monitor';
EXECUTE 'GRANT SELECT ON meta TO pg_monitor';
END IF;
END;
Expand All @@ -381,14 +385,14 @@ $$ LANGUAGE plpgsql;
#[test]
fn test_assign() {
let query = assign("my_queue_archive");
assert!(query.contains("WHERE relname = 'pgmq_my_queue_archive'"));
assert!(query.contains("WHERE relname = 'queue_my_queue_archive'"));
}

#[test]
fn test_create() {
let queue_name = CheckedName::new("yolo").unwrap();
let query = create_queue(queue_name);
assert!(query.unwrap().contains("pgmq_yolo"));
assert!(query.unwrap().contains("queue_yolo"));
}

#[test]
Expand All @@ -399,7 +403,7 @@ $$ LANGUAGE plpgsql;
});
msgs.push(msg);
let query = enqueue("yolo", &msgs, &0).unwrap();
assert!(query.contains("pgmq_yolo"));
assert!(query.contains("queue_yolo"));
assert!(query.contains("{\"foo\":\"bar\"}"));
}

Expand All @@ -420,9 +424,9 @@ $$ LANGUAGE plpgsql;
let table_name = "my_valid_table_name";
assert!(check_input(table_name).is_ok());

assert!(check_input(&"a".repeat(58)).is_ok());
assert!(check_input(&"a".repeat(57)).is_ok());

assert!(check_input(&"a".repeat(59)).is_err());
assert!(check_input(&"a".repeat(58)).is_err());
assert!(check_input(&"a".repeat(60)).is_err());
assert!(check_input(&"a".repeat(70)).is_err());
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ pub const POLL_INTERVAL_DEFAULT: Duration = Duration::from_millis(250);

use chrono::serde::ts_seconds::deserialize as from_ts;

pub const TABLE_PREFIX: &str = r#"pgmq"#;
pub const PGMQ_SCHEMA: &str = "public";
pub const TABLE_PREFIX: &str = r#"queue"#;
pub const PGMQ_SCHEMA: &str = "pgmq";

pub struct PGMQueueMeta {
pub queue_name: String,
Expand Down
2 changes: 1 addition & 1 deletion pgmq-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ readme = "README.md"
repository = "https://github.com/tembo-io/pgmq"

[dependencies]
pgmq_core = { package = "pgmq-core", version = "0.4.0" }
pgmq_core = {package = "pgmq-core", path = "../core" }
chrono = { version = "0.4.23", features = [ "serde" ] }
serde = { version = "1.0.152" }
serde_json = { version = "1.0.91", features = [ "raw_value" ] }
Expand Down
Loading

0 comments on commit e31bcc5

Please sign in to comment.