From 959432ac0bd530ea1687b709f75623f86dfadf12 Mon Sep 17 00:00:00 2001 From: Kyle Verhoog Date: Fri, 13 Dec 2024 14:54:04 -0500 Subject: [PATCH] chore(llmobs): refactor to use span events The LLMObs service formerly depended on the TraceProcessor interface in the tracer. This was problematic due to sharing a dependency with the public API. As such, users could configure a trace filter (under the hood is a trace processor) and overwrite the LLMObs TraceProcessor. Instead, the tracer can emit span start and finish events which the LLMObs service listens to and acts on, as proposed here. The gotcha is that the LLMObs service no longer has a way to drop traces when run in agentless mode, which only LLMObs supports. Instead, we encourage users to explicitly turn off APM which carries the benefit of clarity since this was implicit before. --- ddtrace/_trace/tracer.py | 5 +- ddtrace/llmobs/_llmobs.py | 162 +++++++++++++++--- ddtrace/llmobs/_trace_processor.py | 177 -------------------- ddtrace/llmobs/_utils.py | 2 + tests/llmobs/conftest.py | 1 - tests/llmobs/test_llmobs_service.py | 53 +----- tests/llmobs/test_llmobs_trace_processor.py | 36 ---- 7 files changed, 151 insertions(+), 285 deletions(-) delete mode 100644 ddtrace/llmobs/_trace_processor.py delete mode 100644 tests/llmobs/test_llmobs_trace_processor.py diff --git a/ddtrace/_trace/tracer.py b/ddtrace/_trace/tracer.py index 6027976d6dc..b2d132deb50 100644 --- a/ddtrace/_trace/tracer.py +++ b/ddtrace/_trace/tracer.py @@ -41,6 +41,7 @@ from ddtrace.internal.atexit import register_on_exit_signal from ddtrace.internal.constants import SAMPLING_DECISION_TRACE_TAG_KEY from ddtrace.internal.constants import SPAN_API_DATADOG +from ddtrace.internal.core import dispatch from ddtrace.internal.dogstatsd import get_dogstatsd_client from ddtrace.internal.logger import get_logger from ddtrace.internal.peer_service.processor import PeerServiceProcessor @@ -866,7 +867,7 @@ def _start_span( for p in chain(self._span_processors, SpanProcessor.__processors__, self._deferred_processors): p.on_span_start(span) self._hooks.emit(self.__class__.start_span, span) - + dispatch("trace.span_start", (span,)) return span start_span = _start_span @@ -883,6 +884,8 @@ def _on_span_finish(self, span: Span) -> None: for p in chain(self._span_processors, SpanProcessor.__processors__, self._deferred_processors): p.on_span_finish(span) + dispatch("trace.span_finish", (span,)) + if log.isEnabledFor(logging.DEBUG): log.debug("finishing span %s (enabled:%s)", span._pprint(), self.enabled) diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 49815151118..49dae967f4a 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -3,7 +3,9 @@ import time from typing import Any from typing import Dict +from typing import List from typing import Optional +from typing import Tuple from typing import Union import ddtrace @@ -13,6 +15,7 @@ from ddtrace._trace.context import Context from ddtrace.ext import SpanTypes from ddtrace.internal import atexit +from ddtrace.internal import core from ddtrace.internal import forksafe from ddtrace.internal._rand import rand64bits from ddtrace.internal.compat import ensure_text @@ -45,11 +48,11 @@ from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING from ddtrace.llmobs._constants import TAGS from ddtrace.llmobs._evaluators.runner import EvaluatorRunner -from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor from ddtrace.llmobs._utils import AnnotationContext from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.llmobs._utils import _get_ml_app from ddtrace.llmobs._utils import _get_session_id +from ddtrace.llmobs._utils import _get_span_name from ddtrace.llmobs._utils import _inject_llmobs_parent_id from ddtrace.llmobs._utils import safe_json from ddtrace.llmobs._utils import validate_prompt @@ -60,6 +63,11 @@ from ddtrace.llmobs.utils import Messages from ddtrace.propagation.http import HTTPPropagator +from ..constants import ERROR_MSG +from ..constants import ERROR_STACK +from ..constants import ERROR_TYPE +from . import _constants as constants + log = get_logger(__name__) @@ -81,34 +89,157 @@ class LLMObs(Service): def __init__(self, tracer=None): super(LLMObs, self).__init__() self.tracer = tracer or ddtrace.tracer - self._llmobs_span_writer = None - self._llmobs_span_writer = LLMObsSpanWriter( is_agentless=config._llmobs_agentless_enabled, interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)), timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)), ) - self._llmobs_eval_metric_writer = LLMObsEvalMetricWriter( site=config._dd_site, api_key=config._dd_api_key, interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)), timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 5.0)), ) - self._evaluator_runner = EvaluatorRunner( interval=float(os.getenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 1.0)), llmobs_service=self, ) - self._trace_processor = LLMObsTraceProcessor(self._llmobs_span_writer, self._evaluator_runner) forksafe.register(self._child_after_fork) self._annotations = [] self._annotation_context_lock = forksafe.RLock() - self.tracer.on_start_span(self._do_annotations) - def _do_annotations(self, span): + # Register hooks for span events + core.on("trace.span_start", self._do_annotations) + core.on("trace.span_finish", self._on_span_finish) + + def _on_span_finish(self, span): + if self.enabled and span.span_type == SpanTypes.LLM: + self._submit_llmobs_span(span) + + def _submit_llmobs_span(self, span: Span) -> None: + """Generate and submit an LLMObs span event to be sent to LLMObs.""" + span_event = None + is_llm_span = span._get_ctx_item(SPAN_KIND) == "llm" + is_ragas_integration_span = False + try: + span_event, is_ragas_integration_span = self._llmobs_span_event(span) + self._llmobs_span_writer.enqueue(span_event) + except (KeyError, TypeError): + log.error( + "Error generating LLMObs span event for span %s, likely due to malformed span", span, exc_info=True + ) + finally: + if not span_event or not is_llm_span or is_ragas_integration_span: + return + if self._evaluator_runner: + self._evaluator_runner.enqueue(span_event, span) + + @classmethod + def _llmobs_span_event(cls, span: Span) -> Tuple[Dict[str, Any], bool]: + """Span event object structure.""" + span_kind = span._get_ctx_item(SPAN_KIND) + if not span_kind: + raise KeyError("Span kind not found in span context") + meta: Dict[str, Any] = {"span.kind": span_kind, "input": {}, "output": {}} + if span_kind in ("llm", "embedding") and span._get_ctx_item(MODEL_NAME) is not None: + meta["model_name"] = span._get_ctx_item(MODEL_NAME) + meta["model_provider"] = (span._get_ctx_item(MODEL_PROVIDER) or "custom").lower() + meta["metadata"] = span._get_ctx_item(METADATA) or {} + if span._get_ctx_item(INPUT_PARAMETERS): + meta["input"]["parameters"] = span._get_ctx_item(INPUT_PARAMETERS) + if span_kind == "llm" and span._get_ctx_item(INPUT_MESSAGES) is not None: + meta["input"]["messages"] = span._get_ctx_item(INPUT_MESSAGES) + if span._get_ctx_item(INPUT_VALUE) is not None: + meta["input"]["value"] = safe_json(span._get_ctx_item(INPUT_VALUE)) + if span_kind == "llm" and span._get_ctx_item(OUTPUT_MESSAGES) is not None: + meta["output"]["messages"] = span._get_ctx_item(OUTPUT_MESSAGES) + if span_kind == "embedding" and span._get_ctx_item(INPUT_DOCUMENTS) is not None: + meta["input"]["documents"] = span._get_ctx_item(INPUT_DOCUMENTS) + if span._get_ctx_item(OUTPUT_VALUE) is not None: + meta["output"]["value"] = safe_json(span._get_ctx_item(OUTPUT_VALUE)) + if span_kind == "retrieval" and span._get_ctx_item(OUTPUT_DOCUMENTS) is not None: + meta["output"]["documents"] = span._get_ctx_item(OUTPUT_DOCUMENTS) + if span._get_ctx_item(INPUT_PROMPT) is not None: + prompt_json_str = span._get_ctx_item(INPUT_PROMPT) + if span_kind != "llm": + log.warning( + "Dropping prompt on non-LLM span kind, annotating prompts is only supported for LLM span kinds." + ) + else: + meta["input"]["prompt"] = prompt_json_str + if span.error: + meta.update( + { + ERROR_MSG: span.get_tag(ERROR_MSG), + ERROR_STACK: span.get_tag(ERROR_STACK), + ERROR_TYPE: span.get_tag(ERROR_TYPE), + } + ) + if not meta["input"]: + meta.pop("input") + if not meta["output"]: + meta.pop("output") + metrics = span._get_ctx_item(METRICS) or {} + ml_app = _get_ml_app(span) + + is_ragas_integration_span = False + + if ml_app.startswith(constants.RAGAS_ML_APP_PREFIX): + is_ragas_integration_span = True + + span._set_ctx_item(ML_APP, ml_app) + parent_id = str(_get_llmobs_parent_id(span) or "undefined") + + llmobs_span_event = { + "trace_id": "{:x}".format(span.trace_id), + "span_id": str(span.span_id), + "parent_id": parent_id, + "name": _get_span_name(span), + "start_ns": span.start_ns, + "duration": span.duration_ns, + "status": "error" if span.error else "ok", + "meta": meta, + "metrics": metrics, + } + session_id = _get_session_id(span) + if session_id is not None: + span._set_ctx_item(SESSION_ID, session_id) + llmobs_span_event["session_id"] = session_id + + llmobs_span_event["tags"] = cls._llmobs_tags( + span, ml_app, session_id, is_ragas_integration_span=is_ragas_integration_span + ) + return llmobs_span_event, is_ragas_integration_span + + @staticmethod + def _llmobs_tags( + span: Span, ml_app: str, session_id: Optional[str] = None, is_ragas_integration_span: bool = False + ) -> List[str]: + tags = { + "version": config.version or "", + "env": config.env or "", + "service": span.service or "", + "source": "integration", + "ml_app": ml_app, + "ddtrace.version": ddtrace.__version__, + "language": "python", + "error": span.error, + } + err_type = span.get_tag(ERROR_TYPE) + if err_type: + tags["error_type"] = err_type + if session_id: + tags["session_id"] = session_id + if is_ragas_integration_span: + tags[constants.RUNNER_IS_INTEGRATION_SPAN_TAG] = "ragas" + existing_tags = span._get_ctx_item(TAGS) + if existing_tags is not None: + tags.update(existing_tags) + return ["{}:{}".format(k, v) for k, v in tags.items()] + + def _do_annotations(self, span: Span) -> None: # get the current span context # only do the annotations if it matches the context if span.span_type != SpanTypes.LLM: # do this check to avoid the warning log in `annotate` @@ -120,20 +251,14 @@ def _do_annotations(self, span): if current_context_id == context_id: self.annotate(span, **annotation_kwargs) - def _child_after_fork(self): + def _child_after_fork(self) -> None: self._llmobs_span_writer = self._llmobs_span_writer.recreate() self._llmobs_eval_metric_writer = self._llmobs_eval_metric_writer.recreate() self._evaluator_runner = self._evaluator_runner.recreate() - self._trace_processor._span_writer = self._llmobs_span_writer - self._trace_processor._evaluator_runner = self._evaluator_runner if self.enabled: self._start_service() def _start_service(self) -> None: - tracer_filters = self.tracer._filters - if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters): - tracer_filters += [self._trace_processor] - self.tracer.configure(settings={"FILTERS": tracer_filters}) try: self._llmobs_span_writer.start() self._llmobs_eval_metric_writer.start() @@ -160,11 +285,7 @@ def _stop_service(self) -> None: except ServiceStatusError: log.debug("Error stopping LLMObs writers") - try: - forksafe.unregister(self._child_after_fork) - self.tracer.shutdown() - except Exception: - log.warning("Failed to shutdown tracer", exc_info=True) + forksafe.unregister(self._child_after_fork) @classmethod def enable( @@ -265,7 +386,6 @@ def disable(cls) -> None: cls._instance.stop() cls.enabled = False - cls._instance.tracer.deregister_on_start_span(cls._instance._do_annotations) telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, False) log.debug("%s disabled", cls.__name__) diff --git a/ddtrace/llmobs/_trace_processor.py b/ddtrace/llmobs/_trace_processor.py deleted file mode 100644 index 231d53d7626..00000000000 --- a/ddtrace/llmobs/_trace_processor.py +++ /dev/null @@ -1,177 +0,0 @@ -from typing import Any -from typing import Dict -from typing import List -from typing import Optional -from typing import Tuple - -import ddtrace -from ddtrace import Span -from ddtrace import config -from ddtrace._trace.processor import TraceProcessor -from ddtrace.constants import ERROR_MSG -from ddtrace.constants import ERROR_STACK -from ddtrace.constants import ERROR_TYPE -from ddtrace.ext import SpanTypes -from ddtrace.internal.logger import get_logger -from ddtrace.llmobs._constants import INPUT_DOCUMENTS -from ddtrace.llmobs._constants import INPUT_MESSAGES -from ddtrace.llmobs._constants import INPUT_PARAMETERS -from ddtrace.llmobs._constants import INPUT_PROMPT -from ddtrace.llmobs._constants import INPUT_VALUE -from ddtrace.llmobs._constants import METADATA -from ddtrace.llmobs._constants import METRICS -from ddtrace.llmobs._constants import ML_APP -from ddtrace.llmobs._constants import MODEL_NAME -from ddtrace.llmobs._constants import MODEL_PROVIDER -from ddtrace.llmobs._constants import OUTPUT_DOCUMENTS -from ddtrace.llmobs._constants import OUTPUT_MESSAGES -from ddtrace.llmobs._constants import OUTPUT_VALUE -from ddtrace.llmobs._constants import RAGAS_ML_APP_PREFIX -from ddtrace.llmobs._constants import RUNNER_IS_INTEGRATION_SPAN_TAG -from ddtrace.llmobs._constants import SESSION_ID -from ddtrace.llmobs._constants import SPAN_KIND -from ddtrace.llmobs._constants import TAGS -from ddtrace.llmobs._utils import _get_llmobs_parent_id -from ddtrace.llmobs._utils import _get_ml_app -from ddtrace.llmobs._utils import _get_session_id -from ddtrace.llmobs._utils import _get_span_name -from ddtrace.llmobs._utils import safe_json - - -log = get_logger(__name__) - - -class LLMObsTraceProcessor(TraceProcessor): - """ - Processor that extracts LLM-type spans in a trace to submit as separate LLMObs span events to LLM Observability. - """ - - def __init__(self, llmobs_span_writer, evaluator_runner=None): - self._span_writer = llmobs_span_writer - self._evaluator_runner = evaluator_runner - - def process_trace(self, trace: List[Span]) -> Optional[List[Span]]: - if not trace: - return None - for span in trace: - if span.span_type == SpanTypes.LLM: - self.submit_llmobs_span(span) - return None if config._llmobs_agentless_enabled else trace - - def submit_llmobs_span(self, span: Span) -> None: - """Generate and submit an LLMObs span event to be sent to LLMObs.""" - span_event = None - is_llm_span = span._get_ctx_item(SPAN_KIND) == "llm" - is_ragas_integration_span = False - try: - span_event, is_ragas_integration_span = self._llmobs_span_event(span) - self._span_writer.enqueue(span_event) - except (KeyError, TypeError): - log.error("Error generating LLMObs span event for span %s, likely due to malformed span", span) - finally: - if not span_event or not is_llm_span or is_ragas_integration_span: - return - if self._evaluator_runner: - self._evaluator_runner.enqueue(span_event, span) - - def _llmobs_span_event(self, span: Span) -> Tuple[Dict[str, Any], bool]: - """Span event object structure.""" - span_kind = span._get_ctx_item(SPAN_KIND) - if not span_kind: - raise KeyError("Span kind not found in span context") - meta: Dict[str, Any] = {"span.kind": span_kind, "input": {}, "output": {}} - if span_kind in ("llm", "embedding") and span._get_ctx_item(MODEL_NAME) is not None: - meta["model_name"] = span._get_ctx_item(MODEL_NAME) - meta["model_provider"] = (span._get_ctx_item(MODEL_PROVIDER) or "custom").lower() - meta["metadata"] = span._get_ctx_item(METADATA) or {} - if span._get_ctx_item(INPUT_PARAMETERS): - meta["input"]["parameters"] = span._get_ctx_item(INPUT_PARAMETERS) - if span_kind == "llm" and span._get_ctx_item(INPUT_MESSAGES) is not None: - meta["input"]["messages"] = span._get_ctx_item(INPUT_MESSAGES) - if span._get_ctx_item(INPUT_VALUE) is not None: - meta["input"]["value"] = safe_json(span._get_ctx_item(INPUT_VALUE)) - if span_kind == "llm" and span._get_ctx_item(OUTPUT_MESSAGES) is not None: - meta["output"]["messages"] = span._get_ctx_item(OUTPUT_MESSAGES) - if span_kind == "embedding" and span._get_ctx_item(INPUT_DOCUMENTS) is not None: - meta["input"]["documents"] = span._get_ctx_item(INPUT_DOCUMENTS) - if span._get_ctx_item(OUTPUT_VALUE) is not None: - meta["output"]["value"] = safe_json(span._get_ctx_item(OUTPUT_VALUE)) - if span_kind == "retrieval" and span._get_ctx_item(OUTPUT_DOCUMENTS) is not None: - meta["output"]["documents"] = span._get_ctx_item(OUTPUT_DOCUMENTS) - if span._get_ctx_item(INPUT_PROMPT) is not None: - prompt_json_str = span._get_ctx_item(INPUT_PROMPT) - if span_kind != "llm": - log.warning( - "Dropping prompt on non-LLM span kind, annotating prompts is only supported for LLM span kinds." - ) - else: - meta["input"]["prompt"] = prompt_json_str - if span.error: - meta.update( - { - ERROR_MSG: span.get_tag(ERROR_MSG), - ERROR_STACK: span.get_tag(ERROR_STACK), - ERROR_TYPE: span.get_tag(ERROR_TYPE), - } - ) - if not meta["input"]: - meta.pop("input") - if not meta["output"]: - meta.pop("output") - metrics = span._get_ctx_item(METRICS) or {} - ml_app = _get_ml_app(span) - - is_ragas_integration_span = False - - if ml_app.startswith(RAGAS_ML_APP_PREFIX): - is_ragas_integration_span = True - - span._set_ctx_item(ML_APP, ml_app) - parent_id = str(_get_llmobs_parent_id(span) or "undefined") - - llmobs_span_event = { - "trace_id": "{:x}".format(span.trace_id), - "span_id": str(span.span_id), - "parent_id": parent_id, - "name": _get_span_name(span), - "start_ns": span.start_ns, - "duration": span.duration_ns, - "status": "error" if span.error else "ok", - "meta": meta, - "metrics": metrics, - } - session_id = _get_session_id(span) - if session_id is not None: - span._set_ctx_item(SESSION_ID, session_id) - llmobs_span_event["session_id"] = session_id - - llmobs_span_event["tags"] = self._llmobs_tags( - span, ml_app, session_id, is_ragas_integration_span=is_ragas_integration_span - ) - return llmobs_span_event, is_ragas_integration_span - - @staticmethod - def _llmobs_tags( - span: Span, ml_app: str, session_id: Optional[str] = None, is_ragas_integration_span: bool = False - ) -> List[str]: - tags = { - "version": config.version or "", - "env": config.env or "", - "service": span.service or "", - "source": "integration", - "ml_app": ml_app, - "ddtrace.version": ddtrace.__version__, - "language": "python", - "error": span.error, - } - err_type = span.get_tag(ERROR_TYPE) - if err_type: - tags["error_type"] = err_type - if session_id: - tags["session_id"] = session_id - if is_ragas_integration_span: - tags[RUNNER_IS_INTEGRATION_SPAN_TAG] = "ragas" - existing_tags = span._get_ctx_item(TAGS) - if existing_tags is not None: - tags.update(existing_tags) - return ["{}:{}".format(k, v) for k, v in tags.items()] diff --git a/ddtrace/llmobs/_utils.py b/ddtrace/llmobs/_utils.py index c1b1c4a776c..4b1d4f1ac60 100644 --- a/ddtrace/llmobs/_utils.py +++ b/ddtrace/llmobs/_utils.py @@ -135,6 +135,7 @@ def _get_ml_app(span: Span) -> str: ml_app = span._get_ctx_item(ML_APP) if ml_app: return ml_app + # TODO: go up the span tree to find the nearest LLMObs span with an ml_app nearest_llmobs_ancestor = _get_nearest_llmobs_ancestor(span) if nearest_llmobs_ancestor: ml_app = nearest_llmobs_ancestor._get_ctx_item(ML_APP) @@ -149,6 +150,7 @@ def _get_session_id(span: Span) -> Optional[str]: session_id = span._get_ctx_item(SESSION_ID) if session_id: return session_id + # TODO: go up the span tree to find the nearest LLMObs span with session nearest_llmobs_ancestor = _get_nearest_llmobs_ancestor(span) if nearest_llmobs_ancestor: session_id = nearest_llmobs_ancestor._get_ctx_item(SESSION_ID) diff --git a/tests/llmobs/conftest.py b/tests/llmobs/conftest.py index a7d467b3985..7a2a940e5c8 100644 --- a/tests/llmobs/conftest.py +++ b/tests/llmobs/conftest.py @@ -251,7 +251,6 @@ def llmobs(monkeypatch, tracer, llmobs_env, llmobs_span_writer): with override_global_config(dict(_llmobs_ml_app=llmobs_env.get("DD_LLMOBS_ML_APP"))): llmobs_service.enable(_tracer=tracer) llmobs_service._instance._llmobs_span_writer = llmobs_span_writer - llmobs_service._instance._trace_processor._span_writer = llmobs_span_writer yield llmobs llmobs_service.disable() diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 98748250c3a..1721f79ef97 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -7,9 +7,7 @@ import ddtrace from ddtrace._trace.context import Context -from ddtrace._trace.span import Span from ddtrace.ext import SpanTypes -from ddtrace.filters import TraceFilter from ddtrace.internal.service import ServiceStatus from ddtrace.llmobs import LLMObs as llmobs_service from ddtrace.llmobs._constants import INPUT_DOCUMENTS @@ -31,7 +29,6 @@ from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING from ddtrace.llmobs._constants import TAGS from ddtrace.llmobs._llmobs import SUPPORTED_LLMOBS_INTEGRATIONS -from ddtrace.llmobs._llmobs import LLMObsTraceProcessor from ddtrace.llmobs.utils import Prompt from tests.llmobs._utils import _expected_llmobs_eval_metric_event from tests.llmobs._utils import _expected_llmobs_llm_span_event @@ -48,13 +45,9 @@ def mock_logs(): def run_llmobs_trace_filter(dummy_tracer): - for trace_filter in dummy_tracer._filters: - if isinstance(trace_filter, LLMObsTraceProcessor): - root_llm_span = Span(name="span1", span_type=SpanTypes.LLM) - root_llm_span.set_tag_str(SPAN_KIND, "llm") - trace1 = [root_llm_span] - return trace_filter.process_trace(trace1) - raise ValueError("LLMObsTraceProcessor not found in tracer filters.") + with dummy_tracer.trace("span1", span_type=SpanTypes.LLM) as span: + span.set_tag_str(SPAN_KIND, "llm") + return dummy_tracer.writer.pop() def test_service_enable(): @@ -65,7 +58,6 @@ def test_service_enable(): assert llmobs_instance is not None assert llmobs_service.enabled assert llmobs_instance.tracer == dummy_tracer - assert any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in dummy_tracer._filters) assert run_llmobs_trace_filter(dummy_tracer) is not None llmobs_service.disable() @@ -79,7 +71,6 @@ def test_service_enable_with_apm_disabled(monkeypatch): assert llmobs_instance is not None assert llmobs_service.enabled assert llmobs_instance.tracer == dummy_tracer - assert any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in dummy_tracer._filters) assert run_llmobs_trace_filter(dummy_tracer) is None llmobs_service.disable() @@ -139,7 +130,6 @@ def test_service_enable_already_enabled(mock_logs): assert llmobs_instance is not None assert llmobs_service.enabled assert llmobs_instance.tracer == dummy_tracer - assert any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in dummy_tracer._filters) llmobs_service.disable() mock_logs.debug.assert_has_calls([mock.call("%s already enabled", "LLMObs")]) @@ -1667,42 +1657,6 @@ def test_llmobs_fork_evaluator_runner_run(monkeypatch): llmobs_service.disable() -def test_llmobs_fork_custom_filter(monkeypatch): - """Test that forking a process correctly keeps any custom filters.""" - - class CustomFilter(TraceFilter): - def process_trace(self, trace): - return trace - - monkeypatch.setenv("_DD_LLMOBS_WRITER_INTERVAL", 5.0) - with mock.patch("ddtrace.internal.writer.HTTPWriter._send_payload"): - tracer = DummyTracer() - custom_filter = CustomFilter() - tracer.configure(settings={"FILTERS": [custom_filter]}) - llmobs_service.enable(_tracer=tracer, ml_app="test_app") - assert custom_filter in llmobs_service._instance.tracer._filters - pid = os.fork() - if pid: # parent - assert custom_filter in llmobs_service._instance.tracer._filters - assert any( - isinstance(tracer_filter, LLMObsTraceProcessor) - for tracer_filter in llmobs_service._instance.tracer._filters - ) - else: # child - assert custom_filter in llmobs_service._instance.tracer._filters - assert any( - isinstance(tracer_filter, LLMObsTraceProcessor) - for tracer_filter in llmobs_service._instance.tracer._filters - ) - llmobs_service.disable() - os._exit(12) - - _, status = os.waitpid(pid, 0) - exit_code = os.WEXITSTATUS(status) - assert exit_code == 12 - llmobs_service.disable() - - def test_llmobs_fork_disabled(monkeypatch): """Test that after being disabled the service remains disabled when forking""" monkeypatch.setenv("DD_LLMOBS_ENABLED", "0") @@ -1994,3 +1948,4 @@ def test_service_enable_does_not_start_evaluator_runner(): assert llmobs_service._instance._llmobs_span_writer.status.value == "running" assert llmobs_service._instance._evaluator_runner.status.value == "stopped" llmobs_service.disable() + diff --git a/tests/llmobs/test_llmobs_trace_processor.py b/tests/llmobs/test_llmobs_trace_processor.py deleted file mode 100644 index b55286d49c8..00000000000 --- a/tests/llmobs/test_llmobs_trace_processor.py +++ /dev/null @@ -1,36 +0,0 @@ -import mock - -from ddtrace._trace.span import Span -from ddtrace.ext import SpanTypes -from ddtrace.llmobs._constants import SPAN_KIND -from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor -from tests.utils import override_global_config - - -def test_processor_returns_all_traces_by_default(): - """Test that the LLMObsTraceProcessor returns all traces by default.""" - trace_filter = LLMObsTraceProcessor(llmobs_span_writer=mock.MagicMock()) - root_llm_span = Span(name="span1", span_type=SpanTypes.LLM) - root_llm_span._set_ctx_item(SPAN_KIND, "llm") - trace1 = [root_llm_span] - assert trace_filter.process_trace(trace1) == trace1 - - -def test_processor_returns_all_traces_if_not_agentless(): - """Test that the LLMObsTraceProcessor returns all traces if DD_LLMOBS_AGENTLESS_ENABLED is not set to true.""" - with override_global_config(dict(_llmobs_agentless_enabled=False)): - trace_filter = LLMObsTraceProcessor(llmobs_span_writer=mock.MagicMock()) - root_llm_span = Span(name="span1", span_type=SpanTypes.LLM) - root_llm_span._set_ctx_item(SPAN_KIND, "llm") - trace1 = [root_llm_span] - assert trace_filter.process_trace(trace1) == trace1 - - -def test_processor_returns_none_in_agentless_mode(): - """Test that the LLMObsTraceProcessor returns None if DD_LLMOBS_AGENTLESS_ENABLED is set to true.""" - with override_global_config(dict(_llmobs_agentless_enabled=True)): - trace_filter = LLMObsTraceProcessor(llmobs_span_writer=mock.MagicMock()) - root_llm_span = Span(name="span1", span_type=SpanTypes.LLM) - root_llm_span._set_ctx_item(SPAN_KIND, "llm") - trace1 = [root_llm_span] - assert trace_filter.process_trace(trace1) is None