diff --git a/.gitignore b/.gitignore index 82ac8bfcc..ad85ced43 100644 --- a/.gitignore +++ b/.gitignore @@ -135,3 +135,5 @@ dmypy.json # Pyre type checker .pyre/ + +.idea/ diff --git a/docs/algorithms/rejection_sampling.md b/docs/algorithms/rejection_sampling.md index bee6c4854..c1eb22f10 100644 --- a/docs/algorithms/rejection_sampling.md +++ b/docs/algorithms/rejection_sampling.md @@ -7,6 +7,7 @@ different number of completions per prompt. # Debug run (use an interactive session) +This code supports HF models, local models and also API-based models (e.g., `gpt-4`). For generating completions, the code now accepts one model at a time, but we're working on adding an ensemble of models. Stay tuned. ```bash ## tulu v3 recipe # 1. first sample a bunch of completions given prompts @@ -16,7 +17,10 @@ python open_instruct/generation.py \ --n 3 \ --save_filename output/completions.jsonl \ --sanity_check \ - + ``` +### Scoring completions +You can use either a single RM to score responses or a list of RMs. In the latter case, we will take the majority vote to compute the final score. The RMs can be models explicitly trained as RMs, HF LMs, or API-based models. +``` # 2. tokenize them and run a reward model to filter them python open_instruct/rejection_sampling.py \ --input_filename output/completions.jsonl \ diff --git a/open_instruct/model_utils.py b/open_instruct/model_utils.py index 7ce81fbd0..84fd6d7a5 100644 --- a/open_instruct/model_utils.py +++ b/open_instruct/model_utils.py @@ -29,13 +29,13 @@ import transformers from accelerate import Accelerator from accelerate.state import AcceleratorState +from huggingface_hub import HfApi from rich import print as rprint from rich.console import Console from rich.table import Table from rich.text import Text from torch.nn.parallel.distributed import DistributedDataParallel from transformers import PreTrainedModel, PreTrainedTokenizer -from huggingface_hub import HfApi @dataclass diff --git a/open_instruct/rejection_sampling/__init__.py b/open_instruct/rejection_sampling/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/open_instruct/rejection_sampling/analyze.py b/open_instruct/rejection_sampling/analyze.py new file mode 100644 index 000000000..8f3a6f16b --- /dev/null +++ b/open_instruct/rejection_sampling/analyze.py @@ -0,0 +1,93 @@ +from collections import defaultdict +from dataclasses import dataclass +from typing import Dict, List +from datasets import load_dataset +from rich.console import Console +from rich.panel import Panel +from rich.markdown import Markdown +from rich.columns import Columns +import numpy as np +from transformers import HfArgumentParser + + +def maybe_markdown(content: str, markdown: bool) -> str: + return Markdown(content) if markdown else content + +def print_hf_chosen_rejected(chosen: List[Dict[str, str]], rejected: List[Dict[str, str]], markdown: bool = False, chosen_key: str = "chosen", rejected_key: str = "rejected"): + """here we are assuming the chosen[:-1] == rejected[:-1]""" + assert len(chosen) == len(rejected) + assert chosen[:-1] == rejected[:-1] + console = Console() + colors = ["red", "green"] + color_idx = 0 + console.rule(f"[bold yellow]The number of turns is {len(chosen)}") + for i in range(len(chosen) - 1): + message = chosen[i] + role = message["role"] + content = maybe_markdown(message["content"], markdown) + console.print(Panel(content, title_align="left", title=role, border_style=colors[color_idx])) + color_idx = (color_idx + 1) % 2 + half_width = int(0.48 * console.width) + columns = Columns( + [ + Panel(maybe_markdown(chosen[-1]["content"], markdown), width=half_width, title=chosen_key, border_style="green"), + Panel(maybe_markdown(rejected[-1]["content"], markdown), width=half_width, title=rejected_key, border_style="red"), + ], + ) + + console.print(Panel(columns, title=chosen[-1]["role"], border_style=colors[color_idx])) + + + +@dataclass +class Args: + rejection_sampled_dataset: str = "vwxyzjn/rejection_sampling_31313" + shuffle: bool = False + +def main(args: Args): + args = HfArgumentParser(Args).parse_args_into_dataclasses()[0] + ds = load_dataset("vwxyzjn/rejection_sampling_31313", split="train") + if args.shuffle: + ds = ds.shuffle() + + print("🚀 Dataset loaded, starting to analyze...") + chosen_scores = defaultdict(list) + rejected_scores = defaultdict(list) + reference_scores = defaultdict(list) + chosen_lengths = [] + rejected_lengths = [] + reference_length = [] + for example in ds: + chosen_lengths.append(len(example["chosen"][-1]["content"])) + rejected_lengths.append(len(example["rejected"][-1]["content"])) + reference_length.append(len(example["reference_completion"])) + for key in example["chosen_score"]: + chosen_scores[key].append(example["chosen_score"][key]) + rejected_scores[key].append(example["rejected_score"][key]) + reference_scores[key].append(example["reference_completion_score"][key]) + + print(f"chosen: mean length = {np.mean(chosen_lengths)}") + print(f"rejected: mean length = {np.mean(rejected_lengths)}") + print(f"reference: mean length = {np.mean(reference_length)}") + for key in example["chosen_score"]: + print(f"{key=}") + print(f"chosen: mean score = {np.mean(chosen_scores[key])}") + print(f" std score = {np.std(chosen_scores[key])}") + print(f"rejected: mean score = {np.mean(rejected_scores[key])}") + print(f" std score = {np.std(rejected_scores[key])}") + print(f"reference: mean score = {np.mean(reference_scores[key])}") + print(f" std score = {np.std(reference_scores[key])}") + + for i in range(len(chosen_scores[key])): + if reference_scores[key][i] > chosen_scores[key][i]: + print("reference is better than chosen") + print_hf_chosen_rejected( + ds[i]["chosen"][:-1] + [{"role": "assistant", "content": ds[i]["reference_completion"]}], + ds[i]["chosen"], + chosen_key="reference", + rejected_key="chosen", + ) + input("Press Enter to continue...") + +if __name__ == "__main__": + main(Args()) \ No newline at end of file diff --git a/open_instruct/rejection_sampling/api_generate.py b/open_instruct/rejection_sampling/api_generate.py new file mode 100644 index 000000000..1d229b483 --- /dev/null +++ b/open_instruct/rejection_sampling/api_generate.py @@ -0,0 +1,75 @@ +# main.py + +import asyncio +import re +from dataclasses import dataclass +from typing import List, Optional + +from openai import AsyncOpenAI +from prompt_templates import get_generation_template, get_judgment_template +from tqdm.asyncio import tqdm + + +@dataclass +class LLMGenerationConfig: + n: int = 64 + model: str = "gpt-3.5-turbo-0125" + max_parallel_requests: Optional[int] = None + + def __post_init__(self): + if "gpt-3.5" in self.model: + self.max_parallel_requests = 11 + elif "gpt-4" in self.model: + self.max_parallel_requests = 13 + + +@dataclass +class Args: + output_path: Optional[str] = None + num_trials: int = 1 + skill: str = "summarization" + mode: str = "generation" # Can be "generation" or "judgment" + + +class LLMProcessor: + def __init__(self, config: LLMGenerationConfig): + self.config = config + self.async_client = AsyncOpenAI() + + async def process_text(self, data: dict, i: int, limiter: asyncio.Semaphore, args: Args): + if args.mode == "generation": + template = get_generation_template(args.skill) + text = template.format(prompt=data) + else: # judgment mode + template = get_judgment_template(args.skill) + text = template.format(prompt=data["prompt"], response=data["response"]) + + async with limiter: + while True: + try: + response = await self.async_client.chat.completions.create( + model=self.config.model, + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": text}, + ], + ) + response = response.choices[0].message.content + match = re.search(r"Total score:\s*(\d+)", response) + if match: + total_score = int(match.group(1)) + else: + total_score = -1 + response = total_score + break + except Exception as e: + print(f"Error in {i}: {e}") + await asyncio.sleep(30) + + return response + + async def process_batch(self, data_list: List[dict], args: Args): + limiter = asyncio.Semaphore(self.config.max_parallel_requests) + tasks = [self.process_text(data, i, limiter, args) for i, data in enumerate(data_list)] + # Use tqdm to track progress + return await tqdm.gather(*tasks, total=len(tasks), desc="Processing Batch") diff --git a/open_instruct/generation.py b/open_instruct/rejection_sampling/generation.py similarity index 64% rename from open_instruct/generation.py rename to open_instruct/rejection_sampling/generation.py index 43d805726..3f7ee2829 100644 --- a/open_instruct/generation.py +++ b/open_instruct/rejection_sampling/generation.py @@ -12,15 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. + +import asyncio import copy import json import multiprocessing import os from collections import defaultdict -from dataclasses import dataclass +from dataclasses import asdict, dataclass from typing import Optional import pandas as pd +from api_generate import LLMGenerationConfig, LLMProcessor # Import your classes from datasets import load_dataset from rich.console import Console from rich.pretty import pprint @@ -33,12 +36,13 @@ class Args: model_name_or_path: str = "cleanrl/EleutherAI_pythia-1b-deduped__sft__tldr" save_filename: str = "completions.jsonl" + mode: str = "generation" + skill: str = "chat" @dataclass class GenerationArgs: n: int = 1 - """the number of samples to generate per prompt""" temperature: float = 0.8 response_length: int = 53 tensor_parallel_size: int = 1 @@ -66,27 +70,15 @@ def print_rich_table(df: pd.DataFrame) -> Table: console.print(table) -def main(args: Args, dataset_args: DatasetArgs, gen_args: GenerationArgs): - tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path) - ds = load_dataset(dataset_args.dataset_name) - if dataset_args.sanity_check: - for key in ds: - ds[key] = ds[key].select(range(min(dataset_args.sanity_check_size, len(ds[key])))) - if dataset_args.dataset_end_idx is None: - dataset_args.dataset_end_idx = len(ds[dataset_args.dataset_train_split]) - for key in ds: - ds[key] = ds[key].select(range(dataset_args.dataset_start_idx, dataset_args.dataset_end_idx)) - pprint([dataset_args, args, gen_args]) +async def generate_with_openai(model_name: str, data_list: list, args: Args, n: int): + config = LLMGenerationConfig(model=model_name, n=n) + processor = LLMProcessor(config) + results = await processor.process_batch(data_list, args) + return results - # DATASET specific logic: in this dataset the prompt is simply just a list of strings - ds = ds.map( - lambda x: {"prompt_token_ids": tokenizer.apply_chat_template(x["messages"][:-1])}, - num_proc=multiprocessing.cpu_count(), - ) - prompt_token_ids = ds[dataset_args.dataset_train_split]["prompt_token_ids"] - # Generate using vLLM - llm = LLM(model=args.model_name_or_path, tensor_parallel_size=gen_args.tensor_parallel_size) +def generate_with_vllm(model_name_or_path: str, prompt_token_ids, gen_args: GenerationArgs): + llm = LLM(model=model_name_or_path, tensor_parallel_size=gen_args.tensor_parallel_size) outputs = llm.generate( prompt_token_ids=prompt_token_ids, sampling_params=SamplingParams( @@ -97,6 +89,56 @@ def main(args: Args, dataset_args: DatasetArgs, gen_args: GenerationArgs): include_stop_str_in_output=True, ), ) + + return [ + { + "outputs": [asdict(out) for out in output.outputs], + "prompt": output.prompt, + "prompt_logprobs": output.prompt_logprobs, + "metrics": output.metrics, + } + for output in outputs + ] + + +def main(args: Args, dataset_args: DatasetArgs, gen_args: GenerationArgs): + + ds = load_dataset(dataset_args.dataset_name) + if dataset_args.sanity_check: + for key in ds: + ds[key] = ds[key].select(range(min(dataset_args.sanity_check_size, len(ds[key])))) + if dataset_args.dataset_end_idx is None: + dataset_args.dataset_end_idx = len(ds[dataset_args.dataset_train_split]) + for key in ds: + ds[key] = ds[key].select(range(dataset_args.dataset_start_idx, dataset_args.dataset_end_idx)) + pprint([dataset_args, args, gen_args]) + + if "gpt-3.5" in args.model_name_or_path or "gpt-4" in args.model_name_or_path: + use_openai = True + else: + use_openai = False + + if not use_openai: + tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path) + + ds = ds.map( + lambda x: {"prompt_token_ids": tokenizer.apply_chat_template(x["messages"][:-1])}, + num_proc=multiprocessing.cpu_count(), + ) + prompt_token_ids = ds[dataset_args.dataset_train_split]["prompt_token_ids"] + # Generate using vLLM. + outputs = generate_with_vllm(args.model_name_or_path, prompt_token_ids, gen_args) + + else: + tokenizer = AutoTokenizer.from_pretrained("allenai/llama-3-tulu-2-8b") + ds = ds.map( + lambda x: {"prompt": tokenizer.apply_chat_template(x["messages"][:-1], tokenize=False)}, + num_proc=multiprocessing.cpu_count(), + ) + messages = ds[dataset_args.dataset_train_split]["prompt"] + responses = asyncio.run(generate_with_openai(args.model_name_or_path, messages, args, gen_args.n)) + outputs = [{"outputs": [{"text": response}]} for response in responses] + # Assuming we generate n=3 completions per prompt, the outputs will look like: # prompt | completions # -------|------------ @@ -107,19 +149,13 @@ def main(args: Args, dataset_args: DatasetArgs, gen_args: GenerationArgs): # ... table = defaultdict(list) for output, messages in zip(outputs, ds[dataset_args.dataset_train_split]["messages"]): - # if the model completions are exactly the same across all completions, we can skip this - if len(set([item.text for item in output.outputs])) == 1: - continue - - for item in output.outputs: + for item in output["outputs"]: new_messages = copy.deepcopy(messages[:-1]) - new_messages.append({"role": "assistant", "content": item.text}) + new_messages.append({"role": "assistant", "content": item["text"]}) table["messages"].append(new_messages) - table["model_completion"].append(item.text) + table["model_completion"].append(item["text"]) table["reference_completion"].append(messages[-1]["content"]) - # print_rich_table(pd.DataFrame(table)) # uncomment this line to print the table - # Save results os.makedirs(os.path.dirname(args.save_filename), exist_ok=True) with open(args.save_filename, "w") as outfile: @@ -129,6 +165,7 @@ def main(args: Args, dataset_args: DatasetArgs, gen_args: GenerationArgs): "messages": table["messages"][i], "model_completion": table["model_completion"][i], "reference_completion": table["reference_completion"][i], + "model": args.model_name_or_path, }, outfile, ) diff --git a/open_instruct/rejection_sampling/prompt_templates.py b/open_instruct/rejection_sampling/prompt_templates.py new file mode 100644 index 000000000..7361596db --- /dev/null +++ b/open_instruct/rejection_sampling/prompt_templates.py @@ -0,0 +1,118 @@ +# prompt_templates.py + +DEFAULT_SKILL = "summarization" + +GENERATION_TEMPLATES = { + "chat": """ +You are an expert assistant and your goal is to provide the most helpful and accurate response to the following prompt. Please ensure that your response is clear, concise, and addresses all aspects of the prompt. + +### Prompt: +{prompt} + +### Response: +""", + "summarization": """ +Please provide a concise summary of the following text, highlighting the most important points without including unimportant or irrelevant details. + +### Text to Summarize: +{prompt} + +Summary: +""", + "code_generation": """ +Please write a Python function that solves the following problem. Ensure the code is efficient, readable, and follows best practices. + +### Problem: +{prompt} + +Python Code: +""", + "safety": """ +Please provide a safe and appropriate response to the following scenario or question. Ensure your response adheres to ethical guidelines and promotes user safety. + +### Scenario: +{prompt} + +Safe Response: +""", +} + +JUDGMENT_TEMPLATES = { + "chat": """ +You are an evaluator tasked with assessing the response to the prompt based on three criteria: accuracy, instruction following, and tone/presentation. +For each response provided to the prompt, assign a score from 1 to 3 for each criterion, where 1 is poor, 2 is average, and 3 is excellent. +Don't give full scores unless you're mind blown by the response. + +### Prompt: +{prompt} + +### Response: +{response} + +### Evaluation Criteria: +1. **Accuracy**: How factually correct and relevant is the response? +2. **Instruction Following**: How well does the response adhere to the given instructions? +3. **Tone/Presentation**: How clear, appropriate, and well-structured is the response? + +### Scoring Template: +Response: +- **Accuracy**: [Score 1-3] +- **Instruction Following**: [Score 1-3] +- **Tone/Presentation**: [Score 1-3] + +Total score: +""", + "summarization": """ +Compare the following summaries of the given text. Which one does a better job of summarizing the most important points without including unimportant or irrelevant details? Judge based on accuracy, coverage, and coherence. + +### Original Text: +{prompt} + +### Summary A: +{response1} + +### Summary B: +{response2} + +Comparison: +Preferred Summary: +""", + "code_generation": """ +Compare the following two code snippets that aim to solve the given problem. Which one is more efficient, readable, and follows best practices? + +### Problem: +{prompt} + +### Code A: +{response1} + +### Code B: +{response2} + +Code Analysis: +Preferred Code: +""", + "safety": """ +Evaluate the safety implications of the following two responses to the given scenario. Which one is safer and more appropriate? + +### Scenario: +{prompt} + +### Response A: +{response1} + +### Response B: +{response2} + +Safety Analysis: +Preferred Response: +""", +} + + +def get_generation_template(skill: str) -> str: + return GENERATION_TEMPLATES.get(skill, GENERATION_TEMPLATES[DEFAULT_SKILL]) + + +def get_judgment_template(skill: str) -> str: + return JUDGMENT_TEMPLATES.get(skill, JUDGMENT_TEMPLATES[DEFAULT_SKILL]) diff --git a/open_instruct/rejection_sampling.py b/open_instruct/rejection_sampling/rejection_sampling.py similarity index 78% rename from open_instruct/rejection_sampling.py rename to open_instruct/rejection_sampling/rejection_sampling.py index 68915922f..bc6a3ddf1 100644 --- a/open_instruct/rejection_sampling.py +++ b/open_instruct/rejection_sampling/rejection_sampling.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import json +import multiprocessing import time from collections import Counter, defaultdict from dataclasses import dataclass, field @@ -22,6 +24,7 @@ import torch import torch.multiprocessing as mp from datasets import Dataset +from generation import generate_with_openai from huggingface_hub import HfApi from transformers import ( AutoModelForSequenceClassification, @@ -38,9 +41,7 @@ @dataclass class Args: - model_names_or_paths: List[str] = field( - default_factory=lambda: ["cleanrl/EleutherAI_pythia-1b-deduped__reward__tldr"] - ) + model_names_or_paths: List[str] = field(default_factory=lambda: ["gpt-4"]) input_filename: str = "completions.jsonl" save_filename: str = "rejected_sampling_completions.jsonl" n: int = 1 @@ -50,6 +51,8 @@ class Args: hf_entity: Optional[str] = None hf_repo_id: str = "rejection_sampling" add_timestamp: bool = True + mode: str = "judgement" + skill: str = "chat" def process_shard( @@ -70,12 +73,13 @@ def process_shard( Shape: (num_items_in_shard,) torch.Tensor: A tensor containing the reward scores for each reference completion in the shard. """ + # Convert the list of data items (shard) into a Hugging Face Dataset object + raw_ds = Dataset.from_list(shard) + device = torch.device(f"cuda:{rank}") tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, padding_side="right") tokenizer.add_special_tokens({"pad_token": "[PAD]"}) - # Convert the list of data items (shard) into a Hugging Face Dataset object - raw_ds = Dataset.from_list(shard) # Apply a tokenization function to each item in the dataset ds = raw_ds.map( lambda x: {"input_ids": tokenizer.apply_chat_template(x["messages"])}, remove_columns=raw_ds.column_names @@ -106,9 +110,55 @@ def process_shard( reference_completion_scores = batch_processing_scores( args.max_forward_batch_size, device, tokenizer, reference_completion_ds, model, data_collator ) + return scores, reference_completion_scores +def process_shard_api(model_name_or_path: str, args: Args, shard: List[str]) -> Tuple[torch.Tensor, torch.Tensor]: + """ + This function processes a shard (subset) of data using api-based models. + It feeds data through the model to get reward scores, and handles out-of-memory errors by adjusting the batch size. + + Args: + model_name_or_path (str): The path or name of the model to load. + args (Args): The arguments passed to the script, containing various settings. + shard (List[str]): A list of strings representing the shard of data to be processed. + + Returns: + torch.Tensor: A tensor containing the reward scores for each item in the shard. + Shape: (num_items_in_shard,) + torch.Tensor: A tensor containing the reward scores for each reference completion in the shard. + """ + + # Convert the list of data items (shard) into a Hugging Face Dataset object + raw_ds = Dataset.from_list(shard) + + tokenizer = AutoTokenizer.from_pretrained("allenai/llama-3-tulu-2-8b") + ds = raw_ds.map( + lambda x: {"prompt": tokenizer.apply_chat_template(x["messages"][:-1], tokenize=False)}, + num_proc=multiprocessing.cpu_count(), + ) + prompts = ds["prompt"] + model_responses = ds["model_completion"] + reference_responses = ds["reference_completion"] + + data_list_model_responses = [ + {"prompt": prompt, "response": response} for prompt, response in zip(prompts, model_responses) + ] + model_responses_scores = asyncio.run( + generate_with_openai(model_name_or_path, data_list_model_responses, args, args.n) + ) + + data_list_reference_responses = [ + {"prompt": prompt, "response": response} for prompt, response in zip(prompts, reference_responses) + ] + reference_responses_scores = asyncio.run( + generate_with_openai(model_name_or_path, data_list_reference_responses, args, args.n) + ) + + return torch.Tensor(model_responses_scores), torch.Tensor(reference_responses_scores) + + def batch_processing_scores( max_forward_batch_size: int, device: torch.device, @@ -195,18 +245,30 @@ def main(args: Args): worst_offsets_per_model = {} reference_completion_scores_per_model = {} for model_name_or_path in args.model_names_or_paths: - with mp.Pool(args.num_gpus) as pool: - results = [] + if "gpt-3.5" in model_name_or_path or "gpt-4" in model_name_or_path: + use_openai = True + else: + use_openai = False + + results = [] + if not use_openai: + with mp.Pool(args.num_gpus) as pool: + for i in range(args.num_gpus): + results.append(pool.apply_async(process_shard, (i, model_name_or_path, args, shards[i]))) + else: for i in range(args.num_gpus): - results.append(pool.apply_async(process_shard, (i, model_name_or_path, args, shards[i]))) + results.append(process_shard_api(model_name_or_path, args, shards[i])) - # Collect results - scores = [] - reference_completion_scores = [] - for result in results: + # Collect results + scores = [] + reference_completion_scores = [] + for result in results: + if not use_openai: item = result.get() - scores.append(item[0]) - reference_completion_scores.append(item[1]) + else: + item = result + scores.append(item[0]) + reference_completion_scores.append(item[1]) # Combine scores from all GPUs scores = torch.cat(scores) diff --git a/safety-eval b/safety-eval new file mode 160000 index 000000000..029161744 --- /dev/null +++ b/safety-eval @@ -0,0 +1 @@ +Subproject commit 0291617446f4edf32297c313389dd0b0ae1e4cff