diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d36d10..2f0fe8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,21 @@ # Changelog +## v4.0.0 - Unreleased + +- Starting a pool no longer generates an atom, instead a name is taken as an + argument. +- The `Connection` type has been removed. A subject is now used instead. +- `connect` and `disconnect` have been removed in favour of `start` and `supervised`. +- `url_config` and `default_config` now take a name parameter. +- The `default_timeout` function has been removed. +- The callback given to the `transaction` function now receives the connection + as an argument. Queries made using this connection will be in the + transaction, while the pool can still be used to run queries outside the + transaction. +- The `gleam_time` package is now used for time types and functions. +- The `TransactionError` type is now parameterised with an error type rather + being specific to strings. + ## v3.3.0 - 2025-07-03 - Updated `result.then` to `result.try` to resolve deprecation warnings. diff --git a/README.md b/README.md index d3e6411..75d1714 100644 --- a/README.md +++ b/README.md @@ -5,20 +5,44 @@ A PostgreSQL database client for Gleam, based on [PGO][erlang-pgo]. [erlang-pgo]: https://github.com/erleans/pgo ```gleam +gleam add pog@4 +``` + +Add a pool to your OTP supervision tree, before any siblings that will need to +use the database. + +Pools are named with a `Name` from `gleam/erlang/process`, so create one +outside of your supervision tree and pass it down to the creation of the pool. + +```gleam +import gleam/otp/static_supervisor import pog -import gleam/dynamic/decode -import gleeunit/should -pub fn main() { - // Start a database connection pool. - // Typically you will want to create one pool for use in your program - let db = - pog.default_config() +pub fn start_application_supervisor(pool_name: process.Name(pog.Message)) { + let pool_child = + pog.defaut_config(pool_name) |> pog.host("localhost") |> pog.database("my_database") |> pog.pool_size(15) - |> pog.connect + |> pog.supervised + + supervisor.new(supervisor.RestForOne) + |> supervisor.add(pool_child) + // |> supervisor.add(other) + // |> supervisor.add(application) + // |> supervisor.add(children) + |> supervisor.start +} +``` + +Then in your application you can use a connection created from that name with +the `pog.named_connection` to make queries: + +```gleam +import pog +import gleam/dynamic/decode +pub fn run(db: pog.Connection) { // An SQL statement to run. It takes one int as a parameter let sql_query = " select @@ -39,28 +63,18 @@ pub fn main() { // Run the query against the PostgreSQL database // The int `1` is given as a parameter - let assert Ok(response) = + let assert Ok(data) = pog.query(sql_query) |> pog.parameter(pog.int(1)) |> pog.returning(row_decoder) |> pog.execute(db) // And then do something with the returned results - response.count - |> should.equal(2) - response.rows - |> should.equal([ - #("Nubi", 3, "black", ["Al", "Cutlass"]), - ]) + assert data.count == 2 + assert data.rows == [#("Nubi", 3, "black", ["Al", "Cutlass"])]) } ``` -## Installation - -```sh -gleam add pog -``` - ## Support of connection URI Configuring a Postgres connection is done by using `Config` type in `pog`. @@ -76,15 +90,14 @@ connection URI from the environment. ```gleam import envoy +import gleam/erlang/process.{type Name} import pog -/// Read the DATABASE_URL environment variable. -/// Generate the pog.Config from that database URL. -/// Finally, connect to database. -pub fn read_connection_uri() -> Result(pog.Connection, Nil) { +/// Read the DATABASE_URL environment variable and then +/// build the pog.Config from that database URL. +pub fn read_connection_uri(name: Name(pog.Message)) -> Result(pog.Config, Nil) { use database_url <- result.try(envoy.get("DATABASE_URL")) - use config <- result.try(pog.url_config(database_url)) - Ok(pog.connect(config)) + pog.url_config(name, database_url) } ``` @@ -112,15 +125,6 @@ By default, `pgo` will return every selected value from your query as a tuple. In case you want a different output, you can activate `rows_as_maps` in `Config`. Once activated, every returned rows will take the form of a `Dict`. -## Atom generation - -Creating a connection pool with the `pog.connect` function dynamically generates -an Erlang atom. Atoms are not garbage collected and only a certain number of -them can exist in an Erlang VM instance, and hitting this limit will result in -the VM crashing. Due to this limitation you should not dynamically open new -connection pools, instead create the pools you need when your application starts -and reuse them throughout the lifetime of your program. - ## SSL As for the rest of the web, you should try to use SSL connections with any @@ -172,16 +176,6 @@ in `pog.Config`. The different options are `SslDisabled`, `SslUnverified` & to your database should be highly secured to protect you against man-in-the-middle attacks, you should always try to use the most secured setting. -```gleam -import pog - -pub fn connect() { - pog.default_config() - |> pog.ssl(pog.SslVerified) - |> pog.connect -} -``` - ### Need some help? You tried to setup a secured connection, but it does not work? Your container diff --git a/gleam.toml b/gleam.toml index 2ab8acd..563c3d4 100644 --- a/gleam.toml +++ b/gleam.toml @@ -1,6 +1,6 @@ name = "pog" -version = "3.3.0" -gleam = ">= 1.4.0" +version = "4.0.0" +gleam = ">= 1.11.0" licences = ["Apache-2.0"] description = "A PostgreSQL database client for Gleam, based on PGO" @@ -16,13 +16,15 @@ pages = [ ] [dependencies] +gleam_erlang = ">= 1.2.0 and < 2.0.0" +gleam_otp = ">= 1.0.0 and < 2.0.0" gleam_stdlib = ">= 0.51.0 and < 2.0.0" -pgo = ">= 0.12.0 and < 2.0.0" +gleam_time = ">= 1.0.0 and < 2.0.0" +exception = ">= 2.1.0 and < 3.0.0" +pgo = ">= 0.14.0 and < 1.0.0" [dev-dependencies] gleeunit = ">= 1.0.0 and < 2.0.0" -exception = ">= 2.0.0 and < 3.0.0" -gleam_erlang = ">= 0.30.0 and < 1.0.0" [erlang] # Starting an SSL connection relies on ssl application to be started. diff --git a/manifest.toml b/manifest.toml index e04d732..751b6dc 100644 --- a/manifest.toml +++ b/manifest.toml @@ -3,18 +3,22 @@ packages = [ { name = "backoff", version = "1.1.6", build_tools = ["rebar3"], requirements = [], otp_app = "backoff", source = "hex", outer_checksum = "CF0CFFF8995FB20562F822E5CC47D8CCF664C5ECDC26A684CBE85C225F9D7C39" }, - { name = "exception", version = "2.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "exception", source = "hex", outer_checksum = "F5580D584F16A20B7FCDCABF9E9BE9A2C1F6AC4F9176FA6DD0B63E3B20D450AA" }, - { name = "gleam_erlang", version = "0.33.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "A1D26B80F01901B59AABEE3475DD4C18D27D58FA5C897D922FCB9B099749C064" }, - { name = "gleam_stdlib", version = "0.51.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "14AFA8D3DDD7045203D422715DBB822D1725992A31DF35A08D97389014B74B68" }, - { name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" }, + { name = "exception", version = "2.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "exception", source = "hex", outer_checksum = "329D269D5C2A314F7364BD2711372B6F2C58FA6F39981572E5CA68624D291F8C" }, + { name = "gleam_erlang", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "F91CE62A2D011FA13341F3723DB7DB118541AAA5FE7311BD2716D018F01EF9E3" }, + { name = "gleam_otp", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "7020E652D18F9ABAC9C877270B14160519FA0856EE80126231C505D719AD68DA" }, + { name = "gleam_stdlib", version = "0.61.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "3DC407D6EDA98FCE089150C11F3AD892B6F4C3CA77C87A97BAE8D5AB5E41F331" }, + { name = "gleam_time", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_time", source = "hex", outer_checksum = "D71F1AFF7FEB534FF55E5DC58E534E9201BA75A444619788A2E4DEA4EBD87D16" }, + { name = "gleeunit", version = "1.6.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "63022D81C12C17B7F1A60E029964E830A4CBD846BBC6740004FC1F1031AE0326" }, { name = "opentelemetry_api", version = "1.4.0", build_tools = ["rebar3", "mix"], requirements = [], otp_app = "opentelemetry_api", source = "hex", outer_checksum = "3DFBBFAA2C2ED3121C5C483162836C4F9027DEF469C41578AF5EF32589FCFC58" }, { name = "pg_types", version = "0.4.0", build_tools = ["rebar3"], requirements = [], otp_app = "pg_types", source = "hex", outer_checksum = "B02EFA785CAECECF9702C681C80A9CA12A39F9161A846CE17B01FB20AEEED7EB" }, { name = "pgo", version = "0.14.0", build_tools = ["rebar3"], requirements = ["backoff", "opentelemetry_api", "pg_types"], otp_app = "pgo", source = "hex", outer_checksum = "71016C22599936E042DC0012EE4589D24C71427D266292F775EBF201D97DF9C9" }, ] [requirements] -exception = { version = ">= 2.0.0 and < 3.0.0" } -gleam_erlang = { version = ">= 0.30.0 and < 1.0.0" } +exception = { version = ">= 2.1.0 and < 3.0.0" } +gleam_erlang = { version = ">= 1.2.0 and < 2.0.0" } +gleam_otp = { version = ">= 1.0.0 and < 2.0.0" } gleam_stdlib = { version = ">= 0.51.0 and < 2.0.0" } +gleam_time = { version = ">= 1.0.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } -pgo = { version = ">= 0.12.0 and < 2.0.0" } +pgo = { version = ">= 0.14.0 and < 1.0.0" } diff --git a/src/pog.gleam b/src/pog.gleam index abe011d..11f35fe 100644 --- a/src/pog.gleam +++ b/src/pog.gleam @@ -2,24 +2,51 @@ //// //// Gleam wrapper around pgo library -// TODO: add time and timestamp with zone once pgo supports them +// TODO: add time things with zone once pgo supports them +import exception import gleam/dynamic.{type Dynamic} import gleam/dynamic/decode.{type Decoder} +import gleam/erlang/process.{type Name, type Pid} +import gleam/erlang/reference.{type Reference} import gleam/float import gleam/int import gleam/list import gleam/option.{type Option, None, Some} +import gleam/otp/actor +import gleam/otp/supervision import gleam/result import gleam/string +import gleam/time/calendar.{type Date, type TimeOfDay} +import gleam/time/timestamp.{type Timestamp} import gleam/uri.{Uri} /// The port that will be used when none is specified. const default_port: Int = 5432 +pub opaque type Connection { + Pool(Name(Message)) + SingleConnection(SingleConnection) +} + +type SingleConnection + +pub type Message + +/// Create a reference to a pool using the pool's name. +/// +/// If no pool has been started using this name then queries using this +/// connection will fail. +/// +pub fn named_connection(name: Name(Message)) -> Connection { + Pool(name) +} + /// The configuration for a pool of connections. pub type Config { Config( + /// The Erlang name to register the pool with. + pool_name: Name(Message), /// (default: 127.0.0.1): Database server hostname. host: String, /// (default: 5432): Port the server is listening on. @@ -58,9 +85,6 @@ pub type Config { /// (default: False) By default, pgo will return a n-tuple, in the order of the query. /// By setting `rows_as_map` to `True`, the result will be `Dict`. rows_as_map: Bool, - /// (default: 5000): Default time in milliseconds to wait before the query - /// is considered timeout. Timeout can be edited per query. - default_timeout: Int, ) } @@ -186,13 +210,6 @@ pub fn rows_as_map(config: Config, rows_as_map: Bool) -> Config { Config(..config, rows_as_map:) } -/// By default, pog have a default value of 5000ms as timeout. -/// By setting `default_timeout`, every queries will now use that timeout. -/// The timeout is given in milliseconds. -pub fn default_timeout(config: Config, default_timeout: Int) -> Config { - Config(..config, default_timeout:) -} - /// The internet protocol version to use. pub type IpVersion { /// Internet Protocol version 4 (IPv4) @@ -204,8 +221,9 @@ pub type IpVersion { /// The default configuration for a connection pool, with a single connection. /// You will likely want to increase the size of the pool for your application. /// -pub fn default_config() -> Config { +pub fn default_config(pool_name pool_name: Name(Message)) -> Config { Config( + pool_name:, host: "127.0.0.1", port: default_port, database: "postgres", @@ -220,12 +238,14 @@ pub fn default_config() -> Config { trace: False, ip_version: Ipv4, rows_as_map: False, - default_timeout: 5000, ) } /// Parse a database url into configuration that can be used to start a pool. -pub fn url_config(database_url: String) -> Result(Config, Nil) { +pub fn url_config( + name: Name(Message), + database_url: String, +) -> Result(Config, Nil) { use uri <- result.try(uri.parse(database_url)) let uri = case uri.port { Some(_) -> uri @@ -254,7 +274,7 @@ pub fn url_config(database_url: String) -> Result(Config, Nil) { ["", database] -> Ok( Config( - ..default_config(), + ..default_config(name), host: host, port: db_port, database: database, @@ -279,10 +299,15 @@ fn extract_user_password( } /// Expects `sslmode` to be `require`, `verify-ca`, `verify-full` or `disable`. +/// /// If `sslmode` is set, but not one of those value, fails. +/// /// If `sslmode` is `verify-ca` or `verify-full`, returns `SslVerified`. +/// /// If `sslmode` is `require`, returns `SslUnverified`. +/// /// If `sslmode` is unset, returns `SslDisabled`. +/// fn extract_ssl_mode(query: option.Option(String)) -> Result(Ssl, Nil) { case query { option.None -> Ok(SslDisabled) @@ -299,25 +324,39 @@ fn extract_ssl_mode(query: option.Option(String)) -> Result(Ssl, Nil) { } } -/// A pool of one or more database connections against which queries can be -/// made. +/// Start a database connection pool. Most the time you want to use +/// `supervised` and add the pool to your supervision tree instead of using this +/// function directly. /// -/// Created using the `connect` function and shut-down with the `disconnect` -/// function. -pub type Connection +/// The pool is started in a new process and will asynchronously connect to the +/// PostgreSQL instance specified in the config. If the configuration is invalid +/// or it cannot connect for another reason it will continue to attempt to +/// connect, and any queries made using the connection pool will fail. +/// +pub fn start(config: Config) -> actor.StartResult(Connection) { + case start_tree(config) { + Ok(pid) -> Ok(actor.Started(pid, Pool(config.pool_name))) + Error(reason) -> Error(actor.InitExited(process.Abnormal(reason))) + } +} + +@external(erlang, "pog_ffi", "start") +fn start_tree(config: Config) -> Result(Pid, dynamic.Dynamic) -/// Start a database connection pool. +/// Start a database connection pool by adding it to your supervision tree. +/// +/// Use the `named_connection` function to create a connection to query this +/// pool with if your supervisor does not pass back the return value of +/// creating the pool. /// /// The pool is started in a new process and will asynchronously connect to the /// PostgreSQL instance specified in the config. If the configuration is invalid /// or it cannot connect for another reason it will continue to attempt to /// connect, and any queries made using the connection pool will fail. -@external(erlang, "pog_ffi", "connect") -pub fn connect(a: Config) -> Connection - -/// Shut down a connection pool. -@external(erlang, "pog_ffi", "disconnect") -pub fn disconnect(a: Connection) -> Nil +/// +pub fn supervised(config: Config) -> supervision.ChildSpecification(Connection) { + supervision.supervisor(fn() { start(config) }) +} /// A value that can be sent to PostgreSQL as one of the arguments to a /// parameterised SQL query. @@ -347,25 +386,35 @@ pub fn array(converter: fn(a) -> Value, values: List(a)) -> Value { } pub fn timestamp(timestamp: Timestamp) -> Value { - coerce_value(#(date(timestamp.date), time(timestamp.time))) + let #(seconds, nanoseconds) = + timestamp.to_unix_seconds_and_nanoseconds(timestamp) + coerce_value(seconds * 1_000_000 + nanoseconds / 1000) +} + +pub fn timestamp_decoder() -> decode.Decoder(Timestamp) { + use microseconds <- decode.map(decode.int) + let seconds = microseconds / 1_000_000 + let nanoseconds = { microseconds % 1_000_000 } * 1000 + timestamp.from_unix_seconds_and_nanoseconds(seconds, nanoseconds) } -pub fn date(date: Date) -> Value { - coerce_value(#(date.year, date.month, date.day)) +pub fn calendar_date(date: Date) -> Value { + let month = calendar.month_to_int(date.month) + coerce_value(#(date.year, month, date.day)) } -pub fn time(time: Time) -> Value { +pub fn calendar_time_of_day(time: TimeOfDay) -> Value { let seconds = int.to_float(time.seconds) - let seconds = seconds +. int.to_float(time.microseconds) /. 1_000_000.0 + let seconds = seconds +. int.to_float(time.nanoseconds) /. 1_000_000_000.0 coerce_value(#(time.hours, time.minutes, seconds)) } @external(erlang, "pog_ffi", "coerce") fn coerce_value(a: anything) -> Value -pub type TransactionError { +pub type TransactionError(error) { TransactionQueryError(QueryError) - TransactionRolledBack(String) + TransactionRolledBack(error) } /// Runs a function within a PostgreSQL transaction. @@ -374,11 +423,68 @@ pub type TransactionError { /// /// If the function returns an `Error` or panics then the transaction is rolled /// back. -@external(erlang, "pog_ffi", "transaction") pub fn transaction( pool: Connection, - callback: fn(Connection) -> Result(t, String), -) -> Result(t, TransactionError) + callback: fn(Connection) -> Result(t, error), +) -> Result(t, TransactionError(error)) { + case pool { + SingleConnection(conn) -> { + transaction_layer(conn, callback) + } + Pool(name) -> { + // Check out a single connection from the pool + use #(ref, conn) <- result.try( + checkout(name) |> result.map_error(TransactionQueryError), + ) + + // Make a best attempt to check back in the connection, even if this + // process crashes + use <- exception.defer(fn() { checkin(ref, conn) }) + + transaction_layer(conn, callback) + } + } +} + +fn transaction_layer( + conn: SingleConnection, + callback: fn(Connection) -> Result(t, error), +) -> Result(t, TransactionError(error)) { + let do = fn(conn, sql) { + run_query_extended(conn, sql) + |> result.map_error(TransactionQueryError) + } + + // Start a transaction with the single connection + use _ <- result.try(do(conn, "begin")) + + // When the callback crashes we want to roll back the transaction + use <- exception.on_crash(fn() { + let assert Ok(_) = do(conn, "rollback") as "rollback exec failed" + }) + + case callback(SingleConnection(conn)) { + // The callback was OK, commit the transaction + Ok(t) -> { + use _ <- result.try(do(conn, "commit")) + Ok(t) + } + + Error(error) -> { + // The callback failed, roll-back the transaction + use _ <- result.try(do(conn, "rollback")) + Error(TransactionRolledBack(error)) + } + } +} + +@external(erlang, "pog_ffi", "checkout") +fn checkout( + pool: Name(Message), +) -> Result(#(Reference, SingleConnection), QueryError) + +@external(erlang, "pgo", "checkin") +fn checkin(ref: Reference, conn: SingleConnection) -> Dynamic pub fn nullable(inner_type: fn(a) -> Value, value: Option(a)) -> Value { case value { @@ -397,7 +503,13 @@ fn run_query( a: Connection, b: String, c: List(Value), - timeout: Option(Int), + timeout: Int, +) -> Result(#(Int, List(Dynamic)), QueryError) + +@external(erlang, "pog_ffi", "query_extended") +fn run_query_extended( + connection: SingleConnection, + query: String, ) -> Result(#(Int, List(Dynamic)), QueryError) pub type QueryError { @@ -427,7 +539,7 @@ pub opaque type Query(row_type) { sql: String, parameters: List(Value), row_decoder: Decoder(row_type), - timeout: option.Option(Int), + timeout: Int, ) } @@ -435,12 +547,7 @@ pub opaque type Query(row_type) { /// functions. /// pub fn query(sql: String) -> Query(Nil) { - Query( - sql:, - parameters: [], - row_decoder: decode.success(Nil), - timeout: option.None, - ) + Query(sql:, parameters: [], row_decoder: decode.success(Nil), timeout: 5000) } /// Set the decoder to use for the type of row returned by executing this @@ -460,11 +567,14 @@ pub fn parameter(query: Query(t1), parameter: Value) -> Query(t1) { Query(..query, parameters: [parameter, ..query.parameters]) } -/// Use a custom timeout for the query. This timeout will take precedence over +/// Use a custom timeout for the query, in milliseconds. /// the default connection timeout. -/// The timeout is given in milliseconds. +/// +/// If this function is not used to give a timeout then default of 5000 ms is +/// used. +/// pub fn timeout(query: Query(t1), timeout: Int) -> Query(t1) { - Query(..query, timeout: Some(timeout)) + Query(..query, timeout:) } /// Run a query against a PostgreSQL database. @@ -760,24 +870,22 @@ pub fn error_code_name(error_code: String) -> Result(String, Nil) { } } -pub fn timestamp_decoder() -> decode.Decoder(Timestamp) { - use date <- decode.field(0, date_decoder()) - use time <- decode.field(1, time_decoder()) - decode.success(Timestamp(date, time)) -} - -pub fn date_decoder() -> decode.Decoder(Date) { +pub fn calendar_date_decoder() -> decode.Decoder(Date) { use year <- decode.field(0, decode.int) use month <- decode.field(1, decode.int) use day <- decode.field(2, decode.int) - decode.success(Date(year:, month:, day:)) + case calendar.month_from_int(month) { + Ok(month) -> decode.success(calendar.Date(year:, month:, day:)) + Error(_) -> + decode.failure(calendar.Date(0, calendar.January, 1), "Calendar date") + } } -pub fn time_decoder() -> decode.Decoder(Time) { +pub fn calendar_time_of_day_decoder() -> decode.Decoder(TimeOfDay) { use hours <- decode.field(0, decode.int) use minutes <- decode.field(1, decode.int) - use #(seconds, microseconds) <- decode.field(2, seconds_decoder()) - decode.success(Time(hours:, minutes:, seconds:, microseconds:)) + use #(seconds, nanoseconds) <- decode.field(2, seconds_decoder()) + decode.success(calendar.TimeOfDay(hours:, minutes:, seconds:, nanoseconds:)) } fn seconds_decoder() -> decode.Decoder(#(Int, Int)) { @@ -790,21 +898,9 @@ fn seconds_decoder() -> decode.Decoder(#(Int, Int)) { |> decode.map(fn(f) { let floored = float.floor(f) let seconds = float.round(floored) - let microseconds = float.round({ f -. floored } *. 1_000_000.0) + let microseconds = float.round({ f -. floored } *. 1_000_000_000.0) #(seconds, microseconds) }) } decode.one_of(int, [float]) } - -pub type Date { - Date(year: Int, month: Int, day: Int) -} - -pub type Time { - Time(hours: Int, minutes: Int, seconds: Int, microseconds: Int) -} - -pub type Timestamp { - Timestamp(date: Date, time: Time) -} diff --git a/src/pog_ffi.erl b/src/pog_ffi.erl index f012650..9c486a2 100644 --- a/src/pog_ffi.erl +++ b/src/pog_ffi.erl @@ -1,8 +1,6 @@ -module(pog_ffi). --export([query/4, connect/1, disconnect/1, coerce/1, null/0, transaction/2]). - --record(pog_pool, {name, pid, default_timeout}). +-export([query/4, query_extended/2, start/1, coerce/1, null/0, checkout/1]). -include_lib("pog/include/pog_Config.hrl"). -include_lib("pg_types/include/pg_types.hrl"). @@ -38,10 +36,11 @@ default_ssl_options(Host, Ssl) -> ]} end. -connect(Config) -> - Id = integer_to_list(erlang:unique_integer([positive])), - PoolName = list_to_atom("pog_pool_" ++ Id), +start(Config) -> + % Unfortunately this has to be supplied via global mutable state currently. + application:set_env(pg_types, timestamp_config, integer_system_time_microseconds), #config{ + pool_name = PoolName, host = Host, port = Port, database = Database, @@ -55,8 +54,7 @@ connect(Config) -> idle_interval = IdleInterval, trace = Trace, ip_version = IpVersion, - rows_as_map = RowsAsMap, - default_timeout = DefaultTimeout + rows_as_map = RowsAsMap } = Config, {SslActivated, SslOptions} = default_ssl_options(Host, Ssl), Options1 = #{ @@ -82,36 +80,29 @@ connect(Config) -> {some, Pw} -> maps:put(password, Pw, Options1); none -> Options1 end, - {ok, Pid} = pgo_pool:start_link(PoolName, Options2), - #pog_pool{name = PoolName, pid = Pid, default_timeout = DefaultTimeout}. - -disconnect(#pog_pool{pid = Pid}) -> - erlang:exit(Pid, normal), - nil. + pgo_pool:start_link(PoolName, Options2). -transaction(#pog_pool{name = Name} = Conn, Callback) -> - F = fun() -> - case Callback(Conn) of - {ok, T} -> {ok, T}; - {error, Reason} -> error({pog_rollback_transaction, Reason}) - end +query(Pool, Sql, Arguments, Timeout) -> + Res = case Pool of + {single_connection, Conn} -> + pgo_handler:extended_query(Conn, Sql, Arguments, #{}); + {pool, Name} -> + Options = #{ + pool => Name, + pool_options => [{timeout, Timeout}] + }, + pgo:query(Sql, Arguments, Options) end, - try - pgo:transaction(Name, F, #{}) - catch - error:{pog_rollback_transaction, Reason} -> - {error, {transaction_rolled_back, Reason}} - end. + case Res of + #{rows := Rows, num_rows := NumRows} -> + {ok, {NumRows, Rows}}; + {error, Error} -> + {error, convert_error(Error)} + end. -query(#pog_pool{name = Name, default_timeout = DefaultTimeout}, Sql, Arguments, Timeout) -> - Timeout1 = case Timeout of - none -> DefaultTimeout; - {some, QueryTimeout} -> QueryTimeout - end, - Options = #{pool => Name, pool_options => [{timeout, Timeout1}]}, - Res = pgo:query(Sql, Arguments, Options), - case Res of +query_extended(Conn, Sql) -> + case pgo_handler:extended_query(Conn, Sql, [], #{queue_time => undefined}) of #{rows := Rows, num_rows := NumRows} -> {ok, {NumRows, Rows}}; @@ -119,6 +110,12 @@ query(#pog_pool{name = Name, default_timeout = DefaultTimeout}, Sql, Arguments, {error, convert_error(Error)} end. +checkout(Name) when is_atom(Name) -> + case pgo:checkout(Name) of + {ok, Ref, Conn} -> {ok, {Ref, Conn}}; + {error, Error} -> {error, convert_error(Error)} + end. + convert_error(none_available) -> connection_unavailable; convert_error({pgo_protocol, {parameters, Expected, Got}}) -> diff --git a/test/pog_test.gleam b/test/pog_test.gleam index 4370848..728ec0e 100644 --- a/test/pog_test.gleam +++ b/test/pog_test.gleam @@ -1,96 +1,98 @@ import exception import gleam/dynamic/decode.{type Decoder} -import gleam/erlang/atom +import gleam/erlang/process import gleam/option.{None, Some} +import gleam/otp/actor +import gleam/time/calendar +import gleam/time/timestamp import gleeunit -import gleeunit/should import pog pub fn main() { gleeunit.main() } -pub fn run_with_timeout(time: Int, next: fn() -> a) { - let assert Ok(timeout) = atom.from_string("timeout") - #(timeout, time, next) +fn disconnect(db: actor.Started(a)) -> Nil { + process.send_exit(db.pid) +} + +fn start_default() -> actor.Started(pog.Connection) { + let assert Ok(started) = + process.new_name("pog_test") + |> default_config + |> pog.start + started +} + +fn default_config(name) { + pog.Config( + ..pog.default_config(name), + database: "gleam_pog_test", + password: Some("postgres"), + pool_size: 1, + ) } pub fn url_config_everything_test() { + let name = process.new_name("pog_test") let expected = - pog.default_config() + pog.default_config(name) |> pog.host("db.test") |> pog.port(1234) |> pog.database("my_db") |> pog.user("u") |> pog.password(Some("p")) - pog.url_config("postgres://u:p@db.test:1234/my_db") - |> should.equal(Ok(expected)) + assert pog.url_config(name, "postgres://u:p@db.test:1234/my_db") + == Ok(expected) } pub fn url_config_alternative_postgres_protocol_test() { + let name = process.new_name("pog_test") let expected = - pog.default_config() + pog.default_config(name) |> pog.host("db.test") |> pog.port(1234) |> pog.database("my_db") |> pog.user("u") |> pog.password(Some("p")) - pog.url_config("postgresql://u:p@db.test:1234/my_db") - |> should.equal(Ok(expected)) + assert pog.url_config(name, "postgresql://u:p@db.test:1234/my_db") + == Ok(expected) } pub fn url_config_not_postgres_protocol_test() { - pog.url_config("foo://u:p@db.test:1234/my_db") - |> should.equal(Error(Nil)) + let name = process.new_name("pog_test") + assert pog.url_config(name, "foo://u:p@db.test:1234/my_db") == Error(Nil) } pub fn url_config_no_password_test() { + let name = process.new_name("pog_test") let expected = - pog.default_config() + pog.default_config(name) |> pog.host("db.test") |> pog.port(1234) |> pog.database("my_db") |> pog.user("u") |> pog.password(None) - pog.url_config("postgres://u@db.test:1234/my_db") - |> should.equal(Ok(expected)) + assert pog.url_config(name, "postgres://u@db.test:1234/my_db") == Ok(expected) } pub fn url_config_no_port_test() { + let name = process.new_name("pog_test") let expected = - pog.default_config() + pog.default_config(name) |> pog.host("db.test") |> pog.port(5432) |> pog.database("my_db") |> pog.user("u") |> pog.password(None) - pog.url_config("postgres://u@db.test/my_db") - |> should.equal(Ok(expected)) + assert pog.url_config(name, "postgres://u@db.test/my_db") == Ok(expected) } pub fn url_config_path_slash_test() { - pog.url_config("postgres://u:p@db.test:1234/my_db/foo") - |> should.equal(Error(Nil)) -} - -fn start_default() { - pog.Config( - ..pog.default_config(), - database: "gleam_pog_test", - password: Some("postgres"), - pool_size: 1, - ) - |> pog.connect -} - -fn default_config() { - pog.Config( - ..pog.default_config(), - database: "gleam_pog_test", - password: Some("postgres"), - pool_size: 1, - ) + let name = process.new_name("pog_test") + assert pog.url_config(name, "postgres://u:p@db.test:1234/my_db/foo") + == Error(Nil) } pub fn inserting_new_rows_test() { @@ -102,14 +104,12 @@ pub fn inserting_new_rows_test() { VALUES (DEFAULT, 'bill', true, ARRAY ['black'], now(), '2020-03-04'), (DEFAULT, 'felix', false, ARRAY ['grey'], now(), '2020-03-05')" - let assert Ok(returned) = pog.query(sql) |> pog.execute(db) + let assert Ok(returned) = pog.query(sql) |> pog.execute(db.data) - returned.count - |> should.equal(2) - returned.rows - |> should.equal([]) + assert returned.count == 2 + assert returned.rows == [] - pog.disconnect(db) + disconnect(db) } pub fn inserting_new_rows_and_returning_test() { @@ -126,14 +126,12 @@ pub fn inserting_new_rows_and_returning_test() { let assert Ok(returned) = pog.query(sql) |> pog.returning(decode.at([0], decode.string)) - |> pog.execute(db) + |> pog.execute(db.data) - returned.count - |> should.equal(2) - returned.rows - |> should.equal(["bill", "felix"]) + assert returned.count == 2 + assert returned.rows == ["bill", "felix"] - pog.disconnect(db) + disconnect(db) } pub fn selecting_rows_test() { @@ -150,7 +148,7 @@ pub fn selecting_rows_test() { let assert Ok(pog.Returned(rows: [id], ..)) = pog.query(sql) |> pog.returning(decode.at([0], decode.int)) - |> pog.execute(db) + |> pog.execute(db.data) let assert Ok(returned) = pog.query("SELECT * FROM cats WHERE id = $1") @@ -161,26 +159,29 @@ pub fn selecting_rows_test() { use x2 <- decode.field(2, decode.bool) use x3 <- decode.field(3, decode.list(decode.string)) use x4 <- decode.field(4, pog.timestamp_decoder()) - use x5 <- decode.field(5, pog.date_decoder()) + use x5 <- decode.field(5, pog.calendar_date_decoder()) decode.success(#(x0, x1, x2, x3, x4, x5)) }) - |> pog.execute(db) - - returned.count - |> should.equal(1) - returned.rows - |> should.equal([ - #( - id, - "neo", - True, - ["black"], - pog.Timestamp(pog.Date(2022, 10, 10), pog.Time(11, 30, 30, 100_000)), - pog.Date(2020, 3, 4), - ), - ]) - - pog.disconnect(db) + |> pog.execute(db.data) + + assert returned.count == 1 + assert returned.rows + == [ + #( + id, + "neo", + True, + ["black"], + timestamp.from_calendar( + calendar.Date(2022, calendar.October, 10), + calendar.TimeOfDay(11, 30, 30, 100_000_000), + calendar.utc_offset, + ), + calendar.Date(2020, calendar.March, 4), + ), + ] + + disconnect(db) } pub fn invalid_sql_test() { @@ -188,16 +189,13 @@ pub fn invalid_sql_test() { let sql = "select select" let assert Error(pog.PostgresqlError(code, name, message)) = - pog.query(sql) |> pog.execute(db) + pog.query(sql) |> pog.execute(db.data) - code - |> should.equal("42601") - name - |> should.equal("syntax_error") - message - |> should.equal("syntax error at or near \"select\"") + assert code == "42601" + assert name == "syntax_error" + assert message == "syntax error at or near \"select\"" - pog.disconnect(db) + disconnect(db) } pub fn insert_constraint_error_test() { @@ -211,20 +209,16 @@ pub fn insert_constraint_error_test() { (900, 'felix', false, ARRAY ['black'], now(), '2020-03-05')" let assert Error(pog.ConstraintViolated(message, constraint, detail)) = - pog.query(sql) |> pog.execute(db) + pog.query(sql) |> pog.execute(db.data) - constraint - |> should.equal("cats_pkey") + assert constraint == "cats_pkey" - detail - |> should.equal("Key (id)=(900) already exists.") + assert detail == "Key (id)=(900) already exists." - message - |> should.equal( - "duplicate key value violates unique constraint \"cats_pkey\"", - ) + assert message + == "duplicate key value violates unique constraint \"cats_pkey\"" - pog.disconnect(db) + disconnect(db) } pub fn select_from_unknown_table_test() { @@ -232,16 +226,13 @@ pub fn select_from_unknown_table_test() { let sql = "SELECT * FROM unknown" let assert Error(pog.PostgresqlError(code, name, message)) = - pog.query(sql) |> pog.execute(db) + pog.query(sql) |> pog.execute(db.data) - code - |> should.equal("42P01") - name - |> should.equal("undefined_table") - message - |> should.equal("relation \"unknown\" does not exist") + assert code == "42P01" + assert name == "undefined_table" + assert message == "relation \"unknown\" does not exist" - pog.disconnect(db) + disconnect(db) } pub fn insert_with_incorrect_type_test() { @@ -253,62 +244,57 @@ pub fn insert_with_incorrect_type_test() { VALUES (true, true, true, true)" let assert Error(pog.PostgresqlError(code, name, message)) = - pog.query(sql) |> pog.execute(db) - - code - |> should.equal("42804") - name - |> should.equal("datatype_mismatch") - message - |> should.equal( - "column \"id\" is of type integer but expression is of type boolean", - ) + pog.query(sql) |> pog.execute(db.data) - pog.disconnect(db) + assert code == "42804" + assert name == "datatype_mismatch" + assert message + == "column \"id\" is of type integer but expression is of type boolean" + + disconnect(db) } pub fn execute_with_wrong_number_of_arguments_test() { let db = start_default() let sql = "SELECT * FROM cats WHERE id = $1" - pog.query(sql) - |> pog.execute(db) - |> should.equal(Error(pog.UnexpectedArgumentCount(expected: 1, got: 0))) + assert pog.execute(pog.query(sql), db.data) + == Error(pog.UnexpectedArgumentCount(expected: 1, got: 0)) - pog.disconnect(db) + disconnect(db) } fn assert_roundtrip( - db: pog.Connection, + db: actor.Started(_), value: a, type_name: String, encoder: fn(a) -> pog.Value, decoder: Decoder(a), -) -> pog.Connection { - pog.query("select $1::" <> type_name) - |> pog.parameter(encoder(value)) - |> pog.returning(decode.at([0], decoder)) - |> pog.execute(db) - |> should.equal(Ok(pog.Returned(count: 1, rows: [value]))) +) -> actor.Started(_) { + assert pog.query("select $1::" <> type_name) + |> pog.parameter(encoder(value)) + |> pog.returning(decode.at([0], decoder)) + |> pog.execute(db.data) + == Ok(pog.Returned(count: 1, rows: [value])) db } pub fn null_test() { let db = start_default() - pog.query("select $1") - |> pog.parameter(pog.null()) - |> pog.returning(decode.at([0], decode.optional(decode.int))) - |> pog.execute(db) - |> should.equal(Ok(pog.Returned(count: 1, rows: [None]))) + assert pog.query("select $1") + |> pog.parameter(pog.null()) + |> pog.returning(decode.at([0], decode.optional(decode.int))) + |> pog.execute(db.data) + == Ok(pog.Returned(count: 1, rows: [None])) - pog.disconnect(db) + disconnect(db) } pub fn bool_test() { start_default() |> assert_roundtrip(True, "bool", pog.bool, decode.bool) |> assert_roundtrip(False, "bool", pog.bool, decode.bool) - |> pog.disconnect + |> disconnect } pub fn int_test() { @@ -326,7 +312,7 @@ pub fn int_test() { |> assert_roundtrip(-4, "int", pog.int, decode.int) |> assert_roundtrip(-5, "int", pog.int, decode.int) |> assert_roundtrip(10_000_000, "int", pog.int, decode.int) - |> pog.disconnect + |> disconnect } pub fn float_test() { @@ -344,7 +330,7 @@ pub fn float_test() { |> assert_roundtrip(-4.654, "float", pog.float, decode.float) |> assert_roundtrip(-5.654, "float", pog.float, decode.float) |> assert_roundtrip(10_000_000.0, "float", pog.float, decode.float) - |> pog.disconnect + |> disconnect } pub fn text_test() { @@ -352,7 +338,7 @@ pub fn text_test() { |> assert_roundtrip("", "text", pog.text, decode.string) |> assert_roundtrip("✨", "text", pog.text, decode.string) |> assert_roundtrip("Hello, Joe!", "text", pog.text, decode.string) - |> pog.disconnect + |> disconnect } pub fn bytea_test() { @@ -367,7 +353,7 @@ pub fn bytea_test() { ) |> assert_roundtrip(<<1>>, "bytea", pog.bytea, decode.bit_array) |> assert_roundtrip(<<1, 2, 3>>, "bytea", pog.bytea, decode.bit_array) - |> pog.disconnect + |> disconnect } pub fn array_test() { @@ -382,29 +368,18 @@ pub fn array_test() { pog.array(pog.int, _), decode.list(decode.int), ) - |> pog.disconnect -} - -pub fn datetime_test() { - start_default() - |> assert_roundtrip( - pog.Timestamp(pog.Date(2022, 10, 12), pog.Time(11, 30, 33, 101)), - "timestamp", - pog.timestamp, - pog.timestamp_decoder(), - ) - |> pog.disconnect + |> disconnect } pub fn date_test() { start_default() |> assert_roundtrip( - pog.Date(2022, 10, 11), + calendar.Date(2022, calendar.October, 11), "date", - pog.date, - pog.date_decoder(), + pog.calendar_date, + pog.calendar_date_decoder(), ) - |> pog.disconnect + |> disconnect } pub fn nullable_test() { @@ -433,80 +408,84 @@ pub fn nullable_test() { pog.nullable(pog.int, _), decode.optional(decode.int), ) - |> pog.disconnect + |> disconnect } pub fn expected_argument_type_test() { let db = start_default() - pog.query("select $1::int") - |> pog.returning(decode.at([0], decode.string)) - |> pog.parameter(pog.float(1.2)) - |> pog.execute(db) - |> should.equal(Error(pog.UnexpectedArgumentType("int4", "1.2"))) + assert pog.query("select $1::int") + |> pog.returning(decode.at([0], decode.string)) + |> pog.parameter(pog.float(1.2)) + |> pog.execute(db.data) + == Error(pog.UnexpectedArgumentType("int4", "1.2")) - pog.disconnect(db) + disconnect(db) } pub fn expected_return_type_test() { let db = start_default() - pog.query("select 1") - |> pog.returning(decode.at([0], decode.string)) - |> pog.execute(db) - |> should.equal( - Error( + assert pog.query("select 1") + |> pog.returning(decode.at([0], decode.string)) + |> pog.execute(db.data) + == Error( pog.UnexpectedResultType([ decode.DecodeError(expected: "String", found: "Int", path: ["0"]), ]), - ), - ) + ) - pog.disconnect(db) + disconnect(db) } pub fn expected_five_millis_timeout_test() { - use <- run_with_timeout(20) let db = start_default() - pog.query("select sub.ret from (select pg_sleep(0.05), 'OK' as ret) as sub") - |> pog.timeout(5) - |> pog.returning(decode.at([0], decode.string)) - |> pog.execute(db) - |> should.equal(Error(pog.QueryTimeout)) + assert pog.query( + "select sub.ret from (select pg_sleep(0.05), 'OK' as ret) as sub", + ) + |> pog.timeout(5) + |> pog.returning(decode.at([0], decode.string)) + |> pog.execute(db.data) + == Error(pog.QueryTimeout) - pog.disconnect(db) + disconnect(db) } pub fn expected_ten_millis_no_timeout_test() { - use <- run_with_timeout(20) let db = start_default() - pog.query("select sub.ret from (select pg_sleep(0.01), 'OK' as ret) as sub") - |> pog.timeout(30) - |> pog.returning(decode.at([0], decode.string)) - |> pog.execute(db) - |> should.equal(Ok(pog.Returned(1, ["Ok"]))) + assert pog.query( + "select sub.ret from (select pg_sleep(0.01), 'OK' as ret) as sub", + ) + |> pog.timeout(50) + |> pog.returning(decode.at([0], decode.string)) + |> pog.execute(db.data) + == Ok(pog.Returned(1, ["OK"])) - pog.disconnect(db) + disconnect(db) } pub fn expected_ten_millis_no_default_timeout_test() { - use <- run_with_timeout(20) - let db = - default_config() - |> pog.default_timeout(30) - |> pog.connect - - pog.query("select sub.ret from (select pg_sleep(0.01), 'OK' as ret) as sub") - |> pog.returning(decode.at([0], decode.string)) - |> pog.execute(db) - |> should.equal(Ok(pog.Returned(1, ["Ok"]))) + let name = process.new_name("pog_test") + let assert Ok(db) = + default_config(name) + |> pog.start + + assert pog.query( + "select sub.ret from (select pg_sleep(0.01), 'OK' as ret) as sub", + ) + |> pog.returning(decode.at([0], decode.string)) + |> pog.execute(db.data) + == Ok(pog.Returned(1, ["OK"])) - pog.disconnect(db) + disconnect(db) } pub fn expected_maps_test() { - let db = pog.Config(..default_config(), rows_as_map: True) |> pog.connect + let name = process.new_name("pog_test") + let assert Ok(db) = + pog.Config(..default_config(name), rows_as_map: True) + |> pog.start let sql = " @@ -520,7 +499,7 @@ pub fn expected_maps_test() { let assert Ok(pog.Returned(rows: [id], ..)) = pog.query(sql) |> pog.returning(decode.at(["id"], decode.int)) - |> pog.execute(db) + |> pog.execute(db.data) let assert Ok(returned) = pog.query("SELECT * FROM cats WHERE id = $1") @@ -534,32 +513,35 @@ pub fn expected_maps_test() { "last_petted_at", pog.timestamp_decoder(), ) - use birthday <- decode.field("birthday", pog.date_decoder()) + use birthday <- decode.field("birthday", pog.calendar_date_decoder()) decode.success(#(id, name, is_cute, colors, last_petted_at, birthday)) }) - |> pog.execute(db) - - returned.count - |> should.equal(1) - returned.rows - |> should.equal([ - #( - id, - "neo", - True, - ["black"], - pog.Timestamp(pog.Date(2022, 10, 10), pog.Time(11, 30, 30, 0)), - pog.Date(2020, 3, 4), - ), - ]) - - pog.disconnect(db) + |> pog.execute(db.data) + + assert returned.count == 1 + assert returned.rows + == [ + #( + id, + "neo", + True, + ["black"], + timestamp.from_calendar( + calendar.Date(2022, calendar.October, 10), + calendar.TimeOfDay(11, 30, 30, 0), + calendar.utc_offset, + ), + calendar.Date(2020, calendar.March, 4), + ), + ] + + disconnect(db) } pub fn transaction_commit_test() { let db = start_default() let id_decoder = decode.at([0], decode.int) - let assert Ok(_) = pog.query("truncate table cats") |> pog.execute(db) + let assert Ok(_) = pog.query("truncate table cats") |> pog.execute(db.data) let insert = fn(db, name) { let sql = " @@ -577,7 +559,7 @@ pub fn transaction_commit_test() { // A succeeding transaction let assert Ok(#(id1, id2)) = - pog.transaction(db, fn(db) { + pog.transaction(db.data, fn(db) { let id1 = insert(db, "one") let id2 = insert(db, "two") Ok(#(id1, id2)) @@ -585,7 +567,7 @@ pub fn transaction_commit_test() { // An error returning transaction, it gets rolled back let assert Error(pog.TransactionRolledBack("Nah bruv!")) = - pog.transaction(db, fn(db) { + pog.transaction(db.data, fn(db) { let _id1 = insert(db, "two") let _id2 = insert(db, "three") Error("Nah bruv!") @@ -594,7 +576,7 @@ pub fn transaction_commit_test() { // A crashing transaction, it gets rolled back let _ = exception.rescue(fn() { - pog.transaction(db, fn(db) { + pog.transaction(db.data, fn(db) { let _id1 = insert(db, "four") let _id2 = insert(db, "five") panic as "testing rollbacks" @@ -604,11 +586,11 @@ pub fn transaction_commit_test() { let assert Ok(returned) = pog.query("select id from cats order by id") |> pog.returning(id_decoder) - |> pog.execute(db) + |> pog.execute(db.data) let assert [got1, got2] = returned.rows let assert True = id1 == got1 let assert True = id2 == got2 - pog.disconnect(db) + disconnect(db) }