Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: #645 added run_at_time_zone field to chain and add_job #646

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/pgengine/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (pge *PgEngine) SelectRebootChains(ctx context.Context, dest *[]Chain) erro

// SelectChains returns a list of chains should be executed at the current moment
func (pge *PgEngine) SelectChains(ctx context.Context, dest *[]Chain) error {
const sqlSelectChains = sqlSelectLiveChains + ` AND NOT COALESCE(starts_with(run_at, '@'), FALSE) AND timetable.is_cron_in_time(run_at, now())`
const sqlSelectChains = sqlSelectLiveChains + ` AND NOT COALESCE(starts_with(run_at, '@'), FALSE) AND timetable.is_cron_in_time(run_at, now() AT TIME ZONE run_at_time_zone)`
rows, err := pge.ConfigDb.Query(ctx, sqlSelectChains, pge.ClientName)
if err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions internal/pgengine/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ var Migrations func() migrator.Option = func() migrator.Option {
return ExecuteMigrationScript(ctx, tx, "00629.sql")
},
},
&migrator.Migration{
Name: "00645 Add option to specify time zone per chain",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00645.sql")
},
},
// adding new migration here, update "timetable"."migration" in "sql/init.sql"
// and "dbapi" variable in main.go!

Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/pgengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestInitAndTestConfigDBConnection(t *testing.T) {
funcNames := []string{"_validate_json_schema_type(text, jsonb)",
"validate_json_schema(jsonb, jsonb, jsonb)",
"add_task(timetable.command_kind, TEXT, BIGINT, DOUBLE PRECISION)",
"add_job(TEXT, timetable.cron, TEXT, JSONB, timetable.command_kind, TEXT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN, BOOLEAN, TEXT)",
"add_job(TEXT, timetable.cron, TEXT, JSONB, timetable.command_kind, TEXT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN, BOOLEAN, TEXT, TEXT)",
"is_cron_in_time(timetable.cron, timestamptz)"}
for _, funcName := range funcNames {
err := pge.ConfigDb.QueryRow(ctx, fmt.Sprintf("SELECT COALESCE(to_regprocedure('timetable.%s'), 0) :: int", funcName)).Scan(&oid)
Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/sql/cron.sql
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ COMMENT ON DOMAIN timetable.cron IS 'Extended CRON-style notation with support o
-- is_cron_in_time returns TRUE if timestamp is listed in cron expression
CREATE OR REPLACE FUNCTION timetable.is_cron_in_time(
run_at timetable.cron,
ts timestamptz
ts timestamp
gaslitbytech marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

) RETURNS BOOLEAN AS $$
SELECT
CASE WHEN run_at IS NULL THEN
Expand Down
3 changes: 2 additions & 1 deletion internal/pgengine/sql/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ CREATE TABLE timetable.chain (
self_destruct BOOLEAN DEFAULT FALSE,
exclusive_execution BOOLEAN DEFAULT FALSE,
client_name TEXT,
on_error TEXT
on_error TEXT,
run_at_time_zone TEXT NOT NULL DEFAULT current_setting('TIMEZONE')
gaslitbytech marked this conversation as resolved.
Show resolved Hide resolved
);

COMMENT ON TABLE timetable.chain IS
Expand Down
3 changes: 2 additions & 1 deletion internal/pgengine/sql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ VALUES
(10, '00560 Alter txid column to bigint'),
(11, '00573 Add ability to start a chain with delay'),
(12, '00575 Add on_error handling'),
(13, '00629 Add ignore_error column to timetable.execution_log');
(13, '00629 Add ignore_error column to timetable.execution_log'),
(14, '00645 Add option to specify time zone per chain');
7 changes: 4 additions & 3 deletions internal/pgengine/sql/job_functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ CREATE OR REPLACE FUNCTION timetable.add_job(
job_self_destruct BOOLEAN DEFAULT FALSE,
job_ignore_errors BOOLEAN DEFAULT TRUE,
job_exclusive BOOLEAN DEFAULT FALSE,
job_on_error TEXT DEFAULT NULL
job_on_error TEXT DEFAULT NULL,
job_time_zone TEXT DEFAULT current_setting('TIMEZONE')
) RETURNS BIGINT AS $$
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution, on_error)
VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive, job_on_error)
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution, on_error, run_at_time_zone)
VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive, job_on_error, job_time_zone)
RETURNING chain_id
),
cte_task(v_task_id) AS (
Expand Down
58 changes: 58 additions & 0 deletions internal/pgengine/sql/migrations/00645.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
CREATE OR REPLACE FUNCTION timetable.is_cron_in_time(
run_at timetable.cron,
ts timestamp
) RETURNS BOOLEAN AS $$
SELECT
CASE WHEN run_at IS NULL THEN
TRUE
ELSE
date_part('month', ts) = ANY(a.months)
AND (date_part('dow', ts) = ANY(a.dow) OR date_part('isodow', ts) = ANY(a.dow))
AND date_part('day', ts) = ANY(a.days)
AND date_part('hour', ts) = ANY(a.hours)
AND date_part('minute', ts) = ANY(a.mins)
END
FROM
timetable.cron_split_to_arrays(run_at) a
$$ LANGUAGE SQL;

DROP FUNCTION timetable.is_cron_in_time(timetable.cron, timestamp with time zone);

ALTER TABLE timetable.chain
ADD COLUMN run_at_time_zone TEXT DEFAULT current_setting('TIMEZONE') NOT NULL;

CREATE OR REPLACE FUNCTION timetable.add_job(
job_name TEXT,
job_schedule timetable.cron,
job_command TEXT,
job_parameters JSONB DEFAULT NULL,
job_kind timetable.command_kind DEFAULT 'SQL'::timetable.command_kind,
job_client_name TEXT DEFAULT NULL,
job_max_instances INTEGER DEFAULT NULL,
job_live BOOLEAN DEFAULT TRUE,
job_self_destruct BOOLEAN DEFAULT FALSE,
job_ignore_errors BOOLEAN DEFAULT TRUE,
job_exclusive BOOLEAN DEFAULT FALSE,
job_on_error TEXT DEFAULT NULL,
job_time_zone TEXT DEFAULT current_setting('TIMEZONE')
) RETURNS BIGINT AS $$
WITH
cte_chain (v_chain_id) AS (
INSERT INTO timetable.chain (chain_name, run_at, max_instances, live, self_destruct, client_name, exclusive_execution, on_error, run_at_time_zone)
VALUES (job_name, job_schedule,job_max_instances, job_live, job_self_destruct, job_client_name, job_exclusive, job_on_error, job_time_zone)
RETURNING chain_id
),
cte_task(v_task_id) AS (
INSERT INTO timetable.task (chain_id, task_order, kind, command, ignore_error, autonomous)
SELECT v_chain_id, 10, job_kind, job_command, job_ignore_errors, TRUE
FROM cte_chain
RETURNING task_id
),
cte_param AS (
INSERT INTO timetable.parameter (task_id, order_id, value)
SELECT v_task_id, 1, job_parameters FROM cte_task, cte_chain
)
SELECT v_chain_id FROM cte_chain
$$ LANGUAGE SQL;

DROP FUNCTION timetable.add_job(TEXT, timetable.cron, TEXT, JSONB, timetable.command_kind, TEXT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN, BOOLEAN, TEXT);
Loading