Skip to content

Commit

Permalink
Merge branch 'main' into main-AhmadMasry
Browse files Browse the repository at this point in the history
  • Loading branch information
ZStriker19 authored Jan 30, 2025
2 parents 9c9484a + af9098c commit 4a82114
Show file tree
Hide file tree
Showing 37 changed files with 443 additions and 315 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ ddtrace/internal/remoteconfig @DataDog/remote-config @DataDog/apm-core-pyt
tests/internal/remoteconfig @DataDog/remote-config @DataDog/apm-core-python

# API SDK
ddtrace/trace/ @DataDog/apm-sdk-api-python
ddtrace/_trace/ @DataDog/apm-sdk-api-python
ddtrace/opentelemetry/ @DataDog/apm-sdk-api-python
ddtrace/internal/opentelemetry @DataDog/apm-sdk-api-python
ddtrace/opentracer/ @DataDog/apm-sdk-api-python
ddtrace/propagation/ @DataDog/apm-sdk-api-python
ddtrace/filters.py @DataDog/apm-sdk-api-python
ddtrace/provider.py @DataDog/apm-sdk-api-python
ddtrace/pin.py @DataDog/apm-sdk-api-python
ddtrace/sampler.py @DataDog/apm-sdk-api-python
ddtrace/sampling_rule.py @DataDog/apm-sdk-api-python
Expand Down
27 changes: 27 additions & 0 deletions .github/workflows/check_safe_main_merge.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Check for Safe main Merge

on:
pull_request:
branches:
- '3.x-staging'

jobs:
check-merge:
runs-on: ubuntu-latest
steps:
# Step 1: Checkout the repository
- name: Checkout repository
uses: actions/checkout@v4

# Step 2: Fetch the main branch
- name: Fetch main branch
run: git fetch origin main

# Step 3: Attempt to merge
- name: Check merge conflicts
run: |
git merge --no-commit --no-ff origin/main || exit 1
# Step 4: Clean up the merge (optional)
- name: Abort merge
if: failure()
run: git merge --abort
1 change: 1 addition & 0 deletions .gitlab/benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ benchmark-serverless:
tags: ["arch:amd64"]
when: on_success
needs: [ "benchmark-serverless-trigger" ]
allow_failure: true
script:
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.ddbuild.io/DataDog/serverless-tools.git ./serverless-tools && cd ./serverless-tools
- ./ci/check_trigger_status.sh
Expand Down
5 changes: 5 additions & 0 deletions .gitlab/prepare-oci-package.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
#!/bin/bash
set -eo pipefail

if [ "$OS" != "linux" ]; then
echo "Only linux packages are supported. Exiting"
exit 0
fi

