Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gleam.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ gleam_otp = ">= 1.0.0 and < 2.0.0"
gleam_stdlib = ">= 0.51.0 and < 2.0.0"
gleam_time = ">= 1.0.0 and < 2.0.0"
exception = ">= 2.1.0 and < 3.0.0"
pgo = ">= 0.14.0 and < 1.0.0"
pgo = ">= 0.16.0 and < 1.0.0"

[dev-dependencies]
gleeunit = ">= 1.0.0 and < 2.0.0"
Expand Down
8 changes: 4 additions & 4 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ packages = [
{ name = "gleam_stdlib", version = "0.61.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "3DC407D6EDA98FCE089150C11F3AD892B6F4C3CA77C87A97BAE8D5AB5E41F331" },
{ name = "gleam_time", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_time", source = "hex", outer_checksum = "D71F1AFF7FEB534FF55E5DC58E534E9201BA75A444619788A2E4DEA4EBD87D16" },
{ name = "gleeunit", version = "1.6.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "63022D81C12C17B7F1A60E029964E830A4CBD846BBC6740004FC1F1031AE0326" },
{ name = "opentelemetry_api", version = "1.4.0", build_tools = ["rebar3", "mix"], requirements = [], otp_app = "opentelemetry_api", source = "hex", outer_checksum = "3DFBBFAA2C2ED3121C5C483162836C4F9027DEF469C41578AF5EF32589FCFC58" },
{ name = "pg_types", version = "0.4.0", build_tools = ["rebar3"], requirements = [], otp_app = "pg_types", source = "hex", outer_checksum = "B02EFA785CAECECF9702C681C80A9CA12A39F9161A846CE17B01FB20AEEED7EB" },
{ name = "pgo", version = "0.14.0", build_tools = ["rebar3"], requirements = ["backoff", "opentelemetry_api", "pg_types"], otp_app = "pgo", source = "hex", outer_checksum = "71016C22599936E042DC0012EE4589D24C71427D266292F775EBF201D97DF9C9" },
{ name = "opentelemetry_api", version = "1.5.0", build_tools = ["rebar3", "mix"], requirements = [], otp_app = "opentelemetry_api", source = "hex", outer_checksum = "F53EC8A1337AE4A487D43AC89DA4BD3A3C99DDF576655D071DEED8B56A2D5DDA" },
{ name = "pg_types", version = "0.6.0", build_tools = ["rebar3"], requirements = [], otp_app = "pg_types", source = "hex", outer_checksum = "9949A4849DD13408FA249AB7B745E0D2DFDB9532AEE2B9722326E33CD082A778" },
{ name = "pgo", version = "0.20.0", build_tools = ["rebar3"], requirements = ["backoff", "opentelemetry_api", "pg_types"], otp_app = "pgo", source = "hex", outer_checksum = "2F11E6649CEB38E569EF56B16BE1D04874AE5B11A02867080A2817CE423C683B" },
]

[requirements]
Expand All @@ -21,4 +21,4 @@ gleam_otp = { version = ">= 1.0.0 and < 2.0.0" }
gleam_stdlib = { version = ">= 0.51.0 and < 2.0.0" }
gleam_time = { version = ">= 1.0.0 and < 2.0.0" }
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
pgo = { version = ">= 0.14.0 and < 1.0.0" }
pgo = { version = ">= 0.16.0 and < 1.0.0" }
79 changes: 79 additions & 0 deletions src/pog.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,18 @@ pub opaque type Connection {
SingleConnection(SingleConnection)
}

pub opaque type NotificationsConnection {
NotificationsConnection(Name(Message))
}

type SingleConnection

pub type Message

pub type Notification {
Notify(pid: Pid, reference: Reference, channel: String, payload: String)
}

/// Create a reference to a pool using the pool's name.
///
/// If no pool has been started using this name then queries using this
Expand All @@ -42,6 +50,17 @@ pub fn named_connection(name: Name(Message)) -> Connection {
Pool(name)
}

/// Create a reference to a pool using the pool's name.
///
/// If no notifications process has been started using this name then
/// listeners using this connection will fail.
///
pub fn named_notifications_connection(
name: Name(Message),
) -> NotificationsConnection {
NotificationsConnection(name)
}

/// The configuration for a pool of connections.
pub type Config {
Config(
Expand Down Expand Up @@ -343,6 +362,28 @@ pub fn start(config: Config) -> actor.StartResult(Connection) {
@external(erlang, "pog_ffi", "start")
fn start_tree(config: Config) -> Result(Pid, dynamic.Dynamic)

/// Start a process holding a connection to the database that can be used to
/// `LISTEN` to channels for notifications. Most the time you want to use
/// `notifications_supervised` and add the process to your supervision tree
/// instead of using this function directly.
///
/// The process will asynchronously connect to the PostgreSQL instance specified
/// in the config. If the configuration is invalid or it cannot connect for
/// another reason it will continue to attempt to connect, and any queries made
/// using the connection pool will fail.
///
pub fn start_notifications(
config: Config,
) -> actor.StartResult(NotificationsConnection) {
case start_tree_notifications(config) {
Ok(pid) -> Ok(actor.Started(pid, NotificationsConnection(config.pool_name)))
Error(reason) -> Error(actor.InitExited(process.Abnormal(reason)))
}
}

@external(erlang, "pog_ffi", "start_notifications")
fn start_tree_notifications(config: Config) -> Result(Pid, dynamic.Dynamic)

/// Start a database connection pool by adding it to your supervision tree.
///
/// Use the `named_connection` function to create a connection to query this
Expand All @@ -358,6 +399,23 @@ pub fn supervised(config: Config) -> supervision.ChildSpecification(Connection)
supervision.supervisor(fn() { start(config) })
}

/// Start a database connection pool by adding it to your supervision tree.
///
/// Use the `named_connection` function to create a connection to query this
/// pool with if your supervisor does not pass back the return value of
/// creating the pool.
///
/// The pool is started in a new process and will asynchronously connect to the
/// PostgreSQL instance specified in the config. If the configuration is invalid
/// or it cannot connect for another reason it will continue to attempt to
/// connect, and any queries made using the connection pool will fail.
///
pub fn notifications_supervised(
config: Config,
) -> supervision.ChildSpecification(NotificationsConnection) {
supervision.supervisor(fn() { start_notifications(config) })
}

/// A value that can be sent to PostgreSQL as one of the arguments to a
/// parameterised SQL query.
pub type Value
Expand Down Expand Up @@ -512,6 +570,27 @@ fn run_query_extended(
query: String,
) -> Result(#(Int, List(Dynamic)), QueryError)

@external(erlang, "pog_ffi", "listen")
pub fn listen(conn: NotificationsConnection, channel: String) -> Result(Reference, Nil)

@external(erlang, "pog_ffi", "unlisten")
pub fn unlisten(conn: NotificationsConnection, listener: Reference) -> Nil

@external(erlang, "pog_ffi", "decode_notification")
fn decode_notification(dyn: dynamic.Dynamic) -> Result(Notification, Nil)

type Tag {
Notification
}

pub fn notification_selector() -> process.Selector(Notification) {
process.new_selector()
|> process.select_record(tag: Notification, fields: 4, mapping: fn(tuple) {
let assert Ok(result) = decode_notification(tuple)
result
})
}

pub type QueryError {
/// The query failed as a database constraint would have been violated by the
/// change.
Expand Down
90 changes: 89 additions & 1 deletion src/pog_ffi.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
-module(pog_ffi).

-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1]).
-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1, start_notifications/1, listen/2, unlisten/2, decode_notification/1]).

-include_lib("pog/include/pog_Config.hrl").
-include_lib("pog/include/pog_Notify.hrl").
-include_lib("pg_types/include/pg_types.hrl").

null() ->
Expand All @@ -11,6 +12,16 @@ null() ->
coerce(Value) ->
Value.

decode_notification({notification, Pid, Ref, Channel, Payload}) ->
{ok, #notify{
pid=Pid,
reference=Ref,
channel=Channel,
payload=Payload
}};
decode_notification(_) ->
{error, nil}.

%% Use correct defaults for SSL connections when SSL is enabled.
%% Peers have to be verified & cacerts are fetched directly from the system.
%%
Expand Down Expand Up @@ -82,6 +93,68 @@ start(Config) ->
end,
pgo_pool:start_link(PoolName, Options2).

start_notifications(Config) ->
% Unfortunately this has to be supplied via global mutable state currently.
application:set_env(pg_types, timestamp_config, integer_system_time_microseconds),
#config{
pool_name = PoolName,
host = Host,
port = Port,
database = Database,
user = User,
password = Password,
ssl = Ssl,
connection_parameters = ConnectionParameters,
pool_size = PoolSize,
queue_target = QueueTarget,
queue_interval = QueueInterval,
idle_interval = IdleInterval,
trace = Trace,
ip_version = IpVersion,
rows_as_map = RowsAsMap
} = Config,
{SslActivated, SslOptions} = default_ssl_options(Host, Ssl),
Options1 = #{
host => Host,
port => Port,
database => Database,
user => User,
ssl => SslActivated,
ssl_options => SslOptions,
connection_parameters => ConnectionParameters,
pool_size => PoolSize,
queue_target => QueueTarget,
queue_interval => QueueInterval,
idle_interval => IdleInterval,
trace => Trace,
decode_opts => [{return_rows_as_maps, RowsAsMap}],
socket_options => case IpVersion of
ipv4 -> [];
ipv6 -> [inet6]
end
},
Options2 = case Password of
{some, Pw} -> maps:put(password, Pw, Options1);
none -> Options1
end,
Options3 = normalize_pool_config(Options2),
pgo_notifications:start_link({local, PoolName}, Options3).

% NOTE:
% Copied from https://github.com/erleans/pgo/blob/main/src/pgo_pool.erl
% This may be better to occur within pgo_notifications, but they have chosen
% to omit this (despite doing it in pgo_pool which we use in the other case),
% so we must do it ourselves here.
normalize_pool_config(PoolConfig) when is_list(PoolConfig) ->
normalize_pool_config(maps:from_list(PoolConfig));
normalize_pool_config(PoolConfig) ->
maps:map(fun normalize_pool_config_value/2, PoolConfig).

normalize_pool_config_value(_, V) when is_binary(V) ->
binary_to_list(V);
normalize_pool_config_value(_, V) ->
V.

query(Pool, Sql, Arguments, Timeout) ->
Res = case Pool of
{single_connection, Conn} ->
Expand Down Expand Up @@ -116,6 +189,21 @@ checkout(Name) when is_atom(Name) ->
{error, Error} -> {error, convert_error(Error)}
end.

listen(Conn, Channel) ->
case Conn of
{notifications_connection, Name} ->
case pgo_notifications:listen(Name, Channel) of
{ok, Ref} -> {ok, Ref};
error -> {error, nil}
end
end.

unlisten(Conn, Ref) ->
case Conn of
{notifications_connection, Name} ->
pgo_notifications:unlisten(Name, Ref)
end.

convert_error(none_available) ->
connection_unavailable;
convert_error({pgo_protocol, {parameters, Expected, Got}}) ->
Expand Down
31 changes: 31 additions & 0 deletions test/pog_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,34 @@ pub fn transaction_commit_test() {

disconnect(db)
}

pub fn notifications_test() {
let db = start_default()
let assert Ok(notifications) =
process.new_name("pog_test_notifications")
|> default_config
|> pog.start_notifications

let assert Ok(listener) = pog.listen(notifications.data, "the_channel")

let assert Ok(_) =
pog.query("NOTIFY the_channel, 'first payload'")
|> pog.execute(db.data)

let assert Ok(_) =
pog.notification_selector()
|> process.selector_receive(100)

pog.unlisten(notifications.data, listener)

let assert Ok(_) =
pog.query("NOTIFY the_channel, 'second payload'")
|> pog.execute(db.data)

let assert Error(Nil) =
pog.notification_selector()
|> process.selector_receive(10)

disconnect(db)
disconnect(notifications)
}