REF: Reuse async loading logic for DataTree and open_groups (Fixes #11131)#11149
REF: Reuse async loading logic for DataTree and open_groups (Fixes #11131)#11149AAlexxis222 wants to merge 50 commits intopydata:mainfrom
Conversation
…o async-dtreec
Changes: - Refactor open_datatree() to use zarr_sync() with async implementation for concurrent dataset and index creation across groups - Add _open_datatree_from_stores_async() helper that opens datasets and creates indexes concurrently using asyncio.gather with a semaphore to limit concurrency (avoids deadlocks with stores like Icechunk) - Add open_datatree_async() method for explicit async API - Remove duplicate _maybe_create_default_indexes_async from zarr.py, now imports from api.py (single source of truth) This significantly improves performance when opening DataTrees from high-latency storage backends (e.g., ~2 seconds vs sequential loading).
Remove the asyncio.Semaphore that was limiting concurrency to 10 concurrent operations. Investigation showed: - Zarr already has built-in concurrency control (async.concurrency=10) - The semaphore only applied to asyncio.to_thread() calls, not zarr I/O - Removing it improves performance by ~30-40% (~2s -> ~1.2-1.4s) The semaphore was defensive code for a problem that doesn't exist - zarr and icechunk handle their own concurrency limits internally.
The async implementation uses zarr.core.sync which only exists in zarr v3. Add a conditional check using _zarr_v3() to: - Use async path with zarr_sync() for zarr v3 (concurrent loading) - Fall back to sequential loading for zarr v2 This fixes CI failures on min-versions environment which uses zarr v2.
…o async-dtreec
The previous commit (0ee2a73) removed the semaphore thinking zarr handles its own concurrency, but icechunk can deadlock when too many asyncio.to_thread() calls try to access it simultaneously. This was discovered when testing with larger stores (23+ groups) where all threads would start but never complete. The semaphore limits concurrent to_thread calls to 10, which prevents the deadlock while still providing significant performance benefits over sequential loading.
- Add helper methods _build_group_members and _create_stores_from_members to reduce code duplication between sync and async store opening - Use zarr_sync() to run async index creation in _datatree_from_backend_datatree for zarr engine, making open_datatree fully async behind the scenes - Fix missing chunks validation and source encoding in open_datatree_async - Add tests for chunks validation, source encoding, and chunks parameter
- Add type annotations to nested async functions in _datatree_from_backend_datatree to fix mypy annotation-unchecked notes breaking pytest-mypy-plugins tests - Use os.path.join and os.path.normpath in test_async_source_encoding for cross-platform compatibility on Windows
Add type annotations to _maybe_create_default_indexes_async and its nested functions (load_var, create_index, _create) to satisfy mypy's annotation-unchecked checks. Also add Variable and Hashable imports to the TYPE_CHECKING block. This fixes pytest-mypy-plugins tests that were failing due to mypy emitting annotation-unchecked notes for untyped nested functions.
- Remove open_datatree_async() from api.py (public API) - Remove open_datatree_async() from zarr.py (backend method) - Keep internal async optimization in _datatree_from_backend_datatree() - Use _zarr_v3() for proper zarr version check instead of ImportError - Update tests to only test internal async functionality - Add test to verify sync open_datatree uses async internally for zarr v3 The async optimization is now internal only - users call the sync open_datatree() which automatically uses async index creation for zarr v3 backends.
Co-authored-by: Justus Magin <keewis@users.noreply.github.com>
Benchmarking showed async index creation provides no measurable benefit since it's CPU-bound work. Simplified to sync loop per reviewer feedback.
for more information, see https://pre-commit.ci
Co-authored-by: Justus Magin <keewis@users.noreply.github.com>
for more information, see https://pre-commit.ci
Co-authored-by: Justus Magin <keewis@users.noreply.github.com>
for more information, see https://pre-commit.ci
- Replace asyncio.gather with asyncio.TaskGroup for better error handling (cancels outstanding tasks on error) - Add max_concurrency parameter to open_datatree for controlling parallel I/O operations (defaults to 10) - Add StoreBackendEntrypoint.open_dataset_async method - Add test for open_dataset_async equivalence
7148848 to
a07e5d8
Compare
a07e5d8 to
c204229
Compare
|
at the moment it's pretty hard to review this, so I think we should try to merge #10742 first and rebase on |
|
Sounds good. I'll put this on hold until #10742 is merged, and then I'll rebase on main. Thanks for the guidance! |
There was a problem hiding this comment.
Pull request overview
Refactors Zarr async group loading so open_datatree and async group-opening share a single, more robust execution path (bounded concurrency, TaskGroup, and async dataset opening/indexing), and exposes a max_concurrency control to callers.
Changes:
- Added
StoreBackendEntrypoint.open_dataset_async()andZarrStore.open_store_async()to support non-blocking async loading. - Extracted shared async group opening core into
ZarrBackendEntrypoint._open_groups_from_stores_async()and wired it intoopen_datatreeandopen_groups_as_dict_async. - Added
max_concurrencyparameter toopen_datatree, plus changelog entry and new async-focused tests.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| xarray/backends/zarr.py | Adds async store discovery and shared async group-opening core; wires concurrency-limited loading into open_datatree / open_groups_as_dict_async. |
| xarray/backends/store.py | Introduces open_dataset_async() via asyncio.to_thread() to avoid blocking the event loop. |
| xarray/backends/api.py | Adds async default-index creation helper and uses it for zarr-v3 datatree index creation; threads max_concurrency into backend kwargs. |
| xarray/tests/test_backends_zarr_async.py | Adds targeted tests for async zarr group opening and async default-index creation. |
| doc/whats-new.rst | Documents new open_datatree(max_concurrency=...) feature and performance note. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| return found_groups | ||
|
|
||
| # Start discovery from rootshalgive |
There was a problem hiding this comment.
Typo in comment: "rootshalgive" looks accidental and should be removed or corrected (e.g., "root").
| # Start discovery from rootshalgive | |
| # Start discovery from root |
| """Async version of open_store using flat group discovery. | ||
|
|
||
| This method uses store.list() to discover all groups in a single | ||
| async call, which is significantly faster than recursive traversal | ||
| for stores that support listing (like icechunk). |
There was a problem hiding this comment.
open_store_async docstring claims group discovery happens via a single store.list() call, but the implementation uses list_dir plus per-directory store.get calls (and recursive traversal). Please update the docstring to reflect the actual behavior so callers have accurate performance expectations.
| """Async version of open_store using flat group discovery. | |
| This method uses store.list() to discover all groups in a single | |
| async call, which is significantly faster than recursive traversal | |
| for stores that support listing (like icechunk). | |
| """Async version of open_store using asynchronous group discovery. | |
| This method discovers all groups by recursively traversing the store, | |
| issuing asynchronous directory listing calls (for example via | |
| store.list_dir when available) and per-directory store.get calls. | |
| As a result, group discovery may involve multiple asynchronous | |
| operations and round-trips, especially for large or deeply nested | |
| hierarchies. |
| # Run all subdirectory checks in parallel | ||
| results = await asyncio.gather(*[check_subdir(sd) for sd in subdirs]) | ||
|
|
There was a problem hiding this comment.
_iter_zarr_groups_async uses asyncio.gather over every immediate subdirectory (results = await asyncio.gather(...)) with no concurrency bound. For stores with many directories/groups this can trigger a very large burst of concurrent metadata reads. Consider adding a semaphore / bounded gather similar to the max_concurrency approach used elsewhere.
| - Added ``max_concurrency`` parameter to :py:func:`open_datatree` to control | ||
| the maximum number of concurrent I/O operations when opening groups in parallel | ||
| with the Zarr backend (:pull:`10742`). | ||
| By `Alfonso Ladino <https://github.com/aladinor>`_. |
There was a problem hiding this comment.
This whats-new entry attributes the new open_datatree(max_concurrency=...) parameter to :pull:10742, but this PR appears to be the one introducing that public API. Please update the referenced pull number to the current PR so the changelog accurately reflects where the feature was added.
| - Improve performance of :py:func:`open_datatree` for zarr stores by using async/concurrent | ||
| loading of groups and indexes (:pull:`10742`). | ||
| By `Alfonso Ladino <https://github.com/aladinor>`_. |
There was a problem hiding this comment.
This performance note references :pull:10742, but the async/concurrent loading behavior and the new max_concurrency control are being changed in this PR. Please confirm the correct PR reference here (likely the current PR) so release notes stay accurate.
| # Create indexes concurrently | ||
| ds = await _maybe_create_default_indexes_async(ds) | ||
| if group: |
There was a problem hiding this comment.
_open_groups_from_stores_async unconditionally calls _maybe_create_default_indexes_async, which means indexes will be created even when the public open_datatree/open_groups callers pass create_default_indexes=False (index creation is supposed to be controlled in backends/api.py). This also makes open_groups_as_dict_async diverge from open_groups_as_dict (sync) behavior. Consider removing index creation from this backend helper (let the API layer handle it), or add an explicit flag to control it and ensure create_default_indexes=False yields datasets with empty xindexes.
| from xarray.backends.api import _maybe_create_default_indexes_async | ||
|
|
||
| if max_concurrency is None: | ||
| max_concurrency = 10 |
There was a problem hiding this comment.
max_concurrency is used to construct asyncio.Semaphore(max_concurrency) but there is no validation for non-positive values. Passing 0 (or a negative int) will deadlock tasks waiting on the semaphore. Please validate that max_concurrency is None or >= 1 and raise a clear ValueError otherwise.
| max_concurrency = 10 | |
| max_concurrency = 10 | |
| elif max_concurrency < 1: | |
| raise ValueError( | |
| f"max_concurrency must be None or an integer >= 1, got {max_concurrency!r}" | |
| ) |
| async def create_indexes_async() -> dict[str, Dataset]: | ||
| import asyncio | ||
|
|
||
| results: dict[str, Dataset] = {} | ||
| tasks = [ | ||
| _create_index_for_node(path, node.dataset) | ||
| for path, [node] in group_subtrees(backend_tree) | ||
| ] | ||
| for fut in asyncio.as_completed(tasks): | ||
| path, ds = await fut | ||
| results[path] = ds | ||
| return results |
There was a problem hiding this comment.
The zarr-v3 async index creation path schedules one task per node without any concurrency limit. For large DataTrees this can create a very large number of simultaneous loads and thread offloads, which can overwhelm the event loop / threadpool and negate the benefit of max_concurrency. Consider bounding concurrency here as well (e.g., reuse a semaphore and/or plumb max_concurrency through), similar to the backend’s bounded loading.
Fixes #11131.
Previously,
_open_datatree_from_stores_async(robust) andopen_groups_as_dict_async(naive) contained duplicated logic with inconsistent behavior. The former handled semaphores and async indexing correctly, while the latter did not.Changes
TaskGroup,open_dataset_async,_maybe_create_default_indexes_async) into a shared private method:_open_groups_from_stores_async.open_groups_as_dict_asyncto open stores asynchronously and then delegate to the shared core.open_datatreeto delegate to the shared core (viazarr_sync), removing the need for_open_datatree_from_stores_async.Implementation Details
I chose to extract a shared
_open_groups_from_stores_asyncinstead of making one public function call the other directly. This allowsopen_datatree(which holds pre-opened sync stores) andopen_groups_as_dict_async(which opens stores internally) to share the exact same execution engine.Verification
pytest xarray/tests/test_backends.py xarray/tests/test_datatree.py -k "zarr or async".