-
Notifications
You must be signed in to change notification settings - Fork 9
/
init.c
365 lines (334 loc) · 18.8 KB
/
init.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#include "include.h"
#if PG_VERSION_NUM < 130000
#include <catalog/pg_type.h>
#include <miscadmin.h>
#endif
#include <pgstat.h>
#include <postmaster/bgworker.h>
#include <storage/ipc.h>
#include <storage/proc.h>
#include <tcop/utility.h>
#include <utils/builtins.h>
#include <utils/memutils.h>
PG_MODULE_MAGIC;
char *task_null;
int conf_close;
int conf_fetch;
int task_fetch;
int task_idle;
int work_close;
int work_fetch;
int work_restart;
static bool task_delete;
static bool task_drift;
static bool task_header;
static bool task_string;
static char *task_active;
static char *task_data;
static char *task_delimiter;
static char *task_escape;
static char *task_group;
static char *task_json;
static char *task_live;
static char *task_quote;
static char *task_repeat;
static char *task_reset;
static char *task_schema;
static char *task_table;
static char *task_timeout;
static char *task_user;
static char *work_active;
static int conf_restart;
static int conf_work;
static int task_count;
static int task_id;
static int task_limit;
static int task_max;
static int task_run;
static int task_sleep;
static int work_task;
#if PG_VERSION_NUM >= 150000
static shmem_request_hook_type prev_shmem_request_hook = NULL;
#endif
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
TaskShared *taskshared = NULL;
WorkShared *workshared = NULL;
#if PG_VERSION_NUM < 130000
#include <signal.h>
volatile sig_atomic_t ShutdownRequestPending = false;
#endif
bool init_oid_is_string(Oid oid) {
switch (oid) {
case BITOID:
case BOOLOID:
case CIDOID:
case FLOAT4OID:
case FLOAT8OID:
case INT2OID:
case INT4OID:
case INT8OID:
case NUMERICOID:
case OIDOID:
case TIDOID:
case XIDOID:
return false;
default: return true;
}
}
bool lock_data_user_hash(Oid data, Oid user, int hash) {
LOCKTAG tag = {data, user, (uint32)hash, 3, LOCKTAG_USERLOCK, USER_LOCKMETHOD};
elog(DEBUG1, "data = %i, user = %i, hash = %i", data, user, hash);
return LockAcquire(&tag, AccessExclusiveLock, true, true) == LOCKACQUIRE_OK;
}
bool lock_data_user(Oid data, Oid user) {
LOCKTAG tag = {data, data, user, 6, LOCKTAG_USERLOCK, USER_LOCKMETHOD};
elog(DEBUG1, "data = %i, user = %i", data, user);
return LockAcquire(&tag, AccessExclusiveLock, true, true) == LOCKACQUIRE_OK;
}
bool lock_table_id(Oid table, int64 id) {
LOCKTAG tag = {table, (uint32)(id >> 32), (uint32)id, 4, LOCKTAG_USERLOCK, USER_LOCKMETHOD};
elog(DEBUG1, "table = %i, id = %li", table, id);
return LockAcquire(&tag, AccessExclusiveLock, true, true) == LOCKACQUIRE_OK;
}
bool lock_table_pid_hash(Oid table, int pid, int hash) {
LOCKTAG tag = {table, (uint32)pid, (uint32)hash, 5, LOCKTAG_USERLOCK, USER_LOCKMETHOD};
elog(DEBUG1, "table = %i, pid = %i, hash = %i", table, pid, hash);
return LockAcquire(&tag, AccessShareLock, true, true) == LOCKACQUIRE_OK;
}
bool unlock_data_user_hash(Oid data, Oid user, int hash) {
LOCKTAG tag = {data, user, (uint32)hash, 3, LOCKTAG_USERLOCK, USER_LOCKMETHOD};
elog(DEBUG1, "data = %i, user = %i, hash = %i", data, user, hash);
return LockRelease(&tag, AccessExclusiveLock, true);
}
bool unlock_data_user(Oid data, Oid user) {
LOCKTAG tag = {data, data, user, 6, LOCKTAG_USERLOCK, USER_LOCKMETHOD};
elog(DEBUG1, "data = %i, user = %i", data, user);
return LockRelease(&tag, AccessExclusiveLock, true);
}
bool unlock_table_id(Oid table, int64 id) {
LOCKTAG tag = {table, (uint32)(id >> 32), (uint32)id, 4, LOCKTAG_USERLOCK, USER_LOCKMETHOD};
elog(DEBUG1, "table = %i, id = %li", table, id);
return LockRelease(&tag, AccessExclusiveLock, true);
}
bool unlock_table_pid_hash(Oid table, int pid, int hash) {
LOCKTAG tag = {table, (uint32)pid, (uint32)hash, 5, LOCKTAG_USERLOCK, USER_LOCKMETHOD};
elog(DEBUG1, "table = %i, pid = %i, hash = %i", table, pid, hash);
return LockRelease(&tag, AccessShareLock, true);
}
static char *text_to_cstring_my(const text *t) {
MemoryContext oldMemoryContext = MemoryContextSwitchTo(TopMemoryContext);
char *result = text_to_cstring(t);
MemoryContextSwitchTo(oldMemoryContext);
return result;
}
char *TextDatumGetCStringMy(Datum datum) {
return datum ? text_to_cstring_my((text *)DatumGetPointer(datum)) : NULL;
}
static text *cstring_to_text_my(const char *s) {
MemoryContext oldMemoryContext = MemoryContextSwitchTo(TopMemoryContext);
text *result = cstring_to_text(s);
MemoryContextSwitchTo(oldMemoryContext);
return result;
}
Datum CStringGetTextDatumMy(const char *s) {
return s ? PointerGetDatum(cstring_to_text_my(s)) : (Datum)NULL;
}
void appendBinaryStringInfoEscapeQuote(StringInfo buf, const char *data, int len, bool string, char escape, char quote) {
if (!string && quote) appendStringInfoChar(buf, quote);
if (len) {
if (!string && escape && quote) for (int i = 0; len-- > 0; i++) {
if (quote == data[i]) appendStringInfoChar(buf, escape);
appendStringInfoChar(buf, data[i]);
} else appendBinaryStringInfo(buf, data, len);
}
if (!string && quote) appendStringInfoChar(buf, quote);
}
void init_conf(bool dynamic) {
BackgroundWorker worker = {0};
size_t len;
elog(DEBUG1, "dynamic = %s", dynamic ? "true" : "false");
if ((len = strlcpy(worker.bgw_function_name, "conf_main", sizeof(worker.bgw_function_name))) >= sizeof(worker.bgw_function_name)) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("strlcpy %li >= %li", len, sizeof(worker.bgw_function_name))));
if ((len = strlcpy(worker.bgw_library_name, "pg_task", sizeof(worker.bgw_library_name))) >= sizeof(worker.bgw_library_name)) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("strlcpy %li >= %li", len, sizeof(worker.bgw_library_name))));
if ((len = strlcpy(worker.bgw_name, "postgres pg_conf", sizeof(worker.bgw_name))) >= sizeof(worker.bgw_name)) ereport(WARNING, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("strlcpy %li >= %li", len, sizeof(worker.bgw_name))));
#if PG_VERSION_NUM >= 110000
if ((len = strlcpy(worker.bgw_type, worker.bgw_name, sizeof(worker.bgw_type))) >= sizeof(worker.bgw_type)) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("strlcpy %li >= %li", len, sizeof(worker.bgw_type))));
#endif
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_restart_time = conf_restart;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
if (dynamic) {
worker.bgw_notify_pid = MyProcPid;
IsUnderPostmaster = true;
if (!RegisterDynamicBackgroundWorker(&worker, NULL)) ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("could not register background worker"), errhint("Consider increasing configuration parameter \"max_worker_processes\".")));
IsUnderPostmaster = false;
} else RegisterBackgroundWorker(&worker);
}
static void init_assign_int(const char *var, int newval, void *extra) {
int oldval;
if (PostmasterPid != MyProcPid) return;
if (process_shared_preload_libraries_in_progress) return;
oldval = atoi(GetConfigOption(var, true, true));
if (oldval == newval) return;
elog(DEBUG1, "oldval = %i, newval = %i", oldval, newval);
init_conf(true);
}
static void init_assign_string(const char *var, const char *newval, void *extra) {
bool new_isnull;
bool old_isnull;
const char *oldval;
if (PostmasterPid != MyProcPid) return;
if (process_shared_preload_libraries_in_progress) return;
oldval = GetConfigOption(var, true, true);
old_isnull = !oldval || oldval[0] == '\0';
new_isnull = !newval || newval[0] == '\0';
if (old_isnull && new_isnull) return;
if (!old_isnull && !new_isnull && !strcmp(oldval, newval)) return;
elog(DEBUG1, "oldval = %s, newval = %s", !old_isnull ? oldval : task_null, !new_isnull ? newval : task_null);
init_conf(true);
}
static void init_assign_data(const char *newval, void *extra) { init_assign_string("pg_task.data", newval, extra); }
static void init_assign_json(const char *newval, void *extra) { init_assign_string("pg_task.json", newval, extra); }
static void init_assign_reset(const char *newval, void *extra) { init_assign_string("pg_task.reset", newval, extra); }
static void init_assign_schema(const char *newval, void *extra) { init_assign_string("pg_task.schema", newval, extra); }
static void init_assign_sleep(int newval, void *extra) { init_assign_int("pg_task.sleep", newval, extra); }
static void init_assign_table(const char *newval, void *extra) { init_assign_string("pg_task.table", newval, extra); }
static void init_assign_user(const char *newval, void *extra) { init_assign_string("pg_task.user", newval, extra); }
static size_t init_taskshared_memsize(void) {
return mul_size(work_task, sizeof(*taskshared));
}
static size_t init_workshared_memsize(void) {
return mul_size(conf_work, sizeof(*workshared));
}
#if PG_VERSION_NUM >= 150000
static void init_shmem_request_hook(void) {
if (prev_shmem_request_hook) prev_shmem_request_hook();
RequestAddinShmemSpace(init_taskshared_memsize());
RequestAddinShmemSpace(init_workshared_memsize());
}
#endif
static void init_shmem_startup_hook(void) {
bool found;
if (prev_shmem_startup_hook) prev_shmem_startup_hook();
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
taskshared = ShmemInitStruct("pg_taskshared", init_taskshared_memsize(), &found);
if (!found) MemSet(taskshared, 0, init_taskshared_memsize());
elog(DEBUG1, "pg_taskshared %s found", found ? "" : "not");
workshared = ShmemInitStruct("pg_workshared", init_workshared_memsize(), &found);
if (!found) MemSet(workshared, 0, init_workshared_memsize());
elog(DEBUG1, "pg_workshared %s found", found ? "" : "not");
LWLockRelease(AddinShmemInitLock);
}
void initStringInfoMy(StringInfo buf) {
MemoryContext oldMemoryContext = MemoryContextSwitchTo(TopMemoryContext);
initStringInfo(buf);
MemoryContextSwitchTo(oldMemoryContext);
}
void _PG_init(void) {
if (!process_shared_preload_libraries_in_progress) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("This module can only be loaded via shared_preload_libraries")));
DefineCustomBoolVariable("pg_task.delete", "pg_task delete", "Auto delete task when both output and error are nulls", &task_delete, true, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomBoolVariable("pg_task.drift", "pg_task drift", "Compute next repeat time by stop time instead by plan time", &task_drift, false, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomBoolVariable("pg_task.header", "pg_task header", "Show columns headers in output", &task_header, true, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomBoolVariable("pg_task.string", "pg_task string", "Quote only strings", &task_string, true, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_conf.close", "pg_conf close", "Close conf, milliseconds", &conf_close, BGW_DEFAULT_RESTART_INTERVAL * 1000, 1, INT_MAX, PGC_SUSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_conf.fetch", "pg_conf fetch", "Fetch conf rows at once", &conf_fetch, 10, 1, INT_MAX, PGC_SUSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_conf.restart", "pg_conf restart", "Restart conf interval, seconds", &conf_restart, BGW_DEFAULT_RESTART_INTERVAL, 1, INT_MAX, PGC_SUSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_conf.work", "pg_conf work", "Maximum work workers", &conf_work, max_worker_processes, 1, max_worker_processes, PGC_POSTMASTER, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_task.count", "pg_task count", "Non-negative maximum count of tasks, are executed by current background worker process before exit", &task_count, 0, 0, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_task.fetch", "pg_task fetch", "Fetch task rows at once", &task_fetch, 100, 1, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_task.idle", "pg_task idle", "Idle task count", &task_idle, BGW_DEFAULT_RESTART_INTERVAL, 1, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_task.id", "pg_task id", "Current task id", &task_id, 0, INT_MIN, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_task.limit", "pg_task limit", "Limit task rows at once", &task_limit, 1000, 0, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_task.max", "pg_task max", "Maximum count of concurrently executing tasks in group, negative value means pause between tasks in milliseconds", &task_max, 0, INT_MIN, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_task.run", "pg_task run", "Maximum count of concurrently executing tasks in work", &task_run, INT_MAX, 1, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_task.sleep", "pg_task sleep", "Check tasks every sleep milliseconds", &task_sleep, 1000, 1, INT_MAX, PGC_USERSET, 0, NULL, init_assign_sleep, NULL);
DefineCustomIntVariable("pg_work.close", "pg_work close", "Close work, milliseconds", &work_close, BGW_DEFAULT_RESTART_INTERVAL * 1000, 1, INT_MAX, PGC_SUSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_work.fetch", "pg_work fetch", "Fetch work rows at once", &work_fetch, 100, 1, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_work.restart", "pg_work restart", "Restart work interval, seconds", &work_restart, BGW_DEFAULT_RESTART_INTERVAL, 1, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomIntVariable("pg_work.task", "pg_work task", "Maximum task workers", &work_task, max_worker_processes, 1, max_worker_processes, PGC_POSTMASTER, 0, NULL, NULL, NULL);
DefineCustomStringVariable("pg_task.active", "pg_task active", "Positive period after plan time, when task is active for executing", &task_active, "1 hour", PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomStringVariable("pg_task.data", "pg_task data", "Database name for tasks table", &task_data, "postgres", PGC_SIGHUP, 0, NULL, init_assign_data, NULL);
DefineCustomStringVariable("pg_task.delimiter", "pg_task delimiter", "Results columns delimiter", &task_delimiter, "\t", PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomStringVariable("pg_task.escape", "pg_task escape", "Results columns escape", &task_escape, "", PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomStringVariable("pg_task.group", "pg_task group", "Task grouping by name", &task_group, "group", PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomStringVariable("pg_task.json", "pg_task json", "Json configuration, available keys: data, reset, schema, table, sleep and user", &task_json, SQL([{"data":"postgres"}]), PGC_SIGHUP, 0, NULL, init_assign_json, NULL);
DefineCustomStringVariable("pg_task.live", "pg_task live", "Non-negative maximum time of live of current background worker process before exit", &task_live, "0 sec", PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomStringVariable("pg_task.null", "pg_task null", "Null text value representation", &task_null, "\\N", PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomStringVariable("pg_task.quote", "pg_task quote", "Results columns quote", &task_quote, "", PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomStringVariable("pg_task.repeat", "pg_task repeat", "Non-negative auto repeat tasks interval", &task_repeat, "0 sec", PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomStringVariable("pg_task.reset", "pg_task reset", "Interval of reset tasks", &task_reset, "1 hour", PGC_USERSET, 0, NULL, init_assign_reset, NULL);
DefineCustomStringVariable("pg_task.schema", "pg_task schema", "Schema name for tasks table", &task_schema, "public", PGC_USERSET, 0, NULL, init_assign_schema, NULL);
DefineCustomStringVariable("pg_task.table", "pg_task table", "Table name for tasks table", &task_table, "task", PGC_USERSET, 0, NULL, init_assign_table, NULL);
DefineCustomStringVariable("pg_task.timeout", "pg_task timeout", "Non-negative allowed time for task run", &task_timeout, "0 sec", PGC_USERSET, 0, NULL, NULL, NULL);
DefineCustomStringVariable("pg_task.user", "pg_task user", "User name for tasks table", &task_user, "postgres", PGC_SIGHUP, 0, NULL, init_assign_user, NULL);
elog(DEBUG1, "json = %s, user = %s, data = %s, schema = %s, table = %s, null = %s, sleep = %i, reset = %s, active = %s", task_json, task_user, task_data, task_schema, task_table, task_null, task_sleep, task_reset, work_active);
#ifdef GP_VERSION_NUM
if (!IS_QUERY_DISPATCHER()) return;
#endif
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = init_shmem_startup_hook;
#if PG_VERSION_NUM >= 150000
prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = init_shmem_request_hook;
#elif PG_VERSION_NUM >= 90600
RequestAddinShmemSpace(init_taskshared_memsize());
RequestAddinShmemSpace(init_workshared_memsize());
#endif
init_conf(false);
}
void append_with_tabs(StringInfo buf, const char *str) {
char ch;
while ((ch = *str++) != '\0') {
appendStringInfoCharMacro(buf, ch);
if (ch == '\n') appendStringInfoCharMacro(buf, '\t');
}
}
const char *error_severity(int elevel) {
const char *prefix;
switch (elevel) {
case DEBUG1: case DEBUG2: case DEBUG3: case DEBUG4: case DEBUG5: prefix = gettext_noop("DEBUG"); break;
case LOG:
#if PG_VERSION_NUM >= 90600
case LOG_SERVER_ONLY:
#endif
prefix = gettext_noop("LOG"); break;
case INFO: prefix = gettext_noop("INFO"); break;
case NOTICE: prefix = gettext_noop("NOTICE"); break;
case WARNING:
#if PG_VERSION_NUM >= 140000
case WARNING_CLIENT_ONLY:
#endif
prefix = gettext_noop("WARNING"); break;
case ERROR: prefix = gettext_noop("ERROR"); break;
case FATAL: prefix = gettext_noop("FATAL"); break;
case PANIC: prefix = gettext_noop("PANIC"); break;
default: prefix = "???"; break;
}
return prefix;
}
int severity_error(const char *error) {
if (!pg_strcasecmp("DEBUG", error)) return DEBUG1;
if (!pg_strcasecmp("ERROR", error)) return ERROR;
if (!pg_strcasecmp("FATAL", error)) return FATAL;
if (!pg_strcasecmp("INFO", error)) return INFO;
if (!pg_strcasecmp("LOG", error)) return LOG;
if (!pg_strcasecmp("NOTICE", error)) return NOTICE;
if (!pg_strcasecmp("PANIC", error)) return PANIC;
if (!pg_strcasecmp("WARNING", error)) return WARNING;
return ERROR;
}
bool is_log_level_output(int elevel, int log_min_level) {
if (elevel == LOG
#if PG_VERSION_NUM >= 90600
|| elevel == LOG_SERVER_ONLY
#endif
) {
if (log_min_level == LOG || log_min_level <= ERROR) return true;
#if PG_VERSION_NUM >= 140000
} else if (elevel == WARNING_CLIENT_ONLY) {
return false; // never sent to log, regardless of log_min_level
#endif
} else if (log_min_level == LOG) {
if (elevel >= FATAL) return true; // elevel != LOG
} else if (elevel >= log_min_level) return true; // Neither is LOG
return false;
}