Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add partial time-reading and conjugation to HERADataFastReader #869

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 98 additions & 33 deletions hera_cal/io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2019 the HERA Project
# Licensed under the MIT License
from __future__ import annotations

import numpy as np
from collections import OrderedDict as odict
Expand Down Expand Up @@ -1109,19 +1110,23 @@ def empty_arrays(self):
self.nsample_array = (np.zeros_like(self.nsample_array) if self.nsample_array is not None else None)


def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002,
read_data=True, read_flags=False, read_nsamples=False,
check=False, dtype=np.complex128, verbose=False):
def read_hera_hdf5(
filenames, bls=None, pols=None, keep_times: list[float] | None = None, full_read_thresh=0.002,
read_data=True, read_flags=False, read_nsamples=False,
check=False, dtype=np.complex128, verbose=False
):
'''A potentially faster interface for reading HERA HDF5 files. Only concatenates
along time axis. Puts times in ascending order, but does not check that
files are contiguous. Currently not BDA compatible.

Arguments:
filenames: list of files to read
bls: list of (ant_1, ant_2, [polstr]) tuples to read out of files.
Default: all bls common to all files.
pols: list of pol strings to read out of files. Default: all, but is
superceded by any polstrs listed in bls.
keep_times: list of times to read out of files. Default: all. Note: all times
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to implement some kind of tolerance here? I know pyuvdata has a tolerance for figuring out of times match, perhaps we can use their default under the hood?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think that should be doable. I'll add it in.

are always read from the files, setting this only down-filters times
after reading.
full_read_thresh (0.002): fractional threshold for reading whole file
instead of baseline by baseline.
read_data (bool, True): read data
Expand All @@ -1130,7 +1135,6 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002,
check (bool, False): run sanity checks to make sure files match.
dtype (np.complex128): numpy datatype for output complex-valued arrays
verbose: print some progress messages.

Returns:
rv: dict with keys 'info' and optionally 'data', 'flags', and 'nsamples',
based on whether read_data, read_flags, and read_nsamples are true.
Expand All @@ -1148,6 +1152,12 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002,
inds = {}
# Read file metadata to size up arrays and sort times
filenames = _parse_input_files(filenames, name='input_data')

if bls is not None:
antpairs = [bl[:2] for bl in bls]
else:
antpairs = None
Comment on lines +1156 to +1159
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we ignoring polarization? If I only want to load one bl for one pol, shouldn't that be allowed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you can do that. This is just getting the antpairs for when we check whether all the antpairs are present in the data.


for filename in filenames:
if verbose:
print(f'Reading header of {filename}')
Expand All @@ -1163,12 +1173,20 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002,
info['freqs'] = h['freq_array'][()] # make 1D instead of 2D
nfreqs = info['freqs'].size
pol_array = h['polarization_array'][()]

npols = pol_array.size
# the following errors if x_orientation not set in this hdf5
x_orient = str(h['x_orientation'][()], encoding='utf-8')
pol_indices = {uvutils.parse_polstr(POL_NUM2STR_DICT[n], x_orientation=x_orient): cnt
for cnt, n in enumerate(pol_array)}

pol_indices = {
uvutils.parse_polstr(POL_NUM2STR_DICT[n], x_orientation=x_orient): cnt
for cnt, n in enumerate(pol_array)
}
if pols is not None:
pols = [uvutils.parse_polstr(p, x_orientation=x_orient) for p in pols]

info['pols'] = list(pol_indices.keys())
info['x_orientation'] = x_orient
info['ants'] = antenna_numbers = h['antenna_numbers'][()]
info['antpos'] = dict(zip(antenna_numbers, h['antenna_positions'][()]))
for coord in ['latitude', 'longitude', 'altitude']:
Expand Down Expand Up @@ -1196,27 +1214,34 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002,
data_ants.update(set(ant2_array))
_hash = hash((ant1_array.tobytes(), ant2_array.tobytes(), time_first, ntimes))
# map baselines to array indices for each unique antenna order
data_bls = list(zip(ant1_array, ant2_array))

if _hash not in inds:
inds[_hash] = {}

if time_first:
inds[_hash] = {(i, j): slice(n * ntimes, (n + 1) * ntimes)
for n, (i, j) in enumerate(zip(ant1_array,
ant2_array))}
for n, (i, j) in enumerate(data_bls):
slc = slice(n * ntimes, (n + 1) * ntimes)
inds[_hash][(i, j)] = slc
else:
inds[_hash] = {(i, j): slice(n, None, nbls)
for n, (i, j) in enumerate(zip(ant1_array,
ant2_array))}
if bls is not None:
for n, (i, j) in enumerate(data_bls):
slc = slice(n, None, nbls)
inds[_hash][(i, j)] = slc

if antpairs is not None:
# Make sure our baselines of interest are in this file
if not all([bl[:2] in inds[_hash] for bl in bls]):
missing_bls = [bl for bl in bls if bl[:2] not in inds[_hash]]
missing_bls = [(i,j) for (i,j) in antpairs if not ((i,j) in inds[_hash] or (j,i) in inds[_hash])]
if missing_bls:
raise ValueError(f'File {filename} missing:' + str(missing_bls))
assert bl[:2] in inds[_hash]

# Use intersection of bls across files.
if 'bls' not in info:
info['bls'] = set(inds[_hash].keys())
info['data_ants'] = data_ants
else:
info['bls'].intersection_update(set(inds[_hash].keys()))
info['data_ants'].intersection_update(data_ants)
info['bls'] &= set(inds[_hash].keys())
info['data_ants'] &= data_ants

bl2ind[filename] = inds[_hash]

if bls is None:
Expand All @@ -1233,8 +1258,13 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002,
pols = list(pol_indices.keys())
bls = set(bl for bl in bls if len(bl) == 3)
bls = bls.union([bl + (p,) for bl in bls_len2 for p in pols])

# now, conjugate them if they're not in the data
bls = {bl if bl[:2] in info['bls'] else utils.reverse_bl(bl) for bl in bls}

# record polarizations as total of ones indexed in bls
pols = set(bl[2] for bl in bls)

# sort files by time of first integration
times.sort(key=lambda x: x[0][0])
info['times'] = np.concatenate([t[0] for t in times], axis=0)
Expand All @@ -1251,7 +1281,7 @@ def read_hera_hdf5(filenames, bls=None, pols=None, full_read_thresh=0.002,
# bail here if all we wanted was the info
if len(rv) == 0:
return {'info': info}

t = 0
for _times, filename, _info in times:
inds = bl2ind[filename]
Expand Down Expand Up @@ -1306,23 +1336,47 @@ def index_exp(i, j, p):
else:
for i, j, p in bls:
data[i, j, p][t:t + ntimes] = d[index_exp(i, j, p)]

t += ntimes

# Now subselect the times.
if keep_times is not None:
time_mask = np.array([
np.isclose(
keep_times,
t,
rtol=UVData._time_array.tols[0],
atol=UVData._time_array.tols[1],
).any() for t in info['times']
])
info['times'] = info['times'][time_mask]
for key in ['visdata', 'flags', 'nsamples']:
if key in rv:
for bl in rv[key]:
rv[key][bl] = rv[key][bl][time_mask]

# Quick renaming of data key for niceness
if 'visdata' in rv:
rv['data'] = rv.pop('visdata', [])
info['data_ants'] = np.array(sorted(info['data_ants']))

# Ensure info['bls'] matches input bls.
if antpairs is not None:
info['bls'] = {bl[:2] for bl in bls}

info['data_ants'] = set()
for bl in info['bls']:
info['data_ants'].add(bl[0])
info['data_ants'].add(bl[1])

rv['info'] = info
return rv


class HERADataFastReader():
class HERADataFastReader:
'''Wrapper class around read_hera_hdf5 meant to mimic the functionality of HERAData for drop-in replacement.'''

def __init__(self, input_data, read_metadata=True, check=False, skip_lsts=False):
'''Instantiates a HERADataFastReader object. Only supports reading uvh5 files, not writing them.
Does not support BDA and only supports patial i/o along baselines and polarization axes.

