Skip to content

Commit

Permalink
Abstractions for reasoning about steps outside of converted code.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jul 20, 2024
1 parent 5811bc5 commit 0d71530
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 102 deletions.
109 changes: 8 additions & 101 deletions gxformat2/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
import argparse
import copy
import json
import logging
import os
import sys
import uuid
from typing import Any, Dict, Optional

from ._labels import Labels
from .model import (
clean_connection,
convert_dict_to_id_list_if_needed,
ensure_step_position,
inputs_as_native_steps,
pop_connect_from_step_dict,
setup_connected_values,
SUPPORT_LEGACY_CONNECTIONS,
with_step_ids,
)
from .yaml import ordered_load
Expand All @@ -22,8 +25,6 @@
Convert a Format 2 Galaxy workflow description into a native format.
"""

# source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output
SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1"
STEP_TYPES = [
"subworkflow",
"data_input",
Expand Down Expand Up @@ -82,22 +83,11 @@
},
}

log = logging.getLogger(__name__)


def rename_arg(argument):
return argument


def clean_connection(value):
if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS:
# Hope these are just used by Galaxy testing workflows and such, and not in production workflows.
log.warn(f"Legacy workflow syntax for connections [{value}] will not be supported in the future")
value = value.replace("#", "/", 1)
else:
return value


class ImportOptions:

def __init__(self):
Expand Down Expand Up @@ -381,7 +371,7 @@ def transform_pause(context, step, default_name="Pause for dataset review"):
"name": name
}

connect = _init_connect_dict(step)
connect = pop_connect_from_step_dict(step)
_populate_input_connections(context, step, connect)
_populate_tool_state(step, tool_state)

Expand All @@ -398,7 +388,7 @@ def transform_subworkflow(context, step):
tool_state = {
}

connect = _init_connect_dict(step)
connect = pop_connect_from_step_dict(step)
_populate_input_connections(context, step, connect)
_populate_tool_state(step, tool_state)

Expand All @@ -407,10 +397,6 @@ def _runtime_value():
return {"__class__": "RuntimeValue"}


def _connected_value():
return {"__class__": "ConnectedValue"}


def transform_tool(context, step):
if "tool_id" not in step:
raise Exception("Tool steps must define a tool_id.")
Expand All @@ -428,48 +414,13 @@ def transform_tool(context, step):
"__page__": 0,
}

connect = _init_connect_dict(step)

def append_link(key, value):
if key not in connect:
connect[key] = []
assert "$link" in value
link_value = value["$link"]
connect[key].append(clean_connection(link_value))

def replace_links(value, key=""):
if _is_link(value):
append_link(key, value)
# Filled in by the connection, so to force late
# validation of the field just mark as ConnectedValue,
# which should be further validated by Galaxy
return _connected_value()
if isinstance(value, dict):
new_values = {}
for k, v in value.items():
new_key = _join_prefix(key, k)
new_values[k] = replace_links(v, new_key)
return new_values
elif isinstance(value, list):
new_values = []
for i, v in enumerate(value):
# If we are a repeat we need to modify the key
# but not if values are actually $links.
if _is_link(v):
append_link(key, v)
new_values.append(None)
else:
new_key = "%s_%d" % (key, i)
new_values.append(replace_links(v, new_key))
return new_values
else:
return value
connect = pop_connect_from_step_dict(step)

# TODO: handle runtime inputs and state together.
runtime_inputs = step.get("runtime_inputs", [])
if "state" in step or runtime_inputs:
step_state = step.pop("state", {})
step_state = replace_links(step_state)
step_state = setup_connected_values(step_state, append_to=connect)

for key, value in step_state.items():
tool_state[key] = json.dumps(value)
Expand Down Expand Up @@ -629,50 +580,6 @@ def _action(type, name, arguments):
}


def _is_link(value):
return isinstance(value, dict) and "$link" in value


def _join_prefix(prefix, key):
if prefix:
new_key = f"{prefix}|{key}"
else:
new_key = key
return new_key


def _init_connect_dict(step):
if "connect" not in step:
step["connect"] = {}

connect = step["connect"]
del step["connect"]

# handle CWL-style in dict connections.
if "in" in step:
step_in = step["in"]
assert isinstance(step_in, dict)
connection_keys = set()
for key, value in step_in.items():
# TODO: this can be a list right?
if isinstance(value, dict) and 'source' in value:
value = value["source"]
elif isinstance(value, dict) and 'default' in value:
continue
elif isinstance(value, dict):
raise KeyError(f'step input must define either source or default {value}')
connect[key] = [value]
connection_keys.add(key)

for key in connection_keys:
del step_in[key]

if len(step_in) == 0:
del step['in']

return connect


def _populate_input_connections(context, step, connect):
_ensure_inputs_connections(step)
input_connections = step["input_connections"]
Expand Down
135 changes: 134 additions & 1 deletion gxformat2/model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,140 @@
"""Abstractions for dealing with Format2 data."""
from typing import cast, Dict, List, Union
import logging
import os
from typing import (
Any,
Callable,
cast,
Dict,
List,
Optional,
Union,
)

from typing_extensions import TypedDict

log = logging.getLogger(__name__)

DictOrList = Union[Dict, List]
ConnectDict = dict


EmbeddedLink = TypedDict("EmbeddedLink", {"$link": str})

# source: step#output and $link: step#output instead of outputSource: step/output and $link: step/output
SUPPORT_LEGACY_CONNECTIONS = os.environ.get("GXFORMAT2_SUPPORT_LEGACY_CONNECTIONS") == "1"


def pop_connect_from_step_dict(step: dict) -> ConnectDict:
"""Merge 'in' and 'connect' keys into a unified connection dict separated from state.
Meant to be used an initial processing step in reasoning about connections defined by the
format2 step description.
"""
if "connect" not in step:
step["connect"] = {}

connect = step["connect"]
del step["connect"]

# handle CWL-style in dict connections.
if "in" in step:
step_in = step["in"]
assert isinstance(step_in, dict)
connection_keys = set()
for key, value in step_in.items():
# TODO: this can be a list right?
if isinstance(value, dict) and 'source' in value:
value = value["source"]
elif isinstance(value, dict) and 'default' in value:
continue
elif isinstance(value, dict):
raise KeyError(f'step input must define either source or default {value}')
connect[key] = [value]
connection_keys.add(key)

for key in connection_keys:
del step_in[key]

if len(step_in) == 0:
del step['in']

return connect


AppendLinkCallable = Callable[[str, EmbeddedLink], None]


def setup_connected_values(value, key: str = "", append_to: Optional[Dict[str, list]] = None) -> Any:
"""Replace links with connected value."""

def append_link(key: str, value: dict):
if append_to is None:
return

if key not in append_to:
append_to[key] = []

assert "$link" in value
link_value = value["$link"]
append_to[key].append(clean_connection(link_value))

def recurse(sub_value, sub_key) -> Any:
return setup_connected_values(sub_value, sub_key, append_to=append_to)

if _is_link(value):
append_link(key, value)
# Filled in by the connection, so to force late
# validation of the field just mark as ConnectedValue,
# which should be further validated by Galaxy
return _connected_value()
if isinstance(value, dict):
new_dict_values: Dict[str, Any] = {}
for dict_k, dict_v in value.items():
new_key = _join_prefix(key, dict_k)
new_dict_values[dict_k] = recurse(dict_v, new_key)
return new_dict_values
elif isinstance(value, list):
new_list_values: List[Any] = []
for i, list_v in enumerate(value):
# If we are a repeat we need to modify the key
# but not if values are actually $links.
if _is_link(list_v):
assert isinstance(list_v, dict)
append_link(key, list_v)
new_list_values.append(None)
else:
new_key = "%s_%d" % (key, i)
new_list_values.append(recurse(list_v, new_key))
return new_list_values
else:
return value


def clean_connection(value: str) -> str:
"""Convert legacy style connection targets with modern CWL-style ones."""
if value and "#" in value and SUPPORT_LEGACY_CONNECTIONS:
# Hope these are just used by Galaxy testing workflows and such, and not in production workflows.
log.warn(f"Legacy workflow syntax for connections [{value}] will not be supported in the future")
value = value.replace("#", "/", 1)

return value


def _connected_value():
return {"__class__": "ConnectedValue"}


def _is_link(value: Any) -> bool:
return isinstance(value, dict) and "$link" in value


def _join_prefix(prefix: Optional[str], key: str):
if prefix:
new_key = f"{prefix}|{key}"
else:
new_key = key
return new_key


def convert_dict_to_id_list_if_needed(
Expand Down
49 changes: 49 additions & 0 deletions tests/test_model_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from gxformat2.model import (
pop_connect_from_step_dict,
setup_connected_values,
)


def test_pop_connect():
raw_step = {
"in": {
"bar": {
"source": "foo/moo",
},
},
}
connect = pop_connect_from_step_dict(raw_step)
assert connect["bar"] == ["foo/moo"]
assert "in" not in raw_step


def test_pop_connect_preserves_defaults():
raw_step = {
"in": {
"bar": {
"default": 7,
},
},
}
connect = pop_connect_from_step_dict(raw_step)
assert "bar" not in connect
assert "in" in raw_step


def test_setup_connected_values():
raw_state = {
"input": {"$link": "moo/cow"},
}
connect = {}
setup_connected_values(raw_state, append_to=connect)
assert connect["input"][0] == "moo/cow"


def test_setup_connected_values_in_array():
raw_state = {
"input": [{"$link": "moo/cow"}, {"$link": "moo/cow2"}],
}
connect = {}
setup_connected_values(raw_state, append_to=connect)
assert connect["input"][0] == "moo/cow"
assert connect["input"][1] == "moo/cow2"

0 comments on commit 0d71530

Please sign in to comment.