Skip to content

Commit

Permalink
mv
Browse files Browse the repository at this point in the history
  • Loading branch information
RekGRpth committed Nov 26, 2023
1 parent 2f05d19 commit 6a62d57
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 73 deletions.
58 changes: 51 additions & 7 deletions dest.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

enum PIPES {READ, WRITE};
extern Task task;
static emit_log_hook_type emit_log_hook_prev = NULL;
static int stdout_fd;
static int stdout_pipe[2];

Expand Down Expand Up @@ -95,7 +96,7 @@ DestReceiver myDestReceiver = {
.mydest = DestDebug,
};

DestReceiver *CreateDestReceiverMy(CommandDest dest) {
static DestReceiver *CreateDestReceiverMy(CommandDest dest) {
elog(DEBUG1, "id = %li", task.shared->id);
if ((stdout_fd = dup(STDOUT_FILENO)) < 0) ereport(ERROR, (errcode_for_socket_access(), errmsg("dup < 0"), errdetail("%m")));
if (pipe(stdout_pipe) < 0) ereport(ERROR, (errcode_for_socket_access(), errmsg("pipe < 0"), errdetail("%m")));
Expand All @@ -109,20 +110,20 @@ DestReceiver *CreateDestReceiverMy(CommandDest dest) {
#endif
}

void ReadyForQueryMy(CommandDest dest) {
static void ReadyForQueryMy(CommandDest dest) {
elog(DEBUG1, "id = %li", task.shared->id);
}

void NullCommandMy(CommandDest dest) {
static void NullCommandMy(CommandDest dest) {
elog(DEBUG1, "id = %li", task.shared->id);
}

#if PG_VERSION_NUM >= 130000
void BeginCommandMy(CommandTag commandTag, CommandDest dest) {
static void BeginCommandMy(CommandTag commandTag, CommandDest dest) {
elog(DEBUG1, "id = %li, commandTag = %s", task.shared->id, GetCommandTagName(commandTag));
}

void EndCommandMy(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output) {
static void EndCommandMy(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output) {
char completionTag[COMPLETION_TAG_BUFSIZE];
CommandTag tag = qc->commandTag;
const char *tagname = GetCommandTagName(tag);
Expand All @@ -136,11 +137,11 @@ void EndCommandMy(const QueryCompletion *qc, CommandDest dest, bool force_undeco
}
}
#else
void BeginCommandMy(const char *commandTag, CommandDest dest) {
static void BeginCommandMy(const char *commandTag, CommandDest dest) {
elog(DEBUG1, "id = %li, commandTag = %s", task.shared->id, commandTag);
}

void EndCommandMy(const char *commandTag, CommandDest dest) {
static void EndCommandMy(const char *commandTag, CommandDest dest) {
elog(DEBUG1, "id = %li, commandTag = %s", task.shared->id, commandTag);
if (task.skip) task.skip = 0; else {
if (!task.output.data) initStringInfoMy(&task.output);
Expand All @@ -149,3 +150,46 @@ void EndCommandMy(const char *commandTag, CommandDest dest) {
}
}
#endif

#if PG_VERSION_NUM < 90500
#define PQArgBlock undef
#endif

#include <postgres.c>

void task_execute(void) {
MemoryContext oldMemoryContext = MemoryContextSwitchTo(MessageContext);
MemoryContextResetAndDeleteChildren(MessageContext);
InvalidateCatalogSnapshotConditionally();
MemoryContextSwitchTo(oldMemoryContext);
whereToSendOutput = DestDebug;
ReadyForQueryMy(whereToSendOutput);
SetCurrentStatementStartTimestamp();
exec_simple_query(task.input);
if (IsTransactionState()) exec_simple_query(SQL(COMMIT));
if (IsTransactionState()) ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), errmsg("still active sql transaction")));
}

void task_catch(void) {
HOLD_INTERRUPTS();
disable_all_timeouts(false);
QueryCancelPending = false;
emit_log_hook_prev = emit_log_hook;
emit_log_hook = task_error;
EmitErrorReport();
debug_query_string = NULL;
AbortOutOfAnyTransaction();
#if PG_VERSION_NUM >= 110000
PortalErrorCleanup();
#endif
if (MyReplicationSlot) ReplicationSlotRelease();
#if PG_VERSION_NUM >= 100000
ReplicationSlotCleanup();
#endif
#if PG_VERSION_NUM >= 110000
jit_reset_after_error();
#endif
FlushErrorState();
xact_started = false;
RESUME_INTERRUPTS();
}
22 changes: 0 additions & 22 deletions dest.h

This file was deleted.

4 changes: 3 additions & 1 deletion include.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ extern void SignalHandlerForShutdownRequest(SIGNAL_ARGS);
#include <utils/timeout.h>
#include <utils/timestamp.h>

#include "dest.h"
//#include "dest.h"

#ifdef GP_VERSION_NUM
#include "cdb/cdbvars.h"
Expand Down Expand Up @@ -240,7 +240,9 @@ void SPI_cursor_fetch_my(const char *src, Portal portal, bool forward, long coun
void SPI_execute_plan_my(const char *src, SPIPlanPtr plan, Datum *values, const char *nulls, int res);
void SPI_execute_with_args_my(const char *src, int nargs, Oid *argtypes, Datum *values, const char *nulls, int res);
void SPI_finish_my(void);
void task_catch(void);
void task_error(ErrorData *edata);
void task_execute(void);
void task_free(Task *t);
void taskshared_free(int slot);
void workshared_free(int slot);
Expand Down
43 changes: 0 additions & 43 deletions task.c
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
#include "include.h"

#if PG_VERSION_NUM < 90500
#define PQArgBlock undef
#endif

#include <postgres.c>

extern char *task_null;
extern int task_fetch;
extern TaskShared *taskshared;
Expand Down Expand Up @@ -290,19 +284,6 @@ void task_error(ErrorData *edata) {
}
}

static void task_execute(void) {
MemoryContext oldMemoryContext = MemoryContextSwitchTo(MessageContext);
MemoryContextResetAndDeleteChildren(MessageContext);
InvalidateCatalogSnapshotConditionally();
MemoryContextSwitchTo(oldMemoryContext);
whereToSendOutput = DestDebug;
ReadyForQueryMy(whereToSendOutput);
SetCurrentStatementStartTimestamp();
exec_simple_query(task.input);
if (IsTransactionState()) exec_simple_query(SQL(COMMIT));
if (IsTransactionState()) ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), errmsg("still active sql transaction")));
}

void taskshared_free(int slot) {
LWLockAcquire(BackgroundWorkerLock, LW_EXCLUSIVE);
pg_read_barrier();
Expand All @@ -315,30 +296,6 @@ static void task_shmem_exit(int code, Datum arg) {
taskshared_free(DatumGetInt32(arg));
}

static void task_catch(void) {
HOLD_INTERRUPTS();
disable_all_timeouts(false);
QueryCancelPending = false;
emit_log_hook_prev = emit_log_hook;
emit_log_hook = task_error;
EmitErrorReport();
debug_query_string = NULL;
AbortOutOfAnyTransaction();
#if PG_VERSION_NUM >= 110000
PortalErrorCleanup();
#endif
if (MyReplicationSlot) ReplicationSlotRelease();
#if PG_VERSION_NUM >= 100000
ReplicationSlotCleanup();
#endif
#if PG_VERSION_NUM >= 110000
jit_reset_after_error();
#endif
FlushErrorState();
xact_started = false;
RESUME_INTERRUPTS();
}

static void task_latch(void) {
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
Expand Down

0 comments on commit 6a62d57

Please sign in to comment.