Arguments:
input_data: path or list of paths to uvh5 files.
read_metadata (bool, True): reads metadata from file and stores it internally to try to match HERAData
Expand All @@ -1346,6 +1400,8 @@ def __init__(self, input_data, read_metadata=True, check=False, skip_lsts=False)
else:
setattr(self, meta, None)

self.x_orientation = rv['info'].get('x_orientation', None)

# create functions that error informatively when trying to use standard HERAData/UVData methods
for funcname in list(dir(HERAData)):
if funcname.startswith('__') and funcname.endswith('__'):
Expand All @@ -1366,22 +1422,24 @@ def _adapt_metadata(self, info_dict, skip_lsts=False):
info_dict['data_antpos'] = {ant: info_dict['antpos'][ant] for ant in info_dict['data_ants']}
info_dict['times'] = np.unique(info_dict['times'])
info_dict['times_by_bl'] = {ap: info_dict['times'] for ap in info_dict['antpairs']}
info_dict['times_by_bl'].update({(a2, a1): info_dict['times'] for (a1, a2) in info_dict['antpairs']})
if not skip_lsts:
info_dict['lsts'] = JD2LST(info_dict['times'], info_dict['latitude'], info_dict['longitude'], info_dict['altitude'])
info_dict['lsts_by_bl'] = {ap: info_dict['lsts'] for ap in info_dict['antpairs']}

def _HERAData_error(self, *args, **kwargs):
raise NotImplementedError('HERADataFastReader does not support this method. Try HERAData instead.')

def read(self, bls=None, pols=None, full_read_thresh=0.002, read_data=True, read_flags=True,
def read(self, bls=None, pols=None, keep_times: list[float] | None = None,
full_read_thresh=0.002, read_data=True, read_flags=True,
read_nsamples=True, check=False, dtype=np.complex128, verbose=False, skip_lsts=False):
'''A faster read that only concatenates along the time axis. Puts times in ascending order, but does not
check that files are contiguous. Currently not BDA compatible.

Arguments:
bls: list of (ant_1, ant_2, [polstr]) tuples to read out of files. Default: all bls common to all files.
pols: list of pol strings to read out of files. Default: all, but is superceded by any polstrs listed in bls.
keep_times: list of times to read out of files. Default: all. Note: all times
are always read from the files, setting this only down-filters times
after reading.
full_read_thresh (0.002): fractional threshold for reading whole file instead of baseline by baseline.
read_data (bool, True): read data
read_flags (bool, True): read flags
Expand All @@ -1390,25 +1448,32 @@ def read(self, bls=None, pols=None, full_read_thresh=0.002, read_data=True, read
dtype (np.complex128): numpy datatype for output complex-valued arrays
verbose: print some progress messages.
skip_lsts (bool, False): save time by not computing LSTs from JDs

Returns:
data: DataContainer mapping baseline keys to complex visibility waterfalls (if read_data is True, else None)
flags: DataContainer mapping baseline keys to boolean flag waterfalls (if read_flags is True, else None)
nsamples: DataContainer mapping baseline keys to interger Nsamples waterfalls (if read_nsamples is True, else None)
'''
rv = read_hera_hdf5(self.filepaths, bls=bls, pols=pols, full_read_thresh=full_read_thresh,
read_data=read_data, read_flags=read_flags, read_nsamples=read_nsamples,
check=check, dtype=dtype, verbose=verbose)
rv = read_hera_hdf5(
self.filepaths, bls=bls, pols=pols, keep_times=keep_times,
full_read_thresh=full_read_thresh,
read_data=read_data, read_flags=read_flags, read_nsamples=read_nsamples,
check=check, dtype=dtype, verbose=verbose
)
self._adapt_metadata(rv['info'], skip_lsts=skip_lsts)
print("IN: ", rv['info']['antpairs'])

# make autocorrleations real by taking the abs, matches UVData._fix_autos()
# make autocorrelations real by taking the abs, matches UVData._fix_autos()
if 'data' in rv:
for bl in rv['data']:
if split_bl(bl)[0] == split_bl(bl)[1]:
rv['data'][bl] = np.abs(rv['data'][bl])

# construct datacontainers from result
return self._make_datacontainer(rv, 'data'), self._make_datacontainer(rv, 'flags'), self._make_datacontainer(rv, 'nsamples')
return (
self._make_datacontainer(rv, 'data'),
self._make_datacontainer(rv, 'flags'),
self._make_datacontainer(rv, 'nsamples')
)

def _make_datacontainer(self, rv, key='data'):
'''Converts outputs from read_hera_hdf5 to a more standard HERAData output.'''
Expand Down
Loading