From ef966c02a94388987c0846493b60293109100447 Mon Sep 17 00:00:00 2001 From: David Micallef Date: Mon, 25 Mar 2024 09:26:05 +1100 Subject: [PATCH 1/3] added run_at_time_zone field to chain and add_job --- internal/pgengine/access.go | 2 +- internal/pgengine/migration.go | 6 +++ internal/pgengine/pgengine_test.go | 2 +- internal/pgengine/sql/cron.sql | 2 +- internal/pgengine/sql/ddl.sql | 3 +- internal/pgengine/sql/init.sql | 3 +- internal/pgengine/sql/job_functions.sql | 7 +-- internal/pgengine/sql/migrations/00645.sql | 58 ++++++++++++++++++++++ 8 files changed, 75 insertions(+), 8 deletions(-) create mode 100644 internal/pgengine/sql/migrations/00645.sql diff --git a/internal/pgengine/access.go b/internal/pgengine/access.go index 9cf4380c..e2ad3e10 100644 --- a/internal/pgengine/access.go +++ b/internal/pgengine/access.go @@ -83,7 +83,7 @@ func (pge *PgEngine) SelectRebootChains(ctx context.Context, dest *[]Chain) erro // SelectChains returns a list of chains should be executed at the current moment func (pge *PgEngine) SelectChains(ctx context.Context, dest *[]Chain) error { - const sqlSelectChains = sqlSelectLiveChains + ` AND NOT COALESCE(starts_with(run_at, '@'), FALSE) AND timetable.is_cron_in_time(run_at, now())` + const sqlSelectChains = sqlSelectLiveChains + ` AND NOT COALESCE(starts_with(run_at, '@'), FALSE) AND timetable.is_cron_in_time(run_at, now() AT TIME ZONE run_at_time_zone)` rows, err := pge.ConfigDb.Query(ctx, sqlSelectChains, pge.ClientName) if err != nil { return err diff --git a/internal/pgengine/migration.go b/internal/pgengine/migration.go index b1852d61..15d7f0ab 100644 --- a/internal/pgengine/migration.go +++ b/internal/pgengine/migration.go @@ -141,6 +141,12 @@ var Migrations func() migrator.Option = func() migrator.Option { return ExecuteMigrationScript(ctx, tx, "00629.sql") }, }, + &migrator.Migration{ + Name: "00645 Change is_cron_in_time ts param to timestamp without time zone", + Func: func(ctx context.Context, tx pgx.Tx) error { + return ExecuteMigrationScript(ctx, tx, "00645.sql") + }, + }, // adding new migration here, update "timetable"."migration" in "sql/init.sql" // and "dbapi" variable in main.go! diff --git a/internal/pgengine/pgengine_test.go b/internal/pgengine/pgengine_test.go index 3a20aedb..9ccc472d 100644 --- a/internal/pgengine/pgengine_test.go +++ b/internal/pgengine/pgengine_test.go @@ -80,7 +80,7 @@ func TestInitAndTestConfigDBConnection(t *testing.T) { funcNames := []string{"_validate_json_schema_type(text, jsonb)", "validate_json_schema(jsonb, jsonb, jsonb)", "add_task(timetable.command_kind, TEXT, BIGINT, DOUBLE PRECISION)", - "add_job(TEXT, timetable.cron, TEXT, JSONB, timetable.command_kind, TEXT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN, BOOLEAN, TEXT)", + "add_job(TEXT, timetable.cron, TEXT, JSONB, timetable.command_kind, TEXT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN, BOOLEAN, TEXT, TEXT)", "is_cron_in_time(timetable.cron, timestamptz)"} for _, funcName := range funcNames { err := pge.ConfigDb.QueryRow(ctx, fmt.Sprintf("SELECT COALESCE(to_regprocedure('timetable.%s'), 0) :: int", funcName)).Scan(&oid) diff --git a/internal/pgengine/sql/cron.sql b/internal/pgengine/sql/cron.sql index 96c9e2be..77e50224 100644 --- a/internal/pgengine/sql/cron.sql +++ b/internal/pgengine/sql/cron.sql @@ -144,7 +144,7 @@ COMMENT ON DOMAIN timetable.cron IS 'Extended CRON-style notation with support o -- is_cron_in_time returns TRUE if timestamp is listed in cron expression CREATE OR REPLACE FUNCTION timetable.is_cron_in_time( run_at timetable.cron, - ts timestamptz + ts timestamp ) RETURNS BOOLEAN AS $$ SELECT CASE WHEN run_at IS NULL THEN diff --git a/internal/pgengine/sql/ddl.sql b/internal/pgengine/sql/ddl.sql index 8bff1d76..c95e40a6 100644 --- a/internal/pgengine/sql/ddl.sql +++ b/internal/pgengine/sql/ddl.sql @@ -8,7 +8,8 @@ CREATE TABLE timetable.chain ( self_destruct BOOLEAN DEFAULT FALSE, exclusive_execution BOOLEAN DEFAULT FALSE, client_name TEXT, - on_error TEXT + on_error TEXT, + run_at_time_zone TEXT NOT NULL DEFAULT current_setting('TIMEZONE') ); COMMENT ON TABLE timetable.chain IS diff --git a/internal/pgengine/sql/init.sql b/internal/pgengine/sql/init.sql index 875ea9d0..a6edb095 100644 --- a/internal/pgengine/sql/init.sql +++ b/internal/pgengine/sql/init.sql @@ -26,4 +26,5 @@ VALUES (10, '00560 Alter txid column to bigint'), (11, '00573 Add ability to start a chain with delay'), (12, '00575 Add on_error handling'), - (13, '00629 Add ignore_error column to timetable.execution_log'); \ No newline at end of file + (13, '00629 Add ignore_error column to timetable.execution_log'), + (14, '00645 Change is_cron_in_time ts param to timestamp without time zone'); \ No newline at end of file diff --git a/internal/pgengine/sql/job_functions.sql b/internal/pgengine/sql/job_functions.sql index b8fa821e..1f6b93a7 100644 --- a/internal/pgengine/sql/job_functions.sql +++ b/internal/pgengine/sql/job_functions.sql @@ -25,12 +25,13 @@ CREATE OR REPLACE FUNCTION timetable.add_job( job_self_destruct BOOLEAN DEFAULT FALSE, job_ignore_errors BOOLEAN DEFAULT TRUE, job_exclusive BOOLEAN DEFAULT FALSE, - job_on_error TEXT DEFAULT NULL + job_on_error TEXT DEFAULT NULL, + job_time_zone TEXT DEFAULT current_setting('TIMEZONE') ) RETURNS BIGINT AS $$ WITH cte_chain (v_chain_id) AS ( - INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution, on_error) - VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive, job_on_error) + INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution, on_error, run_at_time_zone) + VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive, job_on_error, job_time_zone) RETURNING chain_id ), cte_task(v_task_id) AS ( diff --git a/internal/pgengine/sql/migrations/00645.sql b/internal/pgengine/sql/migrations/00645.sql new file mode 100644 index 00000000..0afd2758 --- /dev/null +++ b/internal/pgengine/sql/migrations/00645.sql @@ -0,0 +1,58 @@ +CREATE OR REPLACE FUNCTION timetable.is_cron_in_time( + run_at timetable.cron, + ts timestamp +) RETURNS BOOLEAN AS $$ + SELECT + CASE WHEN run_at IS NULL THEN + TRUE + ELSE + date_part('month', ts) = ANY(a.months) + AND (date_part('dow', ts) = ANY(a.dow) OR date_part('isodow', ts) = ANY(a.dow)) + AND date_part('day', ts) = ANY(a.days) + AND date_part('hour', ts) = ANY(a.hours) + AND date_part('minute', ts) = ANY(a.mins) + END + FROM + timetable.cron_split_to_arrays(run_at) a +$$ LANGUAGE SQL; + +DROP FUNCTION timetable.is_cron_in_time(timetable.cron, timestamp with time zone); + +ALTER TABLE timetable.chain + ADD COLUMN run_at_time_zone TEXT DEFAULT current_setting('TIMEZONE') NOT NULL USING current_setting('TIMEZONE'); + +CREATE OR REPLACE FUNCTION timetable.add_job( + job_name TEXT, + job_schedule timetable.cron, + job_command TEXT, + job_parameters JSONB DEFAULT NULL, + job_kind timetable.command_kind DEFAULT 'SQL'::timetable.command_kind, + job_client_name TEXT DEFAULT NULL, + job_max_instances INTEGER DEFAULT NULL, + job_live BOOLEAN DEFAULT TRUE, + job_self_destruct BOOLEAN DEFAULT FALSE, + job_ignore_errors BOOLEAN DEFAULT TRUE, + job_exclusive BOOLEAN DEFAULT FALSE, + job_on_error TEXT DEFAULT NULL, + job_time_zone TEXT DEFAULT current_setting('TIMEZONE') +) RETURNS BIGINT AS $$ + WITH + cte_chain (v_chain_id) AS ( + INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution, on_error, run_at_time_zone) + VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive, job_on_error, job_time_zone) + RETURNING chain_id + ), + cte_task(v_task_id) AS ( + INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous) + SELECT v_chain_id, 10, job_kind, job_command, job_ignore_errors, TRUE + FROM cte_chain + RETURNING task_id + ), + cte_param AS ( + INSERT INTO timetable.parameter (task_id, order_id, value) + SELECT v_task_id, 1, job_parameters FROM cte_task, cte_chain + ) + SELECT v_chain_id FROM cte_chain +$$ LANGUAGE SQL; + +DROP FUNCTION timetable.add_job(TEXT, timetable.cron, TEXT, JSONB, timetable.command_kind, TEXT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN, BOOLEAN, TEXT); \ No newline at end of file From b05ad829eff6eae6ca3b75e233d79ebaf74c3419 Mon Sep 17 00:00:00 2001 From: David Micallef Date: Mon, 25 Mar 2024 10:27:56 +1100 Subject: [PATCH 2/3] did not need using statement on alter table --- internal/pgengine/sql/migrations/00645.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pgengine/sql/migrations/00645.sql b/internal/pgengine/sql/migrations/00645.sql index 0afd2758..2e9fdc00 100644 --- a/internal/pgengine/sql/migrations/00645.sql +++ b/internal/pgengine/sql/migrations/00645.sql @@ -19,7 +19,7 @@ $$ LANGUAGE SQL; DROP FUNCTION timetable.is_cron_in_time(timetable.cron, timestamp with time zone); ALTER TABLE timetable.chain - ADD COLUMN run_at_time_zone TEXT DEFAULT current_setting('TIMEZONE') NOT NULL USING current_setting('TIMEZONE'); + ADD COLUMN run_at_time_zone TEXT DEFAULT current_setting('TIMEZONE') NOT NULL; CREATE OR REPLACE FUNCTION timetable.add_job( job_name TEXT, From 4744d485ed03bfb1103603874102dda60d894b93 Mon Sep 17 00:00:00 2001 From: David Micallef Date: Tue, 26 Mar 2024 04:40:27 +1100 Subject: [PATCH 3/3] changed name of migration to be more alligned with solution. --- internal/pgengine/migration.go | 2 +- internal/pgengine/sql/init.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/pgengine/migration.go b/internal/pgengine/migration.go index 15d7f0ab..f0d5db3d 100644 --- a/internal/pgengine/migration.go +++ b/internal/pgengine/migration.go @@ -142,7 +142,7 @@ var Migrations func() migrator.Option = func() migrator.Option { }, }, &migrator.Migration{ - Name: "00645 Change is_cron_in_time ts param to timestamp without time zone", + Name: "00645 Add option to specify time zone per chain", Func: func(ctx context.Context, tx pgx.Tx) error { return ExecuteMigrationScript(ctx, tx, "00645.sql") }, diff --git a/internal/pgengine/sql/init.sql b/internal/pgengine/sql/init.sql index a6edb095..d797b2fe 100644 --- a/internal/pgengine/sql/init.sql +++ b/internal/pgengine/sql/init.sql @@ -27,4 +27,4 @@ VALUES (11, '00573 Add ability to start a chain with delay'), (12, '00575 Add on_error handling'), (13, '00629 Add ignore_error column to timetable.execution_log'), - (14, '00645 Change is_cron_in_time ts param to timestamp without time zone'); \ No newline at end of file + (14, '00645 Add option to specify time zone per chain'); \ No newline at end of file