From 01b7cac0014538d3ab70bcbe35a15cfaf4f97689 Mon Sep 17 00:00:00 2001 From: Jithin James Date: Tue, 15 Oct 2024 02:58:51 +0530 Subject: [PATCH] feat: small refactors and cleanups (#1493) --- .../metrics => applications}/cost.ipynb | 0 docs/references/evaluation_schema.md | 2 +- mkdocs.yml | 2 +- src/ragas/dataset_schema.py | 183 +++++++++++++++++- src/ragas/evaluation.py | 158 ++------------- src/ragas/integrations/llama_index.py | 4 +- src/ragas/messages.py | 2 +- src/ragas/testset/synthesizers/generate.py | 16 +- .../testset/synthesizers/specific_query.py | 5 + src/ragas/utils.py | 8 +- tests/unit/test_dataset_schema.py | 14 ++ 11 files changed, 227 insertions(+), 167 deletions(-) rename docs/howtos/{customizations/metrics => applications}/cost.ipynb (100%) diff --git a/docs/howtos/customizations/metrics/cost.ipynb b/docs/howtos/applications/cost.ipynb similarity index 100% rename from docs/howtos/customizations/metrics/cost.ipynb rename to docs/howtos/applications/cost.ipynb diff --git a/docs/references/evaluation_schema.md b/docs/references/evaluation_schema.md index 18f70dacb..f5da60e86 100644 --- a/docs/references/evaluation_schema.md +++ b/docs/references/evaluation_schema.md @@ -6,7 +6,7 @@ options: members_order: "source" -::: ragas.evaluation.Result +::: ragas.evaluation.EvaluationResult options: show_root_heading: True diff --git a/mkdocs.yml b/mkdocs.yml index 029c2926e..d064ca475 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -85,7 +85,7 @@ nav: - Seed Generation with Production Data: howtos/customizations/testset_generation/seed_generation_with_production_data.md - Applications: - howtos/applications/index.md - - Cost Analysis: howtos/applications/cost.md + - Cost Analysis: howtos/applications/_cost.md - Integrations: - howtos/integrations/index.md - Migrations: diff --git a/src/ragas/dataset_schema.py b/src/ragas/dataset_schema.py index 635f00449..3a32da2e0 100644 --- a/src/ragas/dataset_schema.py +++ b/src/ragas/dataset_schema.py @@ -2,15 +2,21 @@ import json import typing as t +from dataclasses import dataclass, field +from datasets import Dataset as HFDataset from pydantic import BaseModel, field_validator +from ragas.cost import CostCallbackHandler from ragas.messages import AIMessage, HumanMessage, ToolCall, ToolMessage +from ragas.utils import safe_nanmean if t.TYPE_CHECKING: from datasets import Dataset as HFDataset from pandas import DataFrame as PandasDataframe + from ragas.cost import TokenUsage + class BaseSample(BaseModel): """ @@ -145,7 +151,7 @@ def validate_samples(cls, samples: t.List[BaseSample]) -> t.List[BaseSample]: return samples - def get_sample_type(self): + def get_sample_type(self) -> t.Type[Sample]: """Returns the type of the samples in the dataset.""" return type(self.samples[0]) @@ -175,7 +181,7 @@ def to_hf_dataset(self) -> HFDataset: return HFDataset.from_list(self._to_list()) @classmethod - def from_hf_dataset(cls, dataset: HFDataset) -> "RagasDataset[Sample]": + def from_hf_dataset(cls, dataset: HFDataset): """Creates an EvaluationDataset from a Hugging Face Dataset.""" return cls.from_list(dataset.to_list()) @@ -266,11 +272,17 @@ def __iter__(self) -> t.Iterator[Sample]: # type: ignore def __len__(self) -> int: return len(self.samples) - def __getitem__(self, idx: int) -> Sample: - return self.samples[idx] + def __str__(self) -> str: + return f"EvaluationDataset(features={self.features()}, len={len(self.samples)})" + + def __repr__(self) -> str: + return self.__str__() + +SingleTurnSampleOrMultiTurnSample = t.Union[SingleTurnSample, MultiTurnSample] -class EvaluationDataset(RagasDataset[BaseSample]): + +class EvaluationDataset(RagasDataset[SingleTurnSampleOrMultiTurnSample]): """ Represents a dataset of evaluation samples. @@ -295,6 +307,165 @@ class EvaluationDataset(RagasDataset[BaseSample]): Creates an EvaluationDataset from a list of dictionaries. from_dict(mapping) Creates an EvaluationDataset from a dictionary. + from_csv(path) + Creates an EvaluationDataset from a CSV file. + to_csv(path) + Converts the dataset to a CSV file. + to_jsonl(path) + Converts the dataset to a JSONL file. + from_jsonl(path) + Creates an EvaluationDataset from a JSONL file. """ - pass + @t.overload + def __getitem__(self, idx: int) -> SingleTurnSampleOrMultiTurnSample: ... + + @t.overload + def __getitem__(self, idx: slice) -> "EvaluationDataset": ... + + def __getitem__( + self, idx: t.Union[int, slice] + ) -> t.Union[SingleTurnSampleOrMultiTurnSample, "EvaluationDataset"]: + if isinstance(idx, int): + return self.samples[idx] + elif isinstance(idx, slice): + return type(self)(samples=self.samples[idx]) + else: + raise TypeError("Index must be int or slice") + + +@dataclass +class EvaluationResult: + """ + A class to store and process the results of the evaluation. + + Attributes + ---------- + scores : Dataset + The dataset containing the scores of the evaluation. + dataset : Dataset, optional + The original dataset used for the evaluation. Default is None. + binary_columns : list of str, optional + List of columns that are binary metrics. Default is an empty list. + cost_cb : CostCallbackHandler, optional + The callback handler for cost computation. Default is None. + """ + + scores: t.List[t.Dict[str, t.Any]] + dataset: t.Optional[EvaluationDataset] = None + binary_columns: t.List[str] = field(default_factory=list) + cost_cb: t.Optional[CostCallbackHandler] = None + + def __post_init__(self): + # transform scores from list of dicts to dict of lists + self._scores_dict = { + k: [d[k] for d in self.scores] for k in self.scores[0].keys() + } + + values = [] + self._repr_dict = {} + for metric_name in self._scores_dict.keys(): + value = safe_nanmean(self._scores_dict[metric_name]) + self._repr_dict[metric_name] = value + if metric_name not in self.binary_columns: + value = t.cast(float, value) + values.append(value + 1e-10) + + def to_pandas(self, batch_size: int | None = None, batched: bool = False): + """ + Convert the result to a pandas DataFrame. + + Parameters + ---------- + batch_size : int, optional + The batch size for conversion. Default is None. + batched : bool, optional + Whether to convert in batches. Default is False. + + Returns + ------- + pandas.DataFrame + The result as a pandas DataFrame. + + Raises + ------ + ValueError + If the dataset is not provided. + """ + try: + import pandas as pd + except ImportError: + raise ImportError( + "pandas is not installed. Please install it to use this function." + ) + + if self.dataset is None: + raise ValueError("dataset is not provided for the results class") + assert len(self.scores) == len(self.dataset) + # convert both to pandas dataframes and concatenate + scores_df = pd.DataFrame(self.scores) + dataset_df = self.dataset.to_pandas() + return pd.concat([dataset_df, scores_df], axis=1) + + def total_tokens(self) -> t.Union[t.List[TokenUsage], TokenUsage]: + """ + Compute the total tokens used in the evaluation. + + Returns + ------- + list of TokenUsage or TokenUsage + The total tokens used. + + Raises + ------ + ValueError + If the cost callback handler is not provided. + """ + if self.cost_cb is None: + raise ValueError( + "The evaluate() run was not configured for computing cost. Please provide a token_usage_parser function to evaluate() to compute cost." + ) + return self.cost_cb.total_tokens() + + def total_cost( + self, + cost_per_input_token: t.Optional[float] = None, + cost_per_output_token: t.Optional[float] = None, + per_model_costs: t.Dict[str, t.Tuple[float, float]] = {}, + ) -> float: + """ + Compute the total cost of the evaluation. + + Parameters + ---------- + cost_per_input_token : float, optional + The cost per input token. Default is None. + cost_per_output_token : float, optional + The cost per output token. Default is None. + per_model_costs : dict of str to tuple of float, optional + The per model costs. Default is an empty dictionary. + + Returns + ------- + float + The total cost of the evaluation. + + Raises + ------ + ValueError + If the cost callback handler is not provided. + """ + if self.cost_cb is None: + raise ValueError( + "The evaluate() run was not configured for computing cost. Please provide a token_usage_parser function to evaluate() to compute cost." + ) + return self.cost_cb.total_cost( + cost_per_input_token, cost_per_output_token, per_model_costs + ) + + def __repr__(self) -> str: + score_strs = [f"'{k}': {v:0.4f}" for k, v in self._repr_dict.items()] + return "{" + ", ".join(score_strs) + "}" + + def __getitem__(self, key: str) -> t.List[float]: + return self._scores_dict[key] diff --git a/src/ragas/evaluation.py b/src/ragas/evaluation.py index 68e3871c9..6ee234296 100644 --- a/src/ragas/evaluation.py +++ b/src/ragas/evaluation.py @@ -1,18 +1,21 @@ from __future__ import annotations import typing as t -from dataclasses import dataclass, field import numpy as np -from datasets import Dataset, concatenate_datasets +from datasets import Dataset from langchain_core.callbacks import BaseCallbackHandler, BaseCallbackManager from langchain_core.embeddings import Embeddings as LangchainEmbeddings from langchain_core.language_models import BaseLanguageModel as LangchainLLM from ragas._analytics import EvaluationEvent, track, track_was_completed from ragas.callbacks import new_group -from ragas.cost import TokenUsage -from ragas.dataset_schema import EvaluationDataset, MultiTurnSample, SingleTurnSample +from ragas.dataset_schema import ( + EvaluationDataset, + EvaluationResult, + MultiTurnSample, + SingleTurnSample, +) from ragas.embeddings.base import ( BaseRagasEmbeddings, LangchainEmbeddingsWrapper, @@ -34,12 +37,7 @@ is_reproducable, ) from ragas.run_config import RunConfig -from ragas.utils import ( - convert_v1_to_v2_dataset, - convert_v2_to_v1_dataset, - get_feature_language, - safe_nanmean, -) +from ragas.utils import convert_v1_to_v2_dataset, get_feature_language from ragas.validation import ( remap_column_names, validate_required_columns, @@ -67,13 +65,13 @@ def evaluate( raise_exceptions: bool = False, column_map: t.Optional[t.Dict[str, str]] = None, show_progress: bool = True, -) -> Result: +) -> EvaluationResult: """ Run the evaluation on the dataset with different metrics Parameters ---------- - dataset : Dataset[question: list[str], contexts: list[list[str]], answer: list[str], ground_truth: list[list[str]]] + dataset : Dataset, EvaluationDataset The dataset in the format of ragas which the metrics will use to score the RAG pipeline with metrics : list[Metric] , optional @@ -167,10 +165,8 @@ def evaluate( metrics = [answer_relevancy, context_precision, faithfulness, context_recall] - v1_input = False if isinstance(dataset, Dataset): # remap column names from the dataset - v1_input = True dataset = remap_column_names(dataset, column_map) dataset = convert_v1_to_v2_dataset(dataset) # validation @@ -255,7 +251,7 @@ def evaluate( sample_type = dataset.get_sample_type() for i, sample in enumerate(dataset): - row = t.cast(t.Dict[str, t.Any], sample.dict()) + row = t.cast(t.Dict[str, t.Any], sample.model_dump()) row_rm, row_group_cm = new_group( name=f"row {i}", inputs=row, @@ -289,7 +285,7 @@ def evaluate( else: raise ValueError(f"Unsupported sample type {sample_type}") - scores = [] + scores: t.List[t.Dict[str, t.Any]] = [] try: # get the results results = executor.results() @@ -316,14 +312,9 @@ def evaluate( else: # evalution run was successful # now lets process the results - # convert to v.1 dataset - dataset = dataset.to_hf_dataset() - if v1_input: - dataset = convert_v2_to_v1_dataset(dataset) - cost_cb = ragas_callbacks["cost_cb"] if "cost_cb" in ragas_callbacks else None - result = Result( - scores=Dataset.from_list(scores), + result = EvaluationResult( + scores=scores, dataset=dataset, binary_columns=binary_metrics, cost_cb=t.cast( @@ -362,124 +353,3 @@ def evaluate( ) ) return result - - -@dataclass -class Result(dict): - """ - A class to store and process the results of the evaluation. - - Attributes - ---------- - scores : Dataset - The dataset containing the scores of the evaluation. - dataset : Dataset, optional - The original dataset used for the evaluation. Default is None. - binary_columns : list of str, optional - List of columns that are binary metrics. Default is an empty list. - cost_cb : CostCallbackHandler, optional - The callback handler for cost computation. Default is None. - """ - - scores: Dataset - dataset: t.Optional[Dataset] = None - binary_columns: t.List[str] = field(default_factory=list) - cost_cb: t.Optional[CostCallbackHandler] = None - - def __post_init__(self): - values = [] - for cn in self.scores[0].keys(): - value = safe_nanmean(self.scores[cn]) - self[cn] = value - if cn not in self.binary_columns: - value = t.cast(float, value) - values.append(value + 1e-10) - - def to_pandas(self, batch_size: int | None = None, batched: bool = False): - """ - Convert the result to a pandas DataFrame. - - Parameters - ---------- - batch_size : int, optional - The batch size for conversion. Default is None. - batched : bool, optional - Whether to convert in batches. Default is False. - - Returns - ------- - pandas.DataFrame - The result as a pandas DataFrame. - - Raises - ------ - ValueError - If the dataset is not provided. - """ - if self.dataset is None: - raise ValueError("dataset is not provided for the results class") - assert self.scores.shape[0] == self.dataset.shape[0] - result_ds = concatenate_datasets([self.dataset, self.scores], axis=1) - - return result_ds.to_pandas(batch_size=batch_size, batched=batched) - - def total_tokens(self) -> t.Union[t.List[TokenUsage], TokenUsage]: - """ - Compute the total tokens used in the evaluation. - - Returns - ------- - list of TokenUsage or TokenUsage - The total tokens used. - - Raises - ------ - ValueError - If the cost callback handler is not provided. - """ - if self.cost_cb is None: - raise ValueError( - "The evaluate() run was not configured for computing cost. Please provide a token_usage_parser function to evaluate() to compute cost." - ) - return self.cost_cb.total_tokens() - - def total_cost( - self, - cost_per_input_token: t.Optional[float] = None, - cost_per_output_token: t.Optional[float] = None, - per_model_costs: t.Dict[str, t.Tuple[float, float]] = {}, - ) -> float: - """ - Compute the total cost of the evaluation. - - Parameters - ---------- - cost_per_input_token : float, optional - The cost per input token. Default is None. - cost_per_output_token : float, optional - The cost per output token. Default is None. - per_model_costs : dict of str to tuple of float, optional - The per model costs. Default is an empty dictionary. - - Returns - ------- - float - The total cost of the evaluation. - - Raises - ------ - ValueError - If the cost callback handler is not provided. - """ - if self.cost_cb is None: - raise ValueError( - "The evaluate() run was not configured for computing cost. Please provide a token_usage_parser function to evaluate() to compute cost." - ) - return self.cost_cb.total_cost( - cost_per_input_token, cost_per_output_token, per_model_costs - ) - - def __repr__(self) -> str: - scores = self.copy() - score_strs = [f"'{k}': {v:0.4f}" for k, v in scores.items()] - return "{" + ", ".join(score_strs) + "}" diff --git a/src/ragas/integrations/llama_index.py b/src/ragas/integrations/llama_index.py index dda4a8844..3a5b6e8b5 100644 --- a/src/ragas/integrations/llama_index.py +++ b/src/ragas/integrations/llama_index.py @@ -19,7 +19,7 @@ ) from llama_index.core.base.llms.base import BaseLLM as LlamaindexLLM - from ragas.evaluation import Result + from ragas.evaluation import EvaluationResult from ragas.metrics.base import Metric @@ -35,7 +35,7 @@ def evaluate( raise_exceptions: bool = False, column_map: t.Optional[t.Dict[str, str]] = None, run_config: t.Optional[RunConfig] = None, -) -> Result: +) -> EvaluationResult: column_map = column_map or {} # wrap llms and embeddings diff --git a/src/ragas/messages.py b/src/ragas/messages.py index 6829b8e61..d0957568f 100644 --- a/src/ragas/messages.py +++ b/src/ragas/messages.py @@ -105,7 +105,7 @@ class AIMessage(Message): tool_calls: t.Optional[t.List[ToolCall]] = None metadata: t.Optional[t.Dict[str, t.Any]] = None - def dict(self, **kwargs): + def to_dict(self, **kwargs): """ Returns a dictionary representation of the AI message. """ diff --git a/src/ragas/testset/synthesizers/generate.py b/src/ragas/testset/synthesizers/generate.py index 455b8b325..e23ce42e1 100644 --- a/src/ragas/testset/synthesizers/generate.py +++ b/src/ragas/testset/synthesizers/generate.py @@ -58,7 +58,7 @@ def from_langchain( def generate_with_langchain_docs( self, documents: t.Sequence[LCDocument], - test_size: int, + testset_size: int, transforms: t.Optional[Transforms] = None, query_distribution: t.Optional[QueryDistribution] = None, run_config: t.Optional[RunConfig] = None, @@ -90,7 +90,7 @@ def generate_with_langchain_docs( self.knowledge_graph = kg return self.generate( - test_size=test_size, + testset_size=testset_size, query_distribution=query_distribution, run_config=run_config, callbacks=callbacks, @@ -100,7 +100,7 @@ def generate_with_langchain_docs( def generate( self, - test_size: int, + testset_size: int, query_distribution: t.Optional[QueryDistribution] = None, run_config: t.Optional[RunConfig] = None, callbacks: t.Optional[Callbacks] = None, @@ -112,7 +112,7 @@ def generate( Parameters ---------- - test_size : int + testset_size : int The number of samples to generate. query_distribution : Optional[QueryDistribution], optional A list of tuples containing scenario simulators and their probabilities. @@ -147,7 +147,7 @@ def generate( # new group for Testset Generation testset_generation_rm, testset_generation_grp = new_group( name=RAGAS_TESTSET_GENERATION_GROUP_NAME, - inputs={"test_size": test_size}, + inputs={"testset_size": testset_size}, callbacks=callbacks, ) @@ -160,7 +160,7 @@ def generate( patch_logger("ragas.experimental.testset.transforms", logging.DEBUG) splits, _ = calculate_split_values( - [prob for _, prob in query_distribution], test_size + [prob for _, prob in query_distribution], testset_size ) # new group for Generation of Scenarios scenario_generation_rm, scenario_generation_grp = new_group( @@ -178,7 +178,7 @@ def generate( ) # generate samples splits, _ = calculate_split_values( - [prob for _, prob in query_distribution], test_size + [prob for _, prob in query_distribution], testset_size ) for i, (scenario, _) in enumerate(query_distribution): exec.submit(scenario.generate_scenarios, splits[i], self.knowledge_graph) @@ -229,7 +229,7 @@ def generate( e.__class__.__name__.lower() for e, _ in query_distribution ], evolution_percentages=[p for _, p in query_distribution], - num_rows=test_size, + num_rows=testset_size, language="english", ) ) diff --git a/src/ragas/testset/synthesizers/specific_query.py b/src/ragas/testset/synthesizers/specific_query.py index 67b5e8077..10eb9b6ed 100644 --- a/src/ragas/testset/synthesizers/specific_query.py +++ b/src/ragas/testset/synthesizers/specific_query.py @@ -35,6 +35,11 @@ class SpecificQuerySynthesizer(QuerySynthesizer): """ Synthesizes specific queries by choosing specific chunks and generating a keyphrase from them and then generating queries based on that. + + Attributes + ---------- + generate_query_prompt : PydanticPrompt + The prompt used for generating the query. """ generate_query_prompt: PydanticPrompt = field(default_factory=SpecificQuery) diff --git a/src/ragas/utils.py b/src/ragas/utils.py index 92a3bc73e..ad3066d98 100644 --- a/src/ragas/utils.py +++ b/src/ragas/utils.py @@ -37,16 +37,16 @@ def get_debug_mode() -> bool: return False -def safe_nanmean(arr): +def safe_nanmean(arr: t.List[float]) -> float: if len(arr) == 0: return np.nan # or some other value or behavior for empty arrays - arr = np.asarray(arr) # Ensure input is a numpy array + arr_numpy = np.asarray(arr) # Ensure input is a numpy array - if np.isnan(arr).all(): + if np.isnan(arr_numpy).all(): return np.nan # or some other value or behavior for all-NaN arrays - return np.nanmean(arr) + return float(np.nanmean(arr_numpy)) def check_if_sum_is_close( diff --git a/tests/unit/test_dataset_schema.py b/tests/unit/test_dataset_schema.py index 63fb4963c..ccb55654d 100644 --- a/tests/unit/test_dataset_schema.py +++ b/tests/unit/test_dataset_schema.py @@ -75,3 +75,17 @@ def test_evaluation_dataset_iter(): for sample in dataset: assert sample == single_turn_sample + + +def test_evaluation_dataset_type(): + single_turn_sample = SingleTurnSample(user_input="What is X", response="Y") + multi_turn_sample = MultiTurnSample( + user_input=[{"content": "What is X"}], + response="Y", # type: ignore (this type error is what we want to test) + ) + + dataset = EvaluationDataset(samples=[single_turn_sample]) + assert dataset.get_sample_type() == SingleTurnSample + + dataset = EvaluationDataset(samples=[multi_turn_sample]) + assert dataset.get_sample_type() == MultiTurnSample