Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate CLOC Benchmark #313

Merged
merged 26 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0acb463
start working on CLOC benchmark
MaxiBoether Oct 13, 2023
bd51ed8
progress
MaxiBoether Oct 16, 2023
0cb455b
work
MaxiBoether Oct 16, 2023
9ba980d
formatting
MaxiBoether Oct 16, 2023
416f8f6
all flag
MaxiBoether Oct 16, 2023
967bc0a
Merge remote-tracking branch 'origin/main' into feature/MaxiBoether/cloc
MaxiBoether Oct 16, 2023
1ff03aa
add readme
MaxiBoether Oct 16, 2023
1b3bb74
add example pipeline (yet to be tested)
MaxiBoether Oct 17, 2023
ba33727
parallel download
MaxiBoether Oct 18, 2023
e199e89
unnecessary print
MaxiBoether Oct 18, 2023
3338ea0
add skip unzip flag
MaxiBoether Oct 19, 2023
45ad57a
use processes for downloading
MaxiBoether Oct 19, 2023
8c554f4
move function to root
MaxiBoether Oct 19, 2023
a39bde9
add try catch
MaxiBoether Oct 19, 2023
1ba7a0a
Merge branch 'main' into feature/MaxiBoether/cloc
MaxiBoether Oct 26, 2023
fa52e91
try using tmpdir
MaxiBoether Oct 26, 2023
222ec9f
more workers
MaxiBoether Oct 26, 2023
7658a36
small fix
MaxiBoether Oct 31, 2023
6db5f22
another fix
MaxiBoether Oct 31, 2023
49b6081
fix not so optimal naming
MaxiBoether Oct 31, 2023
a04afc8
Merge branch 'main' into feature/MaxiBoether/cloc
MaxiBoether Oct 31, 2023
740bd15
port some cloc fixes
MaxiBoether Nov 15, 2023
6c26ccb
Merge branch 'main' into feature/MaxiBoether/cloc
MaxiBoether Nov 15, 2023
a424a82
Merge branch 'main' into feature/MaxiBoether/cloc
MaxiBoether Nov 17, 2023
4466938
add working cloc pipeline
MaxiBoether Nov 17, 2023
22bed86
Merge branch 'main' into feature/MaxiBoether/cloc
MaxiBoether Nov 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmark/cloc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cloc_data/
38 changes: 38 additions & 0 deletions benchmark/cloc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# CLOC Data

In this directory, you can find the files necessary to run experiments with the CLOC dataset.
The dataset was introduced [in this paper](https://arxiv.org/pdf/2108.09020.pdf), and we make use of the [CLDatasets](https://github.com/hammoudhasan/CLDatasets) mirror.

## Data Generation
To run the downloading script you need to install the `google-cloud-storage` package.
Then, you can use the `data_generation.py` script to download the data and set the timestamps accordingly.
Use the `-h` flag to find out more.

## License

The CLOC Dataset comes with the MIT License.
The CLDatasets repository does not have an explicit license but allows to "adapt the code to suit your specific requirements".

### CLOC License Copy

MIT License

Copyright (c) 2021 Intel Labs

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
333 changes: 333 additions & 0 deletions benchmark/cloc/data_generation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
# This code is partially adapted from https://github.com/hammoudhasan/CLDatasets/blob/main/src/downloader.py which is provided license-free
# We thank the authors for their work and hosting the data.
# This code requires the pip google-cloud-storage package

import argparse
import glob
import logging
import os
import pathlib
import time
import zipfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from shutil import copy, which

import torch
from google.cloud import storage
from tqdm import tqdm

logging.basicConfig(
level=logging.NOTSET,
format="[%(asctime)s] [%(filename)15s:%(lineno)4d] %(levelname)-8s %(message)s",
datefmt="%Y-%m-%d:%H:%M:%S",
)
logger = logging.getLogger(__name__)

DAY_LENGTH_SECONDS = 24 * 60 * 60

def extract_single_zip(directory: str, target: str, zip_file: str) -> None:
zip_path = os.path.join(directory, zip_file)
output_dir = os.path.join(target, os.path.splitext(zip_file)[0])

os.makedirs(output_dir, exist_ok=True)

try:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(output_dir)
except Exception as e:
logger.error(f"Error while extracing file {zip_path}")
logger.error(e)

def setup_argparser() -> argparse.ArgumentParser:
parser_ = argparse.ArgumentParser(description=f"CLOC Benchmark Storage Script")
parser_.add_argument(
"dir", type=pathlib.Path, action="store", help="Path to data directory"
)
parser_.add_argument(
"--dummyyear",
action="store_true",
help="Add a final dummy year to train also on the last trigger in Modyn",
)
parser_.add_argument(
"--all",
action="store_true",
help="Store all the available data, including the validation and test sets.",
)
parser_.add_argument(
"--test",
action="store_true",
help="Enable test mode (just download one zip file)",
)
parser_.add_argument(
"--skip_download",
action="store_true",
help="Skips the download and only (re)creates labels and timestamps.",
)
parser_.add_argument(
"--skip_unzip",
action="store_true",
help="Skips the unzipping and only (re)creates labels and timestamps.",
)
parser_.add_argument(
"--skip_labels",
action="store_true",
help="Skips the labeling",
)
parser_.add_argument(
"--tmpdir", type=pathlib.Path, action="store", help="If given, use a different directory for storing temporary data"
)
parser_.add_argument(
"--keep_zips",
action="store_true",
help="Keep the downloaded zipfiles.",
)
return parser_


def main():
parser = setup_argparser()
args = parser.parse_args()

tmpdir = args.tmpdir if hasattr(args, "tmpdir") else args.dir

logger.info(f"Final destination is {args.dir}; download destination is {tmpdir}")

downloader = CLDatasets(str(args.dir), str(tmpdir), test_mode=args.test, keep_zips = args.keep_zips)
if not args.skip_download:
logger.info("Starting download")
downloader.download_dataset()

if not args.skip_unzip:
logger.info("Starting extraction")
downloader.extract()

if not args.skip_labels:
logger.info("Starting labeling")
downloader.convert_labels_and_timestamps(args.all)

downloader.remove_images_without_label()

if args.dummyyear:
downloader.add_dummy_year()

logger.info("Done.")


def is_tool(name):
"""Check whether `name` is on PATH and marked as executable."""
return which(name) is not None


class CLDatasets:
"""
A class for downloading datasets from Google Cloud Storage.
"""

def __init__(self, directory: str, tmpdir: str, test_mode: bool = False, unzip: bool = True, keep_zips: bool = False):
"""
Initialize the CLDatasets object.

Args:
directory (str): The directory where the dataset will be saved.
"""

self.dataset = "CLOC"
self.directory = directory
self.tmpdir = tmpdir
self.unzip = unzip
self.test_mode = test_mode
self.keep_zips = keep_zips
self.max_timestamp = 0
self.example_path = ""
self.example_label_path = ""

if not os.path.exists(self.directory):
os.makedirs(self.directory)

if not os.path.exists(self.tmpdir):
os.makedirs(self.tmpdir)


def extract(self):
if self.unzip:
self.unzip_data_files(self.tmpdir + "/CLOC/data")

def convert_labels_and_timestamps(self, all_data: bool):
self.convert_labels_and_timestamps_impl(
self.tmpdir + "/CLOC_torchsave_order_files/train_store_loc.torchSave",
self.tmpdir + "/CLOC_torchsave_order_files/train_labels.torchSave",
self.tmpdir + "/CLOC_torchsave_order_files/train_time.torchSave",
)

if all_data:
logger.info("Converting all data")
self.convert_labels_and_timestamps_impl(
self.tmpdir
+ "/CLOC_torchsave_order_files/cross_val_store_loc.torchSave",
self.tmpdir + "/CLOC_torchsave_order_files/cross_val_labels.torchSave",
self.tmpdir + "/CLOC_torchsave_order_files/cross_val_time.torchSave",
)


def remove_images_without_label(self):
print("Removing images without label...")
removed_files = 0

image_paths = pathlib.Path(self.directory).glob("**/*.jpg")
for filename in tqdm(image_paths):
file_path = pathlib.Path(filename)
label_path = pathlib.Path(file_path.parent / f"{file_path.stem}.label")

if not label_path.exists():
removed_files += 1
file_path.unlink()

print(f"Removed {removed_files} images that do not have a label.")

def convert_labels_and_timestamps_impl(
self, store_loc_path, labels_path, timestamps_path
):
logger.info("Loading labels and timestamps.")
store_loc = torch.load(store_loc_path)
labels = torch.load(labels_path)
timestamps = torch.load(timestamps_path)

assert len(store_loc) == len(labels)
assert len(store_loc) == len(timestamps)

warned_once = False

logger.info("Labels and timestamps loaded, applying")
missing_files = 0
for store_location, label, timestamp in tqdm(
zip(store_loc, labels, timestamps), total=len(store_loc)
):
path = pathlib.Path(
self.directory + "/"
+ store_location.strip().replace("\n", "")
)

if not path.exists():
if not self.test_mode:
raise FileExistsError(f"Cannot find file {path}")
if not warned_once:
logger.warning(
f"Cannot find file {path}, but we are in test mode. Will not repeat this warning."
)
warned_once = True
missing_files += 1
continue

label_path = pathlib.Path(path.parent / f"{path.stem}.label")
with open(label_path, "w+", encoding="utf-8") as file:
file.write(str(int(label)))

# Note: The timestamps obtained in the hd5 file are (very likely) seconds since 2004 (1072911600 GMT timestamp)
actual_timestamp = timestamp + 1072911600
self.max_timestamp = max(self.max_timestamp, actual_timestamp)

self.example_label_path = label_path
self.example_path = path

os.utime(path, (actual_timestamp, actual_timestamp))

logger.info(f"missing files for {store_loc_path} = {missing_files}/{len(store_loc)}")

def add_dummy_year(self):
dummy_path = pathlib.Path(self.directory + "/dummy.jpg")
dummy_label_path = pathlib.Path(self.directory + "/dummy.label")

assert not dummy_path.exists() and not dummy_label_path.exists()

copy(self.example_path, dummy_path)
copy(self.example_label_path, dummy_label_path)
# Two years in the future
dummy_timestamp = self.max_timestamp + (DAY_LENGTH_SECONDS * 365 * 2)

os.utime(dummy_path, (dummy_timestamp, dummy_timestamp))

def download_directory_from_gcloud(self, prefix):
bucket_name = "cl-datasets"
dl_dir = pathlib.Path(self.tmpdir)
storage_client = storage.Client.create_anonymous_client()
bucket = storage_client.bucket(bucket_name=bucket_name)
blobs = bucket.list_blobs(prefix=prefix) # Get list of files
first_zip_downloaded = False
blobs_to_download = []

for blob in blobs:
if blob.name.endswith("/"):
continue
if blob.name.endswith("zip"):
if first_zip_downloaded and self.test_mode:
continue
else:
first_zip_downloaded = True

file_split = blob.name.split("/")
directory = "/".join(file_split[0:-1])
pathlib.Path(dl_dir / directory).mkdir(parents=True, exist_ok=True)
target = dl_dir / blob.name

if not target.exists():
blobs_to_download.append((dl_dir / blob.name, blob))
else:
print(f"Skipping {target} as it already exists")

with ThreadPoolExecutor(max_workers=16) as executor, tqdm(total=len(blobs_to_download)) as pbar:
futures_list = []
download_blob = lambda target, blob: blob.download_to_filename(target)

for blob in blobs_to_download:
future = executor.submit(download_blob, *blob)
future.add_done_callback(lambda p: pbar.update(1))
futures_list.append(future)

# Wait for all tasks to complete
for future in futures_list:
future.result()

def download_dataset(self):
"""
Download the order files from Google Cloud Storage.
"""
print("Order files are being downloaded...")
start_time = time.time()

self.download_directory_from_gcloud(self.dataset)

elapsed_time = time.time() - start_time
print("Elapsed time:", elapsed_time)

def unzip_data_files(self, zip_file_directory: str) -> None:
"""
Extracts the contents of zip files in a directory into nested folders.

Args:
directory: The path to the directory containing the zip files.

Returns:
None
"""

zip_files = [file for file in os.listdir(zip_file_directory) if file.endswith(".zip")]

with ProcessPoolExecutor(max_workers=96) as executor, tqdm(total=len(zip_files)) as pbar:
futures_list = []
for zip_file in zip_files:
future = executor.submit(extract_single_zip, zip_file_directory, self.directory, zip_file)
future.add_done_callback(lambda p: pbar.update(1))
futures_list.append(future)

# Wait for all tasks to complete
for future in futures_list:
future.result()

if not self.keep_zips:
# Remove zip files
remove_command = f"rm {self.tmpdir}/{self.dataset}/data/*.zip"
os.system(remove_command)


if __name__ == "__main__":
main()
Loading
Loading