diff --git a/benchmark/cloc/.gitignore b/benchmark/cloc/.gitignore new file mode 100644 index 000000000..fe30281ee --- /dev/null +++ b/benchmark/cloc/.gitignore @@ -0,0 +1 @@ +cloc_data/ \ No newline at end of file diff --git a/benchmark/cloc/README.md b/benchmark/cloc/README.md new file mode 100644 index 000000000..c86139e60 --- /dev/null +++ b/benchmark/cloc/README.md @@ -0,0 +1,38 @@ +# CLOC Data + +In this directory, you can find the files necessary to run experiments with the CLOC dataset. +The dataset was introduced [in this paper](https://arxiv.org/pdf/2108.09020.pdf), and we make use of the [CLDatasets](https://github.com/hammoudhasan/CLDatasets) mirror. + +## Data Generation +To run the downloading script you need to install the `google-cloud-storage` package. +Then, you can use the `data_generation.py` script to download the data and set the timestamps accordingly. +Use the `-h` flag to find out more. + +## License + +The CLOC Dataset comes with the MIT License. +The CLDatasets repository does not have an explicit license but allows to "adapt the code to suit your specific requirements". + +### CLOC License Copy + +MIT License + +Copyright (c) 2021 Intel Labs + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/benchmark/cloc/data_generation.py b/benchmark/cloc/data_generation.py new file mode 100644 index 000000000..29493d8ce --- /dev/null +++ b/benchmark/cloc/data_generation.py @@ -0,0 +1,333 @@ +# This code is partially adapted from https://github.com/hammoudhasan/CLDatasets/blob/main/src/downloader.py which is provided license-free +# We thank the authors for their work and hosting the data. +# This code requires the pip google-cloud-storage package + +import argparse +import glob +import logging +import os +import pathlib +import time +import zipfile +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from shutil import copy, which + +import torch +from google.cloud import storage +from tqdm import tqdm + +logging.basicConfig( + level=logging.NOTSET, + format="[%(asctime)s] [%(filename)15s:%(lineno)4d] %(levelname)-8s %(message)s", + datefmt="%Y-%m-%d:%H:%M:%S", +) +logger = logging.getLogger(__name__) + +DAY_LENGTH_SECONDS = 24 * 60 * 60 + +def extract_single_zip(directory: str, target: str, zip_file: str) -> None: + zip_path = os.path.join(directory, zip_file) + output_dir = os.path.join(target, os.path.splitext(zip_file)[0]) + + os.makedirs(output_dir, exist_ok=True) + + try: + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(output_dir) + except Exception as e: + logger.error(f"Error while extracing file {zip_path}") + logger.error(e) + +def setup_argparser() -> argparse.ArgumentParser: + parser_ = argparse.ArgumentParser(description=f"CLOC Benchmark Storage Script") + parser_.add_argument( + "dir", type=pathlib.Path, action="store", help="Path to data directory" + ) + parser_.add_argument( + "--dummyyear", + action="store_true", + help="Add a final dummy year to train also on the last trigger in Modyn", + ) + parser_.add_argument( + "--all", + action="store_true", + help="Store all the available data, including the validation and test sets.", + ) + parser_.add_argument( + "--test", + action="store_true", + help="Enable test mode (just download one zip file)", + ) + parser_.add_argument( + "--skip_download", + action="store_true", + help="Skips the download and only (re)creates labels and timestamps.", + ) + parser_.add_argument( + "--skip_unzip", + action="store_true", + help="Skips the unzipping and only (re)creates labels and timestamps.", + ) + parser_.add_argument( + "--skip_labels", + action="store_true", + help="Skips the labeling", + ) + parser_.add_argument( + "--tmpdir", type=pathlib.Path, action="store", help="If given, use a different directory for storing temporary data" + ) + parser_.add_argument( + "--keep_zips", + action="store_true", + help="Keep the downloaded zipfiles.", + ) + return parser_ + + +def main(): + parser = setup_argparser() + args = parser.parse_args() + + tmpdir = args.tmpdir if hasattr(args, "tmpdir") else args.dir + + logger.info(f"Final destination is {args.dir}; download destination is {tmpdir}") + + downloader = CLDatasets(str(args.dir), str(tmpdir), test_mode=args.test, keep_zips = args.keep_zips) + if not args.skip_download: + logger.info("Starting download") + downloader.download_dataset() + + if not args.skip_unzip: + logger.info("Starting extraction") + downloader.extract() + + if not args.skip_labels: + logger.info("Starting labeling") + downloader.convert_labels_and_timestamps(args.all) + + downloader.remove_images_without_label() + + if args.dummyyear: + downloader.add_dummy_year() + + logger.info("Done.") + + +def is_tool(name): + """Check whether `name` is on PATH and marked as executable.""" + return which(name) is not None + + +class CLDatasets: + """ + A class for downloading datasets from Google Cloud Storage. + """ + + def __init__(self, directory: str, tmpdir: str, test_mode: bool = False, unzip: bool = True, keep_zips: bool = False): + """ + Initialize the CLDatasets object. + + Args: + directory (str): The directory where the dataset will be saved. + """ + + self.dataset = "CLOC" + self.directory = directory + self.tmpdir = tmpdir + self.unzip = unzip + self.test_mode = test_mode + self.keep_zips = keep_zips + self.max_timestamp = 0 + self.example_path = "" + self.example_label_path = "" + + if not os.path.exists(self.directory): + os.makedirs(self.directory) + + if not os.path.exists(self.tmpdir): + os.makedirs(self.tmpdir) + + + def extract(self): + if self.unzip: + self.unzip_data_files(self.tmpdir + "/CLOC/data") + + def convert_labels_and_timestamps(self, all_data: bool): + self.convert_labels_and_timestamps_impl( + self.tmpdir + "/CLOC_torchsave_order_files/train_store_loc.torchSave", + self.tmpdir + "/CLOC_torchsave_order_files/train_labels.torchSave", + self.tmpdir + "/CLOC_torchsave_order_files/train_time.torchSave", + ) + + if all_data: + logger.info("Converting all data") + self.convert_labels_and_timestamps_impl( + self.tmpdir + + "/CLOC_torchsave_order_files/cross_val_store_loc.torchSave", + self.tmpdir + "/CLOC_torchsave_order_files/cross_val_labels.torchSave", + self.tmpdir + "/CLOC_torchsave_order_files/cross_val_time.torchSave", + ) + + + def remove_images_without_label(self): + print("Removing images without label...") + removed_files = 0 + + image_paths = pathlib.Path(self.directory).glob("**/*.jpg") + for filename in tqdm(image_paths): + file_path = pathlib.Path(filename) + label_path = pathlib.Path(file_path.parent / f"{file_path.stem}.label") + + if not label_path.exists(): + removed_files += 1 + file_path.unlink() + + print(f"Removed {removed_files} images that do not have a label.") + + def convert_labels_and_timestamps_impl( + self, store_loc_path, labels_path, timestamps_path + ): + logger.info("Loading labels and timestamps.") + store_loc = torch.load(store_loc_path) + labels = torch.load(labels_path) + timestamps = torch.load(timestamps_path) + + assert len(store_loc) == len(labels) + assert len(store_loc) == len(timestamps) + + warned_once = False + + logger.info("Labels and timestamps loaded, applying") + missing_files = 0 + for store_location, label, timestamp in tqdm( + zip(store_loc, labels, timestamps), total=len(store_loc) + ): + path = pathlib.Path( + self.directory + "/" + + store_location.strip().replace("\n", "") + ) + + if not path.exists(): + if not self.test_mode: + raise FileExistsError(f"Cannot find file {path}") + if not warned_once: + logger.warning( + f"Cannot find file {path}, but we are in test mode. Will not repeat this warning." + ) + warned_once = True + missing_files += 1 + continue + + label_path = pathlib.Path(path.parent / f"{path.stem}.label") + with open(label_path, "w+", encoding="utf-8") as file: + file.write(str(int(label))) + + # Note: The timestamps obtained in the hd5 file are (very likely) seconds since 2004 (1072911600 GMT timestamp) + actual_timestamp = timestamp + 1072911600 + self.max_timestamp = max(self.max_timestamp, actual_timestamp) + + self.example_label_path = label_path + self.example_path = path + + os.utime(path, (actual_timestamp, actual_timestamp)) + + logger.info(f"missing files for {store_loc_path} = {missing_files}/{len(store_loc)}") + + def add_dummy_year(self): + dummy_path = pathlib.Path(self.directory + "/dummy.jpg") + dummy_label_path = pathlib.Path(self.directory + "/dummy.label") + + assert not dummy_path.exists() and not dummy_label_path.exists() + + copy(self.example_path, dummy_path) + copy(self.example_label_path, dummy_label_path) + # Two years in the future + dummy_timestamp = self.max_timestamp + (DAY_LENGTH_SECONDS * 365 * 2) + + os.utime(dummy_path, (dummy_timestamp, dummy_timestamp)) + + def download_directory_from_gcloud(self, prefix): + bucket_name = "cl-datasets" + dl_dir = pathlib.Path(self.tmpdir) + storage_client = storage.Client.create_anonymous_client() + bucket = storage_client.bucket(bucket_name=bucket_name) + blobs = bucket.list_blobs(prefix=prefix) # Get list of files + first_zip_downloaded = False + blobs_to_download = [] + + for blob in blobs: + if blob.name.endswith("/"): + continue + if blob.name.endswith("zip"): + if first_zip_downloaded and self.test_mode: + continue + else: + first_zip_downloaded = True + + file_split = blob.name.split("/") + directory = "/".join(file_split[0:-1]) + pathlib.Path(dl_dir / directory).mkdir(parents=True, exist_ok=True) + target = dl_dir / blob.name + + if not target.exists(): + blobs_to_download.append((dl_dir / blob.name, blob)) + else: + print(f"Skipping {target} as it already exists") + + with ThreadPoolExecutor(max_workers=16) as executor, tqdm(total=len(blobs_to_download)) as pbar: + futures_list = [] + download_blob = lambda target, blob: blob.download_to_filename(target) + + for blob in blobs_to_download: + future = executor.submit(download_blob, *blob) + future.add_done_callback(lambda p: pbar.update(1)) + futures_list.append(future) + + # Wait for all tasks to complete + for future in futures_list: + future.result() + + def download_dataset(self): + """ + Download the order files from Google Cloud Storage. + """ + print("Order files are being downloaded...") + start_time = time.time() + + self.download_directory_from_gcloud(self.dataset) + + elapsed_time = time.time() - start_time + print("Elapsed time:", elapsed_time) + + def unzip_data_files(self, zip_file_directory: str) -> None: + """ + Extracts the contents of zip files in a directory into nested folders. + + Args: + directory: The path to the directory containing the zip files. + + Returns: + None + """ + + zip_files = [file for file in os.listdir(zip_file_directory) if file.endswith(".zip")] + + with ProcessPoolExecutor(max_workers=96) as executor, tqdm(total=len(zip_files)) as pbar: + futures_list = [] + for zip_file in zip_files: + future = executor.submit(extract_single_zip, zip_file_directory, self.directory, zip_file) + future.add_done_callback(lambda p: pbar.update(1)) + futures_list.append(future) + + # Wait for all tasks to complete + for future in futures_list: + future.result() + + if not self.keep_zips: + # Remove zip files + remove_command = f"rm {self.tmpdir}/{self.dataset}/data/*.zip" + os.system(remove_command) + + +if __name__ == "__main__": + main() diff --git a/benchmark/cloc/example_pipeline.yml b/benchmark/cloc/example_pipeline.yml new file mode 100644 index 000000000..65d103e53 --- /dev/null +++ b/benchmark/cloc/example_pipeline.yml @@ -0,0 +1,58 @@ +pipeline: + name: CLOC Pipeline + version: 1.0.0 +model: + id: ResNet50 + config: + num_classes: 713 +model_storage: + full_model_strategy: + name: "PyTorchFullModel" +training: + gpus: 1 + device: "cuda:0" + dataloader_workers: 8 + num_prefetched_partitions: 4 + parallel_prefetch_requests: 4 + use_previous_model: True + initial_model: random + initial_pass: + activated: False + batch_size: 256 + optimizers: + - name: "default" + algorithm: "SGD" + source: "PyTorch" + param_groups: + - module: "model" + config: + lr: 0.025 + weight_decay: 0.0001 + momentum: 0.9 + optimization_criterion: + name: "CrossEntropyLoss" + checkpointing: + activated: False + selection_strategy: + name: NewDataStrategy + # With that setting, there is most often only 1 partition + maximum_keys_in_memory: 1000000 + config: + storage_backend: "database" + limit: -1 + reset_after_trigger: True +data: + dataset_id: cloc + transformations: ["transforms.RandomResizedCrop(224)", + "transforms.RandomHorizontalFlip()", + "transforms.ToTensor()", + "transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])"] + bytes_parser_function: | + from PIL import Image + import io + def bytes_parser_function(data: bytes) -> Image: + return Image.open(io.BytesIO(data)).convert("RGB") +trigger: + id: TimeTrigger + trigger_config: + trigger_every: "52w"