From 88ef9a9fa640da406fa009168992ed6479c423d5 Mon Sep 17 00:00:00 2001 From: chrisjrd Date: Fri, 13 Dec 2024 15:12:26 -0800 Subject: [PATCH] feat(audit): query improvements query version 1.1 for CSLC. use temporal time instead of native-id query. Refs #1041 --- tools/ops/cmr_audit/cmr_audit_hls.py | 41 +++++++++++++++++++--------- tools/ops/cmr_audit/cmr_audit_slc.py | 17 ++++++------ 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/tools/ops/cmr_audit/cmr_audit_hls.py b/tools/ops/cmr_audit/cmr_audit_hls.py index 71e0a477..03e07dcc 100644 --- a/tools/ops/cmr_audit/cmr_audit_hls.py +++ b/tools/ops/cmr_audit/cmr_audit_hls.py @@ -7,11 +7,15 @@ import os import re import sys +import urllib.parse from collections import defaultdict +from typing import Union, Iterable import aiohttp import more_itertools from dotenv import dotenv_values +from more_itertools import always_iterable + from tools.ops.cmr_audit.cmr_audit_utils import async_get_cmr_granules, get_cmr_audit_granules @@ -20,7 +24,7 @@ format="%(levelname)7s: %(relativeCreated)7d %(name)s:%(filename)s:%(funcName)s:%(lineno)s - %(message)s", # alternative format which displays time elapsed. # format="%(asctime)s %(levelname)7s %(name)4s:%(filename)8s:%(funcName)22s:%(lineno)3s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", - level=logging.DEBUG) + level=logging.INFO) logger = logging.getLogger(__name__) config = { @@ -88,35 +92,46 @@ async def async_get_cmr_granules_hls_s30(temporal_date_start: str, temporal_date platform_short_name=["Sentinel-2A", "Sentinel-2B"]) +async def async_get_cmr_dswx(rtc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str): + return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L3_DSWX-HLS_PROVISIONAL_V1", + temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end) + -async def async_get_cmr_dswx(dswx_native_id_patterns: set): - logger.debug(f"entry({len(dswx_native_id_patterns)=:,})") +async def async_get_cmr( + native_id_patterns: set, + collection_short_name: Union[str, Iterable[str]], + temporal_date_start: str, temporal_date_end: str, + chunk_size=1000 +): + logger.debug(f"entry({len(native_id_patterns)=:,})") # batch granules-requests due to CMR limitation. 1000 native-id clauses seems to be near the limit. - dswx_native_id_patterns = more_itertools.always_iterable(dswx_native_id_patterns) - dswx_native_id_pattern_batches = list(more_itertools.chunked(dswx_native_id_patterns, 1000)) # 1000 == 55,100 length + native_id_patterns = more_itertools.always_iterable(native_id_patterns) + native_id_pattern_batches = list(more_itertools.chunked(native_id_patterns, chunk_size)) # 1000 == 55,100 length request_url = "https://cmr.earthdata.nasa.gov/search/granules.umm_json" sem = asyncio.Semaphore(15) async with aiohttp.ClientSession() as session: post_cmr_tasks = [] - for i, dswx_native_id_pattern_batch in enumerate(dswx_native_id_pattern_batches, start=1): - dswx_native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(dswx_native_id_pattern_batch) + for i, native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1): + # native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(native_id_pattern_batch) request_body = ( "provider=POCLOUD" - "&short_name[]=OPERA_L3_DSWX-HLS_PROVISIONAL_V1" + f'{"&short_name[]=" + "&short_name[]=".join(always_iterable(collection_short_name))}' "&options[native-id][pattern]=true" - f"{dswx_native_id_patterns_query_params}" + # f"{native_id_patterns_query_params}" + f"&temporal[]={urllib.parse.quote(temporal_date_start, safe='/:')},{urllib.parse.quote(temporal_date_end, safe='/:')}" ) - logger.debug(f"Creating request task {i} of {len(dswx_native_id_pattern_batches)}") + logger.debug(f"Creating request task {i} of {len(native_id_pattern_batches)}") post_cmr_tasks.append(get_cmr_audit_granules(request_url, request_body, session, sem)) + break logger.debug(f"Number of requests to make: {len(post_cmr_tasks)=}") # issue requests in batches logger.debug("Batching tasks") - dswx_granules = set() + cmr_granules = set() task_chunks = list(more_itertools.chunked(post_cmr_tasks, len(post_cmr_tasks))) # CMR recommends 2-5 threads. for i, task_chunk in enumerate(task_chunks, start=1): logger.info(f"Processing batch {i} of {len(task_chunks)}") @@ -125,8 +140,8 @@ async def async_get_cmr_dswx(dswx_native_id_patterns: set): await asyncio.gather(*task_chunk, return_exceptions=False) ) for post_cmr_tasks_result in post_cmr_tasks_results: - dswx_granules.update(post_cmr_tasks_result[0]) - return dswx_granules + cmr_granules.update(post_cmr_tasks_result[0]) + return cmr_granules def hls_granule_ids_to_dswx_native_id_patterns(cmr_granules: set[str], input_to_outputs_map: defaultdict, output_to_inputs_map: defaultdict): diff --git a/tools/ops/cmr_audit/cmr_audit_slc.py b/tools/ops/cmr_audit/cmr_audit_slc.py index f7cf61ec..7745814e 100644 --- a/tools/ops/cmr_audit/cmr_audit_slc.py +++ b/tools/ops/cmr_audit/cmr_audit_slc.py @@ -95,26 +95,24 @@ async def async_get_cmr_granules_slc_s1b(temporal_date_start: str, temporal_date async def async_get_cmr_cslc(cslc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str): - return await async_get_cmr(cslc_native_id_patterns, collection_short_name="OPERA_L2_CSLC-S1_V1", collection_concept_id="C1257337155-ASF", + return await async_get_cmr(cslc_native_id_patterns, collection_short_name="OPERA_L2_CSLC-S1_V1", temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end, chunk_size=100) async def async_get_cmr_rtc(rtc_native_id_patterns: set, temporal_date_start: str, temporal_date_end: str): - return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L2_RTC-S1_V1", collection_concept_id="C1257337044-ASF", + return await async_get_cmr(rtc_native_id_patterns, collection_short_name="OPERA_L2_RTC-S1_V1", temporal_date_start=temporal_date_start, temporal_date_end=temporal_date_end, chunk_size=100) async def async_get_cmr( native_id_patterns: set, collection_short_name: Union[str, Iterable[str]], - collection_concept_id: str, temporal_date_start: str, temporal_date_end: str, chunk_size=1000): # 1000 ~= 55,100 length """ Issue CMR query requests. :param native_id_patterns: the native ID patterns to use in the query. Corresponds to query param `&native-id[]`. Allows use of wildcards "*" and "?", but is descouraged. :param collection_short_name: CMR collection short name. Typically found in PCM's settings.yaml - :param collection_concept_id: CMR collection concept ID for faster queries. :param temporal_date_start: temporal start date. Corresponds to query param `&temporal[]=,` :param temporal_date_end: temporal end date. Corresponds to query param `&temporal[]=,` :param chunk_size: split queries across N native-id patterns per request. CMR request bodies have an implicit size limit of 55,100 length. Must be a value in the interval [1,1000]. @@ -130,8 +128,8 @@ async def async_get_cmr( sem = asyncio.Semaphore(15) async with aiohttp.ClientSession() as session: post_cmr_tasks = [] - for i, rtc_native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1): - native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(rtc_native_id_pattern_batch) + for i, native_id_pattern_batch in enumerate(native_id_pattern_batches, start=1): + # native_id_patterns_query_params = "&native_id[]=" + "&native_id[]=".join(native_id_pattern_batch) request_body = ( "provider=ASF" @@ -139,12 +137,13 @@ async def async_get_cmr( "&platform[]=Sentinel-1A" "&platform[]=Sentinel-1B" "&bounding_box=-180,-60,180,90" - "&options[native-id][pattern]=true" - f"{native_id_patterns_query_params}" + # "&options[native-id][pattern]=true" + # f"{native_id_patterns_query_params}" f"&temporal[]={urllib.parse.quote(temporal_date_start, safe='/:')},{urllib.parse.quote(temporal_date_end, safe='/:')}" ) logger.debug(f"Creating request task {i} of {len(native_id_pattern_batches)}") post_cmr_tasks.append(get_cmr_audit_granules(request_url, request_body, session, sem)) + break logger.debug(f"Number of requests to make: {len(post_cmr_tasks)=}") # issue requests in batches @@ -184,7 +183,7 @@ def slc_granule_ids_to_cslc_native_id_patterns(cmr_granules: set[str], input_to_ cslc_acquisition_dt_str = m.group("start_ts") # OPERA_L2_CSLC-S1_*_20231124T124529Z_*_S1* - rtc_native_id_pattern = f'OPERA_L2_CSLC-S1_*_{cslc_acquisition_dt_str}Z_*_S1*' + rtc_native_id_pattern = f'OPERA_L2_CSLC-S1_*_{cslc_acquisition_dt_str}Z_*_S1*v1.1' rtc_native_id_patterns.add(rtc_native_id_pattern) # bi-directional mapping of HLS-DSWx inputs and outputs