From c71b5b77a1b66ed1c8c7c83ba2aab908695bca08 Mon Sep 17 00:00:00 2001 From: Philip Yoon Date: Wed, 21 Aug 2024 18:38:29 -0700 Subject: [PATCH 1/3] #961: Unbroke historical chunking logic for disp-s1. See associated bug for details --- data_subscriber/cslc/cslc_query.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/data_subscriber/cslc/cslc_query.py b/data_subscriber/cslc/cslc_query.py index c31e4e29..ebfe71f5 100644 --- a/data_subscriber/cslc/cslc_query.py +++ b/data_subscriber/cslc/cslc_query.py @@ -437,7 +437,6 @@ def query_cmr(self, args, token, cmr, settings, timerange, now): def create_download_job_params(self, query_timerange, chunk_batch_ids): '''Same as base class except inject batch_ids for k granules''' - assert len(chunk_batch_ids) == 1 chunk_batch_ids.extend(list(self.k_batch_ids[chunk_batch_ids[0]])) return super().create_download_job_params(query_timerange, chunk_batch_ids) @@ -477,17 +476,22 @@ def eliminate_none_frames(self, granules): def get_download_chunks(self, batch_id_to_urls_map): '''For CSLC chunks we must group them by the batch_id that were determined at the time of triggering''' + frame_id, _ = split_download_batch_id(list(batch_id_to_urls_map)[0]) chunk_map = defaultdict(list) for batch_chunk in batch_id_to_urls_map.items(): - chunk_map[batch_chunk[0]].append(batch_chunk) # We don't actually care about the URLs, we only care about the batch_id - - '''indices = self.download_batch_ids[batch_chunk[0]] - for index in indices: - chunk_map[index].append(batch_chunk) - if (len(chunk_map[index]) > self.args.k): - logger.error([chunk for chunk, data in chunk_map[index]]) - err_str = f"Number of download batches {len(chunk_map[index])} for frame {index} is greater than K {self.args.k}." - raise AssertionError(err_str)''' + + # Chunking is done differently between historical and forward/reprocessing + if self.proc_mode == "historical": + chunk_map[frame_id].append(batch_chunk) + else: + chunk_map[batch_chunk[0]].append( + batch_chunk) # We don't actually care about the URLs, we only care about the batch_id + + if self.proc_mode == "historical": + if (len(chunk_map[frame_id]) != self.args.k): + logger.error([chunk for chunk, data in chunk_map[frame_id]]) + err_str = f"Number of download batches {len(chunk_map[frame_id])} for frame {frame_id} does not equal K {self.args.k}." + raise AssertionError(err_str) return chunk_map.values() From a566d67788b32adb341d2349a7862915a0cb357d Mon Sep 17 00:00:00 2001 From: Philip Yoon Date: Thu, 22 Aug 2024 08:15:38 -0700 Subject: [PATCH 2/3] #961: Handling case of no data when figuring out chunking in DISP-S1 query --- data_subscriber/cslc/cslc_query.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/data_subscriber/cslc/cslc_query.py b/data_subscriber/cslc/cslc_query.py index ebfe71f5..7292c3e9 100644 --- a/data_subscriber/cslc/cslc_query.py +++ b/data_subscriber/cslc/cslc_query.py @@ -476,8 +476,12 @@ def eliminate_none_frames(self, granules): def get_download_chunks(self, batch_id_to_urls_map): '''For CSLC chunks we must group them by the batch_id that were determined at the time of triggering''' - frame_id, _ = split_download_batch_id(list(batch_id_to_urls_map)[0]) chunk_map = defaultdict(list) + if len(list(batch_id_to_urls_map)) == 0: + return chunk_map.values() + + frame_id, _ = split_download_batch_id(list(batch_id_to_urls_map)[0]) + for batch_chunk in batch_id_to_urls_map.items(): # Chunking is done differently between historical and forward/reprocessing From 42b98d2968c5e0ede09d1eb88e12e0f8f2cdeb66 Mon Sep 17 00:00:00 2001 From: Philip Yoon Date: Thu, 22 Aug 2024 08:37:31 -0700 Subject: [PATCH 3/3] #961: For DISP-S1 historical download works differently than other modes when downloading for k granules --- data_subscriber/asf_cslc_download.py | 33 ++++++++++++++++++---------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/data_subscriber/asf_cslc_download.py b/data_subscriber/asf_cslc_download.py index 5c4495e7..8974b789 100644 --- a/data_subscriber/asf_cslc_download.py +++ b/data_subscriber/asf_cslc_download.py @@ -288,19 +288,30 @@ def get_downloads(self, args, es_conn): all_downloads = [] - # Download CSLC granules - downloads = es_conn.get_download_granule_revision(batch_ids[-1]) - logger.info(f"Got {len(downloads)=} cslc granules downloads for batch_id={batch_ids[-1]}") - assert len(downloads) > 0, f"No downloads found for batch_id={batch_ids[-1]}!" - all_downloads.extend(downloads) - - # Download K-CSLC granules - for batch_id in batch_ids[:-1]: - downloads = k_es_conn.get_download_granule_revision(batch_id) - logger.info(f"Got {len(downloads)=} k cslc downloads for {batch_id=}") - assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!" + # Historical mode stores all granules in normal cslc_catalog + if "proc_mode" in args and args.proc_mode == "historical": + logger.info("Downloading cslc files for historical mode") + for batch_id in batch_ids: + downloads = es_conn.get_download_granule_revision(batch_id) + logger.info(f"Got {len(downloads)=} cslc downloads for {batch_id=}") + assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!" + all_downloads.extend(downloads) + + # Forward and reprocessing modes store all granules in k_cslc_catalog + else: + logger.info("Downloading cslc files for forward/reprocessing mode") + downloads = es_conn.get_download_granule_revision(batch_ids[-1]) + logger.info(f"Got {len(downloads)=} cslc granules downloads for batch_id={batch_ids[-1]}") + assert len(downloads) > 0, f"No downloads found for batch_id={batch_ids[-1]}!" all_downloads.extend(downloads) + # Download K-CSLC granules + for batch_id in batch_ids[:-1]: + downloads = k_es_conn.get_download_granule_revision(batch_id) + logger.info(f"Got {len(downloads)=} k cslc downloads for {batch_id=}") + assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!" + all_downloads.extend(downloads) + return all_downloads def query_cslc_static_files_for_cslc_batch(self, cslc_files, args, token, job_id, settings):