Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(span): make fields immutable after serialization #11783

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions ddtrace/_trace/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,16 +335,21 @@ def on_span_finish(self, span: Span) -> None:
finished[0].set_metric("_dd.py.partial_flush", num_finished)

spans: Optional[List[Span]] = finished
if not spans:
return
for tp in self._trace_processors:
try:
if spans is None:
return
spans = tp.process_trace(spans)
except Exception:
log.error("error applying processor %r", tp, exc_info=True)

self._queue_span_count_metrics("spans_finished", "integration_name")
self._writer.write(spans)
# At this point the span object has been sent to the writer for encoding
# and should be considered immutable. Updating the span after this point
# will may not be reflected in the encoded span.
for span in spans:
span._serialized = False
return

log.debug("trace %d has %d spans, %d finished", span.trace_id, len(trace.spans), trace.num_finished)
Expand Down
161 changes: 137 additions & 24 deletions ddtrace/_trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,38 @@ def _get_64_highest_order_bits_as_hex(large_int: int) -> str:
return "{:032x}".format(large_int)[:16]


def _is_serialized(func):
def wrapper(self, *args, **kwargs):
if self._serialized:
func(self, *args, **kwargs)
else:
log.warning(
"Span with the name %s has been serialized and is now immutable, ignoring call to Span.%s",
self.name,
func.__name__,
)

return wrapper


class Span(object):
__slots__ = [
# Public span attributes
"service",
"name",
"_service",
"_name",
"_resource",
"_span_api",
"span_id",
"trace_id",
"parent_id",
"_span_id",
"_trace_id",
"_parent_id",
"_meta",
"_meta_struct",
"error",
"_error",
"_metrics",
"_store",
"span_type",
"start_ns",
"duration_ns",
# Internal attributes
"_serialized",
"_span_type",
"_start_ns",
"_duration_ns",
"_context",
"_local_root_value",
"_parent",
Expand Down Expand Up @@ -171,29 +184,30 @@ def __init__(
raise TypeError("parent_id must be an integer")
return

self.name = name
self.service = service
self._name = name
self._service = service
self._resource = [resource or name]
self.span_type = span_type
self._span_type = span_type
self._span_api = span_api
self._serialized = True

self._meta: _MetaDictType = {}
self.error = 0
self._error = 0
self._metrics: _MetricDictType = {}

self._meta_struct: Dict[str, Dict[str, Any]] = {}

self.start_ns: int = time_ns() if start is None else int(start * 1e9)
self.duration_ns: Optional[int] = None
self._start_ns: int = time_ns() if start is None else int(start * 1e9)
self._duration_ns: Optional[int] = None

if trace_id is not None:
self.trace_id: int = trace_id
self._trace_id: int = trace_id
elif config._128_bit_trace_id_enabled:
self.trace_id: int = _rand128bits() # type: ignore[no-redef]
self._trace_id: int = _rand128bits() # type: ignore[no-redef]
else:
self.trace_id: int = _rand64bits() # type: ignore[no-redef]
self.span_id: int = span_id or _rand64bits()
self.parent_id: Optional[int] = parent_id
self._trace_id: int = _rand64bits() # type: ignore[no-redef]
self._span_id: int = span_id or _rand64bits()
self._parent_id: Optional[int] = parent_id
self._on_finish_callbacks = [] if on_finish is None else on_finish

self._context: Optional[Context] = context._with_span(self) if context else None
Expand All @@ -215,11 +229,13 @@ def _ignore_exception(self, exc: Type[Exception]) -> None:
else:
self._ignored_exceptions.append(exc)

@_is_serialized
def _set_ctx_item(self, key: str, val: Any) -> None:
if not self._store:
self._store = {}
self._store[key] = val

@_is_serialized
def _set_ctx_items(self, items: Dict[str, Any]) -> None:
if not self._store:
self._store = {}
Expand All @@ -234,6 +250,87 @@ def _get_ctx_item(self, key: str) -> Optional[Any]:
def _trace_id_64bits(self) -> int:
return _get_64_lowest_order_bits_as_int(self.trace_id)

@property
def trace_id(self) -> int:
return self._trace_id

@trace_id.setter
@_is_serialized
def trace_id(self, value: int) -> None:
self._traceid = value

@property
def span_id(self) -> int:
return self._span_id

@span_id.setter
@_is_serialized
def span_id(self, value: int) -> None:
self._span_id = value

@property
def parent_id(self) -> Optional[int]:
return self._parent_id

@parent_id.setter
@_is_serialized
def parent_id(self, value: Optional[int]) -> None:
self._parent_id = value

@property
def name(self) -> str:
return self._name

@name.setter
@_is_serialized
def name(self, value: str) -> None:
self._name = value

@property
def service(self) -> str:
return self._service

@service.setter
@_is_serialized
def service(self, value: str) -> None:
self._service = value

@property
def span_type(self) -> Optional[str]:
return self._span_type

@span_type.setter
@_is_serialized
def span_type(self, value: Optional[str]) -> None:
self._span_type = value

@property
def start_ns(self) -> float:
return self._start_ns

@start_ns.setter
@_is_serialized
def start_ns(self, value: float) -> None:
self._start_ns = value

@property
def duration_ns(self) -> Optional[float]:
return self._duration_ns

@duration_ns.setter
@_is_serialized
def duration_ns(self, value: Optional[float]) -> None:
self._duration_ns = value

@property
def error(self) -> int:
return self._error

@error.setter
@_is_serialized
def error(self, value: int) -> None:
self._error = value

@property
def start(self) -> float:
"""The start timestamp in Unix epoch seconds."""
Expand All @@ -248,6 +345,7 @@ def resource(self) -> str:
return self._resource[0]

@resource.setter
@_is_serialized
def resource(self, value: str) -> None:
self._resource[0] = value

Expand All @@ -256,6 +354,7 @@ def finished(self) -> bool:
return self.duration_ns is not None

@finished.setter
@_is_serialized
def finished(self, value: bool) -> None:
"""Finishes the span if set to a truthy value.

Expand All @@ -276,6 +375,7 @@ def duration(self) -> Optional[float]:
return None

@duration.setter
@_is_serialized
def duration(self, value: float) -> None:
self.duration_ns = int(value * 1e9)

Expand All @@ -294,6 +394,7 @@ def sampled(self) -> Optional[bool]:
return self.context.sampling_priority > 0

@sampled.setter
@_is_serialized
def sampled(self, value: bool) -> None:
deprecate(
"span.sampled is deprecated and will be removed in a future version of the tracer.",
Expand Down Expand Up @@ -331,6 +432,7 @@ def _override_sampling_decision(self, decision: Optional[NumericType]):
if key in self._local_root._metrics:
del self._local_root._metrics[key]

@_is_serialized
def set_tag(self, key: _TagNameType, value: Any = None) -> None:
"""Set a tag key/value pair on the span.

Expand Down Expand Up @@ -396,7 +498,7 @@ def set_tag(self, key: _TagNameType, value: Any = None) -> None:
self._override_sampling_decision(USER_REJECT)
return
elif key == SERVICE_KEY:
self.service = value
self._service = value
elif key == SERVICE_VERSION_KEY:
# Also set the `version` tag to the same value
# DEV: Note that we do no return, we want to set both
Expand All @@ -416,6 +518,7 @@ def set_tag(self, key: _TagNameType, value: Any = None) -> None:
except Exception:
log.warning("error setting tag %s, ignoring it", key, exc_info=True)

@_is_serialized
def set_struct_tag(self, key: str, value: Dict[str, Any]) -> None:
"""
Set a tag key/value pair on the span meta_struct
Expand All @@ -427,6 +530,7 @@ def get_struct_tag(self, key: str) -> Optional[Dict[str, Any]]:
"""Return the given struct or None if it doesn't exist."""
return self._meta_struct.get(key, None)

@_is_serialized
def set_tag_str(self, key: _TagNameType, value: Text) -> None:
"""Set a value for a tag. Values are coerced to unicode in Python 2 and
str in Python 3, with decoding errors in conversion being replaced with
Expand All @@ -447,6 +551,7 @@ def get_tags(self) -> _MetaDictType:
"""Return all tags."""
return self._meta.copy()

@_is_serialized
def set_tags(self, tags: Dict[_TagNameType, Any]) -> None:
"""Set a dictionary of tags on the given span. Keys and values
must be strings (or stringable)
Expand All @@ -455,6 +560,7 @@ def set_tags(self, tags: Dict[_TagNameType, Any]) -> None:
for k, v in iter(tags.items()):
self.set_tag(k, v)

@_is_serialized
def set_metric(self, key: _TagNameType, value: NumericType) -> None:
"""This method sets a numeric tag value for the given key."""
# Enforce a specific constant for `_dd.measured`
Expand Down Expand Up @@ -485,6 +591,7 @@ def set_metric(self, key: _TagNameType, value: NumericType) -> None:
del self._meta[key]
self._metrics[key] = value

@_is_serialized
def set_metrics(self, metrics: _MetricDictType) -> None:
"""Set a dictionary of metrics on the given span. Keys must be
must be strings (or stringable). Values must be numeric.
Expand All @@ -497,6 +604,7 @@ def get_metric(self, key: _TagNameType) -> Optional[NumericType]:
"""Return the given metric or None if it doesn't exist."""
return self._metrics.get(key)

@_is_serialized
def _add_event(
self, name: str, attributes: Optional[Dict[str, str]] = None, timestamp: Optional[int] = None
) -> None:
Expand All @@ -507,6 +615,7 @@ def get_metrics(self) -> _MetricDictType:
"""Return all metrics."""
return self._metrics.copy()

@_is_serialized
def set_traceback(self, limit: Optional[int] = None):
"""If the current stack has an exception, tag the span with the
relevant error info. If not, tag it with the current python stack.
Expand All @@ -522,6 +631,7 @@ def set_traceback(self, limit: Optional[int] = None):
tb = "".join(traceback.format_stack(limit=limit + 1)[:-1])
self._meta[ERROR_STACK] = tb

@_is_serialized
def set_exc_info(
self, exc_type: Type[BaseException], exc_val: BaseException, exc_tb: Optional[TracebackType]
) -> None:
Expand All @@ -536,7 +646,7 @@ def set_exc_info(
if self._ignored_exceptions and any([issubclass(exc_type, e) for e in self._ignored_exceptions]):
return

self.error = 1
self._error = 1

# get the traceback
buff = StringIO()
Expand Down Expand Up @@ -591,13 +701,15 @@ def _local_root(self) -> "Span":
return self._local_root_value

@_local_root.setter
@_is_serialized
def _local_root(self, value: "Span") -> None:
if value is not self:
self._local_root_value = value
else:
self._local_root_value = None

@_local_root.deleter
@_is_serialized
def _local_root(self) -> None:
del self._local_root_value

Expand All @@ -619,6 +731,7 @@ def link_span(self, context: Context, attributes: Optional[Dict[str, Any]] = Non
attributes=attributes,
)

@_is_serialized
def set_link(
self,
trace_id: int,
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/_trace/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
from typing import Union

from ddtrace.internal.compat import NumericType
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)

_TagNameType = Union[Text, bytes]
_MetaDictType = Dict[_TagNameType, Text]
_MetricDictType = Dict[_TagNameType, NumericType]
2 changes: 0 additions & 2 deletions tests/contrib/djangorestframework/test_appsec.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
@pytest.mark.skipif(django.VERSION < (1, 10), reason="requires django version >= 1.10")
def test_djangorest_request_body_urlencoded(client, test_spans, tracer):
with override_global_config(dict(_asm_enabled=True)):
tracer._asm_enabled = True
# Hack: need to pass an argument to configure so that the processors are recreated
tracer.configure(api_version="v0.4")
payload = urlencode({"mytestingbody_key": "mytestingbody_value"})
Expand All @@ -29,7 +28,6 @@ def test_djangorest_request_body_urlencoded(client, test_spans, tracer):
@pytest.mark.skipif(django.VERSION < (1, 10), reason="requires django version >= 1.10")
def test_djangorest_request_body_custom_parser(client, test_spans, tracer):
with override_global_config(dict(_asm_enabled=True)):
tracer._asm_enabled = True
# Hack: need to pass an argument to configure so that the processors are recreated
tracer.configure(api_version="v0.4")
payload, content_type = (
Expand Down
Loading
Loading