From 028438d720a289c541c1e12f920d455d1eb4f119 Mon Sep 17 00:00:00 2001 From: ionmincu Date: Thu, 28 Aug 2025 17:22:24 +0300 Subject: [PATCH 1/2] feat(tracing): add json and sqlite exporters to python --- pyproject.toml | 2 +- src/uipath/tracing/__init__.py | 17 +- src/uipath/tracing/_otel_exporters.py | 388 ++++++++++++++++- tests/tracing/test_otel_exporters.py | 585 +++++++++++++++++++++++++- 4 files changed, 987 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a5726f12..1292a121 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath" -version = "2.1.27" +version = "2.1.28" description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools." readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.10" diff --git a/src/uipath/tracing/__init__.py b/src/uipath/tracing/__init__.py index be604b19..8314be5e 100644 --- a/src/uipath/tracing/__init__.py +++ b/src/uipath/tracing/__init__.py @@ -1,4 +1,17 @@ -from ._otel_exporters import LlmOpsHttpExporter # noqa: D104 +from ._otel_exporters import ( # noqa: D104 + BaseSpanProcessor, + JsonFileExporter, + LlmOpsHttpExporter, + SqliteExporter, +) from ._traced import TracingManager, traced, wait_for_tracers # noqa: D104 -__all__ = ["TracingManager", "traced", "wait_for_tracers", "LlmOpsHttpExporter"] +__all__ = [ + "TracingManager", + "traced", + "wait_for_tracers", + "LlmOpsHttpExporter", + "BaseSpanProcessor", + "JsonFileExporter", + "SqliteExporter", +] diff --git a/src/uipath/tracing/_otel_exporters.py b/src/uipath/tracing/_otel_exporters.py index b6462abb..e5eaeb5f 100644 --- a/src/uipath/tracing/_otel_exporters.py +++ b/src/uipath/tracing/_otel_exporters.py @@ -1,8 +1,10 @@ import json import logging import os +import sqlite3 import time -from typing import Any, Dict, Sequence +from abc import ABC, abstractmethod +from typing import Any, Dict, MutableMapping, Sequence import httpx from opentelemetry.sdk.trace import ReadableSpan @@ -89,3 +91,387 @@ def _get_base_url(self) -> str: uipath_url = uipath_url.rstrip("/") return uipath_url + + +class BaseSpanProcessor(ABC): + """Abstract base class for span processors. + + Defines the interface for processing spans with a single abstract method. + """ + + @abstractmethod + def process_span(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]: + """Process a span and return the transformed data. + + Args: + span_data: The span data to process + + Returns: + Processed span data + """ + pass + + +class UiPathSpanExporterBase(SpanExporter, ABC): + """Base class for UiPath span exporters.""" + + def __init__( + self, processor: BaseSpanProcessor | None = None, *args: Any, **kwargs: Any + ) -> None: + """Initializes the exporter with an optional span processor.""" + super().__init__(*args, **kwargs) + self._processor = processor + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + """Converts and exports spans. + + This method converts OpenTelemetry ReadableSpan objects to the UiPath + span format and then calls the subclass's implementation of + _export_uipath_spans. + """ + if not spans: + return SpanExportResult.SUCCESS + try: + uipath_spans = [ + _SpanUtils.otel_span_to_uipath_span(span).to_dict() for span in spans + ] + + processed_spans = uipath_spans + + if self._processor: + processed_spans = [ + self._processor.process_span(span) for span in uipath_spans + ] + + return self._export_uipath_spans(processed_spans) + except Exception as e: + logger.error(f"Failed to export spans: {e}", exc_info=True) + return SpanExportResult.FAILURE + + @abstractmethod + def _export_uipath_spans( + self, uipath_spans: list[Dict[str, Any]] + ) -> SpanExportResult: + """Exports a list of spans in UiPath format. + + Subclasses must implement this method to define the export mechanism + (e.g., sending over HTTP, writing to a file). + + Args: + uipath_spans: A list of spans, each represented as a dictionary. + + Returns: + The result of the export operation. + """ + raise NotImplementedError + + +class JsonFileExporter(UiPathSpanExporterBase): + """An exporter that writes spans to a file in JSON Lines format. + + This exporter is useful for debugging and local development. It serializes + each span to a JSON object and appends it as a new line in the specified + file. + """ + + def __init__(self, file_path: str, processor: BaseSpanProcessor | None = None): + """Initializes the JsonFileExporter. + + Args: + file_path: The path to the JSON file where spans will be written. + processor: Optional span processor for transforming spans. + """ + super().__init__(processor) + self.file_path = file_path + # Ensure the directory exists + os.makedirs(os.path.dirname(self.file_path), exist_ok=True) + + def _export_uipath_spans( + self, uipath_spans: list[Dict[str, Any]] + ) -> SpanExportResult: + """Exports UiPath spans to a JSON file. + + Args: + uipath_spans: A list of spans in UiPath format. + + Returns: + The result of the export operation. + """ + try: + with open(self.file_path, "a") as f: + for span in uipath_spans: + f.write(json.dumps(span) + "\n") + return SpanExportResult.SUCCESS + except Exception as e: + logger.error(f"Failed to export spans to {self.file_path}: {e}") + return SpanExportResult.FAILURE + + def shutdown(self) -> None: + """Shuts down the exporter.""" + pass + + +class SqliteExporter(UiPathSpanExporterBase): + """An exporter that writes spans to a SQLite database file. + + This exporter is useful for debugging and local development. It serializes + the spans and inserts them into a 'spans' table in the specified database. + """ + + # Schema version for the SQLite database + SCHEMA_VERSION = "1.0.0" + + def __init__(self, db_path: str, processor: BaseSpanProcessor | None = None): + """Initializes the SqliteExporter. + + Args: + db_path: The path to the SQLite database file. + processor: Optional span processor for transforming spans. + """ + super().__init__(processor) + self.db_path = db_path + # Ensure the directory exists + os.makedirs(os.path.dirname(self.db_path), exist_ok=True) + self._create_tables() + + def _create_tables(self): + """Creates the necessary tables if they don't already exist.""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Create metadata table + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS __uipath_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """ + ) + + # Create spans table + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS spans ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + trace_id TEXT, + span_id TEXT, + parent_span_id TEXT, + name TEXT, + start_time TEXT, + end_time TEXT, + span_type TEXT, + attributes TEXT + ) + """ + ) + + # Initialize or update schema version + self._initialize_metadata(cursor) + conn.commit() + + def _initialize_metadata(self, cursor: sqlite3.Cursor): + """Initialize or update metadata in the database. + + Args: + cursor: The SQLite cursor to use for database operations. + """ + import datetime + + current_time = ( + datetime.datetime.now(datetime.timezone.utc) + .isoformat() + .replace("+00:00", "Z") + ) # Check if schema_version already exists + cursor.execute("SELECT value FROM __uipath_meta WHERE key = 'schema_version'") + existing_version = cursor.fetchone() + + if existing_version: + # Update existing version if different + if existing_version[0] != self.SCHEMA_VERSION: + cursor.execute( + """ + UPDATE __uipath_meta + SET value = ?, updated_at = ? + WHERE key = 'schema_version' + """, + (self.SCHEMA_VERSION, current_time), + ) + logger.info( + f"Updated schema version from {existing_version[0]} to {self.SCHEMA_VERSION}" + ) + else: + # Insert new metadata entries + metadata_entries = [ + ("schema_version", self.SCHEMA_VERSION, current_time, current_time), + ("created_by", "UiPath SQLiteExporter", current_time, current_time), + ("exporter_class", self.__class__.__name__, current_time, current_time), + ] + + cursor.executemany( + """ + INSERT INTO __uipath_meta (key, value, created_at, updated_at) + VALUES (?, ?, ?, ?) + """, + metadata_entries, + ) + logger.info( + f"Initialized database with schema version {self.SCHEMA_VERSION}" + ) + + def get_schema_version(self) -> str | None: + """Get the current schema version from the database. + + Returns: + The schema version string, or None if not found. + """ + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT value FROM __uipath_meta WHERE key = 'schema_version'" + ) + result = cursor.fetchone() + return result[0] if result else None + except (sqlite3.Error, Exception) as e: + logger.warning(f"Failed to get schema version: {e}") + return None + + def get_metadata(self, key: str) -> str | None: + """Get a metadata value by key. + + Args: + key: The metadata key to retrieve. + + Returns: + The metadata value, or None if not found. + """ + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT value FROM __uipath_meta WHERE key = ?", (key,)) + result = cursor.fetchone() + return result[0] if result else None + except (sqlite3.Error, Exception) as e: + logger.warning(f"Failed to get metadata for key '{key}': {e}") + return None + + def set_metadata(self, key: str, value: str) -> bool: + """Set a metadata key-value pair. + + Args: + key: The metadata key. + value: The metadata value. + + Returns: + True if successful, False otherwise. + """ + try: + import datetime + + current_time = ( + datetime.datetime.now(datetime.timezone.utc) + .isoformat() + .replace("+00:00", "Z") + ) + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + """ + INSERT OR REPLACE INTO __uipath_meta (key, value, created_at, updated_at) + VALUES ( + ?, ?, + COALESCE((SELECT created_at FROM __uipath_meta WHERE key = ?), ?), + ? + ) + """, + (key, value, key, current_time, current_time), + ) + conn.commit() + return True + except (sqlite3.Error, Exception) as e: + logger.error(f"Failed to set metadata for key '{key}': {e}") + return False + + def list_metadata(self) -> Dict[str, Dict[str, str]]: + """List all metadata entries. + + Returns: + A dictionary mapping keys to metadata information including value and timestamps. + """ + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT key, value, created_at, updated_at FROM __uipath_meta ORDER BY key" + ) + results = cursor.fetchall() + + return { + row[0]: { + "value": row[1], + "created_at": row[2], + "updated_at": row[3], + } + for row in results + } + except (sqlite3.Error, Exception) as e: + logger.warning(f"Failed to list metadata: {e}") + return {} + + def _create_table(self): + """Legacy method for backward compatibility. Use _create_tables instead.""" + self._create_tables() + + def _export_uipath_spans( + self, uipath_spans: list[Dict[str, Any]] + ) -> SpanExportResult: + """Exports UiPath spans to a SQLite database. + + Args: + uipath_spans: A list of spans in UiPath format. + + Returns: + The result of the export operation. + """ + try: + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + for span in uipath_spans: + # The 'attributes' field is a JSON string, so we store it as TEXT. + attributes_json = span.get("attributes", "{}") + if not isinstance(attributes_json, str): + attributes_json = json.dumps(attributes_json) + + cursor.execute( + """ + INSERT INTO spans ( + trace_id, span_id, parent_span_id, name, + start_time, end_time, span_type, attributes + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + span.get("TraceId"), + span.get("SpanId"), + span.get("ParentSpanId"), + span.get("Name"), + span.get("StartTime"), + span.get("EndTime"), + span.get("SpanType"), + attributes_json, + ), + ) + conn.commit() + + return SpanExportResult.SUCCESS + except Exception as e: + logger.error(f"Failed to export spans to {self.db_path}: {e}") + return SpanExportResult.FAILURE + + def shutdown(self) -> None: + """Shuts down the exporter.""" + pass diff --git a/tests/tracing/test_otel_exporters.py b/tests/tracing/test_otel_exporters.py index 06620c72..12ab9aa3 100644 --- a/tests/tracing/test_otel_exporters.py +++ b/tests/tracing/test_otel_exporters.py @@ -1,11 +1,19 @@ +import json import os +import sqlite3 +import tempfile from unittest.mock import MagicMock, patch import pytest from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SpanExportResult -from uipath.tracing._otel_exporters import LlmOpsHttpExporter +from uipath.tracing._otel_exporters import ( + BaseSpanProcessor, + JsonFileExporter, + LlmOpsHttpExporter, + SqliteExporter, +) @pytest.fixture @@ -188,3 +196,578 @@ def test_send_with_retries_success(): exporter.http_client.post.assert_called_once_with( # type: ignore "http://example.com", json=[{"span": "data"}] ) + + +class MockSpanProcessor(BaseSpanProcessor): + """Mock span processor for testing.""" + + def process_span(self, span_data): + """Process span by adding a test field.""" + processed = span_data.copy() + processed["test_processed"] = True + # Also add to attributes to test serialization + if "attributes" not in processed: + processed["attributes"] = {} + if isinstance(processed["attributes"], dict): + processed["attributes"]["test_processed"] = True + return processed + + +@pytest.fixture +def mock_uipath_span(): + """Create a mock UiPath span for testing.""" + return { + "TraceId": "test-trace-id", + "SpanId": "test-span-id", + "ParentSpanId": "test-parent-span-id", + "Name": "test-span", + "StartTime": "2023-01-01T00:00:00Z", + "EndTime": "2023-01-01T00:01:00Z", + "SpanType": "test", + "attributes": {"key": "value"}, + } + + +@pytest.fixture +def mock_readable_span(): + """Create a mock ReadableSpan for testing.""" + span = MagicMock(spec=ReadableSpan) + return span + + +class TestJsonFileExporter: + """Test cases for JsonFileExporter.""" + + def test_init_creates_directory(self): + """Test that initialization creates the directory if it doesn't exist.""" + with tempfile.TemporaryDirectory() as temp_dir: + file_path = os.path.join(temp_dir, "subdir", "spans.jsonl") + exporter = JsonFileExporter(file_path) + + assert os.path.exists(os.path.dirname(file_path)) + assert exporter.file_path == file_path + + def test_init_with_processor(self): + """Test initialization with a span processor.""" + with tempfile.TemporaryDirectory() as temp_dir: + file_path = os.path.join(temp_dir, "spans.jsonl") + processor = MockSpanProcessor() + exporter = JsonFileExporter(file_path, processor) + + assert exporter.file_path == file_path + assert exporter._processor == processor + + def test_export_uipath_spans_success(self, mock_uipath_span): + """Test successful export of UiPath spans to JSON file.""" + with tempfile.NamedTemporaryFile( + mode="w", delete=False, suffix=".jsonl" + ) as temp_file: + file_path = temp_file.name + + try: + exporter = JsonFileExporter(file_path) + result = exporter._export_uipath_spans([mock_uipath_span]) + + assert result == SpanExportResult.SUCCESS + + # Verify file contents + with open(file_path, "r") as f: + lines = f.readlines() + assert len(lines) == 1 + loaded_span = json.loads(lines[0].strip()) + assert loaded_span == mock_uipath_span + finally: + os.unlink(file_path) + + def test_export_uipath_spans_multiple(self, mock_uipath_span): + """Test exporting multiple spans to JSON file.""" + span2 = mock_uipath_span.copy() + span2["SpanId"] = "test-span-id-2" + + with tempfile.NamedTemporaryFile( + mode="w", delete=False, suffix=".jsonl" + ) as temp_file: + file_path = temp_file.name + + try: + exporter = JsonFileExporter(file_path) + result = exporter._export_uipath_spans([mock_uipath_span, span2]) + + assert result == SpanExportResult.SUCCESS + + # Verify file contents + with open(file_path, "r") as f: + lines = f.readlines() + assert len(lines) == 2 + assert json.loads(lines[0].strip()) == mock_uipath_span + assert json.loads(lines[1].strip()) == span2 + finally: + os.unlink(file_path) + + def test_export_uipath_spans_append(self, mock_uipath_span): + """Test that spans are appended to existing file.""" + with tempfile.NamedTemporaryFile( + mode="w", delete=False, suffix=".jsonl" + ) as temp_file: + file_path = temp_file.name + # Write initial content + temp_file.write(json.dumps({"initial": "span"}) + "\n") + + try: + exporter = JsonFileExporter(file_path) + result = exporter._export_uipath_spans([mock_uipath_span]) + + assert result == SpanExportResult.SUCCESS + + # Verify file contents + with open(file_path, "r") as f: + lines = f.readlines() + assert len(lines) == 2 + assert json.loads(lines[0].strip()) == {"initial": "span"} + assert json.loads(lines[1].strip()) == mock_uipath_span + finally: + os.unlink(file_path) + + def test_export_with_processor(self, mock_readable_span, mock_uipath_span): + """Test export with span processor integration.""" + with tempfile.NamedTemporaryFile( + mode="w", delete=False, suffix=".jsonl" + ) as temp_file: + file_path = temp_file.name + + try: + processor = MockSpanProcessor() + exporter = JsonFileExporter(file_path, processor) + + with patch( + "uipath.tracing._otel_exporters._SpanUtils.otel_span_to_uipath_span" + ) as mock_converter: + mock_uipath_span_obj = MagicMock() + mock_uipath_span_obj.to_dict.return_value = mock_uipath_span + mock_converter.return_value = mock_uipath_span_obj + + result = exporter.export([mock_readable_span]) + + assert result == SpanExportResult.SUCCESS + + # Verify file contents include processor modification + with open(file_path, "r") as f: + lines = f.readlines() + assert len(lines) == 1 + loaded_span = json.loads(lines[0].strip()) + assert loaded_span["test_processed"] is True + finally: + os.unlink(file_path) + + def test_export_uipath_spans_file_error(self): + """Test export failure when file cannot be written.""" + # Use a Windows-specific invalid path + import platform + + if platform.system() == "Windows": + # Use an invalid character in filename + invalid_path = "C:/invalid<>path/spans.jsonl" + else: + invalid_path = "/invalid/path/spans.jsonl" + + # Don't call constructor as it will try to create directories + exporter = JsonFileExporter.__new__(JsonFileExporter) + exporter.file_path = invalid_path + exporter._processor = None + + result = exporter._export_uipath_spans([{"test": "span"}]) + assert result == SpanExportResult.FAILURE + + def test_shutdown(self): + """Test shutdown method.""" + with tempfile.NamedTemporaryFile(suffix=".jsonl") as temp_file: + exporter = JsonFileExporter(temp_file.name) + # Should not raise any exceptions + exporter.shutdown() + + +class TestSqliteExporter: + """Test cases for SqliteExporter.""" + + def test_init_creates_directory_and_table(self): + """Test that initialization creates directory and database table.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + # Create a path in a subdirectory that doesn't exist yet + base_dir = os.path.dirname(temp_file.name) + db_path = os.path.join(base_dir, "subdir", "spans.db") + + try: + exporter = SqliteExporter(db_path) + + assert os.path.exists(os.path.dirname(db_path)) + assert os.path.exists(db_path) + assert exporter.db_path == db_path + + # Verify table exists + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='spans'" + ) + assert cursor.fetchone() is not None + finally: + try: + os.unlink(db_path) + os.rmdir(os.path.dirname(db_path)) # Remove the subdirectory + except (PermissionError, FileNotFoundError, OSError): + pass # Ignore cleanup errors + + def test_init_with_processor(self): + """Test initialization with a span processor.""" + with tempfile.TemporaryDirectory() as temp_dir: + db_path = os.path.join(temp_dir, "spans.db") + processor = MockSpanProcessor() + exporter = SqliteExporter(db_path, processor) + + assert exporter.db_path == db_path + assert exporter._processor == processor + + def test_export_uipath_spans_success(self, mock_uipath_span): + """Test successful export of UiPath spans to SQLite database.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + exporter = SqliteExporter(db_path) + result = exporter._export_uipath_spans([mock_uipath_span]) + + assert result == SpanExportResult.SUCCESS + + # Verify database contents + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT * FROM spans") + rows = cursor.fetchall() + assert len(rows) == 1 + + row = rows[0] + assert row[1] == mock_uipath_span["TraceId"] # trace_id + assert row[2] == mock_uipath_span["SpanId"] # span_id + assert row[3] == mock_uipath_span["ParentSpanId"] # parent_span_id + assert row[4] == mock_uipath_span["Name"] # name + assert row[5] == mock_uipath_span["StartTime"] # start_time + assert row[6] == mock_uipath_span["EndTime"] # end_time + assert row[7] == mock_uipath_span["SpanType"] # span_type + assert ( + json.loads(row[8]) == mock_uipath_span["attributes"] + ) # attributes + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass # Ignore cleanup errors on Windows + + def test_export_uipath_spans_multiple(self, mock_uipath_span): + """Test exporting multiple spans to SQLite database.""" + span2 = mock_uipath_span.copy() + span2["SpanId"] = "test-span-id-2" + + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + exporter = SqliteExporter(db_path) + result = exporter._export_uipath_spans([mock_uipath_span, span2]) + + assert result == SpanExportResult.SUCCESS + + # Verify database contents + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT span_id FROM spans ORDER BY id") + rows = cursor.fetchall() + assert len(rows) == 2 + assert rows[0][0] == mock_uipath_span["SpanId"] + assert rows[1][0] == span2["SpanId"] + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass # Ignore cleanup errors on Windows + + def test_export_uipath_spans_attributes_serialization(self): + """Test that complex attributes are properly serialized.""" + span_with_complex_attrs = { + "TraceId": "test-trace-id", + "SpanId": "test-span-id", + "ParentSpanId": None, + "Name": "test-span", + "StartTime": "2023-01-01T00:00:00Z", + "EndTime": "2023-01-01T00:01:00Z", + "SpanType": "test", + "attributes": {"nested": {"key": "value"}, "list": [1, 2, 3]}, + } + + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + exporter = SqliteExporter(db_path) + result = exporter._export_uipath_spans([span_with_complex_attrs]) + + assert result == SpanExportResult.SUCCESS + + # Verify attributes serialization + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT attributes FROM spans") + row = cursor.fetchone() + stored_attrs = json.loads(row[0]) + assert stored_attrs == span_with_complex_attrs["attributes"] + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass # Ignore cleanup errors on Windows + + def test_export_with_processor(self, mock_readable_span, mock_uipath_span): + """Test export with span processor integration.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + processor = MockSpanProcessor() + exporter = SqliteExporter(db_path, processor) + + with patch( + "uipath.tracing._otel_exporters._SpanUtils.otel_span_to_uipath_span" + ) as mock_converter: + mock_uipath_span_obj = MagicMock() + mock_uipath_span_obj.to_dict.return_value = mock_uipath_span + mock_converter.return_value = mock_uipath_span_obj + + result = exporter.export([mock_readable_span]) + + assert result == SpanExportResult.SUCCESS + + # Verify processor was applied + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT attributes FROM spans") + row = cursor.fetchone() + stored_attrs = json.loads(row[0]) + # MockSpanProcessor adds test_processed field to attributes + assert "test_processed" in stored_attrs + assert stored_attrs["test_processed"] is True + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass # Ignore cleanup errors on Windows + + def test_export_uipath_spans_database_error(self): + """Test export failure when database cannot be accessed.""" + # Use an invalid path that will cause a database error + invalid_path = "/invalid/path/spans.db" + exporter = SqliteExporter.__new__(SqliteExporter) # Skip __init__ + exporter.db_path = invalid_path + exporter._processor = None + + result = exporter._export_uipath_spans([{"test": "span"}]) + assert result == SpanExportResult.FAILURE + + def test_shutdown(self): + """Test shutdown method.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + exporter = SqliteExporter(db_path) + # Should not raise any exceptions + exporter.shutdown() + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass # Ignore cleanup errors on Windows + + def test_create_table_idempotent(self): + """Test that _create_table can be called multiple times safely.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + exporter = SqliteExporter(db_path) + # Call _create_table again - should not raise an error + exporter._create_table() + + # Verify table still exists and is functional + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='spans'" + ) + assert cursor.fetchone() is not None + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass # Ignore cleanup errors on Windows + + def test_metadata_table_creation(self): + """Test that metadata table is created during initialization.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + exporter = SqliteExporter(db_path) + + # Verify metadata table exists + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='__uipath_meta'" + ) + assert cursor.fetchone() is not None + + # Verify schema version was set + cursor.execute( + "SELECT value FROM __uipath_meta WHERE key = 'schema_version'" + ) + version = cursor.fetchone() + assert version is not None + assert version[0] == SqliteExporter.SCHEMA_VERSION + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass + + def test_get_schema_version(self): + """Test getting the schema version.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + exporter = SqliteExporter(db_path) + version = exporter.get_schema_version() + assert version == SqliteExporter.SCHEMA_VERSION + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass + + def test_get_schema_version_nonexistent_db(self): + """Test getting schema version from non-existent database.""" + invalid_path = "/invalid/path/nonexistent.db" + exporter = SqliteExporter.__new__(SqliteExporter) # Skip __init__ + exporter.db_path = invalid_path + + version = exporter.get_schema_version() + assert version is None + + def test_metadata_operations(self): + """Test setting, getting, and listing metadata.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + exporter = SqliteExporter(db_path) + + # Test setting metadata + assert exporter.set_metadata("test_key", "test_value") is True + assert exporter.set_metadata("another_key", "another_value") is True + + # Test getting metadata + assert exporter.get_metadata("test_key") == "test_value" + assert exporter.get_metadata("another_key") == "another_value" + assert exporter.get_metadata("nonexistent_key") is None + + # Test updating existing metadata + assert exporter.set_metadata("test_key", "updated_value") is True + assert exporter.get_metadata("test_key") == "updated_value" + + # Test listing metadata + metadata = exporter.list_metadata() + assert "test_key" in metadata + assert "another_key" in metadata + assert "schema_version" in metadata + assert "created_by" in metadata + assert "exporter_class" in metadata + + assert metadata["test_key"]["value"] == "updated_value" + assert metadata["another_key"]["value"] == "another_value" + assert "created_at" in metadata["test_key"] + assert "updated_at" in metadata["test_key"] + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass + + def test_metadata_operations_with_database_errors(self): + """Test metadata operations with database errors.""" + invalid_path = "/invalid/path/nonexistent.db" + exporter = SqliteExporter.__new__(SqliteExporter) # Skip __init__ + exporter.db_path = invalid_path + + # Test operations with invalid database + assert exporter.get_metadata("test_key") is None + assert exporter.set_metadata("test_key", "test_value") is False + assert exporter.list_metadata() == {} + + def test_schema_version_update(self): + """Test schema version update when version changes.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + # Create database with initial version + exporter1 = SqliteExporter(db_path) + initial_version = exporter1.get_schema_version() + assert initial_version == SqliteExporter.SCHEMA_VERSION + + # Manually update the version in the database to simulate an older version + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "UPDATE __uipath_meta SET value = ? WHERE key = 'schema_version'", + ("0.9.0",), + ) + conn.commit() + + # Verify the old version is stored + assert exporter1.get_schema_version() == "0.9.0" + + # Create a new exporter instance - should update the version + exporter2 = SqliteExporter(db_path) + updated_version = exporter2.get_schema_version() + assert updated_version == SqliteExporter.SCHEMA_VERSION + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass + + def test_default_metadata_entries(self): + """Test that default metadata entries are created.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file: + db_path = temp_file.name + + try: + exporter = SqliteExporter(db_path) + metadata = exporter.list_metadata() + + # Check default entries + assert metadata["schema_version"]["value"] == SqliteExporter.SCHEMA_VERSION + assert metadata["created_by"]["value"] == "UiPath SQLiteExporter" + assert metadata["exporter_class"]["value"] == "SqliteExporter" + + # Check that timestamps are present + for key in ["schema_version", "created_by", "exporter_class"]: + assert "created_at" in metadata[key] + assert "updated_at" in metadata[key] + # Basic timestamp format check (ISO format) + assert "T" in metadata[key]["created_at"] + assert metadata[key]["created_at"].endswith("Z") + finally: + try: + os.unlink(db_path) + except (PermissionError, FileNotFoundError): + pass From 8752a1baa117b11141ba8554c34e0e60a1e9c9c7 Mon Sep 17 00:00:00 2001 From: ionmincu Date: Fri, 29 Aug 2025 15:24:09 +0300 Subject: [PATCH 2/2] feat(tracing): add common processor --- src/uipath/tracing/__init__.py | 2 + src/uipath/tracing/_otel_exporters.py | 365 +++++++++++++++- src/uipath/tracing/_traced.py | 31 +- .../tracing/test_langchain_span_processor.py | 409 ++++++++++++++++++ 4 files changed, 791 insertions(+), 16 deletions(-) create mode 100644 tests/tracing/test_langchain_span_processor.py diff --git a/src/uipath/tracing/__init__.py b/src/uipath/tracing/__init__.py index 8314be5e..6a21b52c 100644 --- a/src/uipath/tracing/__init__.py +++ b/src/uipath/tracing/__init__.py @@ -1,5 +1,6 @@ from ._otel_exporters import ( # noqa: D104 BaseSpanProcessor, + CommonSpanProcessor, JsonFileExporter, LlmOpsHttpExporter, SqliteExporter, @@ -12,6 +13,7 @@ "wait_for_tracers", "LlmOpsHttpExporter", "BaseSpanProcessor", + "CommonSpanProcessor", "JsonFileExporter", "SqliteExporter", ] diff --git a/src/uipath/tracing/_otel_exporters.py b/src/uipath/tracing/_otel_exporters.py index e5eaeb5f..a1f94f7e 100644 --- a/src/uipath/tracing/_otel_exporters.py +++ b/src/uipath/tracing/_otel_exporters.py @@ -12,6 +12,7 @@ SpanExporter, SpanExportResult, ) +from typing_extensions import override from uipath._utils._ssl_context import get_httpx_client_kwargs @@ -99,17 +100,369 @@ class BaseSpanProcessor(ABC): Defines the interface for processing spans with a single abstract method. """ - @abstractmethod - def process_span(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]: - """Process a span and return the transformed data. + def __init__( + self, + dump_attributes_as_string: bool = True, + unflatten_attributes: bool = True, + map_json_fields: bool = True, + ): + self._dump_attributes_as_string = dump_attributes_as_string + self._unflatten_attributes = unflatten_attributes + self._map_json_fields = map_json_fields + + def try_convert_json(self, flat_dict: Dict[str, Any]) -> Dict[str, Any]: + """Tries to convert stringified JSON values in a flattened dictionary back to their original types. Args: - span_data: The span data to process + flat_dict: A dictionary with potentially stringified JSON values. Returns: - Processed span data + A new dictionary with JSON strings converted to their original types. """ - pass + result = {} + for key, value in flat_dict.items(): + if isinstance(value, str): + try: + result[key] = json.loads(value) + except json.JSONDecodeError: + result[key] = value + else: + result[key] = value + return result + + def unflatten_dict(self, flat_dict: Dict[str, Any]) -> Dict[str, Any]: + """Converts a flattened dictionary with dot-separated keys into a nested dictionary. + + Args: + flat_dict: Dictionary with dot-separated keys (e.g., 'llm.output_messages.0.message.content') + + Returns: + Nested dictionary structure + + Example: + Input: {'llm.output_messages.0.message.content': 'hello', 'llm.model': 'gpt-4'} + Output: {'llm': {'output_messages': [{'message': {'content': 'hello'}}], 'model': 'gpt-4'}} + """ + result = {} + + for key, value in flat_dict.items(): + # Split the key by dots + parts = key.split(".") + current = result + + # Navigate/create the nested structure + for i, part in enumerate(parts[:-1]): + # Check if this part represents an array index + if part.isdigit(): + # Convert to integer index + index = int(part) + # Ensure the parent is a list + if not isinstance(current, list): + raise ValueError( + f"Expected list but found {type(current)} for key: {key}" + ) + # Extend the list if necessary + while len(current) <= index: + current.append(None) + + # If the current element is None, we need to create a structure for it + if current[index] is None: + # Look ahead to see if the next part is a digit (array index) + next_part = parts[i + 1] if i + 1 < len(parts) else None + if next_part and next_part.isdigit(): + current[index] = [] + else: + current[index] = {} + + current = current[index] + else: + # Regular dictionary key + if part not in current: + # Look ahead to see if the next part is a digit (array index) + next_part = parts[i + 1] if i + 1 < len(parts) else None + if next_part and next_part.isdigit(): + current[part] = [] + else: + current[part] = {} + current = current[part] # Set the final value + + final_key = parts[-1] + if final_key.isdigit(): + # If the final key is a digit, we're setting an array element + index = int(final_key) + if not isinstance(current, list): + raise ValueError( + f"Expected list but found {type(current)} for key: {key}" + ) + while len(current) <= index: + current.append(None) + current[index] = value + else: + # Regular key assignment + current[final_key] = value + + return result + + def safe_get(self, data: Dict[str, Any], path: str, default=None): + """Safely get nested value using dot notation.""" + keys = path.split(".") + current = data + for key in keys: + if isinstance(current, dict) and key in current: + current = current[key] + else: + return default + return current + + def safe_parse_json(self, value): + """Safely parse JSON string.""" + if isinstance(value, str): + try: + return json.loads(value.replace("'", '"')) + except json.JSONDecodeError: + return value + return value + + @abstractmethod + def process_span(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]: + return span_data + + +class CommonSpanProcessor(BaseSpanProcessor): + """A class to process spans, applying custom attribute and type mappings. + + This processor can transform flattened attribute keys (e.g., 'llm.output_messages.0.message.role') + into nested dictionary structures for easier access and processing. + + Example usage: + # With unflattening enabled + processor = LangchainSpanProcessor(unflatten_attributes=True, dump_attributes_as_string=False) + processed_span = processor.process_span(span_data) + + # Access nested attributes naturally: + role = processed_span['attributes']['llm']['output_messages'][0]['message']['role'] + + # Without unflattening (original behavior) + processor = LangchainSpanProcessor(unflatten_attributes=False) + processed_span = processor.process_span(span_data) + + # Access with flattened keys: + role = processed_span['attributes']['llm.output_messages.0.message.role'] + """ + + # Mapping of old attribute names to new attribute names or (new name, function) + ATTRIBUTE_MAPPING = { + "input.value": ("input", lambda s: json.loads(s)), + "output.value": ("output", lambda s: json.loads(s)), + "llm.model_name": "model", + } + + # Mapping of span types + SPAN_TYPE_MAPPING = { + "LLM": "completion", + "TOOL": "toolCall", + # Add more mappings as needed + } + + def __init__( + self, + dump_attributes_as_string: bool = True, + unflatten_attributes: bool = True, + map_json_fields: bool = True, + ): + """Initializes the LangchainSpanProcessor. + + Args: + dump_attributes_as_string: If True, dumps attributes as a JSON string. + Otherwise, attributes are set as a dictionary. + unflatten_attributes: If True, converts flattened dot-separated keys + into nested dictionary structures. + map_json_fields: If True, applies JSON field mapping transformations + for tool calls and LLM calls. + """ + self._dump_attributes_as_string = dump_attributes_as_string + self._unflatten_attributes = unflatten_attributes + self._map_json_fields = map_json_fields + + def extract_attributes(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]: + """Extract and parse attributes from span_data, checking both 'Attributes' and 'attributes' keys.""" + for key in ["Attributes", "attributes"]: + if key in span_data: + value = span_data.pop(key) + if isinstance(value, str): + try: + parsed_value = json.loads(value) + return parsed_value if isinstance(parsed_value, dict) else {} + except json.JSONDecodeError: + logger.warning(f"Failed to parse attributes JSON: {value}") + return {} + elif isinstance(value, dict): + return value + else: + return {} + return {} + + @override + def process_span(self, span_data: MutableMapping[str, Any]) -> Dict[str, Any]: + logger.info(f"Processing span: {span_data}") + attributes = self.extract_attributes(span_data) + + if attributes and isinstance(attributes, dict): + if "openinference.span.kind" in attributes: + # Remove the span kind attribute + span_type = attributes["openinference.span.kind"] + # Map span type using SPAN_TYPE_MAPPING + span_data["SpanType"] = self.SPAN_TYPE_MAPPING.get(span_type, span_type) + del attributes["openinference.span.kind"] + + # Apply the transformation logic + for old_key, mapping in self.ATTRIBUTE_MAPPING.items(): + if old_key in attributes: + if isinstance(mapping, tuple): + new_key, func = mapping + try: + attributes[new_key] = func(attributes[old_key]) + except Exception: + attributes[new_key] = attributes[old_key] + else: + new_key = mapping + attributes[new_key] = attributes[old_key] + del attributes[old_key] + + if attributes: + # Apply unflattening if requested (before JSON field mapping) + if self._unflatten_attributes: + try: + attributes = self.try_convert_json(attributes) + attributes = self.unflatten_dict(attributes) + except Exception as e: + logger.warning(f"Failed to unflatten attributes: {e}") + + # Set attributes in span_data as dictionary for JSON field mapping + span_data["attributes"] = attributes + + # Apply JSON field mapping before final serialization + if self._map_json_fields: + span_data = self.map_json_fields_from_attributes(span_data) + + # Convert back to JSON string if requested (after all transformations) + if self._dump_attributes_as_string: + span_data["attributes"] = json.dumps(span_data["attributes"]) + + return span_data + + def map_tool_call_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]: + """Simple tool call mapping - just add new fields.""" + result = attributes.copy() # Keep originals + + # Add new fields + result["type"] = "toolCall" + result["callId"] = attributes.get("call_id") or attributes.get("id") + result["toolName"] = self.safe_get(attributes, "tool.name") + result["arguments"] = self.safe_parse_json(attributes.get("input", "{}")) + result["toolType"] = "Integration" + result["result"] = self.safe_parse_json(attributes.get("output")) + result["error"] = None + + return result + + def map_llm_call_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]: + """Simple LLM call mapping - just add new fields.""" + result = attributes.copy() # Keep originals + + # Transform token usage data if present (after unflattening) + # Use safe_get to extract token count values from nested structure + prompt_tokens = self.safe_get(attributes, "llm.token_count.prompt") + completion_tokens = self.safe_get(attributes, "llm.token_count.completion") + total_tokens = self.safe_get(attributes, "llm.token_count.total") + + usage = { + "promptTokens": prompt_tokens, + "completionTokens": completion_tokens, + "totalTokens": total_tokens, + "isByoExecution": False, + "executionDeploymentType": "PAYGO", + "isPiiMasked": False, + } + + # remove None values + usage = {k: v for k, v in usage.items() if v is not None} + + result["usage"] = usage + + # Add new fields + result["input"] = self.safe_get(attributes, "llm.input_messages") + result["output"] = self.safe_get(attributes, "llm.output_messages") + + result["type"] = "completion" + result["model"] = self.safe_get(attributes, "llm.invocation_parameters.model") + + # Settings + settings = {} + max_tokens = self.safe_get(attributes, "llm.invocation_parameters.max_tokens") + temperature = self.safe_get(attributes, "llm.invocation_parameters.temperature") + if max_tokens: + settings["maxTokens"] = max_tokens + if temperature is not None: + settings["temperature"] = temperature + if settings: + result["settings"] = settings + + # Tool calls (simplified) + tool_calls = [] + output_msgs = self.safe_get(attributes, "llm.output_messages", []) + for msg in output_msgs: + msg_tool_calls = self.safe_get(msg, "message.tool_calls", []) + for tc in msg_tool_calls: + tool_call_data = tc.get("tool_call", {}) + tool_calls.append( + { + "id": tool_call_data.get("id"), + "name": self.safe_get(tool_call_data, "function.name"), + "arguments": self.safe_get( + tool_call_data, "function.arguments", {} + ), + } + ) + if tool_calls: + result["toolCalls"] = tool_calls + + # Usage (enhance existing if not created above) + if "usage" in result: + usage = result["usage"] + if isinstance(usage, dict): + usage.setdefault("isByoExecution", False) + usage.setdefault("executionDeploymentType", "PAYGO") + usage.setdefault("isPiiMasked", False) + + return result + + def map_json_fields_from_attributes( + self, span_data: Dict[str, Any] + ) -> Dict[str, Any]: + """Simple mapping dispatcher.""" + if "attributes" not in span_data: + return span_data + + attributes = span_data["attributes"] + + # Parse if string + if isinstance(attributes, str): + try: + attributes = json.loads(attributes) + except json.JSONDecodeError: + return span_data + + if not isinstance(attributes, dict): + return span_data + + # Simple detection and mapping + if "tool" in attributes or span_data.get("SpanType") == "toolCall": + span_data["attributes"] = self.map_tool_call_attributes(attributes) + elif "llm" in attributes or span_data.get("SpanType") == "completion": + span_data["attributes"] = self.map_llm_call_attributes(attributes) + + return span_data class UiPathSpanExporterBase(SpanExporter, ABC): diff --git a/src/uipath/tracing/_traced.py b/src/uipath/tracing/_traced.py index 92bc768b..05f25b1b 100644 --- a/src/uipath/tracing/_traced.py +++ b/src/uipath/tracing/_traced.py @@ -5,7 +5,8 @@ from functools import wraps from typing import Any, Callable, List, Optional, Tuple -from opentelemetry import trace +from opentelemetry import context, trace +from opentelemetry.trace import set_span_in_context from ._utils import _SpanUtils @@ -55,22 +56,32 @@ def register_current_span_provider( @classmethod def get_parent_context(cls): - """Get the parent context using the registered current span provider. + """Get the parent context for starting new spans. Returns: - Context object with the current span set, or None if no provider is registered. + The current OpenTelemetry context if available, or context with external span if registered. """ + # First, get the current OpenTelemetry context + current_context = context.get_current() + + # Check if there's already a span in the current context + current_span = trace.get_current_span(current_context) + + # If we have a valid span in the current context, use it + if current_span is not None and current_span.is_recording(): + return current_context + + # Only fall back to the external span provider if no active OTel span if cls._current_span_provider is not None: try: - current_span = cls._current_span_provider() - if current_span is not None: - from opentelemetry.trace import set_span_in_context - - return set_span_in_context(current_span) + external_span = cls._current_span_provider() + if external_span is not None: + return set_span_in_context(external_span, current_context) except Exception as e: logger.warning(f"Error getting current span from provider: {e}") - return None - return None + + # Return current context (might be empty, but that's fine) + return current_context @classmethod def register_traced_function(cls, original_func, decorated_func, params): diff --git a/tests/tracing/test_langchain_span_processor.py b/tests/tracing/test_langchain_span_processor.py new file mode 100644 index 00000000..a0cba2a8 --- /dev/null +++ b/tests/tracing/test_langchain_span_processor.py @@ -0,0 +1,409 @@ +"""Tests for LangchainSpanProcessor.""" + +import json +from platform import processor + +import pytest + +from uipath.tracing._otel_exporters import CommonSpanProcessor + +processor = CommonSpanProcessor() + + +class TestUnflattenDict: + """Test the unflatten_dict utility function.""" + + def test_simple_unflatten(self): + """Test basic unflattening functionality.""" + flat_dict = {"user.name": "John", "user.age": 30, "settings.theme": "dark"} + + result = processor.unflatten_dict(flat_dict) + + assert result == { + "user": {"name": "John", "age": 30}, + "settings": {"theme": "dark"}, + } + + def test_array_unflatten(self): + """Test unflattening with array indices.""" + flat_dict = { + "items.0.name": "first", + "items.0.value": 1, + "items.1.name": "second", + "items.1.value": 2, + } + + result = processor.unflatten_dict(flat_dict) + + expected = { + "items": [{"name": "first", "value": 1}, {"name": "second", "value": 2}] + } + assert result == expected + + def test_nested_arrays(self): + """Test deeply nested structures with arrays.""" + flat_dict = { + "llm.messages.0.content": "hello", + "llm.messages.0.tools.0.name": "tool1", + "llm.messages.0.tools.1.name": "tool2", + "llm.provider": "azure", + } + + result = processor.unflatten_dict(flat_dict) + + expected = { + "llm": { + "messages": [ + { + "content": "hello", + "tools": [{"name": "tool1"}, {"name": "tool2"}], + } + ], + "provider": "azure", + } + } + assert result == expected + + def test_sparse_arrays(self): + """Test arrays with gaps in indices.""" + flat_dict = {"items.0.name": "first", "items.2.name": "third"} + + result = processor.unflatten_dict(flat_dict) + + expected = {"items": [{"name": "first"}, None, {"name": "third"}]} + assert result == expected + + def test_empty_dict(self): + """Test with empty dictionary.""" + result = processor.unflatten_dict({}) + assert result == {} + + def test_single_level_keys(self): + """Test with keys that don't need unflattening.""" + flat_dict = {"name": "value", "number": 42} + result = processor.unflatten_dict(flat_dict) + assert result == flat_dict + + +class TestLangchainSpanProcessor: + """Test the LangchainSpanProcessor class.""" + + def test_init_defaults(self): + """Test initialization with default parameters.""" + processor = CommonSpanProcessor() + assert processor._dump_attributes_as_string is True + assert processor._unflatten_attributes is True + + def test_init_custom_params(self): + """Test initialization with custom parameters.""" + processor = CommonSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + assert processor._dump_attributes_as_string is False + assert processor._unflatten_attributes is True + + def test_process_span_without_attributes(self): + """Test processing span without attributes.""" + processor = CommonSpanProcessor() + span_data = {"Id": "test-id", "Name": "TestSpan"} + + result = processor.process_span(span_data) + assert result == span_data + + def test_process_span_with_unflatten_disabled(self): + """Test processing span with unflattening disabled.""" + processor = CommonSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=False + ) + + attributes = { + "llm.output_messages.0.role": "assistant", + "llm.provider": "azure", + "model": "gpt-4", + } + + span_data = {"Id": "test-id", "Attributes": json.dumps(attributes)} + + result = processor.process_span(span_data) + + # Should keep flattened structure + assert result["attributes"]["llm.output_messages.0.role"] == "assistant" + assert result["attributes"]["llm.provider"] == "azure" + assert result["attributes"]["model"] == "gpt-4" + + def test_process_span_with_unflatten_and_json_output(self): + """Test processing span with unflattening and JSON string output.""" + processor = CommonSpanProcessor( + dump_attributes_as_string=True, unflatten_attributes=True + ) + + attributes = {"llm.provider": "azure", "llm.messages.0.role": "user"} + + span_data = {"Id": "test-id", "Attributes": json.dumps(attributes)} + + result = processor.process_span(span_data) + + # Should be JSON string + assert isinstance(result["attributes"], str) + + # Parse and verify nested structure + parsed = json.loads(result["attributes"]) + assert parsed["llm"]["provider"] == "azure" + assert parsed["llm"]["messages"][0]["role"] == "user" + + def test_token_usage_processing_with_unflatten(self): + """Test token usage processing with unflattening.""" + processor = CommonSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + + attributes = { + "llm.token_count.prompt": 100, + "llm.token_count.completion": 50, + "llm.token_count.total": 150, + "llm.provider": "azure", + } + + span_data = {"Id": "test-id", "Attributes": json.dumps(attributes)} + + result = processor.process_span(span_data) + attrs = result["attributes"] + + # Check usage structure + assert attrs["usage"]["promptTokens"] == 100 + assert attrs["usage"]["completionTokens"] == 50 + assert attrs["usage"]["totalTokens"] == 150 + assert attrs["usage"]["isByoExecution"] is False + + # Check unflattening of other attributes + assert attrs["llm"]["provider"] == "azure" + + def test_unflatten_error_handling(self): + """Test error handling in unflattening.""" + processor = CommonSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + + # Create a scenario that might cause unflattening issues + # This should be handled gracefully + attributes = {"normal.key": "value", "llm.provider": "azure"} + + span_data = {"Id": "test-id", "Attributes": json.dumps(attributes)} + + # Should not raise an exception + result = processor.process_span(span_data) + assert "attributes" in result + + def test_process_span_with_dict_attributes_unflatten_enabled(self): + """Test processing span with dictionary attributes and unflattening enabled.""" + processor = CommonSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + + # Simulate the real-world case where Attributes is already a dictionary + attributes = { + "llm.output_messages.0.message.role": "assistant", + "llm.output_messages.0.message.tool_calls.0.tool_call.id": "call_123", + "llm.output_messages.0.message.tool_calls.0.tool_call.function.name": "get_time", + "llm.provider": "azure", + "model": "gpt-4", + } + + span_data = { + "Id": "test-id", + "Attributes": attributes, # Already a dictionary, not a JSON string + } + + result = processor.process_span(span_data) + + # Should have nested structure + attrs = result["attributes"] + assert attrs["llm"]["output_messages"][0]["message"]["role"] == "assistant" + assert ( + attrs["llm"]["output_messages"][0]["message"]["tool_calls"][0]["tool_call"][ + "id" + ] + == "call_123" + ) + assert ( + attrs["llm"]["output_messages"][0]["message"]["tool_calls"][0]["tool_call"][ + "function" + ]["name"] + == "get_time" + ) + assert attrs["llm"]["provider"] == "azure" + + def test_real_world_trace_unflatten(self): + """Test with real-world trace data to verify unflattening works correctly.""" + processor = CommonSpanProcessor( + dump_attributes_as_string=False, unflatten_attributes=True + ) + + # Real trace data from user's example (dictionary format) + real_trace_attributes = { + "input.mime_type": "application/json", + "output.mime_type": "application/json", + "llm.input_messages.0.message.role": "user", + "llm.input_messages.0.message.content": "You are a helpful assistant with access to various tools. \n The user is asking about: Weather and Technology\n \n Please use the available tools to gather some relevant information. For example:\n - Check the current time\n - Generate a random number if relevant\n - Calculate squares of numbers if needed\n - Get weather information for any cities mentioned\n \n Use at least 2-3 tools to demonstrate their functionality.", + "llm.output_messages.0.message.role": "assistant", + "llm.output_messages.0.message.tool_calls.0.tool_call.id": "call_qWaFnNRY8mk2PQjEu0wRLaRd", + "llm.output_messages.0.message.tool_calls.0.tool_call.function.name": "get_current_time", + "llm.output_messages.0.message.tool_calls.1.tool_call.id": "call_3ckaPILSv4SmyeufQf1ovA3H", + "llm.output_messages.0.message.tool_calls.1.tool_call.function.name": "generate_random_number", + "llm.output_messages.0.message.tool_calls.1.tool_call.function.arguments": '{"min_val": 1, "max_val": 10}', + "llm.output_messages.0.message.tool_calls.2.tool_call.id": "call_BjaiJ0NHwWs14fMbCyjDElEX", + "llm.output_messages.0.message.tool_calls.2.tool_call.function.name": "get_weather_info", + "llm.output_messages.0.message.tool_calls.2.tool_call.function.arguments": '{"city": "San Francisco"}', + "llm.invocation_parameters": '{"model": "gpt-4o-mini-2024-07-18", "url": "https://alpha.uipath.com/..."}', + "llm.tools.0.tool.json_schema": '{"type": "function", "function": {"name": "get_current_time", "description": "Get the current date and time.", "parameters": {"properties": {}, "type": "object"}}}', + "llm.tools.1.tool.json_schema": '{"type": "function", "function": {"name": "generate_random_number", "description": "Generate a random number between min_val and max_val (inclusive).", "parameters": {"properties": {"min_val": {"default": 1, "type": "integer"}, "max_val": {"default": 100, "type": "integer"}}, "type": "object"}}}', + "llm.tools.2.tool.json_schema": '{"type": "function", "function": {"name": "calculate_square", "description": "Calculate the square of a given number.", "parameters": {"properties": {"number": {"type": "number"}}, "required": ["number"], "type": "object"}}}', + "llm.tools.3.tool.json_schema": '{"type": "function", "function": {"name": "get_weather_info", "description": "Get mock weather information for a given city.", "parameters": {"properties": {"city": {"type": "string"}}, "required": ["city"], "type": "object"}}}', + "llm.provider": "azure", + "llm.system": "openai", + "session.id": "a879985a-8d39-4f51-94e1-8433423f35db", + "metadata": '{"thread_id": "a879985a-8d39-4f51-94e1-8433423f35db", "langgraph_step": 1, "langgraph_node": "make_tool_calls"}', + "model": "gpt-4o-mini-2024-07-18", + "usage": { + "promptTokens": 219, + "completionTokens": 66, + "totalTokens": 285, + "isByoExecution": False, + }, + } + + span_data = { + "PermissionStatus": 0, + "Id": "7d137190-348c-4ef2-9b19-165295643b82", + "TraceId": "81dbeaf2-c2ba-4b1e-95fd-b722f53dc405", + "ParentId": "f71478d6-f081-4bf6-a942-0944d97ffadb", + "Name": "UiPathChat", + "StartTime": "2025-08-26T16:11:17.276Z", + "EndTime": "2025-08-26T16:11:20.027Z", + "Attributes": real_trace_attributes, # Dictionary format (not JSON string) + "SpanType": "completion", + } + + # Process the span + result = processor.process_span(span_data) + + # Verify the trace data structure is preserved + assert result["Id"] == "7d137190-348c-4ef2-9b19-165295643b82" + assert result["Name"] == "UiPathChat" + assert result["SpanType"] == "completion" + + # Verify attributes are unflattened and accessible + attrs = result["attributes"] + assert isinstance(attrs, dict) + + # Test LLM provider info + assert attrs["llm"]["provider"] == "azure" + assert attrs["llm"]["system"] == "openai" + + # Test input messages + input_messages = attrs["llm"]["input_messages"] + assert len(input_messages) == 1 + assert input_messages[0]["message"]["role"] == "user" + assert "helpful assistant" in input_messages[0]["message"]["content"] + + # Test output messages and tool calls + output_messages = attrs["llm"]["output_messages"] + assert len(output_messages) == 1 + assert output_messages[0]["message"]["role"] == "assistant" + + tool_calls = output_messages[0]["message"]["tool_calls"] + assert len(tool_calls) == 3 + + # Verify individual tool calls + assert tool_calls[0]["tool_call"]["function"]["name"] == "get_current_time" + assert tool_calls[0]["tool_call"]["id"] == "call_qWaFnNRY8mk2PQjEu0wRLaRd" + + assert ( + tool_calls[1]["tool_call"]["function"]["name"] == "generate_random_number" + ) + + assert tool_calls[2]["tool_call"]["function"]["name"] == "get_weather_info" + # Test tools schema + tools = attrs["llm"]["tools"] + assert len(tools) == 4 + + # Test session data + assert attrs["session"]["id"] == "a879985a-8d39-4f51-94e1-8433423f35db" + + def test_invalid_json_attributes(self): + """Test handling of invalid JSON in attributes.""" + processor = CommonSpanProcessor(unflatten_attributes=True) + + span_data = {"Id": "test-id", "Attributes": "invalid json {"} + + # Should handle gracefully and return original span + # Note: invalid JSON causes the Attributes key to be removed + result = processor.process_span(span_data) + assert result["Id"] == "test-id" + assert "Attributes" not in result # Attributes key is removed on invalid JSON + + def test_process_span_with_provided_json(self): + """Test processing a span with the user-provided JSON data.""" + # Test with default settings (unflatten=True, dump_as_string=True) + processor = CommonSpanProcessor() + + input_data = { + "Id": "f4b31bcb-caaa-4979-8a33-099ef4eed977", + "TraceId": "8164a1dd-fb29-42fd-b49a-afdccafe486e", + "ParentId": "8d82c004-bb59-497b-b290-ab02b3699543", + "Name": "UiPathChat", + "StartTime": "2025-08-28T15:10:39.972276", + "EndTime": "2025-08-28T15:10:42.270506", + "Attributes": '{"input.value": "{\\"messages\\": [[{\\"lc\\": 1, \\"type\\": \\"constructor\\", \\"id\\": [\\"langchain\\", \\"schema\\", \\"messages\\", \\"HumanMessage\\"], \\"kwargs\\": {\\"content\\": \\"You are a helpful assistant with access to various tools. \\\\n The user is asking about: Weather and Technology\\\\n \\\\n Please use the available tools to gather some relevant information. For example:\\\\n - Check the current time\\\\n - Generate a random number if relevant\\\\n - Calculate squares of numbers if needed\\\\n - Get weather information for any cities mentioned\\\\n \\\\n Use at least 2-3 tools to demonstrate their functionality.\\", \\"type\\": \\"human\\"}}]]}", "input.mime_type": "application/json", "output.value": "{\\"generations\\": [[{\\"text\\": \\"\\", \\"generation_info\\": null, \\"type\\": \\"ChatGeneration\\", \\"message\\": {\\"lc\\": 1, \\"type\\": \\"constructor\\", \\"id\\": [\\"langchain\\", \\"schema\\", \\"messages\\", \\"AIMessage\\"], \\"kwargs\\": {\\"content\\": \\"\\", \\"response_metadata\\": {\\"token_usage\\": {\\"completion_tokens\\": 66, \\"prompt_tokens\\": 219, \\"total_tokens\\": 285, \\"cache_read_input_tokens\\": 0}, \\"model_name\\": \\"gpt-4o-mini-2024-07-18\\", \\"finish_reason\\": \\"tool_calls\\", \\"system_fingerprint\\": \\"chatcmpl-C9YYq3MkM9laikeNVJ72xJJvYmWsO\\", \\"created\\": 1756393840}, \\"type\\": \\"ai\\", \\"id\\": \\"run--70aec293-656e-4dcb-a253-8317bdda4295-0\\", \\"tool_calls\\": [{\\"id\\": \\"call_kqqmcYwjzusMABpJC5nQJJM6\\", \\"name\\": \\"get_current_time\\", \\"args\\": {}, \\"type\\": \\"tool_call\\"}, {\\"id\\": \\"call_RsgILurSpogORi2FBPXLfTju\\", \\"name\\": \\"generate_random_number\\", \\"args\\": {\\"min_val\\": 1, \\"max_val\\": 10}, \\"type\\": \\"tool_call\\"}, {\\"id\\": \\"call_BgBqVNMqIzL151D12rMLW4Dg\\", \\"name\\": \\"get_weather_info\\", \\"args\\": {\\"city\\": \\"New York\\"}, \\"type\\": \\"tool_call\\"}], \\"usage_metadata\\": {\\"input_tokens\\": 219, \\"output_tokens\\": 66, \\"total_tokens\\": 285}, \\"invalid_tool_calls\\": []}}}]], \\"llm_output\\": null, \\"run\\": null, \\"type\\": \\"LLMResult\\"}", "output.mime_type": "application/json", "llm.input_messages.0.message.role": "user", "llm.input_messages.0.message.content": "You are a helpful assistant with access to various tools. \\n The user is asking about: Weather and Technology\\n \\n Please use the available tools to gather some relevant information. For example:\\n - Check the current time\\n - Generate a random number if relevant\\n - Calculate squares of numbers if needed\\n - Get weather information for any cities mentioned\\n \\n Use at least 2-3 tools to demonstrate their functionality.", "llm.output_messages.0.message.role": "assistant", "llm.output_messages.0.message.tool_calls.0.tool_call.id": "call_kqqmcYwjzusMABpJC5nQJJM6", "llm.output_messages.0.message.tool_calls.0.tool_call.function.name": "get_current_time", "llm.output_messages.0.message.tool_calls.1.tool_call.id": "call_RsgILurSpogORi2FBPXLfTju", "llm.output_messages.0.message.tool_calls.1.tool_call.function.name": "generate_random_number", "llm.output_messages.0.message.tool_calls.1.tool_call.function.arguments": "{\\"min_val\\": 1, \\"max_val\\": 10}", "llm.output_messages.0.message.tool_calls.2.tool_call.id": "call_BgBqVNMqIzL151D12rMLW4Dg", "llm.output_messages.0.message.tool_calls.2.tool_call.function.name": "get_weather_info", "llm.output_messages.0.message.tool_calls.2.tool_call.function.arguments": "{\\"city\\": \\"New York\\"}", "llm.invocation_parameters": "{\\"model\\": \\"gpt-4o-mini-2024-07-18\\", \\"url\\": \\"https://alpha.uipath.com/b7006b1c-11c3-4a80-802e-fee0ebf9c360/6961a069-3392-40ca-bf5d-276f4e54c8ff/agenthub_/llm/api/chat/completions\\", \\"temperature\\": 0.0, \\"max_tokens\\": 1000, \\"frequency_penalty\\": null, \\"presence_penalty\\": null, \\"_type\\": \\"uipath\\", \\"stop\\": null, \\"stream\\": false, \\"tools\\": [{\\"type\\": \\"function\\", \\"function\\": {\\"name\\": \\"get_current_time\\", \\"description\\": \\"Get the current date and time.\\", \\"parameters\\": {\\"properties\\": {}, \\"type\\": \\"object\\"}}}, {\\"type\\": \\"function\\", \\"function\\": {\\"name\\": \\"generate_random_number\\", \\"description\\": \\"Generate a random number between min_val and max_val (inclusive).\\", \\"parameters\\": {\\"properties\\": {\\"min_val\\": {\\"default\\": 1, \\"type\\": \\"integer\\"}, \\"max_val\\": {\\"default\\": 100, \\"type\\": \\"integer\\"}}, \\"type\\": \\"object\\"}}}, {\\"type\\": \\"function\\", \\"function\\": {\\"name\\": \\"calculate_square\\", \\"description\\": \\"Calculate the square of a given number.\\", \\"parameters\\": {\\"properties\\": {\\"number\\": {\\"type\\": \\"number\\"}}, \\"required\\": [\\"number\\"], \\"type\\": \\"object\\"}}}, {\\"type\\": \\"function\\", \\"function\\": {\\"name\\": \\"get_weather_info\\", \\"description\\": \\"Get mock weather information for a given city.\\", \\"parameters\\": {\\"properties\\": {\\"city\\": {\\"type\\": \\"string\\"}}, \\"required\\": [\\"city\\"], \\"type\\": \\"object\\"}}}]}", "llm.tools.0.tool.json_schema": "{\\"type\\": \\"function\\", \\"function\\": {\\"name\\": \\"get_current_time\\", \\"description\\": \\"Get the current date and time.\\", \\"parameters\\": {\\"properties\\": {}, \\"type\\": \\"object\\"}}}", "llm.tools.1.tool.json_schema": "{\\"type\\": \\"function\\", \\"function\\": {\\"name\\": \\"generate_random_number\\", \\"description\\": \\"Generate a random number between min_val and max_val (inclusive).\\", \\"parameters\\": {\\"properties\\": {\\"min_val\\": {\\"default\\": 1, \\"type\\": \\"integer\\"}, \\"max_val\\": {\\"default\\": 100, \\"type\\": \\"integer\\"}}, \\"type\\": \\"object\\"}}}", "llm.tools.2.tool.json_schema": "{\\"type\\": \\"function\\", \\"function\\": {\\"name\\": \\"calculate_square\\", \\"description\\": \\"Calculate the square of a given number.\\", \\"parameters\\": {\\"properties\\": {\\"number\\": {\\"type\\": \\"number\\"}}, \\"required\\": [\\"number\\"], \\"type\\": \\"object\\"}}}", "llm.tools.3.tool.json_schema": "{\\"type\\": \\"function\\", \\"function\\": {\\"name\\": \\"get_weather_info\\", \\"description\\": \\"Get mock weather information for a given city.\\", \\"parameters\\": {\\"properties\\": {\\"city\\": {\\"type\\": \\"string\\"}}, \\"required\\": [\\"city\\"], \\"type\\": \\"object\\"}}}", "llm.provider": "azure", "llm.system": "openai", "llm.model_name": "gpt-4o-mini-2024-07-18", "llm.token_count.prompt": 219, "llm.token_count.completion": 66, "llm.token_count.total": 285, "session.id": "2d619d5e-528d-4219-a166-971414eec294", "metadata": "{\\"thread_id\\": \\"2d619d5e-528d-4219-a166-971414eec294\\", \\"langgraph_step\\": 1, \\"langgraph_node\\": \\"make_tool_calls\\", \\"langgraph_triggers\\": [\\"branch:to:make_tool_calls\\"], \\"langgraph_path\\": [\\"__pregel_pull\\", \\"make_tool_calls\\"], \\"langgraph_checkpoint_ns\\": \\"make_tool_calls:e564e2fb-32d5-3cfa-9bb0-53679afa5af0\\", \\"checkpoint_ns\\": \\"make_tool_calls:e564e2fb-32d5-3cfa-9bb0-53679afa5af0\\", \\"ls_provider\\": \\"azure\\", \\"ls_model_name\\": \\"gpt-4o-mini-2024-07-18\\", \\"ls_model_type\\": \\"chat\\", \\"ls_temperature\\": 0.0, \\"ls_max_tokens\\": 1000}", "openinference.span.kind": "LLM"}', + "Status": 1, + "CreatedAt": "2025-08-28T15:10:43.580667Z", + "UpdatedAt": "2025-08-28T15:10:43.580670Z", + "OrganizationId": "b7006b1c-11c3-4a80-802e-fee0ebf9c360", + "TenantId": "6961a069-3392-40ca-bf5d-276f4e54c8ff", + "ExpiryTimeUtc": None, + "FolderKey": "d0e72980-7a97-44e1-93b7-4087689521b7", + "Source": None, + "SpanType": "OpenTelemetry", + "ProcessKey": "65965c09-87e3-4fa3-a7be-3fdb3955bd47", + "JobKey": "2d619d5e-528d-4219-a166-971414eec294", + } + + result = processor.process_span(input_data) + + # Assertions + assert result["SpanType"] == "completion" + assert isinstance(result["attributes"], str) + + attrs = json.loads(result["attributes"]) + + # Check unflattening + assert "llm" in attrs + assert "input_messages" in attrs["llm"] + assert "output_messages" in attrs["llm"] + assert "invocation_parameters" in attrs["llm"] + + # Check attribute mapping + assert "model" in attrs + assert attrs["model"] == "gpt-4o-mini-2024-07-18" + assert "input" in attrs + assert "output" in attrs + + # Check JSON field mapping + assert "usage" in attrs + assert attrs["usage"]["promptTokens"] == 219 + assert attrs["usage"]["completionTokens"] == 66 + assert attrs["usage"]["totalTokens"] == 285 + + assert "toolCalls" in attrs + assert len(attrs["toolCalls"]) == 3 + assert attrs["toolCalls"][0]["name"] == "get_current_time" + assert attrs["toolCalls"][1]["name"] == "generate_random_number" + assert attrs["toolCalls"][2]["name"] == "get_weather_info" + assert attrs["toolCalls"][2]["arguments"] == {"city": "New York"} + + assert "settings" in attrs + assert attrs["settings"]["maxTokens"] == 1000 + assert attrs["settings"]["temperature"] == 0.0 + + +# uipath-langchain==0.0.123.dev1001490444