Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load esmvalcore.dataset.Dataset objects in parallel using Dask #2517

Open
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

bouweandela
Copy link
Member

@bouweandela bouweandela commented Sep 9, 2024

Description

Load the individual files in a dataset in parallel using Dask and add the option to get a dask.delayed.Delayed back from esmvalcore.dataset.Dataset that can be fed to dask.compute to get an iris.cube.Cube. This can considerably speed up loading datasets that consist of many files or, when used with the delayed option, speed up loading multiple datasets.

Related to #2300 and #2316

Link to documentation: https://esmvaltool--2517.org.readthedocs.build/projects/ESMValCore/en/2517/api/esmvalcore.dataset.html#esmvalcore.dataset.Dataset.load


Before you get started

Checklist

It is the responsibility of the author to make sure the pull request is ready to review. The icons indicate whether the item will be subject to the 🛠 Technical or 🧪 Scientific review.


To help with the number pull requests:

@bouweandela bouweandela added the dask related to improvements using Dask label Sep 9, 2024
Copy link

codecov bot commented Sep 9, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 94.75%. Comparing base (4c36a0c) to head (c7da0a9).

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2517   +/-   ##
=======================================
  Coverage   94.74%   94.75%           
=======================================
  Files         252      252           
  Lines       14509    14521   +12     
=======================================
+ Hits        13747    13759   +12     
  Misses        762      762           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@valeriupredoi valeriupredoi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is brilliant, bud! I've been meaning to get delayed in places in Core for some time. Got one possible nagging comment through - from https://docs.dask.org/en/stable/delayed-best-practices.html they say "Every delayed task has an overhead of a few hundred microseconds. Usually this is ok, but it can become a problem if you apply dask.delayed too finely. In this case, it’s often best to break up your many tasks into batches or use one of the Dask collections to help you." - I am guessing this applies to O(millions) (at least) but can we maybe run a test with one of those mega recipes that loads hundreds of datasets?

@valeriupredoi
Copy link
Contributor

oh and maybe a line or two in the documentation perhaps? Bit of an advanced topic, so maybe a very short reference

@bouweandela
Copy link
Member Author

bouweandela commented Oct 23, 2024

This may need a bit more testing. The recipe below fails with the distributed scheduler and the iris main branch.

# ESMValTool
# recipe_python.yml
#
# See https://docs.esmvaltool.org/en/latest/recipes/recipe_examples.html
# for a description of this recipe.
#
# See https://docs.esmvaltool.org/projects/esmvalcore/en/latest/recipe/overview.html
# for a description of the recipe format.
---
documentation:
  description: |
    Example recipe that plots a map and timeseries of temperature.

  title: Recipe that runs an example diagnostic written in Python.

  authors:
    - andela_bouwe
    - righi_mattia

  maintainer:
    - schlund_manuel

  references:
    - acknow_project

  projects:
    - esmval
    - c3s-magic

datasets:
  - {dataset: FGOALS-f3-L, ensemble: 'r1i1p1f1', grid: gn}

preprocessors:
  # See https://docs.esmvaltool.org/projects/esmvalcore/en/latest/recipe/preprocessor.html
  # for a description of the preprocessor functions.

  annual_mean_global:
    area_statistics:
      operator: mean
    annual_statistics:
      operator: mean
    convert_units:
      units: degrees_C

diagnostics:

  timeseries:
    description: Annual mean temperature in Amsterdam and global mean since 1850.
    themes:
      - phys
    realms:
      - atmos
    variables:
      tos_global:
        short_name: tos
        mip: Omon
        project: CMIP6
        exp: [historical, ssp585]
        preprocessor: annual_mean_global
        timerange: 1850/2100
        caption: Annual global mean {long_name} according to {dataset}.
    scripts:
      script1:
        script: examples/diagnostic.py
        quickplot:
          plot_type: plot

@bouweandela
Copy link
Member Author

This issue mentioned above is fixed by SciTools/iris#6187.

Copy link
Contributor

@schlunma schlunma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I fear that this will break existing recipes due to changes to the preprocessing pipeline. Will remove this block once resolved.

