Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add abstraction for popping connection dictionary to model. #96

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 8 additions & 101 deletions gxformat2/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
import argparse
import copy
import json
import logging
import os
import sys
import uuid
from typing import Any, Dict, Optional

from ._labels import Labels
from .model import (
clean_connection,
convert_dict_to_id_list_if_needed,
ensure_step_position,
inputs_as_native_steps,
pop_connect_from_step_dict,
setup_connected_values,
SUPPORT_LEGACY_CONNECTIONS,
with_step_ids,
)
from .yaml import ordered_load
Expand All @@ -22,8 +25,6 @@
Convert a Format 2 Galaxy workflow description into a native format.
"""

# source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output
SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1"
STEP_TYPES = [
"subworkflow",
"data_input",
Expand Down Expand Up @@ -82,22 +83,11 @@
},
}

log = logging.getLogger(__name__)


def rename_arg(argument):
return argument


def clean_connection(value):
if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS:
# Hope these are just used by Galaxy testing workflows and such, and not in production workflows.
log.warn(f"Legacy workflow syntax for connections [{value}] will not be supported in the future")
value = value.replace("#", "/", 1)
else:
return value


class ImportOptions:

def __init__(self):
Expand Down Expand Up @@ -381,7 +371,7 @@ def transform_pause(context, step, default_name="Pause for dataset review"):
"name": name
}

connect = _init_connect_dict(step)
connect = pop_connect_from_step_dict(step)
_populate_input_connections(context, step, connect)
_populate_tool_state(step, tool_state)

Expand All @@ -398,7 +388,7 @@ def transform_subworkflow(context, step):
tool_state = {
}

connect = _init_connect_dict(step)
connect = pop_connect_from_step_dict(step)
_populate_input_connections(context, step, connect)
_populate_tool_state(step, tool_state)

Expand All @@ -407,10 +397,6 @@ def _runtime_value():
return {"__class__": "RuntimeValue"}


def _connected_value():
return {"__class__": "ConnectedValue"}


def transform_tool(context, step):
if "tool_id" not in step:
raise Exception("Tool steps must define a tool_id.")
Expand All @@ -428,48 +414,13 @@ def transform_tool(context, step):
"__page__": 0,
}

connect = _init_connect_dict(step)

def append_link(key, value):
if key not in connect:
connect[key] = []
assert "$link" in value
link_value = value["$link"]
connect[key].append(clean_connection(link_value))

def replace_links(value, key=""):
if _is_link(value):
append_link(key, value)
# Filled in by the connection, so to force late
# validation of the field just mark as ConnectedValue,
# which should be further validated by Galaxy
return _connected_value()
if isinstance(value, dict):
new_values = {}
for k, v in value.items():
new_key = _join_prefix(key, k)
new_values[k] = replace_links(v, new_key)
return new_values
elif isinstance(value, list):
new_values = []
for i, v in enumerate(value):
# If we are a repeat we need to modify the key
# but not if values are actually $links.
if _is_link(v):
append_link(key, v)
new_values.append(None)
else:
new_key = "%s_%d" % (key, i)
new_values.append(replace_links(v, new_key))
return new_values
else:
return value
connect = pop_connect_from_step_dict(step)

# TODO: handle runtime inputs and state together.
runtime_inputs = step.get("runtime_inputs", [])
if "state" in step or runtime_inputs:
step_state = step.pop("state", {})
step_state = replace_links(step_state)
step_state = setup_connected_values(step_state, append_to=connect)

for key, value in step_state.items():
tool_state[key] = json.dumps(value)
Expand Down Expand Up @@ -629,50 +580,6 @@ def _action(type, name, arguments):
}


def _is_link(value):
return isinstance(value, dict) and "$link" in value


def _join_prefix(prefix, key):
if prefix:
new_key = f"{prefix}|{key}"
else:
new_key = key
return new_key


def _init_connect_dict(step):
if "connect" not in step:
step["connect"] = {}

connect = step["connect"]
del step["connect"]

# handle CWL-style in dict connections.
if "in" in step:
step_in = step["in"]
assert isinstance(step_in, dict)
connection_keys = set()
for key, value in step_in.items():
# TODO: this can be a list right?
if isinstance(value, dict) and 'source' in value:
value = value["source"]
elif isinstance(value, dict) and 'default' in value:
continue
elif isinstance(value, dict):
raise KeyError(f'step input must define either source or default {value}')
connect[key] = [value]
connection_keys.add(key)

for key in connection_keys:
del step_in[key]

if len(step_in) == 0:
del step['in']

return connect


def _populate_input_connections(context, step, connect):
_ensure_inputs_connections(step)
input_connections = step["input_connections"]
Expand Down
131 changes: 130 additions & 1 deletion gxformat2/model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,136 @@
"""Abstractions for dealing with Format2 data."""
from typing import cast, Dict, List, Union
import logging
import os
from typing import (
Any,
cast,
Dict,
List,
Optional,
Union,
)

from typing_extensions import TypedDict

log = logging.getLogger(__name__)

DictOrList = Union[Dict, List]
ConnectDict = dict


EmbeddedLink = TypedDict("EmbeddedLink", {"$link": str})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmchilton This EmbeddedLink type seems unused and generates this warning when building the docs:

WARNING: invalid signature for autoattribute ('gxformat2.model::EmbeddedLink.$link') [autodoc]
WARNING: don't know which module to import for autodocumenting 'gxformat2.model::EmbeddedLink.$link' (try placing a "module" or "currentmodule" directive in the document, or giving an explicit module name) [autodoc]

Can we drop it?


# source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output
SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1"


def pop_connect_from_step_dict(step: dict) -> ConnectDict:
"""Merge 'in' and 'connect' keys into a unified connection dict separated from state.

