Skip to content

Commit

Permalink
Merge branch 'main' into zachg/handle_cross_org_propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
ZStriker19 authored Dec 20, 2024
2 parents ffd8fe2 + b253aa3 commit 34190b1
Show file tree
Hide file tree
Showing 11 changed files with 850 additions and 47 deletions.
File renamed without changes.
37 changes: 15 additions & 22 deletions tests/profiling/collector/test_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,12 @@ def test_ignore_profiler_single():


@pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent")
@pytest.mark.subprocess(ddtrace_run=True)
@pytest.mark.subprocess(ddtrace_run=True, env=dict(DD_PROFILING_IGNORE_PROFILER="1", DD_PROFILING_API_TIMEOUT="0.1"))
def test_ignore_profiler_gevent_task():
import gevent.monkey

gevent.monkey.patch_all()

import os
import time

from ddtrace.profiling import collector # noqa:F401
Expand All @@ -282,28 +281,22 @@ def collect(self):
_fib(22)
return []

for ignore in (True, False):
os.environ["DD_PROFILING_API_TIMEOUT"] = "0.1"
os.environ["DD_PROFILING_IGNORE_PROFILER"] = str(ignore)
p = profiler.Profiler()
p.start()
# This test is particularly useful with gevent enabled: create a test collector that run often and for long
# we're sure to catch it with the StackProfiler and that it's not ignored.
c = CollectorTest(p._profiler._recorder, interval=0.00001)
c.start()
p = profiler.Profiler()
p.start()
# This test is particularly useful with gevent enabled: create a test collector that run often and for long
# we're sure to catch it with the StackProfiler and that it's not ignored.
c = CollectorTest(p._profiler._recorder, interval=0.00001)
c.start()

for _ in range(100):
events = p._profiler._recorder.reset()
ids = {e.task_id for e in events[stack_event.StackSampleEvent]}
if (c._worker.ident in ids) != str(ignore):
break
# Give some time for gevent to switch greenlets
time.sleep(0.1)
else:
raise AssertionError("ignore == " + ignore)
for _ in range(100):
events = p._profiler._recorder.reset()
ids = {e.task_id for e in events[stack_event.StackSampleEvent]}
if c._worker.ident in ids:
raise AssertionError("Collector thread found")
time.sleep(0.1)

c.stop()
p.stop(flush=False)
c.stop()
p.stop(flush=False)


def test_collect():
Expand Down
45 changes: 21 additions & 24 deletions tests/profiling/test_accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ def spend_16():


def spend_cpu_2():
now = time.monotonic_ns()
now = time.process_time_ns()
# Active wait for 2 seconds
while time.monotonic_ns() - now < 2e9:
while time.process_time_ns() - now < 2e9:
pass


def spend_cpu_3():
# Active wait for 3 seconds
now = time.monotonic_ns()
while time.monotonic_ns() - now < 3e9:
now = time.process_time_ns()
while time.process_time_ns() - now < 3e9:
pass


Expand All @@ -51,8 +51,12 @@ def spend_cpu_3():
CPU_TOLERANCE = 0.05


def almost_equal(value, target, tolerance=TOLERANCE):
return abs(value - target) / target <= tolerance
def assert_almost_equal(value, target, tolerance=TOLERANCE):
if abs(value - target) / target > tolerance:
raise AssertionError(
f"Assertion failed: {value} is not approximately equal to {target} "
f"within tolerance={tolerance}, actual error={abs(value - target) / target}"
)


def total_time(time_data, funcname):
Expand All @@ -66,7 +70,7 @@ def test_accuracy():
from ddtrace.profiling import profiler
from ddtrace.profiling.collector import stack_event
from tests.profiling.test_accuracy import CPU_TOLERANCE
from tests.profiling.test_accuracy import almost_equal
from tests.profiling.test_accuracy import assert_almost_equal
from tests.profiling.test_accuracy import spend_16
from tests.profiling.test_accuracy import total_time

Expand All @@ -85,20 +89,13 @@ def test_accuracy():
time_spent_ns[idx][frame[2]] += event.wall_time_ns
cpu_spent_ns[idx][frame[2]] += event.cpu_time_ns

assert almost_equal(total_time(time_spent_ns, "spend_3"), 9e9)
assert almost_equal(total_time(time_spent_ns, "spend_1"), 2e9)
assert almost_equal(total_time(time_spent_ns, "spend_4"), 4e9)
assert almost_equal(total_time(time_spent_ns, "spend_16"), 16e9)
assert almost_equal(total_time(time_spent_ns, "spend_7"), 7e9)