@bouweandela
Copy link
Member Author

Sorry, I fear that this will break existing recipes due to changes to the preprocessing pipeline. Will remove this block once resolved.

Thanks for reviewing @schlunma! As far as I can see it does not change the preprocessing pipeline, but maybe you can find a case where it does? Maybe you could run a recipe that you think could potentially be broken as a test and report back the result?

@schlunma
Copy link
Contributor

schlunma commented Dec 3, 2024

I tested recipe_schlund20esd.yml with this (using a Dask distributed scheduler):

esmvaltool run ~/ESMValTool/esmvaltool/recipes/recipe_schlund20esd.yml --search_esgf=when_missing

This fails with ValueError: Incompatible units: delta and surface_air_pressure must have the same units.

In the main branch it runs successfully.

2024-12-03 16:16:05,162 UTC [2888825] ERROR   Program terminated abnormally, see stack trace below for more information:                                                                                                                                                      
multiprocessing.pool.RemoteTraceback:                                                                                                                                                                                                                                         
"""                                                                                                                                                                                                                                                                           
Traceback (most recent call last):                                                                                                                                                                                                                                            
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/multiprocessing/pool.py", line 125, in worker                                                                                                                                                                 
    result = (True, func(*args, **kwds))                                                                                                                                                                                                                                      
                    ^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                       
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_task.py", line 911, in _run_task                                                                                                                                                                                         
    output_files = task.run()                                                                                                                                                                                                                                                 
                   ^^^^^^^^^^                                                                                                                                                                                                                                                 
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_task.py", line 291, in run                                                                                                                                                                                               
    self.output_files = self._run(input_files)                                                                                                                                                                                                                                
                        ^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/__init__.py", line 726, in _run                                                                                                                                                                              
    product.apply(step, self.debug)                                                                                                                                                                                                                                           
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/__init__.py", line 524, in apply                                                                                                                                                                             
    self.cubes = preprocess(                                                                                                                                                                                                                                                  
                 ^^^^^^^^^^^                                                                                                                                                                                                                                                  
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/__init__.py", line 428, in preprocess                                                                                                                                                                        
    _run_preproc_function(                                                                                                                                                                                                                                                    
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/__init__.py", line 366, in _run_preproc_function                                                                                                                                                             
    return function(items, **kwargs)                                                                                                                                                                                                                                          
           ^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                          
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/_shared.py", line 237, in wrapper                                                                                                                                                                            
    result = func(data, *args, **kwargs)                                                                                                                                                                                                                                      
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                      
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/_regrid.py", line 879, in regrid                                                                                                                                                                             
    cube = regridder(cube)                                                                                                                                                                                                                                                    
           ^^^^^^^^^^^^^^^                                                                                                                                                                                                                                                    
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/iris/analysis/_regrid.py", line 983, in __call__                                                                                                                                                
    result = _create_cube(                                                                                                                                                                                                                                                    
             ^^^^^^^^^^^^^                                                                                                                                                                                                                                                    
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/iris/analysis/_regrid.py", line 1120, in _create_cube                                                                                                                                           
    result.add_aux_factory(factory.updated(coord_mapping))                                                                                                                                                                                                                    
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                     
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/iris/aux_factory.py", line 176, in updated                                                                                                                                                      
    return type(self)(**new_dependencies)                                                                                                                                                                                                                                     
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                     
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/iris/aux_factory.py", line 738, in __init__                                                                                                                                                     
    self._check_dependencies(delta, sigma, surface_air_pressure)                                                                                                                                                                                                              
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/iris/aux_factory.py", line 790, in _check_dependencies                                                                                                                                          
    raise ValueError(msg)
ValueError: Incompatible units: delta and surface_air_pressure must have the same units.
"""

