1+ """
2+ Observation Data Processing and Feature Extraction Pipeline
3+
4+ Author: Azadeh Gholoubi
5+ Organization: NOAA/NCEP/EMC
6+
7+ Description:
8+ Core data processing module for the OCELOT (Observation-Centered Earth Learning
9+ Observation Transformer) GNN model. Handles time-series binning, feature extraction,
10+ normalization, and quality control for multi-instrument observational datasets including
11+ satellites (ATMS, AMSU-A, SSMIS, SEVIRI, AVHRR, ASCAT) and conventional observations
12+ (surface stations, radiosondes).
13+
14+ Key Functions:
15+ - organize_bins_times: Temporal binning of observations into input-target pairs with
16+ support for latent rollout (multiple target sub-windows)
17+ - extract_features: Extracts and normalizes features from zarr files with instrument-
18+ specific QC, metadata handling, and level-specific normalization for radiosondes
19+ - _normalize_by_level_groups: Pressure-stratified normalization for atmospheric soundings
20+
21+ Special Features:
22+ - Radiosonde pressure-level matching with configurable tolerance
23+ - Pressure metadata augmentation for vertical context
24+ - Layer-specific normalization (surface/mid/upper atmosphere)
25+ - Chunked scanning for memory-efficient processing of large zarr datasets
26+ - Reproducible subsampling with stable seeding
27+ - Multi-channel support for satellite instruments
28+ - Quality control integration (QC flags, mask propagation)
29+
30+ Technical Details:
31+ - Supports both single-year and multi-year zarr files
32+ - Handles missing data, fill values, and sentinel values
33+ - Implements cosine transformation for cyclic metadata (wind direction)
34+ - Feature normalization using pre-computed statistics
35+ - Efficient indexing with numpy advanced indexing
36+ """
37+
138import hashlib
239import numpy as np
340import pandas as pd
@@ -293,7 +330,7 @@ def _stats_from_cfg(feature_stats, inst_name, feat_keys):
293330
294331def _normalize_by_level_groups (features , pressures , feature_stats , inst_name , feat_keys ):
295332 """
296- FIX 3: Apply level-specific normalization for radiosondes.
333+ Apply level-specific normalization for radiosondes.
297334 Normalizes features separately for different atmospheric layers (surface, mid, upper).
298335
299336 Args:
@@ -371,8 +408,6 @@ def extract_features(z_dict, data_summary, bin_name, observation_config, feature
371408 Adds per-channel masks for inputs and targets so features can be missing independently.
372409 Inputs: keep a row if ANY feature channel is valid; metadata can be missing (imputed later).
373410 Targets: require metadata row to be valid; features may be missing per-channel.
374-
375- ## MODIFIED to support latent rollout (multiple target windows).
376411 """
377412 print (f"\n Processing { bin_name } ..." )
378413 for obs_type in list (data_summary [bin_name ].keys ()):
@@ -401,7 +436,7 @@ def extract_features(z_dict, data_summary, bin_name, observation_config, feature
401436 matching_mode = level_selection .get ("matching_mode" , "exact" )
402437
403438 if matching_mode == "nearest" :
404- # FIX 1: Nearest-level matching with tolerance
439+ # Nearest-level matching with tolerance
405440 tolerance = level_selection .get ("tolerance_hpa" , 50 )
406441 if input_idx .size :
407442 p_vals = z [col ][input_idx ]
@@ -420,7 +455,7 @@ def extract_features(z_dict, data_summary, bin_name, observation_config, feature
420455 keep_mask_tg |= (np .abs (p_vals_tg - level ) <= tolerance )
421456 target_indices_list [i ] = idx [keep_mask_tg ]
422457 else :
423- # Original exact matching (fallback)
458+ # Exact matching (fallback)
424459 if input_idx .size :
425460 input_idx = input_idx [np .isin (z [col ][input_idx ], levels )]
426461 for i , idx in enumerate (target_indices_list ):
@@ -573,7 +608,7 @@ def _get_feature(arrs, name, idx):
573608 # Extract input features
574609 input_features_raw = np .column_stack ([_get_feature (z , k , input_idx ) for k in feat_keys ]).astype (np .float32 )
575610
576- # FIX 2: Separate computed metadata from zarr-based metadata
611+ # Separate computed metadata from zarr-based metadata
577612 # Computed metadata keys that we compute on-the-fly
578613 computed_meta_keys = {'pressure_normalized' , 'log_pressure_height' }
579614 zarr_meta_keys = [k for k in meta_keys if k not in computed_meta_keys ]
@@ -584,7 +619,7 @@ def _get_feature(arrs, name, idx):
584619 input_lon_raw = z ["longitude" ][input_idx ]
585620 input_times_raw = z ["time" ][input_idx ]
586621
587- # FIX 2: Add pressure-based metadata for radiosondes/conventional obs
622+ # Add pressure-based metadata for radiosondes/conventional obs
588623 if inst_name == 'radiosonde' and 'airPressure' in feat_keys :
589624 # Check if pressure metadata features are requested
590625 pressure_meta_features = []
@@ -627,13 +662,13 @@ def _get_feature(arrs, name, idx):
627662 target_times_raw_list .append (np .array ([], dtype = np .float32 ))
628663 else :
629664 target_features_raw_list .append (np .column_stack ([_get_feature (z , k , target_idx ) for k in feat_keys ]).astype (np .float32 ))
630- # FIX 2: Use only zarr-based metadata keys (filter out computed ones)
665+ # Use only zarr-based metadata keys (filter out computed ones)
631666 target_metadata_raw_list .append (_stack_or_empty (z , zarr_meta_keys , target_idx ))
632667 target_lat_raw_list .append (z ["latitude" ][target_idx ])
633668 target_lon_raw_list .append (z ["longitude" ][target_idx ])
634669 target_times_raw_list .append (z ["time" ][target_idx ])
635670
636- # FIX 2: Add pressure-based metadata for targets (radiosondes)
671+ # Add pressure-based metadata for targets (radiosondes)
637672 if inst_name == 'radiosonde' and 'airPressure' in feat_keys :
638673 pressure_meta_requested = ('pressure_normalized' in meta_keys ) or ('log_pressure_height' in meta_keys )
639674
@@ -972,7 +1007,7 @@ def _apply_relational_qc():
9721007 else :
9731008 # Conventional processing (surface_obs, radiosonde)
9741009
975- # FIX 3: Try level-specific normalization first (for radiosondes)
1010+ # Try level-specific normalization first (for radiosondes)
9761011 input_features_norm = None
9771012 if inst_name == 'radiosonde' and 'airPressure' in feat_keys :
9781013 # Extract pressure values for level grouping
@@ -984,7 +1019,7 @@ def _apply_relational_qc():
9841019 )
9851020
9861021 if input_features_norm is not None :
987- print (f" [{ inst_name } ] Using level-specific normalization (FIX 3) " )
1022+ print (f" [{ inst_name } ] Using level-specific normalization" )
9881023
9891024 # Fall back to global normalization if level-specific not available
9901025 if input_features_norm is None :
@@ -1034,7 +1069,7 @@ def _apply_relational_qc():
10341069 continue
10351070
10361071 # Target normalization with clipping (conventional style)
1037- # FIX 3: Try level-specific normalization for radiosondes
1072+ # Try level-specific normalization for radiosondes
10381073 target_features_norm = None
10391074 if inst_name == 'radiosonde' and 'airPressure' in feat_keys :
10401075 # Extract pressure values for level grouping
0 commit comments