diff --git a/src/instructlab/sdg/datamixing.py b/src/instructlab/sdg/datamixing.py index 5172fdfb..e6ca8675 100644 --- a/src/instructlab/sdg/datamixing.py +++ b/src/instructlab/sdg/datamixing.py @@ -1,7 +1,8 @@ # SPDX-License-Identifier: Apache-2.0 # Standard -from typing import Dict, List, Optional +from pathlib import Path +from typing import Dict, List, Optional, TypedDict import json import logging import os.path @@ -16,10 +17,24 @@ from instructlab.sdg.utils import GenerateException, pandas from instructlab.sdg.utils.pandas import dataset_from_pandas_dataframe +# XXX(osilkin): This value represents the ratio between knowledge & skills data +# below which we upsample knowledge samples from. This only applies +# when |knowledge| << |skills| +MIN_UPSAMPLE_THRESHOLD = 0.03 ALLOWED_COLS = ["id", "messages", "metadata"] logger = logging.getLogger(__name__) +class DatasetListing(TypedDict): + """ + TypedDict class that represents the dataset listings passed around in the + `.datasets` key of each recipe. + """ + + sampling_size: float + path: str + + def _adjust_train_sample_size(ds: Dataset, num_samples: int): """ Return a dataset with num_samples random samples selected from the @@ -99,7 +114,7 @@ def __init__( # Defaults if no recipe path given or these values don't # exist in the given recipe file - self.datasets = [] + self.datasets: List[DatasetListing] = [] if recipe_path is not None: recipe = self._load_recipe() if "datasets" in recipe: @@ -507,6 +522,47 @@ def _create_phase07_ds( return phase07 +def _total_length_of_datasets(datasets: List[DatasetListing]) -> int: + """ + Iterate through the datasets and return the total number of samples + per the the sampling ratio. + + Args: + datasets (List[DatasetListing]): List containing `DatasetListing` entries. + + Returns: + int: Combined length of all datasets in the given list. + """ + total_length = 0 + for dataset in datasets: + if "path" not in dataset: + # this really shouldn't happen because it'd be weird for a dataset + # listing to not have a path + continue + + ds_path = Path(dataset["path"]) + if not ds_path.exists(): + # we should ideally error out here, but this was the existing functionality + # so we should not introduce this type of error boundary for a hackaround + continue + + # Calculate the length of dataset by reading in the JSONL file and assuming every sample + # is on a single line. Assume also if a line is empty then it is not a sample (should only be the last line). + unscaled_length = sum( + 1 for l in ds_path.read_text("utf-8").splitlines() if l.strip() + ) + sampling_size = dataset["sampling_size"] + if isinstance(sampling_size, float): + total_length += int(unscaled_length * sampling_size) + elif isinstance(sampling_size, int): + total_length += sampling_size + else: + # maybe we should do nothing instead? + raise ValueError(f"invalid type for `sampling_size`: {type(sampling_size)}") + + return total_length + + def _convert_to_leaf_node_messages(sample: dict, sys_prompt: str): """ Convert a sample dictionary to contain a 'messages' column required @@ -549,6 +605,10 @@ def __init__( num_procs, auxiliary_inst=None, ): + # HACK(osilkin): This is used to upsample the knowledge dataset when the **pre-computed** skills dataset + # far exceeds the size of our knowledge samples. This will be removed in the future + # in favor of a smarter way to do the upsampling. + self._precomputed_skills_length: int | None = None self.data_dirs = data_dirs self.output_dir = output_dir self.sys_prompt = sys_prompt @@ -572,9 +632,17 @@ def _load_default_recipe(self, yaml_basename): for d in self.data_dirs: default_recipe_path = os.path.join(d, "default_data_recipes", yaml_basename) if os.path.exists(default_recipe_path): - return Recipe( + recipe = Recipe( recipe_path=default_recipe_path, sys_prompt=self.sys_prompt ) + if "skills" in yaml_basename and recipe.datasets: + # HACK(osilkin): we need to balance out the knowledge such that it doesn't + # get drowned out in the skills dataset. This workaround allows us + # to re-balance such that skills consists of at least 3% skills + self._precomputed_skills_length = _total_length_of_datasets( + recipe.datasets + ) + return recipe return Recipe(sys_prompt=self.sys_prompt) def _gen_leaf_node_data( @@ -615,10 +683,32 @@ def collect( output_file_leaf_skills = ( f"node_datasets_{self.date_suffix}/{leaf_node_path}_p10.jsonl" ) + # HACK(osilkin): `knowledge_upsample_amount` is currently used when the generated knowledge data + # is orders of magnitude smaller (approx. < 3%) than the skills dataset. + # It is used to upsample that dataset so that the model doesn't forget it in training. + # + # This work around is currently hacky as we lack insight into the size of both datasets + # when we generate this data, and it may vary across different scenarios + sampling_size: int | float = 1.0 + if self._precomputed_skills_length: + knowledge_to_skills_ratio = ( + len(skills_phase_data) / self._precomputed_skills_length + ) + if knowledge_to_skills_ratio < MIN_UPSAMPLE_THRESHOLD: + sampling_size = int(self._precomputed_skills_length * 0.03) + + logger.info( + "\033[93mKnowledge detected to be less than %.2f%% of skills (%.2f%%), upsampling to: %d\033[0m", + MIN_UPSAMPLE_THRESHOLD * 100, + knowledge_to_skills_ratio * 100, + sampling_size, + ) + self._gen_leaf_node_data( skills_phase_data, self.skills_recipe, output_file_leaf_skills, + sampling_size=sampling_size, ) else: messages = new_generated_data.map( diff --git a/src/instructlab/sdg/generate_data.py b/src/instructlab/sdg/generate_data.py index 2aac5028..d97cdc27 100644 --- a/src/instructlab/sdg/generate_data.py +++ b/src/instructlab/sdg/generate_data.py @@ -273,7 +273,13 @@ def load_pipeline(yaml_basename): ) -def _mixer_init(ctx, output_dir, date_suffix, knowledge_auxiliary_inst, system_prompt): +def _mixer_init( + ctx, + output_dir, + date_suffix, + knowledge_auxiliary_inst, + system_prompt, +): data_dirs = [os.path.join(xdg_data_home(), "instructlab", "sdg")] data_dirs.extend(os.path.join(dir, "instructlab", "sdg") for dir in xdg_data_dirs()) @@ -391,7 +397,11 @@ def generate_data( mmlu_bench_pipe = mmlubench_pipe_init(mmlu_ctx) mixer = _mixer_init( - ctx, output_dir, date_suffix, knowledge_pipe.auxiliary_inst, system_prompt + ctx, + output_dir, + date_suffix, + knowledge_pipe.auxiliary_inst, + system_prompt, ) if console_output: