Skip to content

Commit 16ea80e

Browse files
committed
feat(tracing): add common processor
1 parent 028438d commit 16ea80e

File tree

4 files changed

+789
-16
lines changed

4 files changed

+789
-16
lines changed

src/uipath/tracing/_otel_exporters.py

Lines changed: 359 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
SpanExporter,
1313
SpanExportResult,
1414
)
15+
from typing_extensions import override
1516

1617
from uipath._utils._ssl_context import get_httpx_client_kwargs
1718

@@ -99,17 +100,369 @@ class BaseSpanProcessor(ABC):
99100
Defines the interface for processing spans with a single abstract method.
100101
"""
101102

102-
@abstractmethod
103-
def process_span(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]:
104-
"""Process a span and return the transformed data.
103+
def __init__(
104+
self,
105+
dump_attributes_as_string: bool = True,
106+
unflatten_attributes: bool = True,
107+
map_json_fields: bool = True,
108+
):
109+
self._dump_attributes_as_string = dump_attributes_as_string
110+
self._unflatten_attributes = unflatten_attributes
111+
self._map_json_fields = map_json_fields
112+
113+
def try_convert_json(self, flat_dict: Dict[str, Any]) -> Dict[str, Any]:
114+
"""Tries to convert stringified JSON values in a flattened dictionary back to their original types.
105115
106116
Args:
107-
span_data: The span data to process
117+
flat_dict: A dictionary with potentially stringified JSON values.
108118
109119
Returns:
110-
Processed span data
120+
A new dictionary with JSON strings converted to their original types.
111121
"""
112-
pass
122+
result = {}
123+
for key, value in flat_dict.items():
124+
if isinstance(value, str):
125+
try:
126+
result[key] = json.loads(value)
127+
except json.JSONDecodeError:
128+
result[key] = value
129+
else:
130+
result[key] = value
131+
return result
132+
133+
def unflatten_dict(self, flat_dict: Dict[str, Any]) -> Dict[str, Any]:
134+
"""Converts a flattened dictionary with dot-separated keys into a nested dictionary.
135+
136+
Args:
137+
flat_dict: Dictionary with dot-separated keys (e.g., 'llm.output_messages.0.message.content')
138+
139+
Returns:
140+
Nested dictionary structure
141+
142+
Example:
143+
Input: {'llm.output_messages.0.message.content': 'hello', 'llm.model': 'gpt-4'}
144+
Output: {'llm': {'output_messages': [{'message': {'content': 'hello'}}], 'model': 'gpt-4'}}
145+
"""
146+
result = {}
147+
148+
for key, value in flat_dict.items():
149+
# Split the key by dots
150+
parts = key.split(".")
151+
current = result
152+
153+
# Navigate/create the nested structure
154+
for i, part in enumerate(parts[:-1]):
155+
# Check if this part represents an array index
156+
if part.isdigit():
157+
# Convert to integer index
158+
index = int(part)
159+
# Ensure the parent is a list
160+
if not isinstance(current, list):
161+
raise ValueError(
162+
f"Expected list but found {type(current)} for key: {key}"
163+
)
164+
# Extend the list if necessary
165+
while len(current) <= index:
166+
current.append(None)
167+
168+
# If the current element is None, we need to create a structure for it
169+
if current[index] is None:
170+
# Look ahead to see if the next part is a digit (array index)
171+
next_part = parts[i + 1] if i + 1 < len(parts) else None
172+
if next_part and next_part.isdigit():
173+
current[index] = []
174+
else:
175+
current[index] = {}
176+
177+
current = current[index]
178+
else:
179+
# Regular dictionary key
180+
if part not in current:
181+
# Look ahead to see if the next part is a digit (array index)
182+
next_part = parts[i + 1] if i + 1 < len(parts) else None
183+
if next_part and next_part.isdigit():
184+
current[part] = []
185+
else:
186+
current[part] = {}
187+
current = current[part] # Set the final value
188+
189+
final_key = parts[-1]
190+
if final_key.isdigit():
191+
# If the final key is a digit, we're setting an array element
192+
index = int(final_key)
193+
if not isinstance(current, list):
194+
raise ValueError(
195+
f"Expected list but found {type(current)} for key: {key}"
196+
)
197+
while len(current) <= index:
198+
current.append(None)
199+
current[index] = value
200+
else:
201+
# Regular key assignment
202+
current[final_key] = value
203+
204+
return result
205+
206+
def safe_get(self, data: Dict[str, Any], path: str, default=None):
207+
"""Safely get nested value using dot notation."""
208+
keys = path.split(".")
209+
current = data
210+
for key in keys:
211+
if isinstance(current, dict) and key in current:
212+
current = current[key]
213+
else:
214+
return default
215+
return current
216+
217+
def safe_parse_json(self, value):
218+
"""Safely parse JSON string."""
219+
if isinstance(value, str):
220+
try:
221+
return json.loads(value.replace("'", '"'))
222+
except json.JSONDecodeError:
223+
return value
224+
return value
225+
226+
@abstractmethod
227+
def process_span(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]:
228+
return span_data
229+
230+
231+
class CommonSpanProcessor(BaseSpanProcessor):
232+
"""A class to process spans, applying custom attribute and type mappings.
233+
234+
This processor can transform flattened attribute keys (e.g., 'llm.output_messages.0.message.role')
235+
into nested dictionary structures for easier access and processing.
236+
237+
Example usage:
238+
# With unflattening enabled
239+
processor = LangchainSpanProcessor(unflatten_attributes=True, dump_attributes_as_string=False)
240+
processed_span = processor.process_span(span_data)
241+
242+
# Access nested attributes naturally:
243+
role = processed_span['attributes']['llm']['output_messages'][0]['message']['role']
244+
245+
# Without unflattening (original behavior)
246+
processor = LangchainSpanProcessor(unflatten_attributes=False)
247+
processed_span = processor.process_span(span_data)
248+
249+
# Access with flattened keys:
250+
role = processed_span['attributes']['llm.output_messages.0.message.role']
251+
"""
252+
253+
# Mapping of old attribute names to new attribute names or (new name, function)
254+
ATTRIBUTE_MAPPING = {
255+
"input.value": ("input", lambda s: json.loads(s)),
256+
"output.value": ("output", lambda s: json.loads(s)),
257+
"llm.model_name": "model",
258+
}
259+
260+
# Mapping of span types
261+
SPAN_TYPE_MAPPING = {
262+
"LLM": "completion",
263+
"TOOL": "toolCall",
264+
# Add more mappings as needed
265+
}
266+
267+
def __init__(
268+
self,
269+
dump_attributes_as_string: bool = True,
270+
unflatten_attributes: bool = True,
271+
map_json_fields: bool = True,
272+
):
273+
"""Initializes the LangchainSpanProcessor.
274+
275+
Args:
276+
dump_attributes_as_string: If True, dumps attributes as a JSON string.
277+
Otherwise, attributes are set as a dictionary.
278+
unflatten_attributes: If True, converts flattened dot-separated keys
279+
into nested dictionary structures.
280+
map_json_fields: If True, applies JSON field mapping transformations
281+
for tool calls and LLM calls.
282+
"""
283+
self._dump_attributes_as_string = dump_attributes_as_string
284+
self._unflatten_attributes = unflatten_attributes
285+
self._map_json_fields = map_json_fields
286+
287+
def extract_attributes(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]:
288+
"""Extract and parse attributes from span_data, checking both 'Attributes' and 'attributes' keys."""
289+
for key in ["Attributes", "attributes"]:
290+
if key in span_data:
291+
value = span_data.pop(key)
292+
if isinstance(value, str):
293+
try:
294+
parsed_value = json.loads(value)
295+
return parsed_value if isinstance(parsed_value, dict) else {}
296+
except json.JSONDecodeError:
297+
logger.warning(f"Failed to parse attributes JSON: {value}")
298+
return {}
299+
elif isinstance(value, dict):
300+
return value
301+
else:
302+
return {}
303+
return {}
304+
305+
@override
306+
def process_span(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]:
307+
logger.info(f"Processing span: {span_data}")
308+
attributes = self.extract_attributes(span_data)
309+
310+
if attributes and isinstance(attributes, dict):
311+
if "openinference.span.kind" in attributes:
312+
# Remove the span kind attribute
313+
span_type = attributes["openinference.span.kind"]
314+
# Map span type using SPAN_TYPE_MAPPING
315+
span_data["SpanType"] = self.SPAN_TYPE_MAPPING.get(span_type, span_type)
316+
del attributes["openinference.span.kind"]
317+
318+
# Apply the transformation logic
319+
for old_key, mapping in self.ATTRIBUTE_MAPPING.items():
320+
if old_key in attributes:
321+
if isinstance(mapping, tuple):
322+
new_key, func = mapping
323+
try:
324+
attributes[new_key] = func(attributes[old_key])
325+
except Exception:
326+
attributes[new_key] = attributes[old_key]
327+
else:
328+
new_key = mapping
329+
attributes[new_key] = attributes[old_key]
330+
del attributes[old_key]
331+
332+
if attributes:
333+
# Apply unflattening if requested (before JSON field mapping)
334+
if self._unflatten_attributes:
335+
try:
336+
attributes = self.try_convert_json(attributes)
337+
attributes = self.unflatten_dict(attributes)
338+
except Exception as e:
339+
logger.warning(f"Failed to unflatten attributes: {e}")
340+
341+
# Set attributes in span_data as dictionary for JSON field mapping
342+
span_data["attributes"] = attributes
343+
344+
# Apply JSON field mapping before final serialization
345+
if self._map_json_fields:
346+
span_data = self.map_json_fields_from_attributes(span_data)
347+
348+
# Convert back to JSON string if requested (after all transformations)
349+
if self._dump_attributes_as_string:
350+
span_data["attributes"] = json.dumps(span_data["attributes"])
351+
352+
return span_data
353+
354+
def map_tool_call_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]:
355+
"""Simple tool call mapping - just add new fields."""
356+
result = attributes.copy() # Keep originals
357+
358+
# Add new fields
359+
result["type"] = "toolCall"
360+
result["callId"] = attributes.get("call_id") or attributes.get("id")
361+
result["toolName"] = self.safe_get(attributes, "tool.name")
362+
result["arguments"] = self.safe_parse_json(attributes.get("input", "{}"))
363+
result["toolType"] = "Integration"
364+
result["result"] = self.safe_parse_json(attributes.get("output"))
365+
result["error"] = None
366+
367+
return result
368+
369+
def map_llm_call_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]:
370+
"""Simple LLM call mapping - just add new fields."""
371+
result = attributes.copy() # Keep originals
372+
373+
# Transform token usage data if present (after unflattening)
374+
# Use safe_get to extract token count values from nested structure
375+
prompt_tokens = self.safe_get(attributes, "llm.token_count.prompt")
376+
completion_tokens = self.safe_get(attributes, "llm.token_count.completion")
377+
total_tokens = self.safe_get(attributes, "llm.token_count.total")
378+
379+
usage = {
380+
"promptTokens": prompt_tokens,
381+
"completionTokens": completion_tokens,
382+
"totalTokens": total_tokens,
383+
"isByoExecution": False,
384+
"executionDeploymentType": "PAYGO",
385+
"isPiiMasked": False,
386+
}
387+
388+
# remove None values
389+
usage = {k: v for k, v in usage.items() if v is not None}
390+
391+
result["usage"] = usage
392+
393+
# Add new fields
394+
result["input"] = self.safe_get(attributes, "llm.input_messages")
395+
result["output"] = self.safe_get(attributes, "llm.output_messages")
396+
397+
result["type"] = "completion"
398+
result["model"] = self.safe_get(attributes, "llm.invocation_parameters.model")
399+
400+
# Settings
401+
settings = {}
402+
max_tokens = self.safe_get(attributes, "llm.invocation_parameters.max_tokens")
403+
temperature = self.safe_get(attributes, "llm.invocation_parameters.temperature")
404+
if max_tokens:
405+
settings["maxTokens"] = max_tokens
406+
if temperature is not None:
407+
settings["temperature"] = temperature
408+
if settings:
409+
result["settings"] = settings
410+
411+
# Tool calls (simplified)
412+
tool_calls = []
413+
output_msgs = self.safe_get(attributes, "llm.output_messages", [])
414+
for msg in output_msgs:
415+
msg_tool_calls = self.safe_get(msg, "message.tool_calls", [])
416+
for tc in msg_tool_calls:
417+
tool_call_data = tc.get("tool_call", {})
418+
tool_calls.append(
419+
{
420+
"id": tool_call_data.get("id"),
421+
"name": self.safe_get(tool_call_data, "function.name"),
422+
"arguments": self.safe_get(
423+
tool_call_data, "function.arguments", {}
424+
),
425+
}
426+
)
427+
if tool_calls:
428+
result["toolCalls"] = tool_calls
429+
430+
# Usage (enhance existing if not created above)
431+
if "usage" in result:
432+
usage = result["usage"]
433+
if isinstance(usage, dict):
434+
usage.setdefault("isByoExecution", False)
435+
usage.setdefault("executionDeploymentType", "PAYGO")
436+
usage.setdefault("isPiiMasked", False)
437+
438+
return result
439+
440+
def map_json_fields_from_attributes(
441+
self, span_data: Dict[str, Any]
442+
) -> Dict[str, Any]:
443+
"""Simple mapping dispatcher."""
444+
if "attributes" not in span_data:
445+
return span_data
446+
447+
attributes = span_data["attributes"]
448+
449+
# Parse if string
450+
if isinstance(attributes, str):
451+
try:
452+
attributes = json.loads(attributes)
453+
except json.JSONDecodeError:
454+
return span_data
455+
456+
if not isinstance(attributes, dict):
457+
return span_data
458+
459+
# Simple detection and mapping
460+
if "tool" in attributes or span_data.get("SpanType") == "toolCall":
461+
span_data["attributes"] = self.map_tool_call_attributes(attributes)
462+
elif "llm" in attributes or span_data.get("SpanType") == "completion":
463+
span_data["attributes"] = self.map_llm_call_attributes(attributes)
464+
465+
return span_data
113466

114467

115468
class UiPathSpanExporterBase(SpanExporter, ABC):

src/uipath/tracing/_otel_processors.py

Whitespace-only changes.

0 commit comments

Comments
 (0)