Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 76 additions & 17 deletions ddtrace/internal/_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,49 +203,83 @@ class PyRef
PyObject* _obj;
};

// Reasons associated with the _request wake channel.
static constexpr unsigned char REQUEST_REASON_NONE = 0;
static constexpr unsigned char REQUEST_REASON_AWAKE = 1 << 0;
static constexpr unsigned char REQUEST_REASON_STOP = 1 << 1;
static constexpr unsigned char REQUEST_REASON_FORK_STOP = 1 << 2;

// ----------------------------------------------------------------------------
class Event
{
public:
void set()
void set(unsigned char reasons = REQUEST_REASON_AWAKE)
{
std::lock_guard<std::mutex> lock(_mutex);

if (_set)
unsigned char old_reasons = _reasons;
_reasons |= reasons;

if (old_reasons == _reasons)
return;

_set = true;
_cond.notify_all();
}

void wait()
{
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [this]() { return _set; });
_cond.wait(lock, [this]() { return _reasons != REQUEST_REASON_NONE; });
}

bool wait(std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lock(_mutex);
return _cond.wait_for(lock, timeout, [this]() { return _set; });
return _cond.wait_for(lock, timeout, [this]() { return _reasons != REQUEST_REASON_NONE; });
}

bool wait(std::chrono::time_point<std::chrono::steady_clock> until)
{
std::unique_lock<std::mutex> lock(_mutex);
return _cond.wait_until(lock, until, [this]() { return _set; });
return _cond.wait_until(lock, until, [this]() { return _reasons != REQUEST_REASON_NONE; });
}

void clear()
{
std::lock_guard<std::mutex> lock(_mutex);
_set = false;
_reasons = REQUEST_REASON_NONE;
}

void clear(unsigned char reasons)
{
std::lock_guard<std::mutex> lock(_mutex);
_reasons &= static_cast<unsigned char>(~reasons);
}

unsigned char consume(unsigned char reasons)
{
std::lock_guard<std::mutex> lock(_mutex);

unsigned char matched = _reasons & reasons;
_reasons &= static_cast<unsigned char>(~reasons);

return matched;
}

unsigned char consume_all()
{
std::lock_guard<std::mutex> lock(_mutex);

unsigned char reasons = _reasons;
_reasons = REQUEST_REASON_NONE;

return reasons;
}

private:
std::condition_variable _cond;
std::mutex _mutex;
bool _set = false;
unsigned char _reasons = REQUEST_REASON_NONE;
};

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -413,18 +447,29 @@ PeriodicThread_start(PeriodicThread* self, PyObject* Py_UNUSED(args))

bool error = false;
if (self->_no_wait_at_start)
self->_request->set();
self->_request->set(REQUEST_REASON_AWAKE);

while (!self->_stopping) {
{
AllowThreads _;

if (self->_request->wait(self->_next_call_time)) {
if (self->_stopping)
if (self->_stopping) {
// _stopping can be set by:
// 1. pre-fork stop: preserve non-fork reasons (e.g. awake)
// so they survive restart;
// 2. regular stop(): consume all pending reasons.
const unsigned char stop_reasons =
self->_request->consume(REQUEST_REASON_FORK_STOP | REQUEST_REASON_STOP);
const bool has_fork_stop = (stop_reasons & REQUEST_REASON_FORK_STOP) != 0;
if (!has_fork_stop)
self->_request->consume_all();
break;
}

// Awake signal
self->_request->clear();
// Request wakeup while running (awake/no_wait_at_start).
// Timer wakeups are the wait(...) == false branch.
self->_request->consume_all();
}
}

Expand Down Expand Up @@ -489,7 +534,8 @@ PeriodicThread_awake(PeriodicThread* self, PyObject* Py_UNUSED(args))
std::lock_guard<std::mutex> lock(*self->_awake_mutex);

self->_served->clear();
self->_request->set();
self->_request->set(REQUEST_REASON_AWAKE);

self->_served->wait();
}

Expand All @@ -506,7 +552,7 @@ PeriodicThread_stop(PeriodicThread* self, PyObject* Py_UNUSED(args))
}

self->_stopping = true;
self->_request->set();
self->_request->set(REQUEST_REASON_STOP);

