diff --git a/pyproject.toml b/pyproject.toml index 1b08a27eb..f9f3c8b9e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,3 +91,9 @@ addopts = "-n 0" asyncio_default_fixture_loop_scope = "function" [pytest] testpaths = ["tests"] + +[dependency-groups] +dev = [ + "arize-phoenix>=6.1.0", + "openinference-instrumentation-langchain>=0.1.29", +] diff --git a/src/ragas/config.py b/src/ragas/config.py index b12e9b2a2..edea0dd8e 100644 --- a/src/ragas/config.py +++ b/src/ragas/config.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import typing as t -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator -from ragas.embeddings import BaseRagasEmbeddings -from ragas.llms import BaseRagasLLM +from ragas.embeddings.base import BaseRagasEmbeddings +from ragas.llms.base import BaseRagasLLM from ragas.losses import Loss from ragas.optimizers import GeneticOptimizer, Optimizer @@ -11,17 +13,27 @@ class DemonstrationConfig(BaseModel): + embedding: t.Any # this has to be of type Any because BaseRagasEmbedding is an ABC enabled: bool = True top_k: int = 3 + threshold: float = 0.7 technique: t.Literal["random", "similarity"] = "similarity" - embedding: t.Optional[BaseRagasEmbeddings] = None + + @field_validator("embedding") + def validate_embedding(cls, v): + if not isinstance(v, BaseRagasEmbeddings): + raise ValueError("embedding must be an instance of BaseRagasEmbeddings") + return v class InstructionConfig(BaseModel): + llm: BaseRagasLLM enabled: bool = True loss: t.Optional[Loss] = None optimizer: Optimizer = GeneticOptimizer() optimizer_config: t.Dict[str, t.Any] = Field( default_factory=lambda: DEFAULT_OPTIMIZER_CONFIG ) - llm: t.Optional[BaseRagasLLM] = None + + +InstructionConfig.model_rebuild() diff --git a/src/ragas/dataset_schema.py b/src/ragas/dataset_schema.py index a03984233..662dc181e 100644 --- a/src/ragas/dataset_schema.py +++ b/src/ragas/dataset_schema.py @@ -554,7 +554,7 @@ class PromptAnnotation(BaseModel): prompt_input: t.Dict[str, t.Any] prompt_output: t.Dict[str, t.Any] is_accepted: bool - edited_output: t.Union[t.Dict[str, t.Any], None] + edited_output: t.Optional[t.Dict[str, t.Any]] = None def __getitem__(self, key): return getattr(self, key) @@ -801,3 +801,13 @@ def stratified_batches( all_batches.append(batch) return all_batches + + def get_prompt_annotations(self) -> t.Dict[str, t.List[PromptAnnotation]]: + """ + Get all the prompt annotations for each prompt as a list. + """ + prompt_annotations = defaultdict(list) + for sample in self.samples: + for prompt_name, prompt_annotation in sample.prompts.items(): + prompt_annotations[prompt_name].append(prompt_annotation) + return prompt_annotations diff --git a/src/ragas/metrics/base.py b/src/ragas/metrics/base.py index 52838b0b2..76dc13fc7 100644 --- a/src/ragas/metrics/base.py +++ b/src/ragas/metrics/base.py @@ -8,14 +8,16 @@ from dataclasses import dataclass, field from enum import Enum +from pydantic import ValidationError from pysbd import Segmenter +from tqdm import tqdm from ragas._analytics import EvaluationEvent, _analytics_batcher from ragas.callbacks import ChainType, new_group from ragas.dataset_schema import MetricAnnotation, MultiTurnSample, SingleTurnSample from ragas.executor import is_event_loop_running from ragas.losses import BinaryMetricLoss, MSELoss -from ragas.prompt import PromptMixin +from ragas.prompt import FewShotPydanticPrompt, PromptMixin from ragas.run_config import RunConfig from ragas.utils import ( RAGAS_SUPPORTED_LANGUAGE_CODES, @@ -230,48 +232,30 @@ def init(self, run_config: RunConfig): ) self.llm.set_run_config(run_config) - def train( + def _optimize_instruction( self, - path: str, - demonstration_config: t.Optional[DemonstrationConfig] = None, - instruction_config: t.Optional[InstructionConfig] = None, - callbacks: t.Optional[Callbacks] = None, - run_config: t.Optional[RunConfig] = None, - batch_size: t.Optional[int] = None, - with_debugging_logs=False, - raise_exceptions: bool = True, - ) -> None: - - if not path.endswith(".json"): - raise ValueError("Train data must be in json format") - - if instruction_config is None: - from ragas.config import InstructionConfig - - instruction_config = InstructionConfig() - - if demonstration_config is None: - from ragas.config import DemonstrationConfig - - demonstration_config = DemonstrationConfig() - - dataset = MetricAnnotation.from_json(path, metric_name=self.name) - - optimizer = instruction_config.optimizer - llm = instruction_config.llm or self.llm - if llm is None: + instruction_config: InstructionConfig, + dataset: MetricAnnotation, + callbacks: Callbacks, + run_config: RunConfig, + batch_size: t.Optional[int], + with_debugging_logs: bool, + raise_exceptions: bool, + ): + if self.llm is None: raise ValueError( f"Metric '{self.name}' has no valid LLM provided (self.llm is None). Please initantiate a the metric with an LLM to run." # noqa ) + optimizer = instruction_config.optimizer if optimizer.llm is None: - optimizer.llm = llm + optimizer.llm = instruction_config.llm + # figure out the loss function if instruction_config.loss is None: if self.output_type is None: raise ValueError( f"Output type for metric '{self.name}' is not defined. Please set the output type in the metric or in the instruction config." ) - if self.output_type.name == MetricOutputType.BINARY.name: loss_fun = BinaryMetricLoss() elif ( @@ -286,8 +270,8 @@ def train( else: loss_fun = instruction_config.loss + # Optimize the prompts optimizer.metric = self - optimizer_config = instruction_config.optimizer_config or {} optimized_prompts = optimizer.optimize( dataset[self.name], @@ -299,11 +283,111 @@ def train( with_debugging_logs=with_debugging_logs, raise_exceptions=raise_exceptions, ) + + # replace the instruction in the metric with the optimized instruction prompts = self.get_prompts() for key, val in optimized_prompts.items(): prompts[key].instruction = val self.set_prompts(**prompts) - return + + def _optimize_demonstration( + self, demonstration_config: DemonstrationConfig, dataset: MetricAnnotation + ): + # get the prompt annotations for this metric + prompt_annotations = dataset[self.name].get_prompt_annotations() + prompts = self.get_prompts() + for prompt_name, prompt_annotation_list in prompt_annotations.items(): + # create a new FewShotPydanticPrompt with these annotations + if prompt_name not in prompts: + raise ValueError( + f"Prompt '{prompt_name}' not found in metric '{self.name}'. Please check the prompt names in the annotation dataset." + ) + pydantic_prompt = prompts[prompt_name] + input_model, output_model = ( + pydantic_prompt.input_model, + pydantic_prompt.output_model, + ) + # convert annotations into examples + input_examples, output_examples = [], [] + for i, prompt_annotation in enumerate(prompt_annotation_list): + try: + # skip if the prompt is not accepted + if not prompt_annotation.is_accepted: + continue + input_examples.append( + input_model.model_validate(prompt_annotation.prompt_input) + ) + # use the edited output if it is provided + if prompt_annotation.edited_output is not None: + output_examples.append( + output_model.model_validate(prompt_annotation.edited_output) + ) + else: + output_examples.append( + output_model.model_validate(prompt_annotation.prompt_output) + ) + except ValidationError as e: + logger.warning( + f"Skipping prompt '{prompt_name}' example {i} because of validation error: {e}" + ) + continue + embedding_model = demonstration_config.embedding + few_shot_prompt = FewShotPydanticPrompt.from_pydantic_prompt( + pydantic_prompt=pydantic_prompt, + embeddings=embedding_model, + ) + + # add the top k examples to the few shot prompt + few_shot_prompt.top_k_for_examples = demonstration_config.top_k + few_shot_prompt.threshold_for_examples = demonstration_config.threshold + + # add examples to the few shot prompt + for input_example, output_example in tqdm( + zip(input_examples, output_examples), + total=len(input_examples), + desc=f"Few-shot examples [{prompt_name}]", + ): + few_shot_prompt.add_example(input_example, output_example) + prompts[prompt_name] = few_shot_prompt + self.set_prompts(**prompts) + + def train( + self, + path: str, + demonstration_config: t.Optional[DemonstrationConfig] = None, + instruction_config: t.Optional[InstructionConfig] = None, + callbacks: t.Optional[Callbacks] = None, + run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, + with_debugging_logs=False, + raise_exceptions: bool = True, + ) -> None: + run_config = run_config or RunConfig() + callbacks = callbacks or [] + + # load the dataset from path + if not path.endswith(".json"): + raise ValueError("Train data must be in json format") + dataset = MetricAnnotation.from_json(path, metric_name=self.name) + + # only optimize the instruction if instruction_config is provided + if instruction_config is not None: + self._optimize_instruction( + instruction_config=instruction_config, + dataset=dataset, + callbacks=callbacks, + run_config=run_config, + batch_size=batch_size, + with_debugging_logs=with_debugging_logs, + raise_exceptions=raise_exceptions, + ) + + # if demonstration_config is provided, optimize the demonstrations + if demonstration_config is not None: + self._optimize_demonstration( + demonstration_config=demonstration_config, + dataset=dataset, + ) @dataclass diff --git a/src/ragas/optimizers/genetic.py b/src/ragas/optimizers/genetic.py index 9dd7cf538..fd8e4e9f6 100644 --- a/src/ragas/optimizers/genetic.py +++ b/src/ragas/optimizers/genetic.py @@ -36,7 +36,6 @@ class FormattedExamples(BaseModel): @classmethod def from_examples(cls, examples: t.List[example_type]) -> "FormattedExamples": - formated_examples = [] for example in examples: input_, output = example.values() @@ -151,7 +150,6 @@ def optimize( with_debugging_logs=False, raise_exceptions: bool = True, ) -> t.Dict[str, str]: - callbacks = callbacks or [] if self.metric is None: @@ -187,7 +185,6 @@ def optimize( with tqdm( total=total_steps, desc="Overall Progress", dynamic_ncols=True ) as parent_pbar: - parent_pbar.set_description(f"{stages[0]['name']} Step 1/{len(stages)}") initial_population = self.initialize_population( dataset=dataset, @@ -262,7 +259,6 @@ def initialize_population( raise_exceptions: bool = True, parent_pbar: t.Optional[tqdm] = None, ) -> t.List[t.Dict[str, str]]: - initialize_population_rm, initialize_population_grp = new_group( name="Initializing Population", inputs={"population_size": population_size}, @@ -308,7 +304,6 @@ def initialize_population( async def _reverse_engineer_instruction( self, batch: t.List[SampleAnnotation], callbacks: Callbacks = None ) -> t.Dict[str, str]: - if self.llm is None: raise ValueError("No llm provided for optimization.") @@ -344,7 +339,6 @@ async def _reverse_engineer_instruction( async def _cross_over_prompts( self, parent_1: str, parent_2: str, callbacks: Callbacks = None ) -> str: - if self.llm is None: raise ValueError("No llm provided for optimization.") @@ -373,7 +367,6 @@ def feedback_mutation( raise_exceptions: bool = True, parent_pbar: t.Optional[tqdm] = None, ) -> t.List[t.Dict[str, str]]: - if self.metric is None: raise ValueError("No metric provided for optimization.") @@ -430,7 +423,6 @@ async def _feedback_mutation( raise_exceptions: bool = True, parent_pbar: t.Optional[tqdm] = None, ) -> t.Dict[str, str]: - if self.llm is None: raise ValueError("No llm provided for optimization.") @@ -470,7 +462,6 @@ async def _implement_feedbacks( feedbacks: t.Dict[str, t.List[str]], callbacks: Callbacks = None, ) -> t.Dict[str, str]: - if self.llm is None: raise ValueError("No llm provided for optimization.") @@ -501,7 +492,6 @@ async def _get_feedbacks( target: t.List[float], callbacks: Callbacks = None, ) -> t.Dict[str, t.List[str]]: - def dict_to_str(dict: t.Dict[str, t.Any]) -> str: return "".join(f"\n{key}:\n\t{val}\n" for key, val in dict.items()) @@ -549,7 +539,6 @@ def dict_to_str(dict: t.Dict[str, t.Any]) -> str: def _get_evaluation_dataset( self, dataset: SingleMetricAnnotation ) -> t.Tuple[EvaluationDataset, t.List[float]]: - if self.metric is None: raise ValueError("No metric provided for optimization.") @@ -582,7 +571,6 @@ def evaluate_candidate( run_id: t.Optional[UUID] = None, parent_pbar: t.Optional[tqdm] = None, ) -> EvaluationResult: - if self.metric is None: raise ValueError("No metric provided for optimization.") @@ -620,7 +608,6 @@ def evaluate_fitness( raise_exceptions: bool = True, parent_pbar: t.Optional[tqdm] = None, ) -> t.List[float]: - if self.metric is None: raise ValueError("No metric provided for optimization.") @@ -635,7 +622,6 @@ def evaluate_fitness( ) run_id = initialize_population_rm.run_id for candidate in candidates: - results = self.evaluate_candidate( candidate=candidate, eval_dataset=eval_dataset, @@ -660,7 +646,6 @@ async def _cross_over_chain( parent_y: t.Dict[str, str], callbacks: Callbacks, ): - if parent_x.keys() != parent_y.keys(): raise ValueError("The parents must have the same prompt names.") @@ -684,7 +669,6 @@ def cross_over_mutation( raise_exceptions: bool = True, parent_pbar: t.Optional[tqdm] = None, ): - if self.metric is None: raise ValueError("No metric provided for optimization.") @@ -701,7 +685,6 @@ def cross_over_mutation( run_id = cross_over_rm.run_id prediction_vectors = [] for candidate in candidates: - results = self.evaluate_candidate( candidate=candidate, eval_dataset=eval_dataset, diff --git a/src/ragas/prompt/__init__.py b/src/ragas/prompt/__init__.py index 5743ea22c..ac113afe9 100644 --- a/src/ragas/prompt/__init__.py +++ b/src/ragas/prompt/__init__.py @@ -1,4 +1,9 @@ from .base import BasePrompt, BoolIO, StringIO, StringPrompt +from .few_shot_pydantic_prompt import ( + ExampleStore, + FewShotPydanticPrompt, + InMemoryExampleStore, +) from .mixin import PromptMixin from .multi_modal_prompt import ImageTextPrompt, ImageTextPromptValue from .pydantic_prompt import InputModel, OutputModel, PydanticPrompt @@ -9,6 +14,9 @@ "PydanticPrompt", "StringIO", "StringPrompt", + "ExampleStore", + "FewShotPydanticPrompt", + "InMemoryExampleStore", "PromptMixin", "InputModel", "OutputModel", diff --git a/src/ragas/prompt/few_shot_pydantic_prompt.py b/src/ragas/prompt/few_shot_pydantic_prompt.py new file mode 100644 index 000000000..02b20d402 --- /dev/null +++ b/src/ragas/prompt/few_shot_pydantic_prompt.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import typing as t +from abc import ABC, abstractmethod +from dataclasses import dataclass, field + +import numpy as np +from pydantic import BaseModel + +from ragas.llms.base import BaseRagasLLM +from ragas.prompt.pydantic_prompt import PydanticPrompt + +if t.TYPE_CHECKING: + from langchain_core.callbacks import Callbacks + + from ragas.embeddings.base import BaseRagasEmbeddings + from ragas.llms.base import BaseRagasLLM + +# type variables for input and output models +InputModel = t.TypeVar("InputModel", bound=BaseModel) +OutputModel = t.TypeVar("OutputModel", bound=BaseModel) + + +class ExampleStore(ABC): + @abstractmethod + def get_examples( + self, data: BaseModel, top_k: int = 5 + ) -> t.Sequence[t.Tuple[BaseModel, BaseModel]]: + pass + + @abstractmethod + def add_example(self, input: BaseModel, output: BaseModel): + pass + + +@dataclass +class InMemoryExampleStore(ExampleStore): + embeddings: BaseRagasEmbeddings + _examples_list: t.List[t.Tuple[BaseModel, BaseModel]] = field( + default_factory=list, repr=False + ) + _embeddings_of_examples: t.List[t.List[float]] = field( + default_factory=list, repr=False + ) + + def add_example(self, input: BaseModel, output: BaseModel): + # get json string for input + input_json = input.model_dump_json() + self._embeddings_of_examples.append(self.embeddings.embed_query(input_json)) + self._examples_list.append((input, output)) + + def get_examples( + self, data: BaseModel, top_k: int = 5, threshold: float = 0.7 + ) -> t.Sequence[t.Tuple[BaseModel, BaseModel]]: + data_embedding = self.embeddings.embed_query(data.model_dump_json()) + return [ + self._examples_list[i] + for i in self.get_nearest_examples( + data_embedding, self._embeddings_of_examples, top_k, threshold + ) + ] + + @staticmethod + def get_nearest_examples( + query_embedding: t.List[float], + embeddings: t.List[t.List[float]], + top_k: int = 3, + threshold: float = 0.7, + ) -> t.List[int]: + # Convert to numpy arrays for efficient computation + query = np.array(query_embedding) + embed_matrix = np.array(embeddings) + + # Calculate cosine similarity + similarities = np.dot(embed_matrix, query) / ( + np.linalg.norm(embed_matrix, axis=1) * np.linalg.norm(query) + 1e-8 + ) + + # Get indices of similarities above threshold + valid_indices = np.where(similarities >= threshold)[0] + + # Sort by similarity and get top-k + top_indices = valid_indices[np.argsort(similarities[valid_indices])[-top_k:]] + + return top_indices.tolist() + + def __repr__(self): + return f"InMemoryExampleStore(n_examples={len(self._examples_list)})" + + +@dataclass +class FewShotPydanticPrompt(PydanticPrompt, t.Generic[InputModel, OutputModel]): + example_store: ExampleStore + top_k_for_examples: int = 5 + threshold_for_examples: float = 0.7 + + def __post_init__(self): + self.examples: t.Sequence[t.Tuple[InputModel, OutputModel]] = [] + + def add_example(self, input: InputModel, output: OutputModel): + self.example_store.add_example(input, output) + + async def generate_multiple( + self, + llm: BaseRagasLLM, + data: InputModel, + n: int = 1, + temperature: t.Optional[float] = None, + stop: t.Optional[t.List[str]] = None, + callbacks: t.Optional[Callbacks] = None, + retries_left: int = 3, + ) -> t.List[OutputModel]: + # Ensure get_examples returns a sequence of tuples (InputModel, OutputModel) + self.examples = self.example_store.get_examples(data, self.top_k_for_examples) # type: ignore + return await super().generate_multiple( + llm, data, n, temperature, stop, callbacks, retries_left + ) + + @classmethod + def from_pydantic_prompt( + cls, + pydantic_prompt: PydanticPrompt[InputModel, OutputModel], + embeddings: BaseRagasEmbeddings, + ) -> FewShotPydanticPrompt[InputModel, OutputModel]: + # add examples to the example store + example_store = InMemoryExampleStore(embeddings=embeddings) + for example in pydantic_prompt.examples: + example_store.add_example(example[0], example[1]) + few_shot_prompt = cls( + example_store=example_store, + ) + few_shot_prompt.name = pydantic_prompt.name + few_shot_prompt.language = pydantic_prompt.language + few_shot_prompt.instruction = pydantic_prompt.instruction + few_shot_prompt.input_model = pydantic_prompt.input_model + few_shot_prompt.output_model = pydantic_prompt.output_model + return few_shot_prompt diff --git a/src/ragas/prompt/pydantic_prompt.py b/src/ragas/prompt/pydantic_prompt.py index 3e5c225da..3f239e100 100644 --- a/src/ragas/prompt/pydantic_prompt.py +++ b/src/ragas/prompt/pydantic_prompt.py @@ -31,6 +31,7 @@ class PydanticPrompt(BasePrompt, t.Generic[InputModel, OutputModel]): + # these are class attributes input_model: t.Type[InputModel] output_model: t.Type[OutputModel] instruction: str diff --git a/tests/unit/test_prompt.py b/tests/unit/test_prompt.py index 3d550a628..6f4f29d74 100644 --- a/tests/unit/test_prompt.py +++ b/tests/unit/test_prompt.py @@ -1,5 +1,7 @@ import copy +import typing as t +import numpy as np import pytest from langchain_core.outputs import Generation, LLMResult from langchain_core.prompt_values import StringPromptValue @@ -226,3 +228,37 @@ class Prompt(PydanticPrompt[StringIO, OutputModel]): data=StringIO(text="this prompt will be echoed back as invalid JSON"), llm=echo_llm, ) + + +def cosine_similarity(v1: t.List[float], v2: t.List[float]) -> float: + """Calculate cosine similarity between two vectors.""" + v1_array = np.array(v1) + v2_array = np.array(v2) + return np.dot(v1_array, v2_array) / ( + np.linalg.norm(v1_array) * np.linalg.norm(v2_array) + ) + + +@pytest.mark.skip(reason="TODO: Implement embedding calculation") +def test_in_memory_example_store(): + from ragas.prompt import InMemoryExampleStore + + class FakeInputModel(BaseModel): + text: str + embedding: t.List[float] + + class FakeOutputModel(BaseModel): + text: str + + store = InMemoryExampleStore() + store.add_example( + FakeInputModel(text="hello", embedding=[1, 2, 3]), + FakeOutputModel(text="hello"), + ) + store.add_example( + FakeInputModel(text="world", embedding=[1, 2, 4]), + FakeOutputModel(text="world"), + ) + assert store.get_examples(FakeInputModel(text="hello", embedding=[1, 2, 3])) == [ + FakeOutputModel(text="hello") + ]