try:
from time import monotonic_ns # noqa:F401
except ImportError:
# If we don't have access to high resolution clocks, we can't really test accurately things as it's spread in
# various Python implementation of monotonic, etc.
pass
else:
assert almost_equal(total_time(time_spent_ns, "spend_cpu_2"), 2e9)
assert almost_equal(total_time(time_spent_ns, "spend_cpu_3"), 3e9)
assert almost_equal(total_time(time_spent_ns, "spend_cpu_2"), 2e9, CPU_TOLERANCE)
assert almost_equal(total_time(time_spent_ns, "spend_cpu_3"), 3e9, CPU_TOLERANCE)
assert_almost_equal(total_time(time_spent_ns, "spend_3"), 9e9)
assert_almost_equal(total_time(time_spent_ns, "spend_1"), 2e9)
assert_almost_equal(total_time(time_spent_ns, "spend_4"), 4e9)
assert_almost_equal(total_time(time_spent_ns, "spend_16"), 16e9)
assert_almost_equal(total_time(time_spent_ns, "spend_7"), 7e9)

assert_almost_equal(total_time(time_spent_ns, "spend_cpu_2"), 2e9)
assert_almost_equal(total_time(time_spent_ns, "spend_cpu_3"), 3e9)
assert_almost_equal(total_time(cpu_spent_ns, "spend_cpu_2"), 2e9, CPU_TOLERANCE)
assert_almost_equal(total_time(cpu_spent_ns, "spend_cpu_3"), 3e9, CPU_TOLERANCE)
131 changes: 130 additions & 1 deletion tests/profiling_v2/collector/test_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ddtrace.profiling.collector import stack
from ddtrace.settings.profiling import config
from tests.profiling.collector import pprof_utils
from tests.profiling.collector import test_collector


# Python 3.11.9 is not compatible with gevent, https://github.com/gevent/gevent/issues/2040
Expand All @@ -24,6 +25,43 @@
)


# Use subprocess as ddup config persists across tests.
@pytest.mark.subprocess(
env=dict(
DD_PROFILING_MAX_FRAMES="5",
DD_PROFILING_OUTPUT_PPROF="/tmp/test_collect_truncate",
DD_PROFILING_STACK_V2_ENABLED="1",
)
)
@pytest.mark.skipif(sys.version_info[:2] == (3, 7), reason="stack_v2 is not supported on Python 3.7")
def test_collect_truncate():
import os

from ddtrace.profiling import profiler
from tests.profiling.collector import pprof_utils
from tests.profiling.collector.test_stack import func1

pprof_prefix = os.environ["DD_PROFILING_OUTPUT_PPROF"]
output_filename = pprof_prefix + "." + str(os.getpid())

max_nframes = int(os.environ["DD_PROFILING_MAX_FRAMES"])

p = profiler.Profiler()
p.start()

func1()

p.stop()

profile = pprof_utils.parse_profile(output_filename)
samples = pprof_utils.get_samples_with_value_type(profile, "wall-time")
assert len(samples) > 0
for sample in samples:
# stack v2 adds one extra frame for "%d frames omitted" message
# Also, it allows max_nframes + 1 frames, so we add 2 here.
assert len(sample.location_id) <= max_nframes + 2, len(sample.location_id)


@pytest.mark.parametrize("stack_v2_enabled", [True, False])
def test_stack_locations(stack_v2_enabled, tmp_path):
if sys.version_info[:2] == (3, 7) and stack_v2_enabled:
Expand Down Expand Up @@ -651,8 +689,23 @@ def _dofib():
assert checked_thread, "No samples found for the expected threads"


def test_max_time_usage():
with pytest.raises(ValueError):
stack.StackCollector(None, max_time_usage_pct=0)


def test_max_time_usage_over():
with pytest.raises(ValueError):
stack.StackCollector(None, max_time_usage_pct=200)


@pytest.mark.parametrize(
("stack_v2_enabled", "ignore_profiler"), [(True, True), (True, False), (False, True), (False, False)]
"stack_v2_enabled",
[True, False],
)
@pytest.mark.parametrize(
"ignore_profiler",
[True, False],
)
def test_ignore_profiler(stack_v2_enabled, ignore_profiler, tmp_path):
if sys.version_info[:2] == (3, 7) and stack_v2_enabled:
Expand Down Expand Up @@ -691,3 +744,79 @@ def test_ignore_profiler(stack_v2_enabled, ignore_profiler, tmp_path):
assert collector_worker_thread_id in thread_ids
else:
assert collector_worker_thread_id not in thread_ids


# TODO: support ignore profiler with stack_v2 and update this test
@pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent")
@pytest.mark.subprocess(
ddtrace_run=True,
env=dict(DD_PROFILING_IGNORE_PROFILER="1", DD_PROFILING_OUTPUT_PPROF="/tmp/test_ignore_profiler_gevent_task"),
)
def test_ignore_profiler_gevent_task():
import gevent.monkey

gevent.monkey.patch_all()

import os
import time
import typing

from ddtrace.profiling import collector
from ddtrace.profiling import event as event_mod
from ddtrace.profiling import profiler
from ddtrace.profiling.collector import stack
from tests.profiling.collector import pprof_utils

def _fib(n):
if n == 1:
return 1
elif n == 0:
return 0
else:
return _fib(n - 1) + _fib(n - 2)

class CollectorTest(collector.PeriodicCollector):
def collect(self) -> typing.Iterable[typing.Iterable[event_mod.Event]]:
_fib(22)
return []

output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"]

p = profiler.Profiler()

p.start()

for c in p._profiler._collectors:
if isinstance(c, stack.StackCollector):
c.ignore_profiler

c = CollectorTest(None, interval=0.00001)
c.start()

time.sleep(3)

worker_ident = c._worker.ident

c.stop()
p.stop()

profile = pprof_utils.parse_profile(output_filename + "." + str(os.getpid()))

samples = pprof_utils.get_samples_with_value_type(profile, "cpu-time")

thread_ids = set()
for sample in samples:
thread_id_label = pprof_utils.get_label_with_key(profile.string_table, sample, "thread id")
thread_id = int(thread_id_label.num)
thread_ids.add(thread_id)

assert worker_ident not in thread_ids


def test_repr():
test_collector._test_repr(
stack.StackCollector,
"StackCollector(status=<ServiceStatus.STOPPED: 'stopped'>, "
"recorder=Recorder(default_max_events=16384, max_events={}), min_interval_time=0.01, max_time_usage_pct=1.0, "
"nframes=64, ignore_profiler=False, endpoint_collection_enabled=None, tracer=None)",
)
32 changes: 32 additions & 0 deletions tests/profiling_v2/simple_program.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python
import os
import sys
import time

from ddtrace.internal import service
from ddtrace.profiling import bootstrap
from ddtrace.profiling.collector import stack


for running_collector in bootstrap.profiler._profiler._collectors:
if isinstance(running_collector, stack.StackCollector):
break
else:
raise AssertionError("Unable to find stack collector")


print("hello world")
assert running_collector.status == service.ServiceStatus.RUNNING
print(running_collector.interval)

t0 = time.time()
while time.time() - t0 < (running_collector.interval * 10):
pass

# Do some serious memory allocations!
for _ in range(5000000):
object()

print(os.getpid())
print(bootstrap.profiler._profiler._stack_v2_enabled)
sys.exit(42)
32 changes: 32 additions & 0 deletions tests/profiling_v2/simple_program_fork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import os
import sys
import threading

from ddtrace.internal import service
import ddtrace.profiling.auto
import ddtrace.profiling.bootstrap
import ddtrace.profiling.profiler


lock = threading.Lock()
lock.acquire()


assert ddtrace.profiling.bootstrap.profiler.status == service.ServiceStatus.RUNNING


child_pid = os.fork()
if child_pid == 0:
# Release it
lock.release()

# We track this one though
lock = threading.Lock()
lock.acquire()
lock.release()
else:
lock.release()
assert ddtrace.profiling.bootstrap.profiler.status == service.ServiceStatus.RUNNING
print(child_pid)
pid, status = os.waitpid(child_pid, 0)
sys.exit(os.WEXITSTATUS(status))
34 changes: 34 additions & 0 deletions tests/profiling_v2/simple_program_gevent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Import from ddtrace before monkey patching to ensure that we grab all the
# necessary references to the unpatched modules.
import ddtrace.auto # noqa: F401, I001
import ddtrace.profiling.auto # noqa:F401


import gevent.monkey # noqa:F402

gevent.monkey.patch_all()

import threading # noqa: E402, F402, I001
import time # noqa: E402, F402


def fibonacci(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fibonacci(n - 1) + fibonacci(n - 2)


i = 1
for _ in range(20):
threads = []
for _ in range(10):
t = threading.Thread(target=fibonacci, args=(i,))
t.start()
threads.append(t)
i += 1
for t in threads:
t.join()
time.sleep(0.1)
Loading

0 comments on commit 34190b1

Please sign in to comment.