Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions data_prep/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ The primary function for using it is `scripts/gen_data.py`. It can be used to ea
the data type of interest. For example:

```bash
python gen_data.py -b 2024-04-01 2024-07-31 atms
python gen_data.py -b 2024-04-01 2024-07-31 atms zarr
```

This will generate data for the ATMS instrument for the time period of April 1, 2024 to July 31, 2024 using SLURM
sbatch. The script automatically breaks the job into smaller chunks that can be executed within the time limits of the
cluster. The script also supports parallel execution (does not chunk the job), and serial mode. Please see the help:

```
usage: gen_data.py [-h] [-s SUFFIX] [-p] [-b] [-a] [--slurm_account SLURM_ACCOUNT] start_date end_date {all,atms,surface_pressure,radiosonde}
usage: gen_data.py [-h] [-s SUFFIX] [-p] [-b] [-a] [--slurm_account SLURM_ACCOUNT] start_date end_date {all,atms,surface_pressure,radiosonde} {zarr,parquet}

positional arguments:
start_date Start date in YYYY-MM-DD format
end_date End date in YYYY-MM-DD format
{all,atms,surface_pressure,radiosonde}
Data type to generate. "all" generates all data types.
{zarr,parquet} Output format

options:
-h, --help show this help message and exit
Expand Down Expand Up @@ -61,9 +62,9 @@ Since the data_prep code is already installed on HERA, you can run the script wi
use it you can do the following:

1) Log into HERA
2) `cd /scratch1/NCEPDEV/da/Ronald.McLaren/shared/ocelot/`
2) `cd /scratch3/NCEPDEV/da/Ronald.McLaren/shared/ocelot/`
3) `source ./env.sh` To set up the environment.
4) `cd src/ocelot/data_prep/scripts`
5) `python gen_data.py -b 2024-04-01 2024-04-07 "atms" -s "my_suffix"` Please define a special suffix if you are playing
5) `python gen_data.py -b 2024-04-01 2024-04-07 "atms" "zarr" -s "my_suffix"` Please define a special suffix if you are playing
around as you will replace the existing data. Please note that the output directory for the data is defined by the
`configs/local_settings.py` file.
8 changes: 8 additions & 0 deletions data_prep/configs/ursa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ data types:
directory: /scratch3/NCEPDEV/stmp/Andrew.Collard/IASI_PCA/bufr
filename_regex: M03-IASI-IASPCS1C-NA-1\.0-(?P<start_time>\d{14})\.000000000Z-(?P<end_time>\d{14})-4887062-\d{1}\.bfr

- name: cris_pca
type: netCDF
num_tasks: 48
batch_days: 15
mapping: cris_pca.py
directory: /scratch3/NCEPDEV/stmp/Andrew.Collard/CrIS_PCA/data
filename_regex: SNDR\.J1\.CRIS\.(?P<start_time>\d{8}T\d{4})\.m\d+\.[gG]\d+\.PCA_RED\.beta\.v\d+_\d+\.W\.\d+\.nc

- name: snow_cover
type: tank
num_tasks: 4
Expand Down
195 changes: 195 additions & 0 deletions data_prep/mapping/cris_pca.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/usr/bin/env python3
import os
import numpy as np
import xarray as xr
import bufr
import yaml
import faulthandler

from bufr.obs_builder import ObsBuilder, add_main_functions, map_path

faulthandler.enable()
# Encoder YAML (BUFR schema) – separate from any mapping YAML
ENCODER_YAML = map_path("cris_pca.yaml")


class CrisPcaObsBuilder(ObsBuilder):
"""
CrIS PCA netCDF reader:

* DOES NOT use an ObsBuilder mapping YAML
* DOES use a BUFR encoder YAML (cris_pca.yaml)
* Flattens atrack/xtrack/fov -> location
* Fills a DataContainer matching encoder variable names
"""

def __init__(self):
print("\n*** CrisPcaObsBuilder CONSTRUCTOR ***")
print(" ENCODER_YAML =", ENCODER_YAML)

# --- Load YAML FIRST (before calling super) ---
with open(ENCODER_YAML, "r") as f:
full_yaml = yaml.safe_load(f)

self._encoder_yaml = full_yaml

# Build dimension map
# enc = full_yaml.get("encoder", {})
dim_path_map = {}
for dim in full_yaml.get("dimensions", []):
n = dim["name"]
p = dim["path"]
dim_path_map[n] = p

self._dim_path_map = dim_path_map

print(" DIM PATH MAP:", self._dim_path_map)

# NOW call parent (which calls _make_description)
super().__init__(None, log_name=os.path.basename(__file__))

# -----------------------------------------------------
# 1) Return a Description using the encoder YAML file
# -----------------------------------------------------
def _make_description(self):
print("*** _make_description(): using ENCODER_YAML ***")
return bufr.encoders.Description(ENCODER_YAML)

# -----------------------------------------------------
def load_input(self, filename):
print(f"*** load_input() CALLED: {filename}")
ds = xr.open_dataset(filename, decode_times=False)
print(" dims:", ds.sizes)
return ds

# -----------------------------------------------------
def preprocess_dataset(self, ds):
print("*** preprocess_dataset() CALLED ***")

required = ["atrack", "xtrack", "fov"]
for d in required:
if d not in ds.sizes:
raise RuntimeError(f"Missing dimension {d}")

na = ds.sizes["atrack"]
nx = ds.sizes["xtrack"]
nf = ds.sizes["fov"]
nlocs = na * nx * nf

print(f" atrack={na}, xtrack={nx}, fov={nf} -> nlocs={nlocs}")

# Build indices
a, x, f = xr.broadcast(
xr.DataArray(np.arange(na), dims="atrack"),
xr.DataArray(np.arange(nx), dims="xtrack"),
xr.DataArray(np.arange(nf), dims="fov"),
)

xtrack = x.values.ravel()
fov = f.values.ravel()

scan_pos = 9 * xtrack + fov

out = xr.Dataset()
out = out.assign_coords(location=np.arange(nlocs))

out["scan_position"] = xr.DataArray(scan_pos, dims=("location",))

# Flatten lat/lon into encoder variable names
for v_in, v_out in [("lat", "latitude"), ("lon", "longitude")]:
if v_in in ds:
print(f" flattening {v_in} -> {v_out}")
out[v_out] = xr.DataArray(
ds[v_in].values.reshape(nlocs),
dims=("location",)
)

# Time
if "obs_time_tai93" in ds:
print(" converting obs_time_tai93 -> UNIX seconds")

time3d = xr.broadcast(ds["obs_time_tai93"], ds["lat"])[0]
time_tai93 = time3d.values.reshape(nlocs)

TAI93_EPOCH = np.datetime64("1993-01-01T00:00:00")
UNIX_EPOCH = np.datetime64("1970-01-01T00:00:00")

offset = (TAI93_EPOCH - UNIX_EPOCH) / np.timedelta64(1, "s")
time_unix = time_tai93 + offset

out["time"] = xr.DataArray(time_unix, dims=("location",))

# Global PC scores
if "global_pc_score" in ds:
npc = ds.sizes["npc_global"]
print(f" flattening global_pc_score to (location, {npc})")
out["global_pc_score"] = xr.DataArray(
ds["global_pc_score"].values.reshape(nlocs, npc),
dims=("location", "npc_global")
)

print("*** preprocess complete, vars:", list(out.variables))
return out

# -----------------------------------------------------
# 2) Build a DataContainer from the flattened Dataset
# -----------------------------------------------------
# -----------------------------------------------------
# 2) Build a DataContainer from the flattened Dataset
# -----------------------------------------------------

def _dims_for_var(self, varname, dims):
"""
Map xarray dimension names (e.g. ('location', 'npc_global'))
to BUFR query strings using the 'dimensions' section in cris_pca.yaml.
"""
dim_paths = []
for d in dims:
if d not in self._dim_path_map:
raise RuntimeError(
f"_dims_for_var: no mapping for dimension '{d}' "
f"in encoder YAML; known: "
f"{list(self._dim_path_map.keys())}"
)
dim_paths.append(self._dim_path_map[d])

print(f" _dims_for_var({varname}, {dims}) -> {dim_paths}")
return dim_paths

def make_obs(self, comm, input_path):
print("***** Entering make_obs *****")
ds = self.load_input(input_path)
ds = self.preprocess_dataset(ds)

container = bufr.DataContainer()

# Load YAML once more (or reuse self._encoder_yaml)
enc = self._encoder_yaml["encoder"]
variables = enc["variables"]

print('VARIABLES=', variables)

for v in variables:
name = v["name"]
source = v["source"]

if source not in ds:
print(f"WARNING: source '{source}' not in dataset, skipping")
continue

xr_dims = ds[source].dims
dim_paths = self._dims_for_var(name, xr_dims)

print(f"Adding {name} from {source} with dim_paths {dim_paths}")
print(" shape =", ds[source].values.shape)

container.add(
name,
ds[source].values,
dim_paths
)

return container


add_main_functions(CrisPcaObsBuilder)
49 changes: 49 additions & 0 deletions data_prep/mapping/cris_pca.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
dimensions:
- name: location
source: "location"
path: "*"

- name: npc_global
source: npc_global
path: "*/NPCGLOBAL"
Comment on lines +1 to +8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"dimensions" section should be inside "encoder". Otherwise it will not be applied,


encoder:
categories:
- name: cris_pca

globals:
- name: source
type: string
value: "CrIS PCA netCDF (test)"

variables:
- name: latitude
source: latitude
longName: "Latitude"
units: "degree_north"

- name: longitude
source: longitude
longName: "Longitude"
units: "degree_east"

- name: time
source: time
longName: "Time"
units: "seconds since 1970-01-01T00:00:00Z"

- name: timestamp
source: time
longName: "Time"
units: "seconds since 1970-01-01T00:00:00Z"

- name: scan_position
source: scan_position
longName: "Scan position"
units: "1"

- name: global_pc_score
source: global_pc_score
longName: "Global PC score"
units: "1"

15 changes: 15 additions & 0 deletions data_prep/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ def filename_regex(self):
return self.config['filename_regex']


class NcdfConfig(DataTypeConfig):
def __init__(self, config):
super().__init__(config, 'netCDF')

@property
def directory(self):
return self.config['directory']

@property
def filename_regex(self):
return self.config['filename_regex']


class Config:
def __init__(self, yaml_path=''):
if not yaml_path:
Expand Down Expand Up @@ -104,6 +117,8 @@ def _get_data_types(self):
data_types.append(TankConfig(data_type))
elif data_type['type'] == 'pca':
data_types.append(PcaConfig(data_type))
elif data_type['type'] == 'netCDF':
data_types.append(NcdfConfig(data_type))
else:
assert False, f"Unknown data type {data_type['type']} in config"

Expand Down
45 changes: 45 additions & 0 deletions data_prep/src/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,57 @@ def run(self, comm, parameters: Parameters) -> bufr.DataContainer:
return combined_container


class NcdfRunner(Runner):

def __init__(self, data_type, cfg):
super().__init__(data_type, cfg)

# Make sure regex exists
if hasattr(self.type_config, "filename_regex"):
self.regex = re.compile(self.type_config.filename_regex)
else:
print("WARNING: No filename_regex found — matching all files")
self.regex = re.compile(".*")

def run(self, comm, parameters):
print("\n=== NcdfRunner starting ===")

combined_container = bufr.DataContainer()
directory = self.type_config.directory

if not os.path.isdir(directory):
raise RuntimeError(f"Input directory not found: {directory}")

files = sorted(os.listdir(directory))
print("FILES FOUND:", len(files))

for fname in files:
print("CHECKING:", fname)

if not self.regex.match(fname):
print("NO REGEX MATCH")
continue

print("MATCH:", fname)

input_path = os.path.join(directory, fname)
container = self._make_obs(comm, input_path)

container.gather(comm)
combined_container.append(container)

print("DEBUG: NcdfRunner finished")
return combined_container


def run(comm, data_type, parameters: Parameters, cfg=config.Config()) -> (bufr.encoders.Description, bufr.DataContainer):
type_cfg = cfg.get_data_type(data_type)
if type_cfg.type == 'tank':
runner = TankRunner(data_type, cfg)
elif type_cfg.type == 'pca':
runner = PcaRunner(data_type, cfg)
elif type_cfg.type == 'netCDF':
runner = NcdfRunner(data_type, cfg)
else:
raise ValueError(f"Unknown data type {type_cfg.type}")

Expand Down