Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.internal import core
from ddtrace.internal.forksafe import ddtrace_after_in_parent
from ddtrace.internal.forksafe import ddtrace_before_fork
from ddtrace.internal.logger import get_logger
from ddtrace.trace import tracer

Expand Down Expand Up @@ -51,6 +53,12 @@ def patch_app(app, pin=None):
# Patch apply_async
trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async"))

# When celery starts the beat process it closes all open file descriptors with `close_open_fds`.
# This causes panics as it closes the native runtime file descriptors.
# To prevent this we call the treat the `close_open_fds` method as a fork
# calling fork hook before and after to make sure all native runtime are shut down.
trace_utils.wrap("celery.platforms", "close_open_fds", _patched_close_open_fds)

# connect to the Signal framework
signals.task_prerun.connect(trace_prerun, weak=False)
signals.task_postrun.connect(trace_postrun, weak=False)
Expand All @@ -76,6 +84,7 @@ def unpatch_app(app):
trace_utils.unwrap(celery.beat.Scheduler, "apply_entry")
trace_utils.unwrap(celery.beat.Scheduler, "tick")
trace_utils.unwrap(celery.app.task.Task, "apply_async")
trace_utils.unwrap(celery.platforms, "close_open_fds")

signals.task_prerun.disconnect(trace_prerun)
signals.task_postrun.disconnect(trace_postrun)
Expand Down Expand Up @@ -142,3 +151,21 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
task_span.finish()

return _traced_apply_async_inner


def _patched_close_open_fds(func, instance, args, kwargs):
"""
Celery closes all open file descriptors to isolate some fork child.
This causes the native runtime to panic because it expects to have a valid fd.
We call fork hook to avoid panics when the native runtime interacts with closed fds.
"""
log.debug("Shutting down native runtime before closing fds")
ddtrace_before_fork()

try:
result = func(*args, **kwargs)
finally:
ddtrace_after_in_parent()
log.debug("Restarting native runtime after closing fds")

return result
4 changes: 4 additions & 0 deletions releasenotes/notes/fix-celery-panic-03dd47d624a6d447.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
celery: This fix resolves panics in the NativeWriter caused by celery closing file descriptors when starting beat.
25 changes: 25 additions & 0 deletions tests/contrib/celery/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,3 +796,28 @@ def test_thread_start_during_fork(self):
if not err:
break
assert b"SIGSEGV" not in err

def test_celery_beat_no_panic(self):
"""Test that celery worker with --beat option doesn't cause panics.

When celery starts the beat scheduler, it closes file descriptors which
can cause panics in the native writer if not properly handled. This test
ensures that the native runtime is properly shut down before FDs are closed.
"""
with self.override_env(
dict(
DD_RUNTIME_METRICS_ENABLED="true",
)
):
celery_proc = subprocess.Popen(
["ddtrace-run", "celery", "--app=tests.contrib.celery.tasks", "worker", "--beat", "--loglevel=info"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
sleep(5)
celery_proc.terminate()
celery_proc.wait(timeout=10)

output = celery_proc.stdout.read()
# Check for panics in the output
assert b"panic" not in output.lower(), f"Found panic in celery beat output:\n{output}"
Loading