Py_RETURN_NONE;
}
Expand Down Expand Up @@ -584,8 +630,10 @@ PeriodicThread__after_fork(PeriodicThread* self, PyObject* Py_UNUSED(args))
self->_atexit = false;
self->_skip_shutdown = false;

// We don't clear the request event because we might have pending awake
// requests.
// During prefork, stop() sets _request to wake the thread promptly so it
// can exit before fork. That wakeup should not trigger a synthetic
// periodic() run right after restart.
self->_request->clear(REQUEST_REASON_FORK_STOP);
self->_started->clear();
self->_stopped->clear();
self->_served->clear();
Expand All @@ -601,7 +649,18 @@ PeriodicThread__before_fork(PeriodicThread* self, PyObject* Py_UNUSED(args))
{
self->_skip_shutdown = true;

PeriodicThread_stop(self, NULL);
// Synchronize with awake() so there is no window where _stopping is visible
// before the fork-stop wake reason is published.
{
AllowThreads _;
std::lock_guard<std::mutex> lock(*self->_awake_mutex);

// Equivalent to PeriodicThread_stop(), with an explicit fork-stop
// reason. Keep this order so the worker cannot consume fork-stop as a
// normal wakeup.
self->_stopping = true;
self->_request->set(REQUEST_REASON_FORK_STOP);
}

Py_RETURN_NONE;
}
Expand Down
64 changes: 64 additions & 0 deletions tests/internal/test_periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import platform
from threading import Event
from threading import Thread
from time import monotonic
from time import sleep

Expand Down Expand Up @@ -163,6 +164,69 @@ def periodic(self):
assert exit_code == 42


def test_periodic_service_no_immediate_run_after_fork():
periodic_ran = Event()

class EveryMinute(periodic.PeriodicService):
def periodic(self):
periodic_ran.set()

# Use a long interval so periodic() can only run on an explicit wakeup.
service = EveryMinute(60)
service.start()

try:
assert service._worker is not None

# Simulate fork stop/restart around the worker.
service._worker._before_fork()
service._worker.join()
service._worker._after_fork()

# Prefork stop wakeup must not trigger a synthetic run after restart.
assert not periodic_ran.wait(timeout=0.5)

# A real wakeup after restart must still run periodic().
service._worker.awake()
assert periodic_ran.wait(timeout=1)
finally:
service.stop()
service.join()


def test_periodic_thread_preserves_awake_during_restart_window():
"""Ensure awake() isn't lost while a periodic thread is paused for fork.

The fork-safe runtime stops periodic threads before fork and restarts them
afterwards. A stop wakeup from that path should not trigger an immediate
periodic() run after restart, but a real awake() call made during this
restart window must still be honored.
"""
periodic_ran = Event()
awake_done = Event()

def _run_periodic():
periodic_ran.set()

t = periodic.PeriodicThread(60, _run_periodic)
t.start()
t._before_fork()
t.join()

awaker = Thread(target=lambda: (t.awake(), awake_done.set()))
awaker.start()

# Simulate thread restart after fork in parent.
t._after_fork()

assert awake_done.wait(timeout=1), "awake() should complete after restart"
assert periodic_ran.wait(timeout=1), "periodic() should run for the awake request"

t.stop()
t.join()
awaker.join(timeout=1)


def test_timer():
end = 0

Expand Down
2 changes: 1 addition & 1 deletion tests/profiling/collector/test_sample_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def hello():
p.stop()

output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())
files = glob.glob(output_filename + ".*.internal_metadata.json")
files = sorted(glob.glob(output_filename + ".*.internal_metadata.json"))

found_at_least_one_with_more_samples_than_sampling_events = False
for i, f in enumerate(files):
Expand Down
10 changes: 10 additions & 0 deletions tests/profiling/gunicorn_count_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# -*- encoding: utf-8 -*-
import os
import threading
from typing import Callable


def app(environ: dict[str, str], start_response: Callable[[str, list[tuple[str, str]]], None]) -> list[bytes]:
response_body = f"ok pid {os.getpid()} tid {threading.get_ident()}".encode("utf-8")
start_response("200 OK", [("Content-type", "text/plain")])
return [response_body]
Loading