diff --git a/.riot/requirements/118ee6f.txt b/.riot/requirements/118ee6f.txt deleted file mode 100644 index d90521f4f87..00000000000 --- a/.riot/requirements/118ee6f.txt +++ /dev/null @@ -1,37 +0,0 @@ -# -# This file is autogenerated by pip-compile with Python 3.10 -# by the following command: -# -# pip-compile --no-annotate .riot/requirements/118ee6f.in -# -amqp==5.2.0 -attrs==23.1.0 -billiard==4.2.0 -celery==5.3.6 -click==8.1.7 -click-didyoumean==0.3.0 -click-plugins==1.1.1 -click-repl==0.3.0 -coverage[toml]==7.3.4 -exceptiongroup==1.2.0 -hypothesis==6.45.0 -iniconfig==2.0.0 -kombu==5.3.4 -mock==5.1.0 -more-itertools==8.10.0 -opentracing==2.4.0 -packaging==23.2 -pluggy==1.3.0 -prompt-toolkit==3.0.43 -pytest==7.4.3 -pytest-cov==4.1.0 -pytest-mock==3.12.0 -pytest-randomly==3.15.0 -python-dateutil==2.8.2 -redis==3.5.3 -six==1.16.0 -sortedcontainers==2.4.0 -tomli==2.0.1 -tzdata==2023.3 -vine==5.1.0 -wcwidth==0.2.12 diff --git a/.riot/requirements/138c2b7.txt b/.riot/requirements/138c2b7.txt deleted file mode 100644 index 91995855c74..00000000000 --- a/.riot/requirements/138c2b7.txt +++ /dev/null @@ -1,35 +0,0 @@ -# -# This file is autogenerated by pip-compile with Python 3.11 -# by the following command: -# -# pip-compile --no-annotate .riot/requirements/138c2b7.in -# -amqp==5.2.0 -attrs==23.1.0 -billiard==4.2.0 -celery==5.3.6 -click==8.1.7 -click-didyoumean==0.3.0 -click-plugins==1.1.1 -click-repl==0.3.0 -coverage[toml]==7.3.4 -hypothesis==6.45.0 -iniconfig==2.0.0 -kombu==5.3.4 -mock==5.1.0 -more-itertools==8.10.0 -opentracing==2.4.0 -packaging==23.2 -pluggy==1.3.0 -prompt-toolkit==3.0.43 -pytest==7.4.3 -pytest-cov==4.1.0 -pytest-mock==3.12.0 -pytest-randomly==3.15.0 -python-dateutil==2.8.2 -redis==3.5.3 -six==1.16.0 -sortedcontainers==2.4.0 -tzdata==2023.3 -vine==5.1.0 -wcwidth==0.2.12 diff --git a/.riot/requirements/1509aa1.txt b/.riot/requirements/1509aa1.txt new file mode 100644 index 00000000000..e05463aeb8f --- /dev/null +++ b/.riot/requirements/1509aa1.txt @@ -0,0 +1,35 @@ +# +# This file is autogenerated by pip-compile with Python 3.12 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/1509aa1.in +# +amqp==5.3.1 +attrs==24.2.0 +billiard==4.2.1 +celery[redis]==5.4.0 +click==8.1.7 +click-didyoumean==0.3.1 +click-plugins==1.1.1 +click-repl==0.3.0 +coverage[toml]==7.6.8 +hypothesis==6.45.0 +iniconfig==2.0.0 +kombu==5.4.2 +mock==5.1.0 +more-itertools==8.10.0 +opentracing==2.4.0 +packaging==24.2 +pluggy==1.5.0 +prompt-toolkit==3.0.48 +pytest==8.3.3 +pytest-cov==6.0.0 +pytest-mock==3.14.0 +pytest-randomly==3.16.0 +python-dateutil==2.9.0.post0 +redis==5.2.0 +six==1.16.0 +sortedcontainers==2.4.0 +tzdata==2024.2 +vine==5.1.0 +wcwidth==0.2.13 diff --git a/.riot/requirements/1df4aa0.txt b/.riot/requirements/1df4aa0.txt new file mode 100644 index 00000000000..af3e88e2542 --- /dev/null +++ b/.riot/requirements/1df4aa0.txt @@ -0,0 +1,35 @@ +# +# This file is autogenerated by pip-compile with Python 3.11 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/1df4aa0.in +# +amqp==5.3.1 +attrs==24.2.0 +billiard==4.2.1 +celery[redis]==5.4.0 +click==8.1.7 +click-didyoumean==0.3.1 +click-plugins==1.1.1 +click-repl==0.3.0 +coverage[toml]==7.6.8 +hypothesis==6.45.0 +iniconfig==2.0.0 +kombu==5.4.2 +mock==5.1.0 +more-itertools==8.10.0 +opentracing==2.4.0 +packaging==24.2 +pluggy==1.5.0 +prompt-toolkit==3.0.48 +pytest==8.3.3 +pytest-cov==6.0.0 +pytest-mock==3.14.0 +pytest-randomly==3.16.0 +python-dateutil==2.9.0.post0 +redis==5.2.0 +six==1.16.0 +sortedcontainers==2.4.0 +tzdata==2024.2 +vine==5.1.0 +wcwidth==0.2.13 diff --git a/.riot/requirements/654f8c0.txt b/.riot/requirements/654f8c0.txt new file mode 100644 index 00000000000..6723fdabd37 --- /dev/null +++ b/.riot/requirements/654f8c0.txt @@ -0,0 +1,38 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/654f8c0.in +# +amqp==5.3.1 +async-timeout==5.0.1 +attrs==24.2.0 +billiard==4.2.1 +celery[redis]==5.4.0 +click==8.1.7 +click-didyoumean==0.3.1 +click-plugins==1.1.1 +click-repl==0.3.0 +coverage[toml]==7.6.8 +exceptiongroup==1.2.2 +hypothesis==6.45.0 +iniconfig==2.0.0 +kombu==5.4.2 +mock==5.1.0 +more-itertools==8.10.0 +opentracing==2.4.0 +packaging==24.2 +pluggy==1.5.0 +prompt-toolkit==3.0.48 +pytest==8.3.3 +pytest-cov==6.0.0 +pytest-mock==3.14.0 +pytest-randomly==3.16.0 +python-dateutil==2.9.0.post0 +redis==5.2.0 +six==1.16.0 +sortedcontainers==2.4.0 +tomli==2.1.0 +tzdata==2024.2 +vine==5.1.0 +wcwidth==0.2.13 diff --git a/.riot/requirements/91a1ee4.txt b/.riot/requirements/91a1ee4.txt deleted file mode 100644 index 27ba75912fc..00000000000 --- a/.riot/requirements/91a1ee4.txt +++ /dev/null @@ -1,35 +0,0 @@ -# -# This file is autogenerated by pip-compile with Python 3.12 -# by the following command: -# -# pip-compile --no-annotate .riot/requirements/91a1ee4.in -# -amqp==5.2.0 -attrs==23.1.0 -billiard==4.2.0 -celery==5.3.6 -click==8.1.7 -click-didyoumean==0.3.0 -click-plugins==1.1.1 -click-repl==0.3.0 -coverage[toml]==7.3.4 -hypothesis==6.45.0 -iniconfig==2.0.0 -kombu==5.3.4 -mock==5.1.0 -more-itertools==8.10.0 -opentracing==2.4.0 -packaging==23.2 -pluggy==1.3.0 -prompt-toolkit==3.0.43 -pytest==7.4.3 -pytest-cov==4.1.0 -pytest-mock==3.12.0 -pytest-randomly==3.15.0 -python-dateutil==2.8.2 -redis==3.5.3 -six==1.16.0 -sortedcontainers==2.4.0 -tzdata==2023.3 -vine==5.1.0 -wcwidth==0.2.12 diff --git a/ddtrace/contrib/internal/celery/signals.py b/ddtrace/contrib/internal/celery/signals.py index 6341bed9bbf..f3f5c7a8ee7 100644 --- a/ddtrace/contrib/internal/celery/signals.py +++ b/ddtrace/contrib/internal/celery/signals.py @@ -1,5 +1,7 @@ +from urllib.parse import urlparse + +from celery import current_app from celery import registry -from celery.utils import nodenames from ddtrace import Pin from ddtrace import config @@ -167,9 +169,21 @@ def trace_after_publish(*args, **kwargs): if span is None: return else: - nodename = span.get_tag("celery.hostname") - if nodename is not None: - _, host = nodenames.nodesplit(nodename) + broker_url = current_app.conf.broker_url + + if broker_url == "memory://": + host = broker_url + else: + parsed_url = urlparse(broker_url) + + host = None + if parsed_url.hostname: + host = parsed_url.hostname + + if parsed_url.port: + span.set_metric(net.TARGET_PORT, parsed_url.port) + + if host: span.set_tag_str(net.TARGET_HOST, host) span.finish() diff --git a/releasenotes/notes/update-celery-out-host-tag-be8da4f2ab88b4cf.yaml b/releasenotes/notes/update-celery-out-host-tag-be8da4f2ab88b4cf.yaml new file mode 100644 index 00000000000..3bb4841c9bd --- /dev/null +++ b/releasenotes/notes/update-celery-out-host-tag-be8da4f2ab88b4cf.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + celery: Changes celery ``out.host`` span tag to point towards broker host url instead of local celery process hostname. Fixes + inferred service representation issues when using celery. diff --git a/riotfile.py b/riotfile.py index 5166dbe10ce..a3d91d23446 100644 --- a/riotfile.py +++ b/riotfile.py @@ -716,10 +716,9 @@ def select_pys(min_version: str = MIN_PYTHON_VERSION, max_version: str = MAX_PYT "PYTEST_PLUGINS": "celery.contrib.pytest", }, pkgs={ - "celery": [ + "celery[redis]": [ latest, ], - "redis": "~=3.5", }, ), ], diff --git a/tests/contrib/celery/base.py b/tests/contrib/celery/base.py index 65c1348ac49..dbe12f24f32 100644 --- a/tests/contrib/celery/base.py +++ b/tests/contrib/celery/base.py @@ -8,6 +8,7 @@ from ddtrace.contrib.celery import unpatch from tests.utils import TracerTestCase +from ..config import RABBITMQ_CONFIG from ..config import REDIS_CONFIG @@ -15,6 +16,11 @@ BROKER_URL = "{redis}/{db}".format(redis=REDIS_URL, db=0) BACKEND_URL = "{redis}/{db}".format(redis=REDIS_URL, db=1) +AMQP_URL = "amqp://{user}:{password}@127.0.0.1:{port}".format( + user=RABBITMQ_CONFIG["user"], password=RABBITMQ_CONFIG["password"], port=RABBITMQ_CONFIG["port"] +) +AMQP_BROKER_URL = "{amqp}//".format(amqp=AMQP_URL) + @pytest.fixture(scope="session") def celery_config(): diff --git a/tests/contrib/celery/test_integration.py b/tests/contrib/celery/test_integration.py index 5486587311c..96f4a5d1981 100644 --- a/tests/contrib/celery/test_integration.py +++ b/tests/contrib/celery/test_integration.py @@ -1,6 +1,5 @@ from collections import Counter import os -import socket import subprocess from time import sleep @@ -194,7 +193,7 @@ def fn_task_parameters(user, force_logout=False): assert async_span.get_tag("celery.routing_key") == "celery" assert async_span.get_tag("component") == "celery" assert async_span.get_tag("span.kind") == "producer" - assert async_span.get_tag("out.host") == socket.gethostname() + assert async_span.get_tag("out.host") == "memory://" else: assert 1 == len(traces) assert 1 == len(traces[0]) @@ -239,7 +238,7 @@ def fn_task_parameters(user, force_logout=False): assert async_span.get_tag("celery.routing_key") == "celery" assert async_span.get_tag("component") == "celery" assert async_span.get_tag("span.kind") == "producer" - assert async_span.get_tag("out.host") == socket.gethostname() + assert async_span.get_tag("out.host") == "memory://" else: assert 1 == len(traces) assert 1 == len(traces[0]) @@ -600,7 +599,7 @@ def fn_task_parameters(user, force_logout=False): assert async_span.get_tag("celery.routing_key") == "celery" assert async_span.get_tag("component") == "celery" assert async_span.get_tag("span.kind") == "producer" - assert async_span.get_tag("out.host") == socket.gethostname() + assert async_span.get_tag("out.host") == "memory://" run_span = self.find_span(name="celery.run") assert run_span.name == "celery.run" diff --git a/tests/contrib/celery/test_tagging.py b/tests/contrib/celery/test_tagging.py new file mode 100644 index 00000000000..af40c4f9209 --- /dev/null +++ b/tests/contrib/celery/test_tagging.py @@ -0,0 +1,142 @@ +import socket +import time + +from celery import Celery +from celery.contrib.testing.worker import start_worker +import pytest + +from ddtrace import Pin +from ddtrace.contrib.celery import patch +from ddtrace.contrib.celery import unpatch +from tests.utils import DummyTracer + +from .base import AMQP_BROKER_URL +from .base import BACKEND_URL +from .base import BROKER_URL + + +redis_celery_app = Celery( + "mul_celery", + broker=BROKER_URL, + backend=BACKEND_URL, +) + + +@redis_celery_app.task +def multiply(x, y): + return x * y + + +amqp_celery_app = Celery( + "add_celery", + broker=AMQP_BROKER_URL, + backend="rpc://", +) + + +@amqp_celery_app.task +def add(x, y): + return x + y + + +@pytest.fixture(autouse=False) +def instrument_celery(): + # Instrument Celery and create an app with Broker and Result backends + patch() + yield + # Remove instrumentation from Celery + unpatch() + + +@pytest.fixture(scope="session") +def celery_config(): + return {"broker_url": BROKER_URL} + + +@pytest.fixture +def dummy_tracer(): + return DummyTracer() + + +@pytest.fixture(autouse=False) +def traced_redis_celery_app(instrument_celery, dummy_tracer): + Pin.get_from(redis_celery_app) + Pin.override(redis_celery_app, tracer=dummy_tracer) + yield redis_celery_app + + +@pytest.fixture(autouse=False) +def traced_amqp_celery_app(instrument_celery, dummy_tracer): + Pin.get_from(amqp_celery_app) + Pin.override(amqp_celery_app, tracer=dummy_tracer) + yield amqp_celery_app + + +def test_redis_task(traced_redis_celery_app): + tracer = Pin.get_from(traced_redis_celery_app).tracer + + with start_worker( + traced_redis_celery_app, + pool="solo", + loglevel="info", + perform_ping_check=False, + shutdown_timeout=30, + ): + t = multiply.delay(4, 4) + assert t.get(timeout=2) == 16 + + # wait for spans to be received + time.sleep(3) + + assert_traces(tracer, "multiply", t, 6379) + + +def test_amqp_task(instrument_celery, traced_amqp_celery_app): + tracer = Pin.get_from(traced_amqp_celery_app).tracer + + with start_worker( + traced_amqp_celery_app, + pool="solo", + loglevel="info", + perform_ping_check=False, + shutdown_timeout=30, + ): + t = add.delay(4, 4) + assert t.get(timeout=2) == 8 + + # wait for spans to be received + time.sleep(3) + + assert_traces(tracer, "add", t, 5672) + + +def assert_traces(tracer, task_name, task, port): + traces = tracer.pop_traces() + + assert 2 == len(traces) + assert 1 == len(traces[0]) + assert 1 == len(traces[1]) + async_span = traces[0][0] + run_span = traces[1][0] + + assert async_span.error == 0 + assert async_span.name == "celery.apply" + assert async_span.resource == f"tests.contrib.celery.test_tagging.{task_name}" + assert async_span.service == "celery-producer" + assert async_span.get_tag("celery.id") == task.task_id + assert async_span.get_tag("celery.action") == "apply_async" + assert async_span.get_tag("celery.routing_key") == "celery" + assert async_span.get_tag("component") == "celery" + assert async_span.get_tag("span.kind") == "producer" + assert async_span.get_tag("out.host") == "127.0.0.1" + assert async_span.get_metric("network.destination.port") == port + + assert run_span.error == 0 + assert run_span.name == "celery.run" + assert run_span.resource == f"tests.contrib.celery.test_tagging.{task_name}" + assert run_span.service == "celery-worker" + assert run_span.get_tag("celery.id") == task.task_id + assert run_span.get_tag("celery.action") == "run" + assert run_span.get_tag("component") == "celery" + assert run_span.get_tag("span.kind") == "consumer" + assert socket.gethostname() in run_span.get_tag("celery.hostname") diff --git a/tests/contrib/jobspec.yml b/tests/contrib/jobspec.yml index 32f05864383..9c05772e019 100644 --- a/tests/contrib/jobspec.yml +++ b/tests/contrib/jobspec.yml @@ -31,9 +31,7 @@ celery: env: SUITE_NAME: celery LOG_LEVEL: DEBUG - SNAPSHOT_DIR: /snapshots PORT: 9126 - SNAPSHOT_CI: 1 DD_POOL_TRACE_CHECK_FAILURES: true DD_DISABLE_ERROR_RESPONSES: true ENABLED_CHECKS: trace_stall,meta_tracer_version_header,trace_content_length,trace_peer_service,trace_dd_service # disable flaky content length check