The above exception was the direct cause of the following exception:                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                              
Traceback (most recent call last):                                                                                                                                                                                                                                            
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_main.py", line 618, in run                                                                                                                                                                                               
    fire.Fire(ESMValTool())                                                                                                                                                                                                                                                   
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/fire/core.py", line 135, in Fire                                                                                                                                                                
    component_trace = _Fire(component, args, parsed_flag_args, context, name)                                                                                                                                                                                                 
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                 
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/fire/core.py", line 468, in _Fire                                                                                                                                                               
    component, remaining_args = _CallAndUpdateTrace(                                                                                                                                                                                                                          
                                ^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                          
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/fire/core.py", line 684, in _CallAndUpdateTrace                                                                                                                                                 
    component = fn(*varargs, **kwargs)                                                                                                                                                                                                                                        
                ^^^^^^^^^^^^^^^^^^^^^^                                                                                                                                                                                                                                        
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_main.py", line 446, in run                                                                                                                                                                                               
    self._run(recipe, session, cli_config_dir)                                                                                                                                                                                                                                
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_main.py", line 501, in _run                                                                                                                                                                                              
    process_recipe(recipe_file=recipe, session=session)                                                                                                                                                                                                                       
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_main.py", line 143, in process_recipe                                                                                                                                                                                    
    recipe.run()                                                                                                                                                                                                                                                              
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_recipe/recipe.py", line 1163, in run                                                                                                                                                                                     
    self.tasks.run(max_parallel_tasks=self.session["max_parallel_tasks"])                                                                                                                                                                                                     
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_task.py", line 827, in run                                                                                                                                                                                               
    self._run_parallel(address, max_parallel_tasks)                                                                                                                                                                                                                           
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_task.py", line 873, in _run_parallel                                                                                                                                                                                     
    _copy_results(task, running[task])                                                                                                                                                                                                                                        
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_task.py", line 900, in _copy_results                                                                                                                                                                                     
    task.output_files, task.products = future.get()                                                                                                                                                                                                                           
                                       ^^^^^^^^^^^^                                                                                                                                                                                                                           
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
    ^^^^^^^^^^^^^^^^^
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_task.py", line 911, in _run_task
    output_files = task.run()
    ^^^^^^^^^^^^^^^^^
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/_task.py", line 291, in run
    self.output_files = self._run(input_files)
    ^^^^^^^^^^^^^^^^^
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/__init__.py", line 726, in _run
    product.apply(step, self.debug)
    ^^^^^^^^^^^^^^^^^
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/__init__.py", line 524, in apply
    self.cubes = preprocess(
    ^^^^^^^^^^^^^^^^^
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/__init__.py", line 428, in preprocess
    _run_preproc_function(
    ^^^^^^^^^^^^^^^^^
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/__init__.py", line 366, in _run_preproc_function
    return function(items, **kwargs)
    ^^^^^^^^^^^^^^^^^
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/_shared.py", line 237, in wrapper
    result = func(data, *args, **kwargs)
    ^^^^^^^^^^^^^^^^^
  File "/home/b/b309141/repos/ESMValCore/esmvalcore/preprocessor/_regrid.py", line 879, in regrid
    cube = regridder(cube)
      ^^^^^^^^^^^^^^^^^
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/iris/analysis/_regrid.py", line 983, in __call__
    result = _create_cube(
    ^^^^^^^^^^^^^^^^^
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/iris/analysis/_regrid.py", line 1120, in _create_cube
    result.add_aux_factory(factory.updated(coord_mapping))
    ^^^^^^^^^^^^^^^^^
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/iris/aux_factory.py", line 176, in updated
    return type(self)(**new_dependencies)
    ^^^^^^^^^^^^^^^^^
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/iris/aux_factory.py", line 738, in __init__
    self._check_dependencies(delta, sigma, surface_air_pressure)
    ^^^^^^^^^^^^^^^^^
  File "/work/bd0854/b309141/micromamba/envs/esm/lib/python3.12/site-packages/iris/aux_factory.py", line 790, in _check_dependencies
    raise ValueError(msg)
    ^^^^^^^^^^^^^^^^^
ValueError: Incompatible units: delta and surface_air_pressure must have the same units.

Full log:
job_14076031.out.log

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backwards incompatible change dask related to improvements using Dask
Projects
No open projects
Development

Successfully merging this pull request may close these issues.

3 participants