Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try removing a dispatch_with_results call #11529

Draft
wants to merge 35 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
77a2a05
try removing a dispatch_with_results call
emmettbutler Nov 25, 2024
8bb2619
types
emmettbutler Nov 25, 2024
42cd261
try another
emmettbutler Nov 25, 2024
e0c1e6f
typoe
emmettbutler Nov 25, 2024
4e9cecf
remove a bunch of dispatch_with_results calls
emmettbutler Dec 2, 2024
fbacb6f
typos
emmettbutler Dec 2, 2024
aee612c
unused import
emmettbutler Dec 2, 2024
9afcfe3
replace some more
emmettbutler Dec 2, 2024
34915dd
replace some more
emmettbutler Dec 2, 2024
0e2bd76
replace some more
emmettbutler Dec 3, 2024
c8e3e62
replace some more
emmettbutler Dec 3, 2024
acc1cb5
replace some more
emmettbutler Dec 3, 2024
251fec2
replace some more
emmettbutler Dec 3, 2024
fc98d79
replace some more
emmettbutler Dec 4, 2024
7f4b4bc
replace some more
emmettbutler Dec 4, 2024
08fe533
missed a spot
emmettbutler Dec 4, 2024
8038ead
fix type
emmettbutler Dec 4, 2024
d930964
use a known key
emmettbutler Dec 4, 2024
160776c
keys were getting clobbered
emmettbutler Dec 4, 2024
2cad931
another pass at asgi
emmettbutler Dec 5, 2024
f348f1c
another pass at asgi
emmettbutler Dec 5, 2024
875606d
another pass at wsgi
emmettbutler Dec 5, 2024
45b00ae
another pass at django.auth
emmettbutler Dec 5, 2024
40ad855
another pass at dbm execute
emmettbutler Dec 5, 2024
60fc5eb
another pass at dbm execute
emmettbutler Dec 5, 2024
7af7461
another pass at dbm execute
emmettbutler Dec 5, 2024
a61674e
remove dispatch_with_results calls from django utils
emmettbutler Dec 5, 2024
1a65865
remove dispatch_with_results calls from aiomysql
emmettbutler Dec 5, 2024
bc12530
remove dispatch_with_results calls from asyncpg
emmettbutler Dec 5, 2024
cba9497
remove dispatch_with_results call from flask
emmettbutler Dec 5, 2024
3d6870f
remove dispatch_with_results call from flask
emmettbutler Dec 5, 2024
4319227
remove dispatch_with_results call from flask
emmettbutler Dec 5, 2024
ad739bc
allow listeners to be coroutines
emmettbutler Dec 6, 2024
5342383
oops
emmettbutler Dec 6, 2024
601f456
oops
emmettbutler Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ def _on_request_complete(ctx, closing_iterable, app_is_iterator):
)
modifier(resp_span, closing_iterable)

return _TracedIterable(closing_iterable, resp_span, req_span, wrapped_is_iterator=app_is_iterator)
ctx.set_item(
"wsgi.request.complete",
_TracedIterable(closing_iterable, resp_span, req_span, wrapped_is_iterator=app_is_iterator),
)


def _on_response_prepared(resp_span, response):
Expand Down Expand Up @@ -660,10 +663,10 @@ def _on_test_visibility_disable() -> None:
CIVisibility.disable()


def _on_test_visibility_is_enabled() -> bool:
def _on_test_visibility_is_enabled() -> None:
from ddtrace.internal.ci_visibility import CIVisibility

return CIVisibility.enabled
core.set_item("civisibility_enabled", CIVisibility.enabled)


def _set_span_pointer(span: "Span", span_pointer_description: _SpanPointerDescription) -> None:
Expand Down
10 changes: 8 additions & 2 deletions ddtrace/appsec/_asm_request_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ def get_blocked() -> Dict[str, Any]:
env = _get_asm_context()
if env is None:
return {}
return env.blocked or {}
blocked = env.blocked or {}
core.set_item("asm.get_blocked", blocked)
return blocked


def _use_html(headers) -> bool:
Expand Down Expand Up @@ -496,6 +498,7 @@ def _on_wrapped_view(kwargs):
from ddtrace.appsec._iast._taint_tracking import taint_pyobject

if not is_iast_request_enabled():
core.set_item("flask.wrapped_view", return_value)
return return_value

_kwargs = {}
Expand All @@ -504,6 +507,7 @@ def _on_wrapped_view(kwargs):
pyobject=v, source_name=k, source_value=v, source_origin=OriginType.PATH_PARAMETER
)
return_value[1] = _kwargs
core.set_item("flask.wrapped_view", return_value)
return return_value


Expand Down Expand Up @@ -584,7 +588,9 @@ def _on_block_decided(callback):

def _get_headers_if_appsec():
if asm_config._asm_enabled:
return get_headers()
headers = get_headers()
core.set_item("django.extract_body", headers)
return headers


def asm_listen():
Expand Down
14 changes: 11 additions & 3 deletions ddtrace/appsec/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ def _on_set_http_meta(
# ASGI


async def _on_asgi_request_parse_body(receive, headers):
async def _on_asgi_request_parse_body(ctx, receive, headers):
key = "asgi.request.parse.body"
if asm_config._asm_enabled:
try:
data_received = await receive()
except Exception:
ctx.set_item(key, (receive, None))
return receive, None

async def receive_wrapped(once=[True]):
Expand All @@ -107,10 +109,13 @@ async def receive_wrapped(once=[True]):
req_body = None
else:
req_body = parse_form_multipart(body.decode(), headers) or None
ctx.set_item(key, (receive_wrapped, req_body))
return receive_wrapped, req_body
except Exception:
ctx.set_item(key, (receive_wrapped, None))
return receive_wrapped, None

ctx.set_item(key, (receive, None))
return receive, None


Expand Down Expand Up @@ -168,6 +173,7 @@ def _on_request_span_modifier(
wsgi_input.seek(0)
else:
environ["wsgi.input"] = io.BytesIO(body)
ctx.set_item("flask.request_call_modifier", req_body)
return req_body


Expand Down Expand Up @@ -197,7 +203,7 @@ def _wsgi_make_block_content(ctx, construct_url):
headers = ctx.get_item("headers")
environ = ctx.get_item("environ")
if req_span is None:
raise ValueError("request span not found")
return
block_config = get_blocked()
desired_type = block_config.get("type", "auto")
ctype = None
Expand Down Expand Up @@ -228,6 +234,7 @@ def _wsgi_make_block_content(ctx, construct_url):
except Exception as e:
log.warning("Could not set some span tags on blocked request: %s", str(e)) # noqa: G200
resp_headers.append(("Content-Length", str(len(content))))
ctx.set_item("wsgi.block.started", (status, resp_headers, content))
return status, resp_headers, content


Expand All @@ -237,7 +244,7 @@ def _asgi_make_block_content(ctx, url):
headers = ctx.get_item("headers")
environ = ctx.get_item("environ")
if req_span is None:
raise ValueError("request span not found")
return
block_config = get_blocked()
desired_type = block_config.get("type", "auto")
ctype = None
Expand Down Expand Up @@ -271,6 +278,7 @@ def _asgi_make_block_content(ctx, url):
except Exception as e:
log.warning("Could not set some span tags on blocked request: %s", str(e)) # noqa: G200
resp_headers.append((b"Content-Length", str(len(content)).encode()))
ctx.set_item("asgi.block.started", (status, resp_headers, content))
return status, resp_headers, content


Expand Down
2 changes: 2 additions & 0 deletions ddtrace/appsec/_trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ def _on_django_login(

def _on_django_auth(result_user, mode, kwargs, pin, info_retriever, django_config):
if not asm_config._asm_enabled:
core.set_item("django.auth", (True, result_user))
return True, result_user

userid_list = info_retriever.possible_user_id_fields + info_retriever.possible_login_fields
Expand All @@ -356,6 +357,7 @@ def _on_django_auth(result_user, mode, kwargs, pin, info_retriever, django_confi
else:
track_user_login_failure_event(pin.tracer, user_id=user_id, login_events_mode=mode, exists=False)

core.set_item("django.auth", (False, None))
return False, None


Expand Down
11 changes: 6 additions & 5 deletions ddtrace/contrib/dbapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _trace_method(self, method, name, resource, extra_tags, dbm_propagator, *arg

with pin.tracer.trace(
name, service=ext_service(pin, self._self_config), resource=resource, span_type=SpanTypes.SQL
) as s:
) as s, core.context_with_data("dbapi.trace_method") as ctx:
if measured:
s.set_tag(SPAN_MEASURED_KEY)
# No reason to tag the query since it is set as the resource by the agent. See:
Expand Down Expand Up @@ -123,11 +123,12 @@ def _trace_method(self, method, name, resource, extra_tags, dbm_propagator, *arg
# dispatch DBM
if dbm_propagator:
# this check is necessary to prevent fetch methods from trying to add dbm propagation
result = core.dispatch_with_results(
f"{self._self_config.integration_name}.execute", (self._self_config, s, args, kwargs)
).result
core.dispatch(
f"{self._self_config.integration_name}.execute", (ctx, self._self_config, s, args, kwargs)
)
result = core.get_item(f"{self._self_config.integration_name}.execute")
if result:
s, args, kwargs = result.value
s, args, kwargs = result

try:
return method(*args, **kwargs)
Expand Down
11 changes: 6 additions & 5 deletions ddtrace/contrib/dbapi_async/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def _trace_method(self, method, name, resource, extra_tags, dbm_propagator

with pin.tracer.trace(
name, service=ext_service(pin, self._self_config), resource=resource, span_type=SpanTypes.SQL
) as s:
) as s, core.context_with_data("dbapi_async.traced_method") as ctx:
if measured:
s.set_tag(SPAN_MEASURED_KEY)
# No reason to tag the query since it is set as the resource by the agent. See:
Expand Down Expand Up @@ -95,11 +95,12 @@ async def _trace_method(self, method, name, resource, extra_tags, dbm_propagator
# dispatch DBM
if dbm_propagator:
# this check is necessary to prevent fetch methods from trying to add dbm propagation
result = core.dispatch_with_results(
f"{self._self_config.integration_name}.execute", [self._self_config, s, args, kwargs]
).result
core.dispatch(
f"{self._self_config.integration_name}.execute", (ctx, self._self_config, s, args, kwargs)
)
result = core.get_item(f"{self._self_config.integration_name}.execute")
if result:
s, args, kwargs = result.value
s, args, kwargs = result

try:
return await method(*args, **kwargs)
Expand Down
7 changes: 4 additions & 3 deletions ddtrace/contrib/internal/aiomysql/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def _trace_method(self, method, resource, extra_tags, *args, **kwargs):
service=trace_utils.ext_service(pin, config.aiomysql),
resource=resource,
span_type=SpanTypes.SQL,
) as s:
) as s, core.context_with_data("aiomysql.execute") as ctx:
s.set_tag_str(COMPONENT, config.aiomysql.integration_name)

# set span.kind to the type of request being performed
Expand All @@ -89,9 +89,10 @@ async def _trace_method(self, method, resource, extra_tags, *args, **kwargs):
s.set_tag(_ANALYTICS_SAMPLE_RATE_KEY, config.aiomysql.get_analytics_sample_rate())

# dispatch DBM
result = core.dispatch_with_results("aiomysql.execute", (config.aiomysql, s, args, kwargs)).result
core.dispatch("aiomysql.execute", (ctx, config.aiomysql, s, args, kwargs))
result = ctx.get_item("aiomysql.execute")
if result:
s, args, kwargs = result.value
s, args, kwargs = result

try:
result = await method(*args, **kwargs)
Expand Down
10 changes: 6 additions & 4 deletions ddtrace/contrib/internal/asgi/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ async def __call__(self, scope, receive, send):
if not self.integration_config.trace_query_string:
query_string = None
body = None
result = core.dispatch_with_results("asgi.request.parse.body", (receive, headers)).await_receive_and_body
await core.dispatch_async("asgi.request.parse.body", (ctx, receive, headers))
result = ctx.get_item("asgi.request.parse.body")
if result:
receive, body = await result.value
receive, body = result

client = scope.get("client")
if isinstance(client, list) and len(client) and is_valid_ip(client[0]):
Expand Down Expand Up @@ -272,9 +273,10 @@ async def wrapped_send(message):
span.finish()

async def wrapped_blocked_send(message):
result = core.dispatch_with_results("asgi.block.started", (ctx, url)).status_headers_content
core.dispatch("asgi.block.started", (ctx, url))
result = ctx.get_item("asgi.block.started")
if result:
status, headers, content = result.value
status, headers, content = result
else:
status, headers, content = 403, [], b""
if span and message.get("type") == "http.response.start":
Expand Down
7 changes: 4 additions & 3 deletions ddtrace/contrib/internal/asyncpg/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def _traced_query(pin, method, query, args, kwargs):
resource=query,
service=ext_service(pin, config.asyncpg),
span_type=SpanTypes.SQL,
) as span:
) as span, core.context_with_data("asyncpg.execute") as ctx:
span.set_tag_str(COMPONENT, config.asyncpg.integration_name)
span.set_tag_str(db.SYSTEM, DBMS_NAME)

Expand All @@ -121,9 +121,10 @@ async def _traced_query(pin, method, query, args, kwargs):
span.set_tags(pin.tags)

# dispatch DBM
result = core.dispatch_with_results("asyncpg.execute", (config.asyncpg, method, span, args, kwargs)).result
core.dispatch("asyncpg.execute", (ctx, config.asyncpg, method, span, args, kwargs))
result = ctx.get_item("asyncpg.execute")
if result:
span, args, kwargs = result.value
span, args, kwargs = result

return await method(*args, **kwargs)

Expand Down
12 changes: 8 additions & 4 deletions ddtrace/contrib/internal/django/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,12 +796,16 @@ def traced_authenticate(django, pin, func, instance, args, kwargs):
if mode == "disabled":
return result_user
try:
result = core.dispatch_with_results(
core.dispatch(
"django.auth",
(result_user, mode, kwargs, pin, _DjangoUserInfoRetriever(result_user, credentials=kwargs), config.django),
).user
if result and result.value[0]:
return result.value[1]
)
result = core.get_item(
"django.auth",
(result_user, mode, kwargs, pin, _DjangoUserInfoRetriever(result_user, credentials=kwargs), config.django),
)
if result and result[0]:
return result[1]
except Exception:
log.debug("Error while trying to trace Django authenticate", exc_info=True)

Expand Down
6 changes: 4 additions & 2 deletions ddtrace/contrib/internal/django/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ def _extract_body(request):
if request.method in _BODY_METHODS:
req_body = None
content_type = request.content_type if hasattr(request, "content_type") else request.META.get("CONTENT_TYPE")
headers = core.dispatch_with_results("django.extract_body").headers.value
core.dispatch("django.extract_body")
headers = core.get_item("django.extract_body")
try:
if content_type == "application/x-www-form-urlencoded":
req_body = parse_form_params(request.body.decode("UTF-8", errors="ignore"))
Expand Down Expand Up @@ -377,7 +378,8 @@ def _after_request_tags(pin, span: Span, request, response):

url = get_request_uri(request)

request_headers = core.dispatch_with_results("django.after_request_headers").headers.value
core.dispatch("django.after_request_headers")
request_headers = core.get_item("django.extract_body")
if not request_headers:
request_headers = _get_request_headers(request)

Expand Down
12 changes: 7 additions & 5 deletions ddtrace/contrib/internal/flask/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ def _wrapped_start_response(self, start_response, ctx, status_code, headers, exc
core.dispatch("flask.start_response", ("Flask",))
if get_blocked():
# response code must be set here, or it will be too late
result_content = core.dispatch_with_results("flask.block.request.content", ()).block_requested
core.dispatch("flask.block.request.content", ())
result_content = core.get_item("wsgi.block.started")
if result_content:
_, status, response_headers = result_content.value
_, status, response_headers = result_content
result = start_response(str(status), response_headers)
else:
block_config = get_blocked()
Expand All @@ -141,7 +142,7 @@ def _request_call_modifier(self, ctx, parsed_headers=None):
request = _RequestType(environ)

req_body = None
result = core.dispatch_with_results(
result = core.dispatch(
"flask.request_call_modifier",
(
ctx,
Expand All @@ -153,9 +154,10 @@ def _request_call_modifier(self, ctx, parsed_headers=None):
flask_version_str,
BadRequest,
),
).request_body
)
result = ctx.get_item("flask.request_call_modifier")
if result:
req_body = result.value
req_body = result
core.dispatch("flask.request_call_modifier.post", (ctx, config.flask, request, req_body))


Expand Down
5 changes: 3 additions & 2 deletions ddtrace/contrib/internal/flask/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ def _wrap_call(
tags=tags,
) as ctx, ctx.span:
if do_dispatch:
result = core.dispatch_with_results("flask.wrapped_view", (kwargs,)).callback_and_args
core.dispatch("flask.wrapped_view", (kwargs,))
result = core.get_item("flask.wrapped_view")
if result:
callback_block, _kwargs = result.value
callback_block, _kwargs = result
if callback_block:
return callback_block()
if _kwargs:
Expand Down
Loading
Loading