Meant to be used an initial processing step in reasoning about connections defined by the
format2 step description.
"""
if "connect" not in step:
step["connect"] = {}

connect = step["connect"]
del step["connect"]

# handle CWL-style in dict connections.
if "in" in step:
step_in = step["in"]
assert isinstance(step_in, dict)
connection_keys = set()
for key, value in step_in.items():
# TODO: this can be a list right?
if isinstance(value, dict) and 'source' in value:
value = value["source"]
elif isinstance(value, dict) and 'default' in value:
continue
elif isinstance(value, dict):
raise KeyError(f'step input must define either source or default {value}')
connect[key] = [value]
connection_keys.add(key)

for key in connection_keys:
del step_in[key]

if len(step_in) == 0:
del step['in']

return connect


def setup_connected_values(value, key: str = "", append_to: Optional[Dict[str, list]] = None) -> Any:
"""Replace links with connected value."""

def append_link(key: str, value: dict):
if append_to is None:
return

if key not in append_to:
append_to[key] = []

assert "$link" in value
link_value = value["$link"]
append_to[key].append(clean_connection(link_value))

def recurse(sub_value, sub_key) -> Any:
return setup_connected_values(sub_value, sub_key, append_to=append_to)

if _is_link(value):
append_link(key, value)
# Filled in by the connection, so to force late
# validation of the field just mark as ConnectedValue,
# which should be further validated by Galaxy
return _connected_value()
if isinstance(value, dict):
new_dict_values: Dict[str, Any] = {}
for dict_k, dict_v in value.items():
new_key = _join_prefix(key, dict_k)
new_dict_values[dict_k] = recurse(dict_v, new_key)
return new_dict_values
elif isinstance(value, list):
new_list_values: List[Any] = []
for i, list_v in enumerate(value):
# If we are a repeat we need to modify the key
# but not if values are actually $links.
if _is_link(list_v):
assert isinstance(list_v, dict)
append_link(key, list_v)
new_list_values.append(None)
else:
new_key = "%s_%d" % (key, i)
new_list_values.append(recurse(list_v, new_key))
return new_list_values
else:
return value


def clean_connection(value: str) -> str:
"""Convert legacy style connection targets with modern CWL-style ones."""
if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS:
# Hope these are just used by Galaxy testing workflows and such, and not in production workflows.
log.warn(f"Legacy workflow syntax for connections [{value}] will not be supported in the future")
value = value.replace("#", "/", 1)

return value


def _connected_value():
return {"__class__": "ConnectedValue"}


def _is_link(value: Any) -> bool:
return isinstance(value, dict) and "$link" in value


def _join_prefix(prefix: Optional[str], key: str):
if prefix:
new_key = f"{prefix}|{key}"
else:
new_key = key
return new_key


def convert_dict_to_id_list_if_needed(
Expand Down
49 changes: 49 additions & 0 deletions tests/test_model_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from gxformat2.model import (
pop_connect_from_step_dict,
setup_connected_values,
)


def test_pop_connect():
raw_step = {
"in": {
"bar": {
"source": "foo/moo",
},
},
}
connect = pop_connect_from_step_dict(raw_step)
assert connect["bar"] == ["foo/moo"]
assert "in" not in raw_step


def test_pop_connect_preserves_defaults():
raw_step = {
"in": {
"bar": {
"default": 7,
},
},
}
connect = pop_connect_from_step_dict(raw_step)
assert "bar" not in connect
assert "in" in raw_step


def test_setup_connected_values():
raw_state = {
"input": {"$link": "moo/cow"},
}
connect = {}
setup_connected_values(raw_state, append_to=connect)
assert connect["input"][0] == "moo/cow"


def test_setup_connected_values_in_array():
raw_state = {
"input": [{"$link": "moo/cow"}, {"$link": "moo/cow2"}],
}
connect = {}
setup_connected_values(raw_state, append_to=connect)
assert connect["input"][0] == "moo/cow"
assert connect["input"][1] == "moo/cow2"
Loading