diff --git a/dest.c b/dest.c index 4053c9b..afe8446 100644 --- a/dest.c +++ b/dest.c @@ -3,6 +3,7 @@ enum PIPES {READ, WRITE}; extern Task task; +static emit_log_hook_type emit_log_hook_prev = NULL; static int stdout_fd; static int stdout_pipe[2]; @@ -95,7 +96,7 @@ DestReceiver myDestReceiver = { .mydest = DestDebug, }; -DestReceiver *CreateDestReceiverMy(CommandDest dest) { +static DestReceiver *CreateDestReceiverMy(CommandDest dest) { elog(DEBUG1, "id = %li", task.shared->id); if ((stdout_fd = dup(STDOUT_FILENO)) < 0) ereport(ERROR, (errcode_for_socket_access(), errmsg("dup < 0"), errdetail("%m"))); if (pipe(stdout_pipe) < 0) ereport(ERROR, (errcode_for_socket_access(), errmsg("pipe < 0"), errdetail("%m"))); @@ -109,20 +110,20 @@ DestReceiver *CreateDestReceiverMy(CommandDest dest) { #endif } -void ReadyForQueryMy(CommandDest dest) { +static void ReadyForQueryMy(CommandDest dest) { elog(DEBUG1, "id = %li", task.shared->id); } -void NullCommandMy(CommandDest dest) { +static void NullCommandMy(CommandDest dest) { elog(DEBUG1, "id = %li", task.shared->id); } #if PG_VERSION_NUM >= 130000 -void BeginCommandMy(CommandTag commandTag, CommandDest dest) { +static void BeginCommandMy(CommandTag commandTag, CommandDest dest) { elog(DEBUG1, "id = %li, commandTag = %s", task.shared->id, GetCommandTagName(commandTag)); } -void EndCommandMy(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output) { +static void EndCommandMy(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output) { char completionTag[COMPLETION_TAG_BUFSIZE]; CommandTag tag = qc->commandTag; const char *tagname = GetCommandTagName(tag); @@ -136,11 +137,11 @@ void EndCommandMy(const QueryCompletion *qc, CommandDest dest, bool force_undeco } } #else -void BeginCommandMy(const char *commandTag, CommandDest dest) { +static void BeginCommandMy(const char *commandTag, CommandDest dest) { elog(DEBUG1, "id = %li, commandTag = %s", task.shared->id, commandTag); } -void EndCommandMy(const char *commandTag, CommandDest dest) { +static void EndCommandMy(const char *commandTag, CommandDest dest) { elog(DEBUG1, "id = %li, commandTag = %s", task.shared->id, commandTag); if (task.skip) task.skip = 0; else { if (!task.output.data) initStringInfoMy(&task.output); @@ -149,3 +150,46 @@ void EndCommandMy(const char *commandTag, CommandDest dest) { } } #endif + +#if PG_VERSION_NUM < 90500 +#define PQArgBlock undef +#endif + +#include + +void task_execute(void) { + MemoryContext oldMemoryContext = MemoryContextSwitchTo(MessageContext); + MemoryContextResetAndDeleteChildren(MessageContext); + InvalidateCatalogSnapshotConditionally(); + MemoryContextSwitchTo(oldMemoryContext); + whereToSendOutput = DestDebug; + ReadyForQueryMy(whereToSendOutput); + SetCurrentStatementStartTimestamp(); + exec_simple_query(task.input); + if (IsTransactionState()) exec_simple_query(SQL(COMMIT)); + if (IsTransactionState()) ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), errmsg("still active sql transaction"))); +} + +void task_catch(void) { + HOLD_INTERRUPTS(); + disable_all_timeouts(false); + QueryCancelPending = false; + emit_log_hook_prev = emit_log_hook; + emit_log_hook = task_error; + EmitErrorReport(); + debug_query_string = NULL; + AbortOutOfAnyTransaction(); +#if PG_VERSION_NUM >= 110000 + PortalErrorCleanup(); +#endif + if (MyReplicationSlot) ReplicationSlotRelease(); +#if PG_VERSION_NUM >= 100000 + ReplicationSlotCleanup(); +#endif +#if PG_VERSION_NUM >= 110000 + jit_reset_after_error(); +#endif + FlushErrorState(); + xact_started = false; + RESUME_INTERRUPTS(); +} diff --git a/dest.h b/dest.h deleted file mode 100644 index c927599..0000000 --- a/dest.h +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef _DEST_H -#define _DEST_H - -#include - -#include - -DestReceiver *CreateDestReceiverMy(CommandDest dest); -#if PG_VERSION_NUM >= 130000 -void BeginCommandMy(CommandTag commandTag, CommandDest dest); -#else -void BeginCommandMy(const char *commandTag, CommandDest dest); -#endif -#if PG_VERSION_NUM >= 130000 -void EndCommandMy(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output); -#else -void EndCommandMy(const char *commandTag, CommandDest dest); -#endif -void NullCommandMy(CommandDest dest); -void ReadyForQueryMy(CommandDest dest); - -#endif // _DEST_H diff --git a/include.h b/include.h index 6a760fa..ed85762 100644 --- a/include.h +++ b/include.h @@ -71,7 +71,7 @@ extern void SignalHandlerForShutdownRequest(SIGNAL_ARGS); #include #include -#include "dest.h" +//#include "dest.h" #ifdef GP_VERSION_NUM #include "cdb/cdbvars.h" @@ -240,7 +240,9 @@ void SPI_cursor_fetch_my(const char *src, Portal portal, bool forward, long coun void SPI_execute_plan_my(const char *src, SPIPlanPtr plan, Datum *values, const char *nulls, int res); void SPI_execute_with_args_my(const char *src, int nargs, Oid *argtypes, Datum *values, const char *nulls, int res); void SPI_finish_my(void); +void task_catch(void); void task_error(ErrorData *edata); +void task_execute(void); void task_free(Task *t); void taskshared_free(int slot); void workshared_free(int slot); diff --git a/task.c b/task.c index 964a957..4fd5dd5 100644 --- a/task.c +++ b/task.c @@ -1,11 +1,5 @@ #include "include.h" -#if PG_VERSION_NUM < 90500 -#define PQArgBlock undef -#endif - -#include - extern char *task_null; extern int task_fetch; extern TaskShared *taskshared; @@ -290,19 +284,6 @@ void task_error(ErrorData *edata) { } } -static void task_execute(void) { - MemoryContext oldMemoryContext = MemoryContextSwitchTo(MessageContext); - MemoryContextResetAndDeleteChildren(MessageContext); - InvalidateCatalogSnapshotConditionally(); - MemoryContextSwitchTo(oldMemoryContext); - whereToSendOutput = DestDebug; - ReadyForQueryMy(whereToSendOutput); - SetCurrentStatementStartTimestamp(); - exec_simple_query(task.input); - if (IsTransactionState()) exec_simple_query(SQL(COMMIT)); - if (IsTransactionState()) ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), errmsg("still active sql transaction"))); -} - void taskshared_free(int slot) { LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE); pg_read_barrier(); @@ -315,30 +296,6 @@ static void task_shmem_exit(int code, Datum arg) { taskshared_free(DatumGetInt32(arg)); } -static void task_catch(void) { - HOLD_INTERRUPTS(); - disable_all_timeouts(false); - QueryCancelPending = false; - emit_log_hook_prev = emit_log_hook; - emit_log_hook = task_error; - EmitErrorReport(); - debug_query_string = NULL; - AbortOutOfAnyTransaction(); -#if PG_VERSION_NUM >= 110000 - PortalErrorCleanup(); -#endif - if (MyReplicationSlot) ReplicationSlotRelease(); -#if PG_VERSION_NUM >= 100000 - ReplicationSlotCleanup(); -#endif -#if PG_VERSION_NUM >= 110000 - jit_reset_after_error(); -#endif - FlushErrorState(); - xact_started = false; - RESUME_INTERRUPTS(); -} - static void task_latch(void) { ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS();