From eb90683a48ef5ed4edd3f859582e796faf4c523c Mon Sep 17 00:00:00 2001 From: jal347 Date: Fri, 27 Sep 2024 14:58:10 -0700 Subject: [PATCH 1/4] test chembl dumper --- src/hub/dataload/sources/chembl/__init__.py | 2 + src/hub/dataload/sources/chembl/dump.py | 218 ++++++++++++++++++++ src/hub/dataload/sources/chembl/upload.py | 5 + 3 files changed, 225 insertions(+) create mode 100644 src/hub/dataload/sources/chembl/__init__.py create mode 100644 src/hub/dataload/sources/chembl/dump.py create mode 100644 src/hub/dataload/sources/chembl/upload.py diff --git a/src/hub/dataload/sources/chembl/__init__.py b/src/hub/dataload/sources/chembl/__init__.py new file mode 100644 index 0000000..15ef098 --- /dev/null +++ b/src/hub/dataload/sources/chembl/__init__.py @@ -0,0 +1,2 @@ +from .dump import ChemblDumper +from .upload import ChemblUploader diff --git a/src/hub/dataload/sources/chembl/dump.py b/src/hub/dataload/sources/chembl/dump.py new file mode 100644 index 0000000..1c97235 --- /dev/null +++ b/src/hub/dataload/sources/chembl/dump.py @@ -0,0 +1,218 @@ +import glob +import itertools +import json +import os +import os.path + +import biothings + +import config + +biothings.config_for_app(config) + +from biothings.hub.dataload.dumper import HTTPDumper +from biothings.utils.common import iter_n + +from config import DATA_ARCHIVE_ROOT + + +class ChemblDumper(HTTPDumper): + + SRC_NAME = "chembl" + SRC_ROOT_FOLDER = os.path.join(DATA_ARCHIVE_ROOT, SRC_NAME) + + SRC_VERSION_URL = "https://www.ebi.ac.uk/chembl/api/data/status.json" + + """ + As the code is written, we have: + - 13,382 "target" json objects + """ + SRC_DATA_URLS = { + # Used to join with `mechanism` by `target_chembl_id` + "target": "https://www.ebi.ac.uk/chembl/api/data/target.json", + } + + # SCHEDULE = "0 12 * * *" + SLEEP_BETWEEN_DOWNLOAD = 0.1 + MAX_PARALLEL_DUMP = 5 + + # number of documents in each download job, i.e. number of documents in each .part* file + TO_DUMP_DOWNLOAD_SIZE = 1000 + # number of .part* files to be merged together after download + POST_DUMP_MERGE_SIZE = 100 + + def get_total_count_of_documents(self, src_data_name): + """ + Get the total count of documents from the first page of the url specified by `src_data_name`. + `total_count` is a member of the `page_meta` member of the root json object. + + Args: + src_data_name (str): must be a key to self.__class__.SRC_DATA_URLS + + Returns: + int: the total count of documents + """ + if src_data_name not in self.__class__.SRC_DATA_URLS: + raise KeyError( + "Cannot recognize src_data_name={}. Must be one of {{{}}}".format( + src_data_name, ", ".join(self.__class__.SRC_DATA_URLS.keys()) + ) + ) + + data = self.load_json_from_file(self.__class__.SRC_DATA_URLS[src_data_name]) + return data["page_meta"]["total_count"] + + def load_json_from_file(self, file) -> dict: + """ + Read the content of `file` and return the json object + + Args: + file (str): could either be an URL ("remotefile") or a path to a local text file ("localfile") + + Returns: + object: the json object read from the `file` + """ + + """ + Note that: + + - `json.loads(string)` deserializes string + - `json.load(file)` deserializes a file object + """ + if file.startswith("http://") or file.startswith("https://"): # file is an URL + data = json.loads(self.client.get(file).text) + else: # file is a local path + data = json.load(open(file)) + + return data + + def remote_is_better(self, remotefile, localfile): + remote_data = self.load_json_from_file(remotefile) + assert "chembl_db_version" in remote_data + assert remote_data["status"] == "UP" # API is working correctly + self.release = remote_data["chembl_db_version"] + + if localfile is None: + # ok we have the release, we can't compare further so we need to download + return True + + local_data = self.load_json_from_file(localfile) + self.logger.info( + "ChEMBL DB version: remote=={}, local=={}".format( + remote_data["chembl_db_version"], local_data["chembl_db_version"] + ) + ) + + # comparing strings should work since it's formatted as "ChEMBL_xxx" + if remote_data["chembl_db_version"] > local_data["chembl_db_version"]: + return True + else: + return False + + def create_todump_list(self, force=False, **kwargs): + version_filename = os.path.basename(self.__class__.SRC_VERSION_URL) + try: + current_localfile = os.path.join(self.current_data_folder, version_filename) + if not os.path.exists(current_localfile): + current_localfile = None + except TypeError: + # current data folder doesn't even exist + current_localfile = None + + remote_better = self.remote_is_better( + self.__class__.SRC_VERSION_URL, current_localfile + ) + self.logger.info( + "ChEMBL Dump: force=={}, current_localfile=={}, remote_better=={}".format( + force, current_localfile, remote_better + ) + ) + + if force or current_localfile is None or remote_better: + new_localfile = os.path.join(self.new_data_folder, version_filename) + self.to_dump.append( + {"remote": self.__class__.SRC_VERSION_URL, "local": new_localfile} + ) + + """ + Now we need to scroll the API endpoints. Let's get the total number of records + and generate URLs for each call to parallelize the downloads for each type of source data, + i.e. "molecule", "mechanism", "drug_indication", "target" and "binding_site". + + The partition size is set to 1000 json objects (represented by `TO_DUMP_DOWNLOAD_SIZE`). + + E.g. suppose for "molecule" data we have a `total_count` of 2500 json objects, and then we'll have, + in the process of iteration: + + - (part_index, part_start) = (0, 0) + - (part_index, part_start) = (1, 1000) + - (part_index, part_start) = (2, 2000) + + Therefore we would download 3 files, i.e. "molecule.part0", "molecule.part1", and "molecule.part2". + """ + part_size = self.__class__.TO_DUMP_DOWNLOAD_SIZE + for src_data_name in self.__class__.SRC_DATA_URLS: + total_count = self.get_total_count_of_documents(src_data_name) + for part_index, part_start in enumerate( + range(0, total_count, part_size) + ): + remote = ( + self.__class__.SRC_DATA_URLS[src_data_name] + + "?limit=" + + str(part_size) + + "&offset=" + + str(part_start) + ) + local = os.path.join( + self.new_data_folder, + "{}.part{}".format(src_data_name, part_index), + ) + self.to_dump.append({"remote": remote, "local": local}) + + def post_dump(self, *args, **kwargs): + """ + In the post-dump phase, for each type of source data, we merge each chunk of 100 .part* files + into one .*.json file. (This way we won't have a small number of huge files nor a pile of small files.) + + E.g. as the code is written, we have 1,961,462 "molecule" json objects. + Therefore we would download 1,962 files, i.e. "molecule.part0", ..., "molecule.part1961". + For each chunk of 100 such files, e.g. "molecule.part0", ..., "molecule.part99", we merge them into one + json file, e.g. "molecule.100.json". + + We'll also remove metadata (useless now) + """ + self.logger.info("Merging JSON documents in '%s'" % self.new_data_folder) + + chunk_size = self.__class__.POST_DUMP_MERGE_SIZE + for src_data_name in self.__class__.SRC_DATA_URLS: + part_files = glob.iglob( + os.path.join(self.new_data_folder, "{}.part*".format(src_data_name)) + ) + for chunk, cnt in iter_n(part_files, chunk_size, with_cnt=True): + outfile = os.path.join( + self.new_data_folder, "{}.{}.json".format(src_data_name, cnt) + ) + + """ + For each "molecule" json object, we only fetch the value associated with the "molecules" key. + This rule also applies to "mechanism", "drug_indication", "target" and "binding_site" + json objects. + """ + data_key = src_data_name + "s" + merged_value = itertools.chain.from_iterable( + self.load_json_from_file(f)[data_key] for f in chunk + ) + merged_data = {data_key: list(merged_value)} + + json.dump(merged_data, open(outfile, "w")) + self.logger.info("Merged %s %s files" % (src_data_name, cnt)) + + # now we can delete the part files + self.logger.info("Deleting part files") + part_files = glob.iglob( + os.path.join(self.new_data_folder, "{}.part*".format(src_data_name)) + ) + for f in part_files: + os.remove(f) + + self.logger.info("Post-dump merge done") diff --git a/src/hub/dataload/sources/chembl/upload.py b/src/hub/dataload/sources/chembl/upload.py new file mode 100644 index 0000000..5921721 --- /dev/null +++ b/src/hub/dataload/sources/chembl/upload.py @@ -0,0 +1,5 @@ +import biothings.hub.dataload.uploader as uploader + + +class ChemblUploader(uploader.DummySourceUploader): + name = "chembl" From 3e78548da6cfa4202fb25d94f1f4303df9cef49b Mon Sep 17 00:00:00 2001 From: jal347 Date: Fri, 27 Sep 2024 15:06:53 -0700 Subject: [PATCH 2/4] updated mapping --- src/hub/dataload/sources/chembl/upload.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/hub/dataload/sources/chembl/upload.py b/src/hub/dataload/sources/chembl/upload.py index 5921721..b1c980f 100644 --- a/src/hub/dataload/sources/chembl/upload.py +++ b/src/hub/dataload/sources/chembl/upload.py @@ -3,3 +3,13 @@ class ChemblUploader(uploader.DummySourceUploader): name = "chembl" + + @classmethod + def get_mapping(klass): + mapping = { + "chembl_target": { + "type": "keyword", + "normalizer": "keyword_lowercase_normalizer", + } + } + return mapping From c5cf6402a1d259e6b3af7d5b4b7333922fa04aa1 Mon Sep 17 00:00:00 2001 From: jal347 Date: Fri, 27 Sep 2024 15:19:07 -0700 Subject: [PATCH 3/4] test dummydumper to get source to register --- src/hub/dataload/sources/chembl/dump.py | 212 +------------------ src/hub/dataload/sources/chembl/old_dump.py | 218 ++++++++++++++++++++ 2 files changed, 220 insertions(+), 210 deletions(-) create mode 100644 src/hub/dataload/sources/chembl/old_dump.py diff --git a/src/hub/dataload/sources/chembl/dump.py b/src/hub/dataload/sources/chembl/dump.py index 1c97235..a10f06e 100644 --- a/src/hub/dataload/sources/chembl/dump.py +++ b/src/hub/dataload/sources/chembl/dump.py @@ -1,218 +1,10 @@ -import glob -import itertools -import json -import os -import os.path - import biothings import config biothings.config_for_app(config) - -from biothings.hub.dataload.dumper import HTTPDumper -from biothings.utils.common import iter_n - -from config import DATA_ARCHIVE_ROOT +import biothings.hub.dataload.dumper as dumper -class ChemblDumper(HTTPDumper): - +class ChemblDumper(dumper.DummyDumper): SRC_NAME = "chembl" - SRC_ROOT_FOLDER = os.path.join(DATA_ARCHIVE_ROOT, SRC_NAME) - - SRC_VERSION_URL = "https://www.ebi.ac.uk/chembl/api/data/status.json" - - """ - As the code is written, we have: - - 13,382 "target" json objects - """ - SRC_DATA_URLS = { - # Used to join with `mechanism` by `target_chembl_id` - "target": "https://www.ebi.ac.uk/chembl/api/data/target.json", - } - - # SCHEDULE = "0 12 * * *" - SLEEP_BETWEEN_DOWNLOAD = 0.1 - MAX_PARALLEL_DUMP = 5 - - # number of documents in each download job, i.e. number of documents in each .part* file - TO_DUMP_DOWNLOAD_SIZE = 1000 - # number of .part* files to be merged together after download - POST_DUMP_MERGE_SIZE = 100 - - def get_total_count_of_documents(self, src_data_name): - """ - Get the total count of documents from the first page of the url specified by `src_data_name`. - `total_count` is a member of the `page_meta` member of the root json object. - - Args: - src_data_name (str): must be a key to self.__class__.SRC_DATA_URLS - - Returns: - int: the total count of documents - """ - if src_data_name not in self.__class__.SRC_DATA_URLS: - raise KeyError( - "Cannot recognize src_data_name={}. Must be one of {{{}}}".format( - src_data_name, ", ".join(self.__class__.SRC_DATA_URLS.keys()) - ) - ) - - data = self.load_json_from_file(self.__class__.SRC_DATA_URLS[src_data_name]) - return data["page_meta"]["total_count"] - - def load_json_from_file(self, file) -> dict: - """ - Read the content of `file` and return the json object - - Args: - file (str): could either be an URL ("remotefile") or a path to a local text file ("localfile") - - Returns: - object: the json object read from the `file` - """ - - """ - Note that: - - - `json.loads(string)` deserializes string - - `json.load(file)` deserializes a file object - """ - if file.startswith("http://") or file.startswith("https://"): # file is an URL - data = json.loads(self.client.get(file).text) - else: # file is a local path - data = json.load(open(file)) - - return data - - def remote_is_better(self, remotefile, localfile): - remote_data = self.load_json_from_file(remotefile) - assert "chembl_db_version" in remote_data - assert remote_data["status"] == "UP" # API is working correctly - self.release = remote_data["chembl_db_version"] - - if localfile is None: - # ok we have the release, we can't compare further so we need to download - return True - - local_data = self.load_json_from_file(localfile) - self.logger.info( - "ChEMBL DB version: remote=={}, local=={}".format( - remote_data["chembl_db_version"], local_data["chembl_db_version"] - ) - ) - - # comparing strings should work since it's formatted as "ChEMBL_xxx" - if remote_data["chembl_db_version"] > local_data["chembl_db_version"]: - return True - else: - return False - - def create_todump_list(self, force=False, **kwargs): - version_filename = os.path.basename(self.__class__.SRC_VERSION_URL) - try: - current_localfile = os.path.join(self.current_data_folder, version_filename) - if not os.path.exists(current_localfile): - current_localfile = None - except TypeError: - # current data folder doesn't even exist - current_localfile = None - - remote_better = self.remote_is_better( - self.__class__.SRC_VERSION_URL, current_localfile - ) - self.logger.info( - "ChEMBL Dump: force=={}, current_localfile=={}, remote_better=={}".format( - force, current_localfile, remote_better - ) - ) - - if force or current_localfile is None or remote_better: - new_localfile = os.path.join(self.new_data_folder, version_filename) - self.to_dump.append( - {"remote": self.__class__.SRC_VERSION_URL, "local": new_localfile} - ) - - """ - Now we need to scroll the API endpoints. Let's get the total number of records - and generate URLs for each call to parallelize the downloads for each type of source data, - i.e. "molecule", "mechanism", "drug_indication", "target" and "binding_site". - - The partition size is set to 1000 json objects (represented by `TO_DUMP_DOWNLOAD_SIZE`). - - E.g. suppose for "molecule" data we have a `total_count` of 2500 json objects, and then we'll have, - in the process of iteration: - - - (part_index, part_start) = (0, 0) - - (part_index, part_start) = (1, 1000) - - (part_index, part_start) = (2, 2000) - - Therefore we would download 3 files, i.e. "molecule.part0", "molecule.part1", and "molecule.part2". - """ - part_size = self.__class__.TO_DUMP_DOWNLOAD_SIZE - for src_data_name in self.__class__.SRC_DATA_URLS: - total_count = self.get_total_count_of_documents(src_data_name) - for part_index, part_start in enumerate( - range(0, total_count, part_size) - ): - remote = ( - self.__class__.SRC_DATA_URLS[src_data_name] - + "?limit=" - + str(part_size) - + "&offset=" - + str(part_start) - ) - local = os.path.join( - self.new_data_folder, - "{}.part{}".format(src_data_name, part_index), - ) - self.to_dump.append({"remote": remote, "local": local}) - - def post_dump(self, *args, **kwargs): - """ - In the post-dump phase, for each type of source data, we merge each chunk of 100 .part* files - into one .*.json file. (This way we won't have a small number of huge files nor a pile of small files.) - - E.g. as the code is written, we have 1,961,462 "molecule" json objects. - Therefore we would download 1,962 files, i.e. "molecule.part0", ..., "molecule.part1961". - For each chunk of 100 such files, e.g. "molecule.part0", ..., "molecule.part99", we merge them into one - json file, e.g. "molecule.100.json". - - We'll also remove metadata (useless now) - """ - self.logger.info("Merging JSON documents in '%s'" % self.new_data_folder) - - chunk_size = self.__class__.POST_DUMP_MERGE_SIZE - for src_data_name in self.__class__.SRC_DATA_URLS: - part_files = glob.iglob( - os.path.join(self.new_data_folder, "{}.part*".format(src_data_name)) - ) - for chunk, cnt in iter_n(part_files, chunk_size, with_cnt=True): - outfile = os.path.join( - self.new_data_folder, "{}.{}.json".format(src_data_name, cnt) - ) - - """ - For each "molecule" json object, we only fetch the value associated with the "molecules" key. - This rule also applies to "mechanism", "drug_indication", "target" and "binding_site" - json objects. - """ - data_key = src_data_name + "s" - merged_value = itertools.chain.from_iterable( - self.load_json_from_file(f)[data_key] for f in chunk - ) - merged_data = {data_key: list(merged_value)} - - json.dump(merged_data, open(outfile, "w")) - self.logger.info("Merged %s %s files" % (src_data_name, cnt)) - - # now we can delete the part files - self.logger.info("Deleting part files") - part_files = glob.iglob( - os.path.join(self.new_data_folder, "{}.part*".format(src_data_name)) - ) - for f in part_files: - os.remove(f) - - self.logger.info("Post-dump merge done") diff --git a/src/hub/dataload/sources/chembl/old_dump.py b/src/hub/dataload/sources/chembl/old_dump.py new file mode 100644 index 0000000..1c97235 --- /dev/null +++ b/src/hub/dataload/sources/chembl/old_dump.py @@ -0,0 +1,218 @@ +import glob +import itertools +import json +import os +import os.path + +import biothings + +import config + +biothings.config_for_app(config) + +from biothings.hub.dataload.dumper import HTTPDumper +from biothings.utils.common import iter_n + +from config import DATA_ARCHIVE_ROOT + + +class ChemblDumper(HTTPDumper): + + SRC_NAME = "chembl" + SRC_ROOT_FOLDER = os.path.join(DATA_ARCHIVE_ROOT, SRC_NAME) + + SRC_VERSION_URL = "https://www.ebi.ac.uk/chembl/api/data/status.json" + + """ + As the code is written, we have: + - 13,382 "target" json objects + """ + SRC_DATA_URLS = { + # Used to join with `mechanism` by `target_chembl_id` + "target": "https://www.ebi.ac.uk/chembl/api/data/target.json", + } + + # SCHEDULE = "0 12 * * *" + SLEEP_BETWEEN_DOWNLOAD = 0.1 + MAX_PARALLEL_DUMP = 5 + + # number of documents in each download job, i.e. number of documents in each .part* file + TO_DUMP_DOWNLOAD_SIZE = 1000 + # number of .part* files to be merged together after download + POST_DUMP_MERGE_SIZE = 100 + + def get_total_count_of_documents(self, src_data_name): + """ + Get the total count of documents from the first page of the url specified by `src_data_name`. + `total_count` is a member of the `page_meta` member of the root json object. + + Args: + src_data_name (str): must be a key to self.__class__.SRC_DATA_URLS + + Returns: + int: the total count of documents + """ + if src_data_name not in self.__class__.SRC_DATA_URLS: + raise KeyError( + "Cannot recognize src_data_name={}. Must be one of {{{}}}".format( + src_data_name, ", ".join(self.__class__.SRC_DATA_URLS.keys()) + ) + ) + + data = self.load_json_from_file(self.__class__.SRC_DATA_URLS[src_data_name]) + return data["page_meta"]["total_count"] + + def load_json_from_file(self, file) -> dict: + """ + Read the content of `file` and return the json object + + Args: + file (str): could either be an URL ("remotefile") or a path to a local text file ("localfile") + + Returns: + object: the json object read from the `file` + """ + + """ + Note that: + + - `json.loads(string)` deserializes string + - `json.load(file)` deserializes a file object + """ + if file.startswith("http://") or file.startswith("https://"): # file is an URL + data = json.loads(self.client.get(file).text) + else: # file is a local path + data = json.load(open(file)) + + return data + + def remote_is_better(self, remotefile, localfile): + remote_data = self.load_json_from_file(remotefile) + assert "chembl_db_version" in remote_data + assert remote_data["status"] == "UP" # API is working correctly + self.release = remote_data["chembl_db_version"] + + if localfile is None: + # ok we have the release, we can't compare further so we need to download + return True + + local_data = self.load_json_from_file(localfile) + self.logger.info( + "ChEMBL DB version: remote=={}, local=={}".format( + remote_data["chembl_db_version"], local_data["chembl_db_version"] + ) + ) + + # comparing strings should work since it's formatted as "ChEMBL_xxx" + if remote_data["chembl_db_version"] > local_data["chembl_db_version"]: + return True + else: + return False + + def create_todump_list(self, force=False, **kwargs): + version_filename = os.path.basename(self.__class__.SRC_VERSION_URL) + try: + current_localfile = os.path.join(self.current_data_folder, version_filename) + if not os.path.exists(current_localfile): + current_localfile = None + except TypeError: + # current data folder doesn't even exist + current_localfile = None + + remote_better = self.remote_is_better( + self.__class__.SRC_VERSION_URL, current_localfile + ) + self.logger.info( + "ChEMBL Dump: force=={}, current_localfile=={}, remote_better=={}".format( + force, current_localfile, remote_better + ) + ) + + if force or current_localfile is None or remote_better: + new_localfile = os.path.join(self.new_data_folder, version_filename) + self.to_dump.append( + {"remote": self.__class__.SRC_VERSION_URL, "local": new_localfile} + ) + + """ + Now we need to scroll the API endpoints. Let's get the total number of records + and generate URLs for each call to parallelize the downloads for each type of source data, + i.e. "molecule", "mechanism", "drug_indication", "target" and "binding_site". + + The partition size is set to 1000 json objects (represented by `TO_DUMP_DOWNLOAD_SIZE`). + + E.g. suppose for "molecule" data we have a `total_count` of 2500 json objects, and then we'll have, + in the process of iteration: + + - (part_index, part_start) = (0, 0) + - (part_index, part_start) = (1, 1000) + - (part_index, part_start) = (2, 2000) + + Therefore we would download 3 files, i.e. "molecule.part0", "molecule.part1", and "molecule.part2". + """ + part_size = self.__class__.TO_DUMP_DOWNLOAD_SIZE + for src_data_name in self.__class__.SRC_DATA_URLS: + total_count = self.get_total_count_of_documents(src_data_name) + for part_index, part_start in enumerate( + range(0, total_count, part_size) + ): + remote = ( + self.__class__.SRC_DATA_URLS[src_data_name] + + "?limit=" + + str(part_size) + + "&offset=" + + str(part_start) + ) + local = os.path.join( + self.new_data_folder, + "{}.part{}".format(src_data_name, part_index), + ) + self.to_dump.append({"remote": remote, "local": local}) + + def post_dump(self, *args, **kwargs): + """ + In the post-dump phase, for each type of source data, we merge each chunk of 100 .part* files + into one .*.json file. (This way we won't have a small number of huge files nor a pile of small files.) + + E.g. as the code is written, we have 1,961,462 "molecule" json objects. + Therefore we would download 1,962 files, i.e. "molecule.part0", ..., "molecule.part1961". + For each chunk of 100 such files, e.g. "molecule.part0", ..., "molecule.part99", we merge them into one + json file, e.g. "molecule.100.json". + + We'll also remove metadata (useless now) + """ + self.logger.info("Merging JSON documents in '%s'" % self.new_data_folder) + + chunk_size = self.__class__.POST_DUMP_MERGE_SIZE + for src_data_name in self.__class__.SRC_DATA_URLS: + part_files = glob.iglob( + os.path.join(self.new_data_folder, "{}.part*".format(src_data_name)) + ) + for chunk, cnt in iter_n(part_files, chunk_size, with_cnt=True): + outfile = os.path.join( + self.new_data_folder, "{}.{}.json".format(src_data_name, cnt) + ) + + """ + For each "molecule" json object, we only fetch the value associated with the "molecules" key. + This rule also applies to "mechanism", "drug_indication", "target" and "binding_site" + json objects. + """ + data_key = src_data_name + "s" + merged_value = itertools.chain.from_iterable( + self.load_json_from_file(f)[data_key] for f in chunk + ) + merged_data = {data_key: list(merged_value)} + + json.dump(merged_data, open(outfile, "w")) + self.logger.info("Merged %s %s files" % (src_data_name, cnt)) + + # now we can delete the part files + self.logger.info("Deleting part files") + part_files = glob.iglob( + os.path.join(self.new_data_folder, "{}.part*".format(src_data_name)) + ) + for f in part_files: + os.remove(f) + + self.logger.info("Post-dump merge done") From c6082e0e98f5b6a599aff069ca647a1c7cd990db Mon Sep 17 00:00:00 2001 From: jal347 Date: Fri, 27 Sep 2024 15:34:18 -0700 Subject: [PATCH 4/4] reverted code --- src/hub/dataload/sources/chembl/dump.py | 212 ++++++++++++++++++- src/hub/dataload/sources/chembl/old_dump.py | 218 -------------------- 2 files changed, 210 insertions(+), 220 deletions(-) delete mode 100644 src/hub/dataload/sources/chembl/old_dump.py diff --git a/src/hub/dataload/sources/chembl/dump.py b/src/hub/dataload/sources/chembl/dump.py index a10f06e..1c97235 100644 --- a/src/hub/dataload/sources/chembl/dump.py +++ b/src/hub/dataload/sources/chembl/dump.py @@ -1,10 +1,218 @@ +import glob +import itertools +import json +import os +import os.path + import biothings import config biothings.config_for_app(config) -import biothings.hub.dataload.dumper as dumper + +from biothings.hub.dataload.dumper import HTTPDumper +from biothings.utils.common import iter_n + +from config import DATA_ARCHIVE_ROOT -class ChemblDumper(dumper.DummyDumper): +class ChemblDumper(HTTPDumper): + SRC_NAME = "chembl" + SRC_ROOT_FOLDER = os.path.join(DATA_ARCHIVE_ROOT, SRC_NAME) + + SRC_VERSION_URL = "https://www.ebi.ac.uk/chembl/api/data/status.json" + + """ + As the code is written, we have: + - 13,382 "target" json objects + """ + SRC_DATA_URLS = { + # Used to join with `mechanism` by `target_chembl_id` + "target": "https://www.ebi.ac.uk/chembl/api/data/target.json", + } + + # SCHEDULE = "0 12 * * *" + SLEEP_BETWEEN_DOWNLOAD = 0.1 + MAX_PARALLEL_DUMP = 5 + + # number of documents in each download job, i.e. number of documents in each .part* file + TO_DUMP_DOWNLOAD_SIZE = 1000 + # number of .part* files to be merged together after download + POST_DUMP_MERGE_SIZE = 100 + + def get_total_count_of_documents(self, src_data_name): + """ + Get the total count of documents from the first page of the url specified by `src_data_name`. + `total_count` is a member of the `page_meta` member of the root json object. + + Args: + src_data_name (str): must be a key to self.__class__.SRC_DATA_URLS + + Returns: + int: the total count of documents + """ + if src_data_name not in self.__class__.SRC_DATA_URLS: + raise KeyError( + "Cannot recognize src_data_name={}. Must be one of {{{}}}".format( + src_data_name, ", ".join(self.__class__.SRC_DATA_URLS.keys()) + ) + ) + + data = self.load_json_from_file(self.__class__.SRC_DATA_URLS[src_data_name]) + return data["page_meta"]["total_count"] + + def load_json_from_file(self, file) -> dict: + """ + Read the content of `file` and return the json object + + Args: + file (str): could either be an URL ("remotefile") or a path to a local text file ("localfile") + + Returns: + object: the json object read from the `file` + """ + + """ + Note that: + + - `json.loads(string)` deserializes string + - `json.load(file)` deserializes a file object + """ + if file.startswith("http://") or file.startswith("https://"): # file is an URL + data = json.loads(self.client.get(file).text) + else: # file is a local path + data = json.load(open(file)) + + return data + + def remote_is_better(self, remotefile, localfile): + remote_data = self.load_json_from_file(remotefile) + assert "chembl_db_version" in remote_data + assert remote_data["status"] == "UP" # API is working correctly + self.release = remote_data["chembl_db_version"] + + if localfile is None: + # ok we have the release, we can't compare further so we need to download + return True + + local_data = self.load_json_from_file(localfile) + self.logger.info( + "ChEMBL DB version: remote=={}, local=={}".format( + remote_data["chembl_db_version"], local_data["chembl_db_version"] + ) + ) + + # comparing strings should work since it's formatted as "ChEMBL_xxx" + if remote_data["chembl_db_version"] > local_data["chembl_db_version"]: + return True + else: + return False + + def create_todump_list(self, force=False, **kwargs): + version_filename = os.path.basename(self.__class__.SRC_VERSION_URL) + try: + current_localfile = os.path.join(self.current_data_folder, version_filename) + if not os.path.exists(current_localfile): + current_localfile = None + except TypeError: + # current data folder doesn't even exist + current_localfile = None + + remote_better = self.remote_is_better( + self.__class__.SRC_VERSION_URL, current_localfile + ) + self.logger.info( + "ChEMBL Dump: force=={}, current_localfile=={}, remote_better=={}".format( + force, current_localfile, remote_better + ) + ) + + if force or current_localfile is None or remote_better: + new_localfile = os.path.join(self.new_data_folder, version_filename) + self.to_dump.append( + {"remote": self.__class__.SRC_VERSION_URL, "local": new_localfile} + ) + + """ + Now we need to scroll the API endpoints. Let's get the total number of records + and generate URLs for each call to parallelize the downloads for each type of source data, + i.e. "molecule", "mechanism", "drug_indication", "target" and "binding_site". + + The partition size is set to 1000 json objects (represented by `TO_DUMP_DOWNLOAD_SIZE`). + + E.g. suppose for "molecule" data we have a `total_count` of 2500 json objects, and then we'll have, + in the process of iteration: + + - (part_index, part_start) = (0, 0) + - (part_index, part_start) = (1, 1000) + - (part_index, part_start) = (2, 2000) + + Therefore we would download 3 files, i.e. "molecule.part0", "molecule.part1", and "molecule.part2". + """ + part_size = self.__class__.TO_DUMP_DOWNLOAD_SIZE + for src_data_name in self.__class__.SRC_DATA_URLS: + total_count = self.get_total_count_of_documents(src_data_name) + for part_index, part_start in enumerate( + range(0, total_count, part_size) + ): + remote = ( + self.__class__.SRC_DATA_URLS[src_data_name] + + "?limit=" + + str(part_size) + + "&offset=" + + str(part_start) + ) + local = os.path.join( + self.new_data_folder, + "{}.part{}".format(src_data_name, part_index), + ) + self.to_dump.append({"remote": remote, "local": local}) + + def post_dump(self, *args, **kwargs): + """ + In the post-dump phase, for each type of source data, we merge each chunk of 100 .part* files + into one .*.json file. (This way we won't have a small number of huge files nor a pile of small files.) + + E.g. as the code is written, we have 1,961,462 "molecule" json objects. + Therefore we would download 1,962 files, i.e. "molecule.part0", ..., "molecule.part1961". + For each chunk of 100 such files, e.g. "molecule.part0", ..., "molecule.part99", we merge them into one + json file, e.g. "molecule.100.json". + + We'll also remove metadata (useless now) + """ + self.logger.info("Merging JSON documents in '%s'" % self.new_data_folder) + + chunk_size = self.__class__.POST_DUMP_MERGE_SIZE + for src_data_name in self.__class__.SRC_DATA_URLS: + part_files = glob.iglob( + os.path.join(self.new_data_folder, "{}.part*".format(src_data_name)) + ) + for chunk, cnt in iter_n(part_files, chunk_size, with_cnt=True): + outfile = os.path.join( + self.new_data_folder, "{}.{}.json".format(src_data_name, cnt) + ) + + """ + For each "molecule" json object, we only fetch the value associated with the "molecules" key. + This rule also applies to "mechanism", "drug_indication", "target" and "binding_site" + json objects. + """ + data_key = src_data_name + "s" + merged_value = itertools.chain.from_iterable( + self.load_json_from_file(f)[data_key] for f in chunk + ) + merged_data = {data_key: list(merged_value)} + + json.dump(merged_data, open(outfile, "w")) + self.logger.info("Merged %s %s files" % (src_data_name, cnt)) + + # now we can delete the part files + self.logger.info("Deleting part files") + part_files = glob.iglob( + os.path.join(self.new_data_folder, "{}.part*".format(src_data_name)) + ) + for f in part_files: + os.remove(f) + + self.logger.info("Post-dump merge done") diff --git a/src/hub/dataload/sources/chembl/old_dump.py b/src/hub/dataload/sources/chembl/old_dump.py deleted file mode 100644 index 1c97235..0000000 --- a/src/hub/dataload/sources/chembl/old_dump.py +++ /dev/null @@ -1,218 +0,0 @@ -import glob -import itertools -import json -import os -import os.path - -import biothings - -import config - -biothings.config_for_app(config) - -from biothings.hub.dataload.dumper import HTTPDumper -from biothings.utils.common import iter_n - -from config import DATA_ARCHIVE_ROOT - - -class ChemblDumper(HTTPDumper): - - SRC_NAME = "chembl" - SRC_ROOT_FOLDER = os.path.join(DATA_ARCHIVE_ROOT, SRC_NAME) - - SRC_VERSION_URL = "https://www.ebi.ac.uk/chembl/api/data/status.json" - - """ - As the code is written, we have: - - 13,382 "target" json objects - """ - SRC_DATA_URLS = { - # Used to join with `mechanism` by `target_chembl_id` - "target": "https://www.ebi.ac.uk/chembl/api/data/target.json", - } - - # SCHEDULE = "0 12 * * *" - SLEEP_BETWEEN_DOWNLOAD = 0.1 - MAX_PARALLEL_DUMP = 5 - - # number of documents in each download job, i.e. number of documents in each .part* file - TO_DUMP_DOWNLOAD_SIZE = 1000 - # number of .part* files to be merged together after download - POST_DUMP_MERGE_SIZE = 100 - - def get_total_count_of_documents(self, src_data_name): - """ - Get the total count of documents from the first page of the url specified by `src_data_name`. - `total_count` is a member of the `page_meta` member of the root json object. - - Args: - src_data_name (str): must be a key to self.__class__.SRC_DATA_URLS - - Returns: - int: the total count of documents - """ - if src_data_name not in self.__class__.SRC_DATA_URLS: - raise KeyError( - "Cannot recognize src_data_name={}. Must be one of {{{}}}".format( - src_data_name, ", ".join(self.__class__.SRC_DATA_URLS.keys()) - ) - ) - - data = self.load_json_from_file(self.__class__.SRC_DATA_URLS[src_data_name]) - return data["page_meta"]["total_count"] - - def load_json_from_file(self, file) -> dict: - """ - Read the content of `file` and return the json object - - Args: - file (str): could either be an URL ("remotefile") or a path to a local text file ("localfile") - - Returns: - object: the json object read from the `file` - """ - - """ - Note that: - - - `json.loads(string)` deserializes string - - `json.load(file)` deserializes a file object - """ - if file.startswith("http://") or file.startswith("https://"): # file is an URL - data = json.loads(self.client.get(file).text) - else: # file is a local path - data = json.load(open(file)) - - return data - - def remote_is_better(self, remotefile, localfile): - remote_data = self.load_json_from_file(remotefile) - assert "chembl_db_version" in remote_data - assert remote_data["status"] == "UP" # API is working correctly - self.release = remote_data["chembl_db_version"] - - if localfile is None: - # ok we have the release, we can't compare further so we need to download - return True - - local_data = self.load_json_from_file(localfile) - self.logger.info( - "ChEMBL DB version: remote=={}, local=={}".format( - remote_data["chembl_db_version"], local_data["chembl_db_version"] - ) - ) - - # comparing strings should work since it's formatted as "ChEMBL_xxx" - if remote_data["chembl_db_version"] > local_data["chembl_db_version"]: - return True - else: - return False - - def create_todump_list(self, force=False, **kwargs): - version_filename = os.path.basename(self.__class__.SRC_VERSION_URL) - try: - current_localfile = os.path.join(self.current_data_folder, version_filename) - if not os.path.exists(current_localfile): - current_localfile = None - except TypeError: - # current data folder doesn't even exist - current_localfile = None - - remote_better = self.remote_is_better( - self.__class__.SRC_VERSION_URL, current_localfile - ) - self.logger.info( - "ChEMBL Dump: force=={}, current_localfile=={}, remote_better=={}".format( - force, current_localfile, remote_better - ) - ) - - if force or current_localfile is None or remote_better: - new_localfile = os.path.join(self.new_data_folder, version_filename) - self.to_dump.append( - {"remote": self.__class__.SRC_VERSION_URL, "local": new_localfile} - ) - - """ - Now we need to scroll the API endpoints. Let's get the total number of records - and generate URLs for each call to parallelize the downloads for each type of source data, - i.e. "molecule", "mechanism", "drug_indication", "target" and "binding_site". - - The partition size is set to 1000 json objects (represented by `TO_DUMP_DOWNLOAD_SIZE`). - - E.g. suppose for "molecule" data we have a `total_count` of 2500 json objects, and then we'll have, - in the process of iteration: - - - (part_index, part_start) = (0, 0) - - (part_index, part_start) = (1, 1000) - - (part_index, part_start) = (2, 2000) - - Therefore we would download 3 files, i.e. "molecule.part0", "molecule.part1", and "molecule.part2". - """ - part_size = self.__class__.TO_DUMP_DOWNLOAD_SIZE - for src_data_name in self.__class__.SRC_DATA_URLS: - total_count = self.get_total_count_of_documents(src_data_name) - for part_index, part_start in enumerate( - range(0, total_count, part_size) - ): - remote = ( - self.__class__.SRC_DATA_URLS[src_data_name] - + "?limit=" - + str(part_size) - + "&offset=" - + str(part_start) - ) - local = os.path.join( - self.new_data_folder, - "{}.part{}".format(src_data_name, part_index), - ) - self.to_dump.append({"remote": remote, "local": local}) - - def post_dump(self, *args, **kwargs): - """ - In the post-dump phase, for each type of source data, we merge each chunk of 100 .part* files - into one .*.json file. (This way we won't have a small number of huge files nor a pile of small files.) - - E.g. as the code is written, we have 1,961,462 "molecule" json objects. - Therefore we would download 1,962 files, i.e. "molecule.part0", ..., "molecule.part1961". - For each chunk of 100 such files, e.g. "molecule.part0", ..., "molecule.part99", we merge them into one - json file, e.g. "molecule.100.json". - - We'll also remove metadata (useless now) - """ - self.logger.info("Merging JSON documents in '%s'" % self.new_data_folder) - - chunk_size = self.__class__.POST_DUMP_MERGE_SIZE - for src_data_name in self.__class__.SRC_DATA_URLS: - part_files = glob.iglob( - os.path.join(self.new_data_folder, "{}.part*".format(src_data_name)) - ) - for chunk, cnt in iter_n(part_files, chunk_size, with_cnt=True): - outfile = os.path.join( - self.new_data_folder, "{}.{}.json".format(src_data_name, cnt) - ) - - """ - For each "molecule" json object, we only fetch the value associated with the "molecules" key. - This rule also applies to "mechanism", "drug_indication", "target" and "binding_site" - json objects. - """ - data_key = src_data_name + "s" - merged_value = itertools.chain.from_iterable( - self.load_json_from_file(f)[data_key] for f in chunk - ) - merged_data = {data_key: list(merged_value)} - - json.dump(merged_data, open(outfile, "w")) - self.logger.info("Merged %s %s files" % (src_data_name, cnt)) - - # now we can delete the part files - self.logger.info("Deleting part files") - part_files = glob.iglob( - os.path.join(self.new_data_folder, "{}.part*".format(src_data_name)) - ) - for f in part_files: - os.remove(f) - - self.logger.info("Post-dump merge done")