if [ -n "$CI_COMMIT_TAG" ] && [ -z "$PYTHON_PACKAGE_VERSION" ]; then
PYTHON_PACKAGE_VERSION=${CI_COMMIT_TAG##v}
fi
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ Changelogs for versions not listed here can be found at https://github.com/DataD

---

## 2.19.2
### Bug Fixes

- Tracing
- celery: Fixes an issue where `celery.apply` spans from Celery prerun got closed too soon leading to span tags being missing.
- openai: Fixes a patching issue where asynchronous moderation endpoint calls resulted in coroutine scheduling errors.
- openai: Ensures the OpenAI integration is compatible with Python versions 3.12 and 3.13.
- vertexai: Resolves an issue with `chat.send_message()` where the content keyword argument was not parsed correctly.
- LLM Observability
- This fix resolves an issue where annotating a span with non latin-1 (but valid utf-8) input/output values resulted in encoding errors.
- Lib-Injection
- Fixes incorrect telemetry data payload format.

---

## 2.19.1
### Bug Fixes

Expand Down
56 changes: 54 additions & 2 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,14 @@ def _get_parameters_for_new_span_directly_from_context(ctx: core.ExecutionContex
def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -> "Span":
span_kwargs = _get_parameters_for_new_span_directly_from_context(ctx)
call_trace = ctx.get_item("call_trace", call_trace)
tracer = (ctx.get_item("middleware") or ctx["pin"]).tracer
tracer = ctx.get_item("tracer") or (ctx.get_item("middleware") or ctx["pin"]).tracer
distributed_headers_config = ctx.get_item("distributed_headers_config")
if distributed_headers_config:
trace_utils.activate_distributed_headers(
tracer, int_config=distributed_headers_config, request_headers=ctx["distributed_headers"]
tracer,
int_config=distributed_headers_config,
request_headers=ctx["distributed_headers"],
override=ctx.get_item("distributed_headers_config_override"),
)
distributed_context = ctx.get_item("distributed_context")
if distributed_context and not call_trace:
Expand All @@ -126,6 +129,42 @@ def _start_span(ctx: core.ExecutionContext, call_trace: bool = True, **kwargs) -
return span


def _set_web_frameworks_tags(ctx, span, int_config):
span.set_tag_str(COMPONENT, int_config.integration_name)
span.set_tag_str(SPAN_KIND, SpanKind.SERVER)
span.set_tag(_SPAN_MEASURED_KEY)

analytics_enabled = ctx.get_item("analytics_enabled")
analytics_sample_rate = ctx.get_item("analytics_sample_rate", True)

# Configure trace search sample rate
if (config._analytics_enabled and analytics_enabled is not False) or analytics_enabled is True:
span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, analytics_sample_rate)


def _on_web_framework_start_request(ctx, int_config):
request_span = ctx.get_item("req_span")
_set_web_frameworks_tags(ctx, request_span, int_config)


def _on_web_framework_finish_request(
span, int_config, method, url, status_code, query, req_headers, res_headers, route, finish
):
trace_utils.set_http_meta(
span=span,
integration_config=int_config,
method=method,
url=url,
status_code=status_code,
query=query,
request_headers=req_headers,
response_headers=res_headers,
route=route,
)
if finish:
span.finish()


def _on_traced_request_context_started_flask(ctx):
current_span = ctx["pin"].tracer.current_span()
if not ctx["pin"].enabled or not current_span:
Expand Down Expand Up @@ -768,6 +807,10 @@ def listen():
core.on("azure.functions.request_call_modifier", _on_azure_functions_request_span_modifier)
core.on("azure.functions.start_response", _on_azure_functions_start_response)

# web frameworks general handlers
core.on("web.request.start", _on_web_framework_start_request)
core.on("web.request.finish", _on_web_framework_finish_request)

core.on("test_visibility.enable", _on_test_visibility_enable)
core.on("test_visibility.disable", _on_test_visibility_disable)
core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled")
Expand All @@ -776,6 +819,14 @@ def listen():
core.on("rq.queue.enqueue_job", _propagate_context)

for context_name in (
# web frameworks
"aiohttp.request",
"bottle.request",
"cherrypy.request",
"falcon.request",
"molten.request",
"pyramid.request",
"sanic.request",
"flask.call",
"flask.jsonify",
"flask.render_template",
Expand All @@ -786,6 +837,7 @@ def listen():
"django.template.render",
"django.process_exception",
"django.func.wrapped",
# non web frameworks
"botocore.instrumented_api_call",
"botocore.instrumented_lib_function",
"botocore.patched_kinesis_api_call",
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/appsec/_ddwaf/ddwaf_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

if system() == "Linux":
try:
asm_config._bypass_instrumentation_for_waf = True
ctypes.CDLL(ctypes.util.find_library("rt"), mode=ctypes.RTLD_GLOBAL)
except Exception: # nosec
pass
finally:
asm_config._bypass_instrumentation_for_waf = False

ARCHI = machine().lower()

Expand Down
26 changes: 14 additions & 12 deletions ddtrace/appsec/_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
from json.decoder import JSONDecodeError
import os
import os.path
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union


if TYPE_CHECKING:
import ddtrace.appsec._ddwaf as ddwaf

import weakref

from ddtrace._trace.processor import SpanProcessor
Expand Down Expand Up @@ -167,14 +173,17 @@ def __post_init__(self) -> None:
def delayed_init(self) -> None:
try:
if self._rules is not None and not hasattr(self, "_ddwaf"):
self._ddwaf = ddwaf.DDWaf(
from ddtrace.appsec._ddwaf import DDWaf # noqa: E402
import ddtrace.appsec._metrics as metrics # noqa: E402

self.metrics = metrics
self._ddwaf = DDWaf(
self._rules, self.obfuscation_parameter_key_regexp, self.obfuscation_parameter_value_regexp
)
_set_waf_init_metric(self._ddwaf.info)
self.metrics._set_waf_init_metric(self._ddwaf.info)
except Exception:
# Partial of DDAS-0005-00
log.warning("[DDAS-0005-00] WAF initialization failed")
raise
self._update_required()

def _update_required(self):
Expand All @@ -193,7 +202,7 @@ def _update_rules(self, new_rules: Dict[str, Any]) -> bool:
if asm_config._asm_static_rule_file is not None:
return result
result = self._ddwaf.update_rules(new_rules)
_set_waf_updates_metric(self._ddwaf.info)
self.metrics._set_waf_updates_metric(self._ddwaf.info)
self._update_required()
return result

Expand Down Expand Up @@ -241,7 +250,7 @@ def waf_callable(custom_data=None, **kwargs):
return self._waf_action(span._local_root or span, ctx, custom_data, **kwargs)

_asm_request_context.set_waf_callback(waf_callable)
_asm_request_context.add_context_callback(_set_waf_request_metrics)
_asm_request_context.add_context_callback(self.metrics._set_waf_request_metrics)
if headers is not None:
_asm_request_context.set_waf_address(SPAN_DATA_NAMES.REQUEST_HEADERS_NO_COOKIES, headers)
_asm_request_context.set_waf_address(
Expand Down Expand Up @@ -436,10 +445,3 @@ def on_span_finish(self, span: Span) -> None:
del self._span_to_waf_ctx[s]
except Exception: # nosec B110
pass


# load waf at the end only to avoid possible circular imports with gevent
import ddtrace.appsec._ddwaf as ddwaf # noqa: E402
from ddtrace.appsec._metrics import _set_waf_init_metric # noqa: E402
from ddtrace.appsec._metrics import _set_waf_request_metrics # noqa: E402
from ddtrace.appsec._metrics import _set_waf_updates_metric # noqa: E402
105 changes: 49 additions & 56 deletions ddtrace/contrib/internal/aiohttp/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@
from aiohttp.web_urldispatcher import SystemRoute

from ddtrace import config
from ddtrace.constants import _ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.constants import _SPAN_MEASURED_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.contrib import trace_utils
from ddtrace.contrib.asyncio import context_provider
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import http
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal import core
from ddtrace.internal.schema import schematize_url_operation
from ddtrace.internal.schema.span_attribute_schema import SpanDirection

Expand All @@ -35,47 +30,42 @@ async def attach_context(request):
# application configs
tracer = app[CONFIG_KEY]["tracer"]
service = app[CONFIG_KEY]["service"]
distributed_tracing = app[CONFIG_KEY]["distributed_tracing_enabled"]
# Create a new context based on the propagated information.
trace_utils.activate_distributed_headers(
tracer,
int_config=config.aiohttp,
request_headers=request.headers,
override=distributed_tracing,
)

# trace the handler
request_span = tracer.trace(
schematize_url_operation("aiohttp.request", protocol="http", direction=SpanDirection.INBOUND),
service=service,
span_type=SpanTypes.WEB,
)
request_span.set_tag(_SPAN_MEASURED_KEY)

request_span.set_tag_str(COMPONENT, config.aiohttp.integration_name)

# set span.kind tag equal to type of request
request_span.set_tag_str(SPAN_KIND, SpanKind.SERVER)

# Configure trace search sample rate
# DEV: aiohttp is special case maintains separate configuration from config api
analytics_enabled = app[CONFIG_KEY]["analytics_enabled"]
if (config._analytics_enabled and analytics_enabled is not False) or analytics_enabled is True:
request_span.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, app[CONFIG_KEY].get("analytics_sample_rate", True))

# attach the context and the root span to the request; the Context
# may be freely used by the application code
request[REQUEST_CONTEXT_KEY] = request_span.context
request[REQUEST_SPAN_KEY] = request_span
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
return response
except Exception:
request_span.set_traceback()
raise
# Create a new context based on the propagated information.

with core.context_with_data(
"aiohttp.request",
span_name=schematize_url_operation("aiohttp.request", protocol="http", direction=SpanDirection.INBOUND),
span_type=SpanTypes.WEB,
service=service,
tags={},
tracer=tracer,
distributed_headers=request.headers,
distributed_headers_config=config.aiohttp,
distributed_headers_config_override=app[CONFIG_KEY]["distributed_tracing_enabled"],
headers_case_sensitive=True,
analytics_enabled=analytics_enabled,
analytics_sample_rate=app[CONFIG_KEY].get("analytics_sample_rate", True),
) as ctx:
req_span = ctx.span

ctx.set_item("req_span", req_span)
core.dispatch("web.request.start", (ctx, config.aiohttp))

# attach the context and the root span to the request; the Context
# may be freely used by the application code
request[REQUEST_CONTEXT_KEY] = req_span.context
request[REQUEST_SPAN_KEY] = req_span
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
if isinstance(response, web.StreamResponse):
request.task.add_done_callback(lambda _: finish_request_span(request, response))
return response
except Exception:
req_span.set_traceback()
raise

return attach_context

Expand Down Expand Up @@ -122,19 +112,22 @@ def finish_request_span(request, response):
# SystemRoute objects exist to throw HTTP errors and have no path
route = aiohttp_route.resource.canonical

trace_utils.set_http_meta(
request_span,
config.aiohttp,
method=request.method,
url=str(request.url), # DEV: request.url is a yarl's URL object
status_code=response.status,
request_headers=request.headers,
response_headers=response.headers,
route=route,
core.dispatch(
"web.request.finish",
(
request_span,
config.aiohttp,
request.method,
str(request.url), # DEV: request.url is a yarl's URL object
response.status,
None, # query arg = None
request.headers,
response.headers,
route,
True,
),
)

request_span.finish()


async def on_prepare(request, response):
"""
Expand Down
Loading

0 comments on commit 4a82114

Please sign in to comment.