Skip to content

Commit

Permalink
Upstream20241110 (#51)
Browse files Browse the repository at this point in the history
* Update from upstream

* Update minimum metaflow requirement

* Use micromamba 1.5.10 to avoid issues with 2.0+ until stable

* Fix micromamba version

* Compatibility with mamba 2.0; added more test and more robust retries

* Fix matrix options

* Exclude test combination that uses too old python

* Bump version to 1.2.3
  • Loading branch information
romain-intel authored Nov 11, 2024
1 parent 1c534c5 commit 5df045d
Show file tree
Hide file tree
Showing 20 changed files with 694 additions and 271 deletions.
31 changes: 22 additions & 9 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,34 @@ on:
- main

jobs:

pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1
- uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1
- uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1

test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ ubuntu-latest, macos-latest ]
python-version: [ "3.8", "3.10", "3.12" ]
resolver: [ mamba, conda, micromamba ]
os: [ubuntu-latest, macos-latest]
python-version: ["3.8", "3.10", "3.12"]
resolver: [mamba, conda, micromamba]
micromamba-version: ["1.5.10-0", "latest"]
mamba-version: ["mamba=1.5.10", "mamba"]
exclude:
- resolver: mamba
micromamba-version: "1.5.10-0"
- resolver: conda
micromamba-version: "1.5.10-0"
- resolver: conda
mamba-version: "mamba=1.5.10"
- resolver: micromamba
mamba-version: "mamba=1.5.10"
- python-version: "3.8"
mamba-version: "mamba=1.5.10"
env:
METAFLOW_CONDA_DEPENDENCY_RESOLVER: ${{ matrix.resolver }}
METAFLOW_CONDA_TEST: 1
Expand All @@ -31,18 +43,19 @@ jobs:

- uses: mamba-org/setup-micromamba@f8b8a1e23a26f60a44c853292711bacfd3eac822 # v1.9.0
with:
micromamba-version: latest
micromamba-version: ${{ matrix.micromamba-version }}
environment-file: dev-env.yml
init-shell: bash
create-args: >-
python=${{ matrix.python-version }}
${{ matrix.mamba-version }}
- name: install nflx-extension
shell: bash -eo pipefail -l {0}
run: |
which pip
pip install -e . --force-reinstall -U
- name: install bash
if: runner.os == 'macOS'
run: brew install bash
Expand Down
5 changes: 3 additions & 2 deletions metaflow_extensions/netflix_ext/cmd/debug/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,9 @@ def merge_artifacts(

# Escape hatch stub
ESCAPE_HATCH_STUB = """
import sys
sys.path.insert(0, "{MetaflowEnvEscapeDir}")
# Uncomment the following lines if os.environ.get("PYTHONPATH") is not set
# import sys
# sys.path.insert(0, "{MetaflowEnvEscapeDir}")
"""

# Imports needed to define stubbed classes & Debug steps
Expand Down
27 changes: 27 additions & 0 deletions metaflow_extensions/netflix_ext/cmd/debug/debug_cmd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import json
import tarfile
import datetime

Expand Down Expand Up @@ -287,9 +288,13 @@ def _generate_debug_scripts(
if generate_notebook:
kernel_def = _find_kernel_name(python_executable)
if kernel_def:
_update_kernel_pythonpath(kernel_def[2], metaflow_root_dir)
obj.echo(
f"Jupyter kernel name: {kernel_def[0]} with display name: {kernel_def[1]}"
)
obj.echo(
f"Added escape trampolines to PYTHONPATH for the kernel {kernel_def[0]}"
)
notebook_json = debug_script_generator.generate_debug_notebook(
metaflow_root_dir, debug_file_name, kernel_def
)
Expand All @@ -304,3 +309,25 @@ def _generate_debug_scripts(
# We copy the stub generators to the metaflow root directory as the stub generators
# may not be present in the metaflow version that the user is using.
copy_stub_generator_to_metaflow_root_dir(metaflow_root_dir)


def _update_kernel_pythonpath(kernelspec_path, metaflow_root_dir):
"""
Updates the kernelspec with the escape trampolines added to the PYTHONPATH.
Parameters
----------
kernelspec_path : str
The kernelspec path.
metaflow_root_dir : str
The metaflow root directory.
"""
kernel_json_path = os.path.join(kernelspec_path, "kernel.json")
with open(kernel_json_path, "r") as f:
kernel_json = json.load(f)

_ = kernel_json.setdefault("env", {})["PYTHONPATH"] = os.path.abspath(
os.path.join(metaflow_root_dir, "_escape_trampolines")
)
with open(kernel_json_path, "w") as f:
json.dump(kernel_json, f, indent=4)
29 changes: 26 additions & 3 deletions metaflow_extensions/netflix_ext/cmd/debug/debug_stub_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,32 @@ def get_previous_tasks(self, previous_steps: List[Step]) -> List[Task]:
return sorted(
[task for task in previous_steps[0].tasks()], key=lambda x: x.index
)
# If the step is linear, split-foreach, split-static or a static-join, then we just return the
# tasks in the order they were run.
return [step.task for step in previous_steps]
# TODO: This method is incomplete and incorrect in the general case. We actually
# need more information to return the exact subset of tasks that are runtime
# parents in some situations with nested foreaches. For now, best effort but
# we need to revisit/fix these. The primary limitation right now is that the
# `foreach-stack` field we are referring to is capped at some number of characters
# which can fail to work for deeply nested foreaches or even shallow ones with
# long values.
foreach_list = self.task.metadata_dict.get("foreach-stack", [])
if not foreach_list:
# We are not part of a foreach so return all the previous tasks. This will
# be a list of 1 for most everything except for a join.
return [step.task for step in previous_steps]

# We are part of a foreach, we want to list tasks that either have the same
# foreach_list or match everything but the last element
def _filter(t):
t_foreach_list = t.metadata_dict.get("foreach-stack", [])
return (
len(t_foreach_list) == len(foreach_list)
and t_foreach_list == foreach_list
) or t_foreach_list == foreach_list[:-1]

to_return = []
for step in previous_steps:
to_return.extend([task for task in step if _filter(task)])
return to_return

def get_task_namespace(self) -> str:
"""
Expand Down
10 changes: 7 additions & 3 deletions metaflow_extensions/netflix_ext/cmd/debug/debug_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,19 @@ def _find_kernel_name(
Returns
-------
Optional[Tuple[str, str]]
The kernel name and the kernel's display name if found, None otherwise.
Optional[Tuple[str, str, str]]
The kernel name, kernel display name, and kernelspec path if found, None otherwise.
"""
try:
output = subprocess.check_output(["jupyter", "kernelspec", "list", "--json"])
kernelspecs = json.loads(output)
for kernel_name, kernel_spec in kernelspecs["kernelspecs"].items():
if kernel_spec["spec"]["argv"][0] == python_executable:
return kernel_name, kernel_spec["spec"]["display_name"]
return (
kernel_name,
kernel_spec["spec"]["display_name"],
kernel_spec["resource_dir"],
)
except Exception as e:
# Ignore the exception and return None as it is a best effort function
print(f"Error finding kernel name: {traceback.format_exc()}")
Expand Down
148 changes: 85 additions & 63 deletions metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from metaflow.plugins import DATASTORES
from metaflow.metaflow_config import (
CONDA_ALL_ARCHS,
CONDA_DEPENDENCY_RESOLVER,
CONDA_TEST,
CONDA_SYS_DEPENDENCIES,
DEFAULT_DATASTORE,
Expand Down Expand Up @@ -241,6 +242,13 @@ def environment(
help="Recreate the environment if it already exists and remove the `into` directory "
"if it exists",
)
@click.option(
"--strict/--no-strict",
default=True,
is_flag=True,
show_default=True,
help="If True, fails if it cannot install the original Metaflow environment",
)
@click.option(
"--into-dir",
default=None,
Expand Down Expand Up @@ -269,6 +277,7 @@ def create(
name: Optional[str],
local_only: bool,
force: bool,
strict: bool,
into_dir: Optional[str],
install_notebook: bool,
pathspec: bool,
Expand Down Expand Up @@ -299,26 +308,31 @@ def create(
else:
os.makedirs(into_dir)
if install_notebook and name is None:
raise click.BadOptionUsage("--install-notebook requires --name")
raise click.BadOptionUsage("install-notebook", "requires --name")

code_pkg = None
mf_version = None
mf_version = ""
mf_extensions_info = None

if pathspec:
env_name = "step:%s" % env_name
alias_type, resolved_alias = resolve_env_alias(env_name)
if alias_type == AliasType.PATHSPEC:
if not pathspec:
raise click.BadOptionUsage(
"--pathspec used but environment name is not a pathspec"
)
task = Step(resolved_alias, _namespace_check=False).task
code_pkg = task.code
mf_version = task.metadata_dict["metaflow_version"]
raise click.BadOptionUsage("pathspec", "environment name is not a pathspec")
step = Step(resolved_alias, _namespace_check=False)
parameters_task = Step(
"%s/_parameters" % step.parent.pathspec, _namespace_check=False
).task
code_pkg = step.task.code
# We use parameters_task to allow the creation of a environment for a task that
# may not have fully run.
mf_version = parameters_task.metadata_dict.get("metaflow_version", "")
mf_extensions_info = parameters_task["_graph_info"].data.get("extensions")
else:
if pathspec:
raise click.BadOptionUsage(
"--pathspec not used but environment name is a pathspec"
"pathspec", "missing --pathspec; environment name is a pathspec"
)

env_id_for_alias = cast(Conda, obj.conda).env_id_from_alias(
Expand All @@ -339,61 +353,66 @@ def create(
# We need to install ipykernel into the resolved environment
obj.echo(" Resolving an environment compatible with Jupyter ...", nl=False)

# We use envsresolver to properly deal with builder environments and what not
resolver = EnvsResolver(obj.conda)
# We force the env_type to be the same as the base env since we don't modify that
# by adding these deps.

# We also force the use of use_latest because we are not really doing anything
# that would require a re-resolve (ie: the user doesn't really care about the
# version of ipykernel most likely).
resolver.add_environment(
arch_id(),
user_deps={
"pypi" if env.env_type == EnvType.PYPI_ONLY else "conda": ["ipykernel"]
},
user_sources={},
extras={},
base_env=env,
local_only=local_only,
use_latest=":any:",
)
resolver.resolve_environments(obj.echo)
update_envs = [] # type: List[ResolvedEnvironment]
if obj.datastore_type != "local" or CONDA_TEST:
# We may need to update caches
# Note that it is possible that something we needed to resolve, we don't need
# to cache (if we resolved to something already cached).
formats = set() # type: Set[str]
for _, resolved_env, f, _ in resolver.need_caching_environments(
include_builder_envs=True
):
update_envs.append(resolved_env)
formats.update(f)

cast(Conda, obj.conda).cache_environments(
update_envs, {"conda": list(formats)}
# We first check if `ipykernel` already exists in the environment. If it does, we
# can skip the whole resolution process.
if not any("ipykernel" == p.package_name for p in env.packages):
# We use envsresolver to properly deal with builder environments and what not
resolver = EnvsResolver(obj.conda)
# We force the env_type to be the same as the base env since we don't modify
# that by adding these deps.

# We also force the use of use_latest because we are not really doing
# anything that would require a re-resolve (ie: the user doesn't really
# care about the version of ipykernel most likely).
resolver.add_environment(
arch_id(),
user_deps={
"pypi" if env.env_type == EnvType.PYPI_ONLY else "conda": [
"ipykernel"
]
},
user_sources={},
extras={},
base_env=env,
local_only=local_only,
use_latest=":any:",
)
else:
update_envs = [
resolved_env
for _, resolved_env, _ in resolver.new_environments(
resolver.resolve_environments(obj.echo)
update_envs = [] # type: List[ResolvedEnvironment]
if obj.datastore_type != "local" or CONDA_TEST:
# We may need to update caches
# Note that it is possible that something we needed to resolve, we don't need
# to cache (if we resolved to something already cached).
formats = set() # type: Set[str]
for _, resolved_env, f, _ in resolver.need_caching_environments(
include_builder_envs=True
):
update_envs.append(resolved_env)
formats.update(f)

cast(Conda, obj.conda).cache_environments(
update_envs, {"conda": list(formats)}
)
]
cast(Conda, obj.conda).add_environments(update_envs)
else:
update_envs = [
resolved_env
for _, resolved_env, _ in resolver.new_environments(
include_builder_envs=True
)
]
cast(Conda, obj.conda).add_environments(update_envs)

# Update the default environment
for _, resolved_env, _ in resolver.resolved_environments(
include_builder_envs=True
):
cast(Conda, obj.conda).set_default_environment(resolved_env.env_id)
# Update the default environment
for _, resolved_env, _ in resolver.resolved_environments(
include_builder_envs=True
):
cast(Conda, obj.conda).set_default_environment(resolved_env.env_id)

cast(Conda, obj.conda).write_out_environments()
cast(Conda, obj.conda).write_out_environments()

# We are going to be creating this new environment going forward (not the
# initial env we got)
_, env, _ = next(resolver.resolved_environments())
# We are going to be creating this new environment going forward (not the
# initial env we got)
_, env, _ = next(resolver.resolved_environments())

delta_time = int(time.time() - start)
obj.echo(" done in %d second%s." % (delta_time, plural_marker(delta_time)))
Expand Down Expand Up @@ -422,10 +441,12 @@ def create(
"Step '%s' does not have a code package -- "
"downloading active Metaflow version only" % env_name
)
download_mf_version("./__conda_python", mf_version)
download_mf_version(
"./__conda_python", mf_version, mf_extensions_info, obj.echo, strict
)
obj.echo(
"Code package for %s downloaded into '%s' -- `__conda_python` is "
"the executable to use" % (env_name, into_dir)
"Python executable `__conda_python` for environment '%s' downloaded "
"into '%s'" % (env_name, into_dir)
)
else:
python_bin = os.path.join(obj.conda.create_for_name(name, env), "bin", "python")
Expand Down Expand Up @@ -498,8 +519,9 @@ def create(
f.write("\n")
else:
obj.echo(
"Created environment '%s' locally, activate with `%s activate %s`"
% (name, obj.conda.binary("conda"), name)
"Conda environment '%s' created locally, activate with "
"`CONDA_ENVS_DIRS=%s %s activate %s`"
% (name, obj.conda.root_env_dir, CONDA_DEPENDENCY_RESOLVER, name)
)
cast(Conda, obj.conda).write_out_environments()

Expand Down
Loading

0 comments on commit 5df045d

Please sign in to comment.