From ac2e1f226f70c84f5fad8a70bc86cee3ad679b67 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Fri, 20 Dec 2024 10:23:54 +0100 Subject: [PATCH] Refactor logic for WaitGlobalLatch --- include/pgduckdb/pgduckdb_process_latch.hpp | 62 +++++++++++++++++---- src/scan/postgres_table_reader.cpp | 7 ++- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/include/pgduckdb/pgduckdb_process_latch.hpp b/include/pgduckdb/pgduckdb_process_latch.hpp index cd6210c8..14854086 100644 --- a/include/pgduckdb/pgduckdb_process_latch.hpp +++ b/include/pgduckdb/pgduckdb_process_latch.hpp @@ -2,17 +2,17 @@ #include #include +#include extern "C" { struct Latch; extern struct Latch *MyLatch; -extern int WaitLatch(Latch *latch, int wakeEvents, long timeout, - uint32_t wait_event_info); +extern int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32_t wait_event_info); extern void ResetLatch(Latch *latch); /* Defined in storage/latch.h */ -#define WL_LATCH_SET (1 << 0) -#define WL_EXIT_ON_PM_DEATH (1 << 5) +#define WL_LATCH_SET (1 << 0) +#define WL_EXIT_ON_PM_DEATH (1 << 5) /* Defined in utils/wait_event.h */ #define PG_WAIT_EXTENSION 0x07000000U @@ -27,17 +27,57 @@ namespace pgduckdb { struct GlobalProcessLatch { public: - static void WaitGlobalLatch() { - static std::condition_variable cv; + static void + WaitGlobalLatch() { static std::mutex lock; - if (lock.try_lock()) { + static std::condition_variable cv; + static std::atomic threads_waiting_for_latch_event = 0; + static bool latch_released = false; + static std::atomic latch_event_thread_done = false; + + // We exit if "latch" thread is waiting to finish + if (!latch_event_thread_done.load()) { + return; + } + + if (!threads_waiting_for_latch_event.fetch_add(1)) { + // Lets start waiting for latch on this thread (we are consider this as "latch" thread) + std::unique_lock lk(lock); + lock.lock(); + WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, PG_WAIT_EXTENSION); ResetLatch(MyLatch); - cv.notify_all(); - lock.unlock(); + + // Use for notify waiting threads to exit + latch_released = true; + + // All new threads arriving on latch waiting will return immediately + latch_event_thread_done = true; + + // Notify one thread waiting + cv.notify_one(); + lk.unlock(); + + // Wait until we are only thread left + cv.wait(lk, [] { return threads_waiting_for_latch_event = 1; }); + threads_waiting_for_latch_event--; + + // Reset variables + latch_released = false; + latch_event_thread_done = false; } else { - std::unique_lock lk(lock); - cv.wait(lk); + std::unique_lock lk(lock); + + // Wait for "latch" thread to notify + cv.wait(lk, [] { return latch_released; }); + + // We are done with this threads + threads_waiting_for_latch_event--; + + lk.unlock(); + + // Notify another thread (either waiting thread or "latch" thread) + cv.notify_one(); } } }; diff --git a/src/scan/postgres_table_reader.cpp b/src/scan/postgres_table_reader.cpp index c8256d3c..50b7ece8 100644 --- a/src/scan/postgres_table_reader.cpp +++ b/src/scan/postgres_table_reader.cpp @@ -272,7 +272,12 @@ PostgresTableReader::GetNextWorkerTuple() { bool readerdone; reader = (TupleQueueReader *)parallel_worker_readers[next_parallel_reader]; - minimal_tuple = PostgresFunctionGuard(TupleQueueReaderNext, reader, true, &readerdone); + + { + // We need to take global lock for `TupleQueueReaderNext` call + std::lock_guard lock(GlobalProcessLock::GetLock()); + minimal_tuple = PostgresFunctionGuard(TupleQueueReaderNext, reader, true, &readerdone); + } if (readerdone) { --nreaders;