Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Awkward Queries #307

Open
BenGalewsky opened this issue Sep 13, 2023 · 4 comments
Open

Awkward Queries #307

BenGalewsky opened this issue Sep 13, 2023 · 4 comments
Assignees

Comments

@BenGalewsky
Copy link
Contributor

As an analyzer I want to specify my ServiceX queries using awkward syntax so I can perform row-level cuts without learning a new language

Description

We will use Awkward DASK to create a task graph for selects along with necessary_columns method to determine properties to include in the results. This will be translated into Qastle to pass on to the code generators.

We can add annotations to the task graph to indicate where the select goes beyond what ServiceX can handle.

Assumptions

  1. It will only do row-level filtering
  2. For the first pass, we won't attempt to unify the selections between the ServiceX parts and the coffea parts.
@BenGalewsky
Copy link
Contributor Author

This code snippet was submitted by Lindsey Gray

from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from distributed import Client
import dask
import dask_awkward
import awkward as ak
import hist.dask as hda

def extract_pushdown(coll):
    hlg_sorted = coll.dask._toposort_layers()
    pushdown_deps = []
    for key in hlg_sorted:
        annotations = coll.dask.layers[key].annotations
        if annotations is not None and "pushdown" in annotations:
            #print(key, coll.dask.layers[key].annotations)
            pushdown_deps = [key] + pushdown_deps
    for dep in pushdown_deps:
        layer = coll.dask.layers[dep]
        fcn = list(layer.dsk.values())[0][0]
        if isinstance(layer, dask_awkward.layers.AwkwardBlockwiseLayer) and not isinstance(layer, dask_awkward.layers.AwkwardInputLayer):
            print(dir(layer))
            print(layer.dsk)
            print(list(layer.keys()))
            print(dep, fcn.fn)
            print(dir(fcn))
            print(fcn.arg_repackers[0])
        else:
            print(dep, fcn)

if __name__ == "__main__":
    #client = Client()


    dask.config.set({"awkward.optimization.enabled": True, "awkward.raise-failed-meta": True, "awkward.optimization.on-fail": "raise"})

    with dask.annotate(pushdown="servicex"):
        events = NanoEventsFactory.from_root(
            {"tests/samples/nano_dy.root": "Events"},
            metadata={"dataset": "nano_dy"},
            schemaclass=NanoAODSchema,
            permit_dask=True,
        ).events()

        mask = events.Muon.pt > 30
        events = events[ak.any(mask, axis=1)]
        
    myhist = hda.Hist.new.Regular(50, -2.5, 2.5, name="abseta").Double()

    myhist.fill(abseta=abs(events.Muon.eta))

    extract_pushdown(myhist)

@ponyisi
Copy link
Collaborator

ponyisi commented Jun 5, 2024

We have significant support for expressions and filtering using awkward syntax now using the uproot-raw codegen.

@ponyisi
Copy link
Collaborator

ponyisi commented Sep 5, 2024

Following some discussion with Jim Pivarski, a thought about a first way of tying ServiceX and dask-awkward together:

  • we need to provide dask-awkward with the schema of at least one input file. I would imagine a separate microservice that used the DID finder to look up the dataset files and extract metadata from one of them, then returning the schema to the user. (It could also determine the number of files, I guess)
  • dask-awkward can then compute the columns that are necessary for its operations.
  • at some point dask-awkward might be smart enough to come up with a cut expression that can be interpreted with uproot.open, but as a zeroth-order thing we might just ask the users to specify this as an argument to their servicex.dask_awkward() call.
  • We then submit the ServiceX transformation and the dask tasks. Some magic to allow the dask inputs to be created once the corresponding per-file transformations are done.

@BenGalewsky
Copy link
Contributor Author

  • I would imagine a separate microservice that used the DID finder to look up the dataset files and extract metadata from one of them, then returning the schema to the user.

The return of the preflight check! We used to have a service that would review a sample file to decide if the transform would work before committing the rest of the workers. We decided it wasn't worth the effort and removed that functionality.

@ponyisi ponyisi added this to the 3.2 New milestone Oct 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants