diff --git a/gleam.toml b/gleam.toml index 977f761..7f8e444 100644 --- a/gleam.toml +++ b/gleam.toml @@ -21,7 +21,7 @@ gleam_otp = ">= 1.0.0 and < 2.0.0" gleam_stdlib = ">= 0.51.0 and < 2.0.0" gleam_time = ">= 1.0.0 and < 2.0.0" exception = ">= 2.1.0 and < 3.0.0" -pgo = ">= 0.14.0 and < 1.0.0" +pgo = ">= 0.16.0 and < 1.0.0" [dev-dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" diff --git a/manifest.toml b/manifest.toml index 751b6dc..5007d4f 100644 --- a/manifest.toml +++ b/manifest.toml @@ -9,9 +9,9 @@ packages = [ { name = "gleam_stdlib", version = "0.61.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "3DC407D6EDA98FCE089150C11F3AD892B6F4C3CA77C87A97BAE8D5AB5E41F331" }, { name = "gleam_time", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_time", source = "hex", outer_checksum = "D71F1AFF7FEB534FF55E5DC58E534E9201BA75A444619788A2E4DEA4EBD87D16" }, { name = "gleeunit", version = "1.6.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "63022D81C12C17B7F1A60E029964E830A4CBD846BBC6740004FC1F1031AE0326" }, - { name = "opentelemetry_api", version = "1.4.0", build_tools = ["rebar3", "mix"], requirements = [], otp_app = "opentelemetry_api", source = "hex", outer_checksum = "3DFBBFAA2C2ED3121C5C483162836C4F9027DEF469C41578AF5EF32589FCFC58" }, - { name = "pg_types", version = "0.4.0", build_tools = ["rebar3"], requirements = [], otp_app = "pg_types", source = "hex", outer_checksum = "B02EFA785CAECECF9702C681C80A9CA12A39F9161A846CE17B01FB20AEEED7EB" }, - { name = "pgo", version = "0.14.0", build_tools = ["rebar3"], requirements = ["backoff", "opentelemetry_api", "pg_types"], otp_app = "pgo", source = "hex", outer_checksum = "71016C22599936E042DC0012EE4589D24C71427D266292F775EBF201D97DF9C9" }, + { name = "opentelemetry_api", version = "1.5.0", build_tools = ["rebar3", "mix"], requirements = [], otp_app = "opentelemetry_api", source = "hex", outer_checksum = "F53EC8A1337AE4A487D43AC89DA4BD3A3C99DDF576655D071DEED8B56A2D5DDA" }, + { name = "pg_types", version = "0.6.0", build_tools = ["rebar3"], requirements = [], otp_app = "pg_types", source = "hex", outer_checksum = "9949A4849DD13408FA249AB7B745E0D2DFDB9532AEE2B9722326E33CD082A778" }, + { name = "pgo", version = "0.20.0", build_tools = ["rebar3"], requirements = ["backoff", "opentelemetry_api", "pg_types"], otp_app = "pgo", source = "hex", outer_checksum = "2F11E6649CEB38E569EF56B16BE1D04874AE5B11A02867080A2817CE423C683B" }, ] [requirements] @@ -21,4 +21,4 @@ gleam_otp = { version = ">= 1.0.0 and < 2.0.0" } gleam_stdlib = { version = ">= 0.51.0 and < 2.0.0" } gleam_time = { version = ">= 1.0.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } -pgo = { version = ">= 0.14.0 and < 1.0.0" } +pgo = { version = ">= 0.16.0 and < 1.0.0" } diff --git a/src/pog.gleam b/src/pog.gleam index 2cb6c61..d22872e 100644 --- a/src/pog.gleam +++ b/src/pog.gleam @@ -29,10 +29,18 @@ pub opaque type Connection { SingleConnection(SingleConnection) } +pub opaque type NotificationsConnection { + NotificationsConnection(Name(Message)) +} + type SingleConnection pub type Message +pub type Notification { + Notify(pid: Pid, reference: Reference, channel: String, payload: String) +} + /// Create a reference to a pool using the pool's name. /// /// If no pool has been started using this name then queries using this @@ -42,6 +50,17 @@ pub fn named_connection(name: Name(Message)) -> Connection { Pool(name) } +/// Create a reference to a pool using the pool's name. +/// +/// If no notifications process has been started using this name then +/// listeners using this connection will fail. +/// +pub fn named_notifications_connection( + name: Name(Message), +) -> NotificationsConnection { + NotificationsConnection(name) +} + /// The configuration for a pool of connections. pub type Config { Config( @@ -343,6 +362,28 @@ pub fn start(config: Config) -> actor.StartResult(Connection) { @external(erlang, "pog_ffi", "start") fn start_tree(config: Config) -> Result(Pid, dynamic.Dynamic) +/// Start a process holding a connection to the database that can be used to +/// `LISTEN` to channels for notifications. Most the time you want to use +/// `notifications_supervised` and add the process to your supervision tree +/// instead of using this function directly. +/// +/// The process will asynchronously connect to the PostgreSQL instance specified +/// in the config. If the configuration is invalid or it cannot connect for +/// another reason it will continue to attempt to connect, and any queries made +/// using the connection pool will fail. +/// +pub fn start_notifications( + config: Config, +) -> actor.StartResult(NotificationsConnection) { + case start_tree_notifications(config) { + Ok(pid) -> Ok(actor.Started(pid, NotificationsConnection(config.pool_name))) + Error(reason) -> Error(actor.InitExited(process.Abnormal(reason))) + } +} + +@external(erlang, "pog_ffi", "start_notifications") +fn start_tree_notifications(config: Config) -> Result(Pid, dynamic.Dynamic) + /// Start a database connection pool by adding it to your supervision tree. /// /// Use the `named_connection` function to create a connection to query this @@ -358,6 +399,23 @@ pub fn supervised(config: Config) -> supervision.ChildSpecification(Connection) supervision.supervisor(fn() { start(config) }) } +/// Start a database connection pool by adding it to your supervision tree. +/// +/// Use the `named_connection` function to create a connection to query this +/// pool with if your supervisor does not pass back the return value of +/// creating the pool. +/// +/// The pool is started in a new process and will asynchronously connect to the +/// PostgreSQL instance specified in the config. If the configuration is invalid +/// or it cannot connect for another reason it will continue to attempt to +/// connect, and any queries made using the connection pool will fail. +/// +pub fn notifications_supervised( + config: Config, +) -> supervision.ChildSpecification(NotificationsConnection) { + supervision.supervisor(fn() { start_notifications(config) }) +} + /// A value that can be sent to PostgreSQL as one of the arguments to a /// parameterised SQL query. pub type Value @@ -512,6 +570,27 @@ fn run_query_extended( query: String, ) -> Result(#(Int, List(Dynamic)), QueryError) +@external(erlang, "pog_ffi", "listen") +pub fn listen(conn: NotificationsConnection, channel: String) -> Result(Reference, Nil) + +@external(erlang, "pog_ffi", "unlisten") +pub fn unlisten(conn: NotificationsConnection, listener: Reference) -> Nil + +@external(erlang, "pog_ffi", "decode_notification") +fn decode_notification(dyn: dynamic.Dynamic) -> Result(Notification, Nil) + +type Tag { + Notification +} + +pub fn notification_selector() -> process.Selector(Notification) { + process.new_selector() + |> process.select_record(tag: Notification, fields: 4, mapping: fn(tuple) { + let assert Ok(result) = decode_notification(tuple) + result + }) +} + pub type QueryError { /// The query failed as a database constraint would have been violated by the /// change. diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index 9c486a2..e750ae8 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -1,8 +1,9 @@ -module(pog_ffi). --export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1]). +-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1, start_notifications/1, listen/2, unlisten/2, decode_notification/1]). -include_lib("pog/include/pog_Config.hrl"). +-include_lib("pog/include/pog_Notify.hrl"). -include_lib("pg_types/include/pg_types.hrl"). null() -> @@ -11,6 +12,16 @@ null() -> coerce(Value) -> Value. +decode_notification({notification, Pid, Ref, Channel, Payload}) -> + {ok, #notify{ + pid=Pid, + reference=Ref, + channel=Channel, + payload=Payload + }}; +decode_notification(_) -> + {error, nil}. + %% Use correct defaults for SSL connections when SSL is enabled. %% Peers have to be verified & cacerts are fetched directly from the system. %% @@ -82,6 +93,68 @@ start(Config) -> end, pgo_pool:start_link(PoolName, Options2). +start_notifications(Config) -> + % Unfortunately this has to be supplied via global mutable state currently. + application:set_env(pg_types, timestamp_config, integer_system_time_microseconds), + #config{ + pool_name = PoolName, + host = Host, + port = Port, + database = Database, + user = User, + password = Password, + ssl = Ssl, + connection_parameters = ConnectionParameters, + pool_size = PoolSize, + queue_target = QueueTarget, + queue_interval = QueueInterval, + idle_interval = IdleInterval, + trace = Trace, + ip_version = IpVersion, + rows_as_map = RowsAsMap + } = Config, + {SslActivated, SslOptions} = default_ssl_options(Host, Ssl), + Options1 = #{ + host => Host, + port => Port, + database => Database, + user => User, + ssl => SslActivated, + ssl_options => SslOptions, + connection_parameters => ConnectionParameters, + pool_size => PoolSize, + queue_target => QueueTarget, + queue_interval => QueueInterval, + idle_interval => IdleInterval, + trace => Trace, + decode_opts => [{return_rows_as_maps, RowsAsMap}], + socket_options => case IpVersion of + ipv4 -> []; + ipv6 -> [inet6] + end + }, + Options2 = case Password of + {some, Pw} -> maps:put(password, Pw, Options1); + none -> Options1 + end, + Options3 = normalize_pool_config(Options2), + pgo_notifications:start_link({local, PoolName}, Options3). + +% NOTE: +% Copied from https://github.com/erleans/pgo/blob/main/src/pgo_pool.erl +% This may be better to occur within pgo_notifications, but they have chosen +% to omit this (despite doing it in pgo_pool which we use in the other case), +% so we must do it ourselves here. +normalize_pool_config(PoolConfig) when is_list(PoolConfig) -> + normalize_pool_config(maps:from_list(PoolConfig)); +normalize_pool_config(PoolConfig) -> + maps:map(fun normalize_pool_config_value/2, PoolConfig). + +normalize_pool_config_value(_, V) when is_binary(V) -> + binary_to_list(V); +normalize_pool_config_value(_, V) -> + V. + query(Pool, Sql, Arguments, Timeout) -> Res = case Pool of {single_connection, Conn} -> @@ -116,6 +189,21 @@ checkout(Name) when is_atom(Name) -> {error, Error} -> {error, convert_error(Error)} end. +listen(Conn, Channel) -> + case Conn of + {notifications_connection, Name} -> + case pgo_notifications:listen(Name, Channel) of + {ok, Ref} -> {ok, Ref}; + error -> {error, nil} + end + end. + +unlisten(Conn, Ref) -> + case Conn of + {notifications_connection, Name} -> + pgo_notifications:unlisten(Name, Ref) + end. + convert_error(none_available) -> connection_unavailable; convert_error({pgo_protocol, {parameters, Expected, Got}}) -> diff --git a/test/pog_test.gleam b/test/pog_test.gleam index 3a6f7ad..6a64dfa 100644 --- a/test/pog_test.gleam +++ b/test/pog_test.gleam @@ -615,3 +615,34 @@ pub fn transaction_commit_test() { disconnect(db) } + +pub fn notifications_test() { + let db = start_default() + let assert Ok(notifications) = + process.new_name("pog_test_notifications") + |> default_config + |> pog.start_notifications + + let assert Ok(listener) = pog.listen(notifications.data, "the_channel") + + let assert Ok(_) = + pog.query("NOTIFY the_channel, 'first payload'") + |> pog.execute(db.data) + + let assert Ok(_) = + pog.notification_selector() + |> process.selector_receive(100) + + pog.unlisten(notifications.data, listener) + + let assert Ok(_) = + pog.query("NOTIFY the_channel, 'second payload'") + |> pog.execute(db.data) + + let assert Error(Nil) = + pog.notification_selector() + |> process.selector_receive(10) + + disconnect(db) + disconnect(notifications) +}