Skip to content

Commit

Permalink
Refactor logic for WaitGlobalLatch
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaruza committed Dec 20, 2024
1 parent 3f45be5 commit ac2e1f2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 12 deletions.
62 changes: 51 additions & 11 deletions include/pgduckdb/pgduckdb_process_latch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

#include <mutex>
#include <condition_variable>
#include <atomic>

extern "C" {
struct Latch;
extern struct Latch *MyLatch;
extern int WaitLatch(Latch *latch, int wakeEvents, long timeout,
uint32_t wait_event_info);
extern int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32_t wait_event_info);
extern void ResetLatch(Latch *latch);

/* Defined in storage/latch.h */
#define WL_LATCH_SET (1 << 0)
#define WL_EXIT_ON_PM_DEATH (1 << 5)
#define WL_LATCH_SET (1 << 0)
#define WL_EXIT_ON_PM_DEATH (1 << 5)

/* Defined in utils/wait_event.h */
#define PG_WAIT_EXTENSION 0x07000000U
Expand All @@ -27,17 +27,57 @@ namespace pgduckdb {

struct GlobalProcessLatch {
public:
static void WaitGlobalLatch() {
static std::condition_variable cv;
static void
WaitGlobalLatch() {
static std::mutex lock;
if (lock.try_lock()) {
static std::condition_variable cv;
static std::atomic<int> threads_waiting_for_latch_event = 0;
static bool latch_released = false;
static std::atomic<bool> latch_event_thread_done = false;

// We exit if "latch" thread is waiting to finish
if (!latch_event_thread_done.load()) {
return;
}

if (!threads_waiting_for_latch_event.fetch_add(1)) {
// Lets start waiting for latch on this thread (we are consider this as "latch" thread)
std::unique_lock lk(lock);
lock.lock();

WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
cv.notify_all();
lock.unlock();

// Use for notify waiting threads to exit
latch_released = true;

// All new threads arriving on latch waiting will return immediately
latch_event_thread_done = true;

// Notify one thread waiting
cv.notify_one();
lk.unlock();

// Wait until we are only thread left
cv.wait(lk, [] { return threads_waiting_for_latch_event = 1; });
threads_waiting_for_latch_event--;

// Reset variables
latch_released = false;
latch_event_thread_done = false;
} else {
std::unique_lock<std::mutex> lk(lock);
cv.wait(lk);
std::unique_lock lk(lock);

// Wait for "latch" thread to notify
cv.wait(lk, [] { return latch_released; });

// We are done with this threads
threads_waiting_for_latch_event--;

lk.unlock();

// Notify another thread (either waiting thread or "latch" thread)
cv.notify_one();
}
}
};
Expand Down
7 changes: 6 additions & 1 deletion src/scan/postgres_table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,12 @@ PostgresTableReader::GetNextWorkerTuple() {
bool readerdone;

reader = (TupleQueueReader *)parallel_worker_readers[next_parallel_reader];
minimal_tuple = PostgresFunctionGuard(TupleQueueReaderNext, reader, true, &readerdone);

{
// We need to take global lock for `TupleQueueReaderNext` call
std::lock_guard<std::mutex> lock(GlobalProcessLock::GetLock());
minimal_tuple = PostgresFunctionGuard(TupleQueueReaderNext, reader, true, &readerdone);
}

if (readerdone) {
--nreaders;
Expand Down

0 comments on commit ac2e1f2

Please sign in to comment.