Skip to content

Commit

Permalink
Migrate schemas (#115)
Browse files Browse the repository at this point in the history
* Fix not creating is_partitioned column on extensionless

* Migrate everything to the pgmq schema

* Update python client for extension 0.25

* Update ruby example for pgmq 0.25

* Update README for 0.25

* Change prefixes/sufixes to q_ and a_ prefixes

* Rename TABLE_PREFIX constant to QUEUE_PREFIX
  • Loading branch information
v0idpwn committed Sep 15, 2023
1 parent 7ce03aa commit 7db90ee
Show file tree
Hide file tree
Showing 24 changed files with 1,225 additions and 955 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.24.0"
version = "0.25.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
50 changes: 25 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ CREATE EXTENSION pgmq;

### Creating a queue

Every queue is its own table in Postgres. The table name is the queue name prefixed with `pgmq_`.
For example, `pgmq_my_queue` is the table for the queue `my_queue`.
Every queue is its own table in Postgres. The table name is the queue name prefixed with `queue_`.
For example, `queue_my_mq` is the table for the queue `my_mq`.

```sql
-- creates the queue
SELECT pgmq_create('my_queue');
SELECT pgmq.create('my_queue');
```

```text
pgmq_create
create
-------------
(1 row)
Expand All @@ -85,19 +85,19 @@ SELECT pgmq_create('my_queue');

```sql
-- messages are sent as JSON
SELECT * from pgmq_send('my_queue', '{"foo": "bar1"}');
SELECT * from pgmq_send('my_queue', '{"foo": "bar2"}');
SELECT * from pgmq.send('my_queue', '{"foo": "bar1"}');
SELECT * from pgmq.send('my_queue', '{"foo": "bar2"}');
```

The message id is returned from the send function.

```text
pgmq_send
send
-----------
1
(1 row)
pgmq_send
send
-----------
2
(1 row)
Expand All @@ -110,7 +110,7 @@ Read `2` message from the queue. Make them invisible for `30` seconds.
and can be read by another consumer.

```sql
SELECT * from pgmq_read('my_queue', 30, 2);
SELECT pgmq.read('my_queue', 30, 2);
```

```text
Expand All @@ -123,7 +123,7 @@ SELECT * from pgmq_read('my_queue', 30, 2);
If the queue is empty, or if all messages are currently invisible, no rows will be returned.

```sql
SELECT * from pgmq_read('my_queue', 30, 1);
SELECT pgmq.read('my_queue', 30, 1);
```

```text
Expand All @@ -135,7 +135,7 @@ SELECT * from pgmq_read('my_queue', 30, 1);

```sql
-- Read a message and immediately delete it from the queue. Returns `None` if the queue is empty.
SELECT * from pgmq_pop('my_queue');
SELECT pgmq.pop('my_queue');
```

```text
Expand All @@ -151,18 +151,18 @@ Archiving a message removes it from the queue, and inserts it to the archive tab
Archive message with msg_id=2.

```sql
SELECT * from pgmq_archive('my_queue', 2);
SELECT pgmq.archive('my_queue', 2);
```

```text
pgmq_archive
archive
--------------
t
(1 row)
```

```sql
SELECT * from pgmq_my_queue_archive;
SELECT pgmq.my_queue_archive;
```

```text
Expand All @@ -176,11 +176,11 @@ SELECT * from pgmq_my_queue_archive;
Send another message, so that we can delete it.
```sql
SELECT * from pgmq_send('my_queue', '{"foo": "bar3"}');
SELECT pgmq.send('my_queue', '{"foo": "bar3"}');
```

```text
pgmq_send
send
-----------
3
(1 row)
Expand All @@ -189,11 +189,11 @@ SELECT * from pgmq_send('my_queue', '{"foo": "bar3"}');
Delete the message with id `3` from the queue named `my_queue`.

```sql
SELECT pgmq_delete('my_queue', 3);
SELECT pgmq.delete('my_queue', 3);
```

```text
pgmq_delete
delete
-------------
t
(1 row)
Expand All @@ -204,11 +204,11 @@ SELECT pgmq_delete('my_queue', 3);
Delete the queue `my_queue`.

```sql
SELECT pgmq_drop_queue('my_queue');
SELECT pgmq.drop_queue('my_queue');
```

```text
pgmq_drop_queue
drop_queue
-----------------
t
(1 row)
Expand All @@ -220,18 +220,18 @@ SELECT pgmq_drop_queue('my_queue');

You will need to install [pg_partman](https://github.com/pgpartman/pg_partman/) if you want to use `pgmq` partitioned queues.

`pgmq` queue tables can be created as a partitioned table by using `pgmq_create_partitioned()`. [pg_partman](https://github.com/pgpartman/pg_partman/)
`pgmq` queue tables can be created as a partitioned table by using `pgmq.create_partitioned()`. [pg_partman](https://github.com/pgpartman/pg_partman/)
handles all maintenance of queue tables. This includes creating new partitions and dropping old partitions.

Partitions behavior is configured at the time queues are created, via `pgmq_create_partitioned()`. This function has three parameters:
Partitions behavior is configured at the time queues are created, via `pgmq.create_partitioned()`. This function has three parameters:

`queue_name: text`: The name of the queue. Queues are Postgres tables prepended with `pgmq_`. For example, `pgmq_my_queue`.
`queue_name: text`: The name of the queue. Queues are Postgres tables prepended with `queue_`. For example, `queue_my_mq`.


`partition_interval: text` - The interval at which partitions are created. This can be either any valid Postgres `Duration` supported by pg_partman, or an integer value. When it is a duration, queues are partitioned by the time at which messages are sent to the table (`enqueued_at`). A value of `'daily'` would create a new partition each day. When it is an integer value, queues are partitioned by the `msg_id`. A value of `'100'` will create a new partition every 100 messages. The value must agree with `retention_interval` (time based or numeric). The default value is `daily`.


`retention_interval: text` - The interval for retaining partitions. This can be either any valid Postgres `Duration` supported by pg_partman, or an integer value. When it is a duration, partitions containing data greater than the duration will be dropped. When it is an integer value, any messages that have a `msg_id` less than `max(msg_id) - retention_interval` will be dropped. For example, if the max `msg_id` is 100 and the `retention_interval` is 60, any partitions with `msg_id` values less than 40 will be dropped. The value must agree with `partition_interval` (time based or numeric). The default is `'5 days'`. Note: `retention_interval` does not apply to messages that have been deleted via `pgmq_delete()` or archived with `pgmq_archive()`. `pgmq_delete()` removes messages forever and `pgmq_archive()` moves messages to the corresponding archive table forever (for example, `pgmq_my_queue_archive`).
`retention_interval: text` - The interval for retaining partitions. This can be either any valid Postgres `Duration` supported by pg_partman, or an integer value. When it is a duration, partitions containing data greater than the duration will be dropped. When it is an integer value, any messages that have a `msg_id` less than `max(msg_id) - retention_interval` will be dropped. For example, if the max `msg_id` is 100 and the `retention_interval` is 60, any partitions with `msg_id` values less than 40 will be dropped. The value must agree with `partition_interval` (time based or numeric). The default is `'5 days'`. Note: `retention_interval` does not apply to messages that have been deleted via `pgmq.delete()` or archived with `pgmq.archive()`. `pgmq.delete()` removes messages forever and `pgmq.archive()` moves messages to the corresponding archive table forever (for example, `queue_my_mq_archive`).


In order for automatic partition maintenance to take place, several settings must be added to the `postgresql.conf` file, which is typically located in the postgres `DATADIR`.
Expand All @@ -252,7 +252,7 @@ pg_partman_bgw.dbname = 'postgres'

## Visibility Timeout (vt)

pgmq guarantees exactly once delivery of a message within a visibility timeout. The visibility timeout is the amount of time a message is invisible to other consumers after it has been read by a consumer. If the message is NOT deleted or archived within the visibility timeout, it will become visible again and can be read by another consumer. The visibility timeout is set when a message is read from the queue, via `pgmq_read()`. It is recommended to set a `vt` value that is greater than the expected time it takes to process a message. After the application successfully processes the message, it should call `pgmq_delete()` to completely remove the message from the queue or `pgmq_archive()` to move it to the archive table for the queue.
pgmq guarantees exactly once delivery of a message within a visibility timeout. The visibility timeout is the amount of time a message is invisible to other consumers after it has been read by a consumer. If the message is NOT deleted or archived within the visibility timeout, it will become visible again and can be read by another consumer. The visibility timeout is set when a message is read from the queue, via `pgmq.read()`. It is recommended to set a `vt` value that is greater than the expected time it takes to process a message. After the application successfully processes the message, it should call `pgmq.delete()` to completely remove the message from the queue or `pgmq.archive()` to move it to the archive table for the queue.

## ✨ Contributors

Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq-core"
version = "0.4.1"
version = "0.5.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Core functionality shared between the PGMQ Rust SDK and Postgres Extension"
Expand Down
Loading

0 comments on commit 7db90ee

Please sign in to comment.