-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
REF: Reuse async loading logic for DataTree and open_groups (Fixes #11131) #11149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9916b0b
afa42e9
d469f2e
d53498a
3b26dd6
b5ab48a
94a9efd
288a818
573a700
7557261
3c10a23
013804c
f4ca679
531c589
0ee2a73
6d6cd1e
640081b
0ee154e
542cad3
31b50dc
b6a3b27
6c4d9e4
a711167
2127aa5
5c2f62c
3a698d2
2501b2a
c3ec77e
9bf810e
10e4756
6f97d9c
fbf4617
c53f0bd
5da5adc
1debb7c
06bdab4
480b872
02ee46b
b008620
422d127
179c20c
a4844b5
b58ffb3
085f9ec
87fd361
ab0cbfa
adac9b1
8cf3ad7
c204229
73b2eb2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,10 @@ v2026.03.0 (unreleased) | |
| New Features | ||
| ~~~~~~~~~~~~ | ||
|
|
||
| - Added ``max_concurrency`` parameter to :py:func:`open_datatree` to control | ||
| the maximum number of concurrent I/O operations when opening groups in parallel | ||
| with the Zarr backend (:pull:`10742`). | ||
| By `Alfonso Ladino <https://github.com/aladinor>`_. | ||
|
|
||
| Breaking Changes | ||
| ~~~~~~~~~~~~~~~~ | ||
|
|
@@ -239,6 +243,9 @@ Documentation | |
| Performance | ||
| ~~~~~~~~~~~ | ||
|
|
||
| - Improve performance of :py:func:`open_datatree` for zarr stores by using async/concurrent | ||
| loading of groups and indexes (:pull:`10742`). | ||
| By `Alfonso Ladino <https://github.com/aladinor>`_. | ||
|
Comment on lines
+246
to
+248
|
||
| - Add a fastpath to the backend plugin system for standard engines (:issue:`10178`, :pull:`10937`). | ||
| By `Sam Levang <https://github.com/slevang>`_. | ||
| - Optimize :py:class:`~xarray.coding.variables.CFMaskCoder` decoder (:pull:`11105`). | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,6 +65,7 @@ | |
| NestedSequence, | ||
| T_Chunks, | ||
| ) | ||
| from xarray.core.variable import Variable | ||
|
|
||
| T_NetcdfEngine = Literal["netcdf4", "scipy", "h5netcdf"] | ||
| T_Engine = Union[ | ||
|
|
@@ -349,7 +350,37 @@ def _datatree_from_backend_datatree( | |
|
|
||
| _protect_datatree_variables_inplace(backend_tree, cache) | ||
| if create_default_indexes: | ||
| tree = backend_tree.map_over_datasets(_maybe_create_default_indexes) | ||
| _use_zarr_async = False | ||
| if engine == "zarr": | ||
| from xarray.backends.zarr import _zarr_v3 | ||
|
|
||
| _use_zarr_async = _zarr_v3() | ||
|
|
||
| if _use_zarr_async: | ||
| from zarr.core.sync import sync as zarr_sync | ||
|
|
||
| async def create_indexes_async() -> dict[str, Dataset]: | ||
| import asyncio | ||
|
|
||
| results: dict[str, Dataset] = {} | ||
| tasks = [ | ||
| _create_index_for_node(path, node.dataset) | ||
| for path, [node] in group_subtrees(backend_tree) | ||
| ] | ||
| for fut in asyncio.as_completed(tasks): | ||
| path, ds = await fut | ||
| results[path] = ds | ||
| return results | ||
|
Comment on lines
+362
to
+373
|
||
|
|
||
| async def _create_index_for_node( | ||
| path: str, ds: Dataset | ||
| ) -> tuple[str, Dataset]: | ||
| return path, await _maybe_create_default_indexes_async(ds) | ||
|
|
||
| results = zarr_sync(create_indexes_async()) | ||
| tree = DataTree.from_dict(results, name=backend_tree.name) | ||
| else: | ||
| tree = backend_tree.map_over_datasets(_maybe_create_default_indexes) | ||
| else: | ||
| tree = backend_tree | ||
| if chunks is not None: | ||
|
|
@@ -386,6 +417,36 @@ def _datatree_from_backend_datatree( | |
| return tree | ||
|
|
||
|
|
||
| async def _maybe_create_default_indexes_async(ds: Dataset) -> Dataset: | ||
| """Create default indexes for dimension coordinates asynchronously. | ||
|
|
||
| This function parallelizes both data loading and index creation, | ||
| which can significantly speed up opening datasets with many coordinates. | ||
| """ | ||
| import asyncio | ||
|
|
||
| to_index_names = [ | ||
| name | ||
| for name, coord in ds.coords.items() | ||
| if coord.dims == (name,) and name not in ds.xindexes | ||
| ] | ||
|
|
||
| if not to_index_names: | ||
| return ds | ||
|
|
||
| async def load_var(var: Variable) -> Variable: | ||
| try: | ||
| return await var.load_async() | ||
| except NotImplementedError: | ||
| return await asyncio.to_thread(var.load) | ||
|
|
||
| await asyncio.gather(*[load_var(ds.variables[name]) for name in to_index_names]) | ||
|
|
||
| variables = {name: ds.variables[name] for name in to_index_names} | ||
| new_coords = Coordinates(variables) | ||
| return ds.assign_coords(new_coords) | ||
|
|
||
|
|
||
| def open_dataset( | ||
| filename_or_obj: T_PathFileOrDataStore, | ||
| *, | ||
|
|
@@ -882,6 +943,7 @@ def open_datatree( | |
| chunked_array_type: str | None = None, | ||
| from_array_kwargs: dict[str, Any] | None = None, | ||
| backend_kwargs: dict[str, Any] | None = None, | ||
| max_concurrency: int | None = None, | ||
| **kwargs, | ||
| ) -> DataTree: | ||
| """ | ||
|
|
@@ -1014,6 +1076,13 @@ def open_datatree( | |
| chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg. | ||
| For example if :py:func:`dask.array.Array` objects are used for chunking, additional kwargs will be passed | ||
| to :py:func:`dask.array.from_array`. Experimental API that should not be relied upon. | ||
| max_concurrency : int, optional | ||
| Maximum number of concurrent I/O operations when opening groups in | ||
| parallel. This limits the number of groups that are loaded simultaneously. | ||
| Useful for controlling resource usage with large datatrees or stores | ||
| that may have limitations on concurrent access (e.g., icechunk). | ||
| Only used by backends that support parallel loading (currently Zarr v3). | ||
| If None (default), the backend uses its default value (typically 10). | ||
| backend_kwargs: dict | ||
| Additional keyword arguments passed on to the engine open function, | ||
| equivalent to `**kwargs`. | ||
|
|
@@ -1074,6 +1143,9 @@ def open_datatree( | |
| ) | ||
| overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None) | ||
|
|
||
| if max_concurrency is not None: | ||
| kwargs["max_concurrency"] = max_concurrency | ||
|
|
||
| backend_tree = backend.open_datatree( | ||
| filename_or_obj, | ||
| drop_variables=drop_variables, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whats-new entry attributes the new open_datatree(max_concurrency=...) parameter to :pull:
10742, but this PR appears to be the one introducing that public API. Please update the referenced pull number to the current PR so the changelog accurately reflects where the feature was added.