Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider supporting FIFO + message keys #294

Open
fggrtech opened this issue Aug 9, 2024 · 1 comment
Open

Consider supporting FIFO + message keys #294

fggrtech opened this issue Aug 9, 2024 · 1 comment
Labels
enhancement New feature or request

Comments

@fggrtech
Copy link

fggrtech commented Aug 9, 2024

Firstly, this project is real neat.

Any thoughts on supporting FIFO queues with message key values, similar to SQS FIFO + MessageGroupId ?

Riffing off your existing work, here are the prototype functions which illustrate the idea:

CREATE TYPE message_record AS (
    msg_id BIGINT,
    read_ct INTEGER,
    enqueued_at TIMESTAMP WITH TIME ZONE,
    vt TIMESTAMP WITH TIME ZONE,
    group_id UUID,
    message JSONB
);

CREATE FUNCTION create_queue(queue_name TEXT)
RETURNS void AS $$
BEGIN

  EXECUTE FORMAT(
    $QUERY$
    CREATE TABLE IF NOT EXISTS %s (
        msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
        read_ct INT DEFAULT 0 NOT NULL,
        enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
        vt TIMESTAMP WITH TIME ZONE NOT NULL,
        group_id UUID NOT NULL,
        message JSONB NOT NULL
    )
    $QUERY$,
    queue_name
  );

  EXECUTE FORMAT(
    $QUERY$
    CREATE INDEX IF NOT EXISTS %s_vt_idx ON %s (vt ASC);
    $QUERY$,
    queue_name, queue_name
  );

  EXECUTE FORMAT(
    $QUERY$
    CREATE INDEX IF NOT EXISTS %s_group_id_order ON %s (group_id, msg_id);
    $QUERY$,
    queue_name, queue_name
  );

END;
$$ LANGUAGE plpgsql;

CREATE FUNCTION enqueue(
    queue_name TEXT,
    group_id UUID,
    msg JSONB,
    delay INTEGER DEFAULT 0
) RETURNS SETOF BIGINT AS $$
DECLARE
    sql TEXT;
BEGIN
    sql := FORMAT(
        $QUERY$
        INSERT INTO %s (vt, group_id, message)
        VALUES ((clock_timestamp() + interval '%s seconds'), '%s'::uuid, $1)
        RETURNING msg_id;
        $QUERY$,
        queue_name, delay, group_id
    );
    RETURN QUERY EXECUTE sql USING msg;
END;
$$ LANGUAGE plpgsql;

CREATE FUNCTION dequeue(
    queue_name TEXT,
    vt INTEGER,
    qty INTEGER
)
RETURNS SETOF message_record AS $$
DECLARE
    sql TEXT;
BEGIN
    sql := FORMAT(
        $QUERY$

        WITH cte0 AS 
        (
          SELECT group_id, MIN(msg_id) AS msg_id
          FROM %s
          GROUP BY group_id
        ),
        cte1 AS
        (
          SELECT t1.msg_id AS msg_id
          FROM %s AS t1
          JOIN cte0 AS t2
            ON t1.group_id = t2.group_id
            AND t1.msg_id = t2.msg_id
          WHERE vt <= clock_timestamp()
          ORDER BY msg_id ASC
          LIMIT $1
          FOR UPDATE SKIP LOCKED
        )

        UPDATE %s m
        SET
            vt = clock_timestamp() + interval '%s seconds',
            read_ct = read_ct + 1
        FROM cte1
        WHERE m.msg_id = cte1.msg_id
        RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.group_id, m.message;
        $QUERY$,
        queue_name, queue_name, queue_name, vt
    );
    RETURN QUERY EXECUTE sql USING qty;
END;
$$ LANGUAGE plpgsql;
SELECT * FROM create_queue('test');

-- Two enqueued messages with the same group id. These should respect FIFO rules.
SELECT * FROM enqueue('test', '0470da73-c6a4-4273-aa31-f48b05555539', '{"email-address": "[email protected]"}');
SELECT * FROM enqueue('test', '0470da73-c6a4-4273-aa31-f48b05555539', '{"email-address": "[email protected]", "phone-number": 5551112222}');

-- Third enqueued message with different group id.
SELECT * FROM enqueue('test', gen_random_uuid(), '{"email-address": "[email protected]}");

-- This should return message id 1 and 3. Message id 2 will NOT be available until 1 has been deleted.
SELECT * FROM dequeue('test', 5, 5);

Thoughts?

@ChuckHend
Copy link
Contributor

Hi @fggrtech , thank you. I'd like to see features like this make it into the project too.

I'll have to think a bit about how this would impact the existing queue APIs and tables. I don't think we'd want to break the simple queue structure that currently exists and ideally we could find a way to add this functionality without adding overhead or requiring the usage of the new keys, but worst case we add a new API specifically for FIFO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants