Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion ddtrace/llmobs/_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import field
import re
import sys
import time
import traceback
from typing import TYPE_CHECKING
from typing import Any
Expand Down Expand Up @@ -37,6 +38,7 @@
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._writer import LLMObsExperimentEvalMetricEvent
from ddtrace.llmobs._writer import LLMObsExperimentsClient
from ddtrace.llmobs.types import ExportedLLMObsSpan


logger = get_logger(__name__)
Expand Down Expand Up @@ -737,6 +739,11 @@ class Experiment:
_evaluators: Sequence[Union[EvaluatorType, AsyncEvaluatorType]]
_summary_evaluators: Sequence[Union[SummaryEvaluatorType, AsyncSummaryEvaluatorType]]

@classmethod
def _NO_OP_TASK(cls, input_data, config):
"""No-op task used when initializing distributed experiment objects on remote hosts."""
return None

def __init__(
self,
name: str,
Expand All @@ -750,6 +757,7 @@ def __init__(
_llmobs_instance: Optional["LLMObs"] = None,
summary_evaluators: Optional[Sequence[Union[SummaryEvaluatorType, AsyncSummaryEvaluatorType]]] = None,
runs: Optional[int] = None,
is_distributed: Optional[bool] = False,
) -> None:
self.name = name
self._task = task
Expand All @@ -765,6 +773,7 @@ def __init__(
self._config: dict[str, JSONType] = config or {}
self._runs: int = runs or 1
self._llmobs_instance = _llmobs_instance
self._is_distributed = is_distributed

if not project_name:
raise ValueError(
Expand All @@ -776,6 +785,7 @@ def __init__(
self._project_id: Optional[str] = None
self._id: Optional[str] = None
self._run_name: Optional[str] = None
self.experiment_span: Optional["ExportedLLMObsSpan"] = None

@property
def url(self) -> str:
Expand Down Expand Up @@ -974,7 +984,7 @@ def _prepare_summary_evaluator_data(

return inputs, outputs, expected_outputs, metadata_list, eval_results_by_name

def _setup_experiment(self, llmobs_not_enabled_error: str) -> None:
def _setup_experiment(self, llmobs_not_enabled_error: str, ensure_unique: bool = True) -> None:
if not self._llmobs_instance or not self._llmobs_instance.enabled:
raise ValueError(llmobs_not_enabled_error)

Expand All @@ -991,6 +1001,7 @@ def _setup_experiment(self, llmobs_not_enabled_error: str) -> None:
convert_tags_dict_to_list(self._tags),
self._description,
self._runs,
ensure_unique,
)
self._id = experiment_id
self._tags["experiment_id"] = str(experiment_id)
Expand Down Expand Up @@ -1061,6 +1072,8 @@ async def _process_record(
experiment_name=self.name,
) as span:
span_context = self._llmobs_instance.export_span(span=span)
if self._is_distributed:
self.experiment_span = span_context
if span_context:
span_id = span_context.get("span_id", "")
trace_id = span_context.get("trace_id", "")
Expand Down Expand Up @@ -1327,6 +1340,92 @@ async def _evaluate_summary_single(summary_evaluator: Any) -> tuple[str, dict[st

return evaluations

async def _run_task_single_iteration(
self,
jobs: int = 1,
raise_errors: bool = False,
run_iteration: Optional[int] = 1,
) -> ExperimentResult:
run = _ExperimentRunInfo(run_iteration or 1)
self._tags["run_id"] = str(run._id)
self._tags["run_iteration"] = str(run._run_iteration)
task_results = await self._run_task(jobs, run, raise_errors, None)
evaluations = await self._run_evaluators(task_results, raise_errors=raise_errors, jobs=jobs)
run_result = self._merge_results(run, task_results, evaluations, [])
experiment_evals = self._generate_metrics_from_exp_results(run_result)
self._llmobs_instance._dne_client.experiment_eval_post( # type: ignore[union-attr]
cast(str, self._id), experiment_evals, convert_tags_dict_to_list(self._tags)
)
return {
"summary_evaluations": {},
"rows": [],
"runs": [run_result],
}

def _submit_eval_metric(
self,
eval_name: str,
eval_value: JSONType,
span: Optional["ExportedLLMObsSpan"] = None,
timestamp_ms: Optional[int] = None,
is_summary_eval: Optional[bool] = None,
reasoning: Optional[str] = None,
assessment: Optional[str] = None,
metadata: Optional[dict[str, JSONType]] = None,
tags: Optional[dict[str, str]] = None,
) -> None:
"""Submit an evaluation metric for a distributed experiment.

:param eval_name: Name of the evaluation metric
:param eval_value: Value of the evaluation metric
:param span: Optional span context dict with span_id and trace_id. If None and not a
summary eval, uses the last span from _run_task_single_iteration.
:param timestamp_ms: Optional timestamp in milliseconds
:param is_summary_eval: Whether this is a summary-level evaluation
:param reasoning: Optional reasoning string
:param assessment: Optional assessment string
:param metadata: Optional metadata dict
:param tags: Optional tags dict
"""
if not self._is_distributed:
raise ValueError("this method is only used for distributed experiments")

if span is not None and (
not isinstance(span, dict)
or not isinstance(span.get("span_id"), str)
or not isinstance(span.get("trace_id"), str)
):
raise TypeError(
"`span` must be a dictionary containing both span_id and trace_id keys. "
"LLMObs.export_span() can be used to generate this dictionary from a given span."
)

if span is None and not is_summary_eval and self.experiment_span is None:
raise TypeError("unexpected state, must supply span or must run the experiment first")

if span is None and not is_summary_eval:
span = self.experiment_span

timestamp_ns = int(time.time() * 1e9)

eval_metric = self._generate_metric_from_evaluation(
eval_name,
eval_value,
None,
span.get("span_id", "") if span else "",
span.get("trace_id", "") if span else "",
timestamp_ns,
"summary" if is_summary_eval else "custom",
reasoning,
assessment,
metadata,
tags,
)

self._llmobs_instance._dne_client.experiment_eval_post( # type: ignore[union-attr]
cast(str, self._id), [eval_metric], convert_tags_dict_to_list(self._tags)
)


class SyncExperiment:
"""Thin synchronous wrapper around the async-native ``Experiment``.
Expand Down
69 changes: 69 additions & 0 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@
from ddtrace.llmobs._experiment import Dataset
from ddtrace.llmobs._experiment import DatasetRecord
from ddtrace.llmobs._experiment import DatasetRecordInputType
from ddtrace.llmobs._experiment import EvaluatorResult
from ddtrace.llmobs._experiment import EvaluatorType
from ddtrace.llmobs._experiment import Experiment
from ddtrace.llmobs._experiment import ExperimentResult
from ddtrace.llmobs._experiment import JSONType
from ddtrace.llmobs._experiment import Project
from ddtrace.llmobs._experiment import SummaryEvaluatorType
Expand Down Expand Up @@ -1321,6 +1323,73 @@ def async_experiment(
runs=runs,
)

@classmethod
def _distributed_experiment(
cls,
name: str,
dataset: Optional[Dataset] = None,
description: str = "",
project_name: Optional[str] = None,
tags: Optional[dict[str, str]] = None,
config: Optional[ConfigType] = None,
runs: Optional[int] = 1,
) -> Experiment:
experiment = Experiment(
name,
Experiment._NO_OP_TASK,
dataset, # type: ignore[arg-type]
[],
project_name=project_name or cls._project_name,
tags=tags,
description=description,
config=config,
_llmobs_instance=cls._instance,
runs=runs,
is_distributed=True,
)
experiment._setup_experiment(
"LLMObs is not enabled. Ensure LLM Observability is enabled via `LLMObs.enable(...)`",
ensure_unique=False,
)
return experiment

@classmethod
def _run_for_experiment(
cls,
experiment_id: str,
task: Callable[[DatasetRecordInputType, Optional[ConfigType]], JSONType],
dataset_records: list[DatasetRecord],
evaluators: list[
Union[
Callable[[DatasetRecordInputType, JSONType, JSONType], Union[JSONType, EvaluatorResult]],
Callable[[], Union[JSONType, EvaluatorResult]],
]
],
jobs: int = 1,
raise_errors: bool = False,
run_iteration: Optional[int] = 1,
tags: Optional[dict[str, str]] = None,
) -> tuple[Experiment, ExperimentResult]:
if not cls._instance or not cls._instance.enabled:
raise ValueError("LLMObs is not enabled. Ensure LLM Observability is enabled via `LLMObs.enable(...)`")
experiment = cls._instance._dne_client.experiment_get(experiment_id)
experiment._llmobs_instance = cls._instance
experiment._dataset._records = dataset_records
experiment._task = task
experiment._evaluators = evaluators # type: ignore[assignment]

coro = experiment._run_task_single_iteration(jobs, raise_errors, run_iteration)
try:
asyncio.get_running_loop()
except RuntimeError:
results = asyncio.run(coro)
else:
import concurrent.futures

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
results = pool.submit(asyncio.run, coro).result()
return experiment, results

@classmethod
def register_processor(cls, processor: Optional[Callable[[LLMObsSpan], Optional[LLMObsSpan]]] = None) -> None:
"""Register a processor to be called on each LLMObs span.
Expand Down
65 changes: 64 additions & 1 deletion ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from ddtrace.llmobs._experiment import Dataset
from ddtrace.llmobs._experiment import DatasetRecord
from ddtrace.llmobs._experiment import DatasetRecordRaw
from ddtrace.llmobs._experiment import Experiment
from ddtrace.llmobs._experiment import JSONType
from ddtrace.llmobs._experiment import Project
from ddtrace.llmobs._experiment import UpdatableDatasetRecord
Expand Down Expand Up @@ -647,6 +648,67 @@ def project_create_or_get(self, name: Optional[str] = None) -> Project:

return project

def experiment_get(self, id: str, tag_overrides: Optional[dict[str, str]] = None) -> "Experiment": # noqa: A002
path = f"/api/v2/llm-obs/v1/experiments?filter[id]={id}"
resp = self.request("GET", path)
if resp.status != 200:
raise ValueError(f"Failed to get experiment with ID {id}: {resp.status} {resp.get_json()}")
response_data = resp.get_json()
experiments = response_data.get("data", [])
if len(experiments) < 1:
raise ValueError(f"No experiments found for ID {id}")
experiment = experiments[0]["attributes"]
project_id = experiment["project_id"]
dataset_id = experiment["dataset_id"]

tags: list[str] = experiment["metadata"].get("tags", [])
tags_dict: dict[str, str] = {}
for tag in tags:
kv = tag.split(":", 1)
if len(kv) == 2:
tags_dict[kv[0]] = kv[1]

if tag_overrides:
tags_dict.update(tag_overrides)

# TODO[gh] attempt to find the project & dataset name through tags if possible,
# temporary hack to avoid extra API calls
project_name = tags_dict.get("project_name", project_id)
dataset_name = tags_dict.get("dataset_name", dataset_id)

project = Project(name=project_name, _id=project_id)

dataset = Dataset(
name=dataset_name,
project=project,
dataset_id=dataset_id,
records=[],
# TODO[gh] need to fully pull dataset for this to be accurate, not critical for now
description="",
# TODO[gh] this may be incorrect
latest_version=experiment["dataset_version"],
version=experiment["dataset_version"],
_dne_client=self,
)

experiment_obj = Experiment(
name=experiment["experiment"],
task=Experiment._NO_OP_TASK,
dataset=dataset,
evaluators=[],
project_name=project_name,
tags=tags_dict,
description=experiment["description"],
config=experiment.get("config", {}),
_llmobs_instance=None,
runs=experiment["run_count"],
is_distributed=True,
)
experiment_obj._run_name = experiment["name"]
experiment_obj._id = id
experiment_obj._project_id = project_id
return experiment_obj

def experiment_create(
self,
name: str,
Expand All @@ -657,6 +719,7 @@ def experiment_create(
tags: Optional[list[str]] = None,
description: Optional[str] = None,
runs: Optional[int] = 1,
ensure_unique: bool = True,
) -> tuple[str, str]:
path = "/api/unstable/llm-obs/v1/experiments"
resp = self.request(
Expand All @@ -673,7 +736,7 @@ def experiment_create(
"dataset_version": dataset_version,
"config": exp_config or {},
"metadata": {"tags": cast(JSONType, tags or [])},
"ensure_unique": True,
"ensure_unique": ensure_unique,
"run_count": runs,
},
}
Expand Down
Loading