Skip to content

Commit

Permalink
Add a new instructlab.sdg.taxonomy_to_samples API
Browse files Browse the repository at this point in the history
Take a first pass at separating out the data preprocessing steps from
generation by adding a new top-level API (and temporary CLI) to
invoke preprocessing but not generation.

Signed-off-by: Ben Browning <[email protected]>
  • Loading branch information
bbrowning committed Dec 10, 2024
1 parent dcbabc5 commit a80a3f7
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 134 deletions.
2 changes: 2 additions & 0 deletions src/instructlab/sdg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"FULL_PIPELINES_PACKAGE",
"SIMPLE_PIPELINES_PACKAGE",
"generate_data",
"taxonomy_to_samples",
)

# Local
Expand Down Expand Up @@ -61,5 +62,6 @@
PipelineContext,
)
from .registry import BlockRegistry, PromptRegistry
from .taxonomy import taxonomy_to_samples
from .utils import GenerateException
from .utils.taxonomy import TaxonomyReadingException
82 changes: 82 additions & 0 deletions src/instructlab/sdg/cli/taxonomy_to_samples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# SPDX-License-Identifier: Apache-2.0

# Standard
import os

# First Party
from instructlab.sdg.taxonomy import (
DEFAULT_CHUNK_WORD_COUNT,
DEFAULT_SERVER_CTX_SIZE,
DEFAULT_TAXONOMY_BASE,
taxonomy_to_samples,
)
from instructlab.sdg.utils.logging import setup_logger

if __name__ == "__main__":
# Standard
import argparse

parser = argparse.ArgumentParser(
description="Turn a taxonomy into json samples suitable for use as input to data generate pipelines"
)

# Required args
parser.add_argument(
"--output-dir",
type=str,
required=True,
help="Directory to write the processed dataset samples into",
)
parser.add_argument(
"--taxonomy-path",
type=str,
required=True,
help="Path to your InstructLab taxonomy",
)

# Optional args
parser.add_argument(
"--chunk-word-count",
type=int,
default=DEFAULT_CHUNK_WORD_COUNT,
help="Number of words per document chunk",
)
parser.add_argument(
"--log-level",
type=str,
default=os.getenv("LOG_LEVEL", "INFO"),
help="Logging level",
)
parser.add_argument(
"--server-ctx-size",
type=int,
default=DEFAULT_SERVER_CTX_SIZE,
help="The maximum number of tokens the inference server can handle.",
)
parser.add_argument(
"--taxonomy-base",
type=str,
default=DEFAULT_TAXONOMY_BASE,
help="Taxonomy based used to determine what has changed - defaults to 'empty' which means consider all the taxonomy files as changed and process all of them",
)
parser.add_argument(
"--yaml-rules",
type=str,
default=None,
help="Path to custom rules file for YAML linting",
)

args = parser.parse_args()
setup_logger(args.log_level)
taxonomy_to_samples(
args.taxonomy_path,
args.output_dir,
chunk_word_count=args.chunk_word_count,
server_ctx_size=args.server_ctx_size,
taxonomy_base=args.taxonomy_base,
yaml_rules=args.yaml_rules,
)

"""
python -m instructlab.sdg.cli.taxonomy_to_samples --taxonomy-path /path/to/my/taxonomy --output-dir /path/to/my/output
"""
156 changes: 67 additions & 89 deletions src/instructlab/sdg/generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

# Third Party
# instructlab - All of these need to go away (other than sdg) - issue #6
from datasets import Dataset
from xdg_base_dirs import xdg_data_dirs, xdg_data_home
import openai
import yaml

# First Party
from instructlab.sdg.blocks.llmblock import DEFAULT_MAX_NUM_TOKENS
Expand All @@ -27,12 +27,9 @@
Pipeline,
PipelineContext,
)
from instructlab.sdg.taxonomy import taxonomy_to_samples
from instructlab.sdg.utils import GenerateException, models
from instructlab.sdg.utils.json import jldump
from instructlab.sdg.utils.taxonomy import (
leaf_node_to_samples,
read_taxonomy_leaf_nodes,
)
from instructlab.sdg.utils.json import jldump, jlload

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -115,20 +112,21 @@ def _gen_train_data(

def _knowledge_seed_example_to_test_data(seed_example, system_prompt):
res = []
for qna in seed_example["questions_and_answers"]:
user = qna["question"] + "\n" + seed_example["context"]
for i in range(3):
idx = i + 1
user = seed_example[f"icl_query_{idx}"] + "\n" + seed_example["icl_document"]
res.append(
{
"system": system_prompt,
"user": _unescape(user),
"assistant": _unescape(qna["answer"]),
"assistant": _unescape(seed_example[f"icl_response_{idx}"]),
}
)
return res


def _gen_test_data(
leaf_nodes,
seed_examples,
output_file_test,
system_prompt,
):
Expand All @@ -137,30 +135,29 @@ def _gen_test_data(
in instructlab/instructlab.
"""
test_data = []
for _, leaf_node in leaf_nodes.items():
for seed_example in leaf_node:
if "questions_and_answers" in seed_example:
test_data.extend(
_knowledge_seed_example_to_test_data(seed_example, system_prompt)
)
continue
for seed_example in seed_examples:
if "icl_query_1" in seed_example:
test_data.extend(
_knowledge_seed_example_to_test_data(seed_example, system_prompt)
)
continue

# skill seed example
# skill seed example

user = seed_example["instruction"] # question
user = seed_example["seed_question"] # question

if len(seed_example["input"]) > 0:
user += "\n" + seed_example["input"] # context
if seed_example["leaf_node_type"] == "grounded_skill":
user += "\n" + seed_example["seed_context"] # context

test_data.append(
{
"system": system_prompt,
"user": _unescape(user),
"assistant": _unescape(seed_example["output"]), # answer
}
)
test_data.append(
{
"system": system_prompt,
"user": _unescape(user),
"assistant": _unescape(seed_example["seed_response"]), # answer
}
)

jldump(test_data, output_file_test)
jldump(test_data, output_file_test)


def _check_pipeline_dir(pipeline):
Expand Down Expand Up @@ -208,23 +205,6 @@ def _sdg_init(ctx, pipeline):
data_dirs = [os.path.join(xdg_data_home(), "instructlab", "sdg")]
data_dirs.extend(os.path.join(dir, "instructlab", "sdg") for dir in xdg_data_dirs())

docling_model_path = None
sdg_models_path = docling_model_path
for d in data_dirs:
if os.path.exists(os.path.join(d, "models")):
sdg_models_path = os.path.join(d, "models")
break

if sdg_models_path is not None:
try:
with open(
os.path.join(sdg_models_path, "config.yaml"), "r", encoding="utf-8"
) as file:
config = yaml.safe_load(file)
docling_model_path = config["models"][0]["path"]
except (FileNotFoundError, NotADirectoryError, PermissionError) as e:
logger.warning(f"unable to read docling models path from config.yaml {e}")

for d in data_dirs:
pipeline_path = os.path.join(d, "pipelines", pipeline)
if os.path.exists(pipeline_path):
Expand Down Expand Up @@ -256,7 +236,6 @@ def load_pipeline(yaml_basename):
load_pipeline("knowledge.yaml"),
load_pipeline("freeform_skills.yaml"),
load_pipeline("grounded_skills.yaml"),
docling_model_path,
)


Expand Down Expand Up @@ -326,28 +305,32 @@ def generate_data(
if batch_size is None:
batch_size = 0

if not os.path.exists(output_dir):
os.mkdir(output_dir)

if not (taxonomy and os.path.exists(taxonomy)):
raise GenerateException(f"Error: taxonomy ({taxonomy}) does not exist.")

output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
date_suffix = datetime.now().replace(microsecond=0).isoformat().replace(":", "_")
document_output_dir = Path(output_dir) / f"documents-{date_suffix}"

leaf_nodes = read_taxonomy_leaf_nodes(
taxonomy, taxonomy_base, yaml_rules, document_output_dir
preprocessed_output_dir = output_dir.joinpath(f"preprocessed_{date_suffix}")

# This writes samples to disk in our output_dir and returns the
# list of files created
sample_files = taxonomy_to_samples(
taxonomy,
preprocessed_output_dir,
chunk_word_count=chunk_word_count,
server_ctx_size=server_ctx_size,
taxonomy_base=taxonomy_base,
yaml_rules=yaml_rules,
)
if not leaf_nodes:
raise GenerateException("Error: No new leaf nodes found in the taxonomy.")

name = Path(model_name).stem # Just in case it is a file path
output_file_messages = f"messages_{name}_{date_suffix}.jsonl"
output_file_test = f"test_{name}_{date_suffix}.jsonl"
output_file_train = f"train_{name}_{date_suffix}.jsonl"

all_samples = []
for sample_file in sample_files:
all_samples.extend(jlload(sample_file))
_gen_test_data(
leaf_nodes,
all_samples,
os.path.join(output_dir, output_file_test),
system_prompt,
)
Expand All @@ -368,8 +351,8 @@ def generate_data(
max_num_tokens=max_num_tokens,
)

knowledge_pipe, freeform_skills_pipe, grounded_skills_pipe, docling_model_path = (
_sdg_init(ctx, pipeline)
knowledge_pipe, freeform_skills_pipe, grounded_skills_pipe = _sdg_init(
ctx, pipeline
)

# Make sure checkpointing is disabled (we don't want this pipeline to load checkpoints from the main pipeline)
Expand All @@ -390,39 +373,34 @@ def generate_data(
)

generated_data = []
empty_sdg_leaf_nodes = []
for leaf_node in leaf_nodes.values():
is_knowledge = False
leaf_node_path = leaf_node[0]["taxonomy_path"].replace("->", "_")
samples = leaf_node_to_samples(
leaf_node,
taxonomy,
server_ctx_size,
chunk_word_count,
document_output_dir,
model_name,
docling_model_path=docling_model_path,
)

empty_input_sample_files = []
for sample_file in sample_files:
logger.debug("Generating data from input sample file: %s", sample_file)
samples = jlload(sample_file)
if not samples:
raise GenerateException("Error: No samples found in leaf node.")

if "document" in samples.column_names:
raise GenerateException(
"Error: No samples found in input file {sample_file}"
)
# For now we assume every sample in the file is the same type
first_sample = samples[0]
leaf_node_path = first_sample["leaf_node_path"]
leaf_node_type = first_sample["leaf_node_type"]
is_knowledge = False
if leaf_node_type == "knowledge":
pipe = knowledge_pipe
is_knowledge = True

elif "seed_context" in samples.column_names:
elif leaf_node_type == "grounded_skill":
pipe = grounded_skills_pipe

else:
pipe = freeform_skills_pipe

logger.debug("Samples: %s", samples)
samples_ds = Dataset.from_list(samples)
logger.debug("Samples: %s", samples_ds)

new_generated_data = pipe.generate(samples, leaf_node_path)
new_generated_data = pipe.generate(samples_ds, leaf_node_path)
if len(new_generated_data) == 0:
empty_sdg_leaf_nodes.append(leaf_node_path)
logger.warning("Empty dataset for qna node: %s", leaf_node_path)
empty_input_sample_files.append(sample_file)
logger.warning("Empty generated dataset for sample file: %s", sample_file)
continue
generated_data.append(new_generated_data)

Expand Down Expand Up @@ -457,9 +435,9 @@ def generate_data(

generate_duration = time.time() - generate_start
logger.info(f"Generation took {generate_duration:.2f}s")
if len(empty_sdg_leaf_nodes) > 0:
if len(empty_input_sample_files) > 0:
logger.warning(
"Leaf nodes with empty sdg output: {}".format(
" ".join(empty_sdg_leaf_nodes)
"Input sample files with empty sdg output: {}".format(
" ".join(empty_input_sample_files)
)
)
Loading

0 comments on commit a80a3f7

Please sign in to comment.