Skip to content

A Go (Golang) client for Postgres Message Queue (PGMQ)

License

Notifications You must be signed in to change notification settings

craigpastro/pgmq-go

Folders and files

NameName
Last commit message
Last commit date
Aug 10, 2023
Feb 22, 2025
Oct 9, 2023
Aug 8, 2023
Feb 22, 2025
Feb 22, 2025
Mar 1, 2025
Mar 1, 2025
Feb 22, 2025
Feb 22, 2025
Feb 22, 2025

Repository files navigation

pgmq-go

Go Reference Go Report Card CI codecov

A Go (Golang) client for Postgres Message Queue (PGMQ). Based loosely on the Rust client.

pgmq-go works with pgx. The second argument of most functions only needs to satisfy the DB interface, which means it can take, among others, a *pgx.Conn, *pgxpool.Pool, or pgx.Tx.

Usage

Start a Postgres instance with the PGMQ extension installed:

docker run -d --name postgres -e POSTGRES_PASSWORD=password -p 5432:5432 quay.io/tembo/pgmq-pg:latest

Then

package main

import (
    "context"
    "fmt"

    "github.com/craigpastro/pgmq-go"
)

func main() {
    ctx := context.Background()

    pool, err := pgmq.NewPgxPool(ctx, "postgres://postgres:password@localhost:5432/postgres")
    if err != nil {
        panic(err)
    }

    err = pgmq.CreatePGMQExtension(ctx, pool)
    if err != nil {
        panic(err)
    }

    err = pgmq.CreateQueue(ctx, pool, "my_queue")
    if err != nil {
        panic(err)
    }

    // We can perform various queue operations using a transaction.
    tx, err := pool.Begin(ctx)
    if err != nil {
        panic(err)
    }

    id, err := pgmq.Send(ctx, tx, "my_queue", json.RawMessage(`{"foo": "bar"}`))
    if err != nil {
        panic(err)
    }

    msg, err := pgmq.Read(ctx, tx, "my_queue", 30)
    if err != nil {
        panic(err)
    }

    // Archive the message by moving it to the "pgmq.a_<queue_name>" table.
    // Alternatively, you can `Delete` the message, or read and delete in one
    // call by using `Pop`.
    _, err = pgmq.Archive(ctx, tx, "my_queue", id)
    if err != nil {
        panic(err)
    }

    // Commit the transaction.
    err = tx.Commit(ctx)
    if err != nil {
        panic(err)
    }

    // Close the connection pool.
    pool.Close()
}

Contributions

We ❤️ contributions.

See also