diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c842c79..330d572 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,22 +6,34 @@ on: - main jobs: - pre-commit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 - - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 - - uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1 + - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7 + - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + - uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1 test: runs-on: ${{ matrix.os }} strategy: fail-fast: false matrix: - os: [ ubuntu-latest, macos-latest ] - python-version: [ "3.8", "3.10", "3.12" ] - resolver: [ mamba, conda, micromamba ] + os: [ubuntu-latest, macos-latest] + python-version: ["3.8", "3.10", "3.12"] + resolver: [mamba, conda, micromamba] + micromamba-version: ["1.5.10-0", "latest"] + mamba-version: ["mamba=1.5.10", "mamba"] + exclude: + - resolver: mamba + micromamba-version: "1.5.10-0" + - resolver: conda + micromamba-version: "1.5.10-0" + - resolver: conda + mamba-version: "mamba=1.5.10" + - resolver: micromamba + mamba-version: "mamba=1.5.10" + - python-version: "3.8" + mamba-version: "mamba=1.5.10" env: METAFLOW_CONDA_DEPENDENCY_RESOLVER: ${{ matrix.resolver }} METAFLOW_CONDA_TEST: 1 @@ -31,18 +43,19 @@ jobs: - uses: mamba-org/setup-micromamba@f8b8a1e23a26f60a44c853292711bacfd3eac822 # v1.9.0 with: - micromamba-version: latest + micromamba-version: ${{ matrix.micromamba-version }} environment-file: dev-env.yml init-shell: bash create-args: >- python=${{ matrix.python-version }} + ${{ matrix.mamba-version }} - name: install nflx-extension shell: bash -eo pipefail -l {0} run: | which pip pip install -e . --force-reinstall -U - + - name: install bash if: runner.os == 'macOS' run: brew install bash diff --git a/metaflow_extensions/netflix_ext/cmd/debug/constants.py b/metaflow_extensions/netflix_ext/cmd/debug/constants.py index 1812544..2d9bd8b 100644 --- a/metaflow_extensions/netflix_ext/cmd/debug/constants.py +++ b/metaflow_extensions/netflix_ext/cmd/debug/constants.py @@ -236,8 +236,9 @@ def merge_artifacts( # Escape hatch stub ESCAPE_HATCH_STUB = """ - import sys - sys.path.insert(0, "{MetaflowEnvEscapeDir}") + # Uncomment the following lines if os.environ.get("PYTHONPATH") is not set + # import sys + # sys.path.insert(0, "{MetaflowEnvEscapeDir}") """ # Imports needed to define stubbed classes & Debug steps diff --git a/metaflow_extensions/netflix_ext/cmd/debug/debug_cmd.py b/metaflow_extensions/netflix_ext/cmd/debug/debug_cmd.py index a3a6d28..5b75bac 100644 --- a/metaflow_extensions/netflix_ext/cmd/debug/debug_cmd.py +++ b/metaflow_extensions/netflix_ext/cmd/debug/debug_cmd.py @@ -1,4 +1,5 @@ import os +import json import tarfile import datetime @@ -287,9 +288,13 @@ def _generate_debug_scripts( if generate_notebook: kernel_def = _find_kernel_name(python_executable) if kernel_def: + _update_kernel_pythonpath(kernel_def[2], metaflow_root_dir) obj.echo( f"Jupyter kernel name: {kernel_def[0]} with display name: {kernel_def[1]}" ) + obj.echo( + f"Added escape trampolines to PYTHONPATH for the kernel {kernel_def[0]}" + ) notebook_json = debug_script_generator.generate_debug_notebook( metaflow_root_dir, debug_file_name, kernel_def ) @@ -304,3 +309,25 @@ def _generate_debug_scripts( # We copy the stub generators to the metaflow root directory as the stub generators # may not be present in the metaflow version that the user is using. copy_stub_generator_to_metaflow_root_dir(metaflow_root_dir) + + +def _update_kernel_pythonpath(kernelspec_path, metaflow_root_dir): + """ + Updates the kernelspec with the escape trampolines added to the PYTHONPATH. + + Parameters + ---------- + kernelspec_path : str + The kernelspec path. + metaflow_root_dir : str + The metaflow root directory. + """ + kernel_json_path = os.path.join(kernelspec_path, "kernel.json") + with open(kernel_json_path, "r") as f: + kernel_json = json.load(f) + + _ = kernel_json.setdefault("env", {})["PYTHONPATH"] = os.path.abspath( + os.path.join(metaflow_root_dir, "_escape_trampolines") + ) + with open(kernel_json_path, "w") as f: + json.dump(kernel_json, f, indent=4) diff --git a/metaflow_extensions/netflix_ext/cmd/debug/debug_stub_generator.py b/metaflow_extensions/netflix_ext/cmd/debug/debug_stub_generator.py index 6a869fa..37ab268 100644 --- a/metaflow_extensions/netflix_ext/cmd/debug/debug_stub_generator.py +++ b/metaflow_extensions/netflix_ext/cmd/debug/debug_stub_generator.py @@ -167,9 +167,32 @@ def get_previous_tasks(self, previous_steps: List[Step]) -> List[Task]: return sorted( [task for task in previous_steps[0].tasks()], key=lambda x: x.index ) - # If the step is linear, split-foreach, split-static or a static-join, then we just return the - # tasks in the order they were run. - return [step.task for step in previous_steps] + # TODO: This method is incomplete and incorrect in the general case. We actually + # need more information to return the exact subset of tasks that are runtime + # parents in some situations with nested foreaches. For now, best effort but + # we need to revisit/fix these. The primary limitation right now is that the + # `foreach-stack` field we are referring to is capped at some number of characters + # which can fail to work for deeply nested foreaches or even shallow ones with + # long values. + foreach_list = self.task.metadata_dict.get("foreach-stack", []) + if not foreach_list: + # We are not part of a foreach so return all the previous tasks. This will + # be a list of 1 for most everything except for a join. + return [step.task for step in previous_steps] + + # We are part of a foreach, we want to list tasks that either have the same + # foreach_list or match everything but the last element + def _filter(t): + t_foreach_list = t.metadata_dict.get("foreach-stack", []) + return ( + len(t_foreach_list) == len(foreach_list) + and t_foreach_list == foreach_list + ) or t_foreach_list == foreach_list[:-1] + + to_return = [] + for step in previous_steps: + to_return.extend([task for task in step if _filter(task)]) + return to_return def get_task_namespace(self) -> str: """ diff --git a/metaflow_extensions/netflix_ext/cmd/debug/debug_utils.py b/metaflow_extensions/netflix_ext/cmd/debug/debug_utils.py index 816e591..9c396fd 100644 --- a/metaflow_extensions/netflix_ext/cmd/debug/debug_utils.py +++ b/metaflow_extensions/netflix_ext/cmd/debug/debug_utils.py @@ -224,15 +224,19 @@ def _find_kernel_name( Returns ------- - Optional[Tuple[str, str]] - The kernel name and the kernel's display name if found, None otherwise. + Optional[Tuple[str, str, str]] + The kernel name, kernel display name, and kernelspec path if found, None otherwise. """ try: output = subprocess.check_output(["jupyter", "kernelspec", "list", "--json"]) kernelspecs = json.loads(output) for kernel_name, kernel_spec in kernelspecs["kernelspecs"].items(): if kernel_spec["spec"]["argv"][0] == python_executable: - return kernel_name, kernel_spec["spec"]["display_name"] + return ( + kernel_name, + kernel_spec["spec"]["display_name"], + kernel_spec["resource_dir"], + ) except Exception as e: # Ignore the exception and return None as it is a best effort function print(f"Error finding kernel name: {traceback.format_exc()}") diff --git a/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py b/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py index f2af6c8..899343a 100644 --- a/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py +++ b/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py @@ -21,6 +21,7 @@ from metaflow.plugins import DATASTORES from metaflow.metaflow_config import ( CONDA_ALL_ARCHS, + CONDA_DEPENDENCY_RESOLVER, CONDA_TEST, CONDA_SYS_DEPENDENCIES, DEFAULT_DATASTORE, @@ -241,6 +242,13 @@ def environment( help="Recreate the environment if it already exists and remove the `into` directory " "if it exists", ) +@click.option( + "--strict/--no-strict", + default=True, + is_flag=True, + show_default=True, + help="If True, fails if it cannot install the original Metaflow environment", +) @click.option( "--into-dir", default=None, @@ -269,6 +277,7 @@ def create( name: Optional[str], local_only: bool, force: bool, + strict: bool, into_dir: Optional[str], install_notebook: bool, pathspec: bool, @@ -299,26 +308,31 @@ def create( else: os.makedirs(into_dir) if install_notebook and name is None: - raise click.BadOptionUsage("--install-notebook requires --name") + raise click.BadOptionUsage("install-notebook", "requires --name") code_pkg = None - mf_version = None + mf_version = "" + mf_extensions_info = None if pathspec: env_name = "step:%s" % env_name alias_type, resolved_alias = resolve_env_alias(env_name) if alias_type == AliasType.PATHSPEC: if not pathspec: - raise click.BadOptionUsage( - "--pathspec used but environment name is not a pathspec" - ) - task = Step(resolved_alias, _namespace_check=False).task - code_pkg = task.code - mf_version = task.metadata_dict["metaflow_version"] + raise click.BadOptionUsage("pathspec", "environment name is not a pathspec") + step = Step(resolved_alias, _namespace_check=False) + parameters_task = Step( + "%s/_parameters" % step.parent.pathspec, _namespace_check=False + ).task + code_pkg = step.task.code + # We use parameters_task to allow the creation of a environment for a task that + # may not have fully run. + mf_version = parameters_task.metadata_dict.get("metaflow_version", "") + mf_extensions_info = parameters_task["_graph_info"].data.get("extensions") else: if pathspec: raise click.BadOptionUsage( - "--pathspec not used but environment name is a pathspec" + "pathspec", "missing --pathspec; environment name is a pathspec" ) env_id_for_alias = cast(Conda, obj.conda).env_id_from_alias( @@ -339,61 +353,66 @@ def create( # We need to install ipykernel into the resolved environment obj.echo(" Resolving an environment compatible with Jupyter ...", nl=False) - # We use envsresolver to properly deal with builder environments and what not - resolver = EnvsResolver(obj.conda) - # We force the env_type to be the same as the base env since we don't modify that - # by adding these deps. - - # We also force the use of use_latest because we are not really doing anything - # that would require a re-resolve (ie: the user doesn't really care about the - # version of ipykernel most likely). - resolver.add_environment( - arch_id(), - user_deps={ - "pypi" if env.env_type == EnvType.PYPI_ONLY else "conda": ["ipykernel"] - }, - user_sources={}, - extras={}, - base_env=env, - local_only=local_only, - use_latest=":any:", - ) - resolver.resolve_environments(obj.echo) - update_envs = [] # type: List[ResolvedEnvironment] - if obj.datastore_type != "local" or CONDA_TEST: - # We may need to update caches - # Note that it is possible that something we needed to resolve, we don't need - # to cache (if we resolved to something already cached). - formats = set() # type: Set[str] - for _, resolved_env, f, _ in resolver.need_caching_environments( - include_builder_envs=True - ): - update_envs.append(resolved_env) - formats.update(f) - - cast(Conda, obj.conda).cache_environments( - update_envs, {"conda": list(formats)} + # We first check if `ipykernel` already exists in the environment. If it does, we + # can skip the whole resolution process. + if not any("ipykernel" == p.package_name for p in env.packages): + # We use envsresolver to properly deal with builder environments and what not + resolver = EnvsResolver(obj.conda) + # We force the env_type to be the same as the base env since we don't modify + # that by adding these deps. + + # We also force the use of use_latest because we are not really doing + # anything that would require a re-resolve (ie: the user doesn't really + # care about the version of ipykernel most likely). + resolver.add_environment( + arch_id(), + user_deps={ + "pypi" if env.env_type == EnvType.PYPI_ONLY else "conda": [ + "ipykernel" + ] + }, + user_sources={}, + extras={}, + base_env=env, + local_only=local_only, + use_latest=":any:", ) - else: - update_envs = [ - resolved_env - for _, resolved_env, _ in resolver.new_environments( + resolver.resolve_environments(obj.echo) + update_envs = [] # type: List[ResolvedEnvironment] + if obj.datastore_type != "local" or CONDA_TEST: + # We may need to update caches + # Note that it is possible that something we needed to resolve, we don't need + # to cache (if we resolved to something already cached). + formats = set() # type: Set[str] + for _, resolved_env, f, _ in resolver.need_caching_environments( include_builder_envs=True + ): + update_envs.append(resolved_env) + formats.update(f) + + cast(Conda, obj.conda).cache_environments( + update_envs, {"conda": list(formats)} ) - ] - cast(Conda, obj.conda).add_environments(update_envs) + else: + update_envs = [ + resolved_env + for _, resolved_env, _ in resolver.new_environments( + include_builder_envs=True + ) + ] + cast(Conda, obj.conda).add_environments(update_envs) - # Update the default environment - for _, resolved_env, _ in resolver.resolved_environments( - include_builder_envs=True - ): - cast(Conda, obj.conda).set_default_environment(resolved_env.env_id) + # Update the default environment + for _, resolved_env, _ in resolver.resolved_environments( + include_builder_envs=True + ): + cast(Conda, obj.conda).set_default_environment(resolved_env.env_id) - cast(Conda, obj.conda).write_out_environments() + cast(Conda, obj.conda).write_out_environments() - # We are going to be creating this new environment going forward (not the - # initial env we got) - _, env, _ = next(resolver.resolved_environments()) + # We are going to be creating this new environment going forward (not the + # initial env we got) + _, env, _ = next(resolver.resolved_environments()) delta_time = int(time.time() - start) obj.echo(" done in %d second%s." % (delta_time, plural_marker(delta_time))) @@ -422,10 +441,12 @@ def create( "Step '%s' does not have a code package -- " "downloading active Metaflow version only" % env_name ) - download_mf_version("./__conda_python", mf_version) + download_mf_version( + "./__conda_python", mf_version, mf_extensions_info, obj.echo, strict + ) obj.echo( - "Code package for %s downloaded into '%s' -- `__conda_python` is " - "the executable to use" % (env_name, into_dir) + "Python executable `__conda_python` for environment '%s' downloaded " + "into '%s'" % (env_name, into_dir) ) else: python_bin = os.path.join(obj.conda.create_for_name(name, env), "bin", "python") @@ -498,8 +519,9 @@ def create( f.write("\n") else: obj.echo( - "Created environment '%s' locally, activate with `%s activate %s`" - % (name, obj.conda.binary("conda"), name) + "Conda environment '%s' created locally, activate with " + "`CONDA_ENVS_DIRS=%s %s activate %s`" + % (name, obj.conda.root_env_dir, CONDA_DEPENDENCY_RESOLVER, name) ) cast(Conda, obj.conda).write_out_environments() diff --git a/metaflow_extensions/netflix_ext/cmd/environment/utils.py b/metaflow_extensions/netflix_ext/cmd/environment/utils.py index 55f4eec..5d4892a 100644 --- a/metaflow_extensions/netflix_ext/cmd/environment/utils.py +++ b/metaflow_extensions/netflix_ext/cmd/environment/utils.py @@ -1,20 +1,21 @@ import json import os import re +import shutil import subprocess -from typing import Dict, List, Optional - -from metaflow import Step -from metaflow_extensions.netflix_ext.plugins.conda.conda import Conda -from metaflow_extensions.netflix_ext.plugins.conda.env_descr import ( - EnvID, - ResolvedEnvironment, - TStr, +import sys +from typing import Any, Dict, Optional + +from metaflow.extension_support import ( + dump_module_info, + get_extensions_in_dir, + update_package_info, ) -from metaflow_extensions.netflix_ext.plugins.conda.utils import arch_id +from metaflow.info_file import INFO_FILE -_deps_parse = re.compile(r"([^<>=!~]+)(.*)") +# _deps_parse = re.compile(r"([^<>=!~]+)(.*)") _ext_parse = re.compile(r"([-_\w]+)\(([^)]+)\)") +_git_version = re.compile(r"-git([0-9a-f]+)(-dirty)?$") name_to_pkg = {"netflix-ext": "metaflow-netflixext"} @@ -72,8 +73,41 @@ # return local_instances -def download_mf_version(executable: str, version_str: str): - def _install_pkg(pkg: str, ver: str): +def _merge_directories(src_dir, dest_dir): + # Due to a bug in PIP, we can't use --target to install namespace packages + # so we hack around it by merging directories manually. + + for root, dirs, files in os.walk(src_dir): + # Determine the path of the current directory relative to src_dir + relative_path = os.path.relpath(root, src_dir) + # Determine the corresponding path in the destination directory + dest_path = os.path.join(dest_dir, relative_path) + + # Create directories in the destination directory + for dir_name in dirs: + dest_dir_path = os.path.join(dest_path, dir_name) + if not os.path.exists(dest_dir_path): + os.makedirs(dest_dir_path) + + # Copy files to the destination directory + for file_name in files: + src_file_path = os.path.join(root, file_name) + dest_file_path = os.path.join(dest_path, file_name) + shutil.copy2(src_file_path, dest_file_path) + + +def download_mf_version( + executable: str, version_str: str, extension_info: Dict[str, Any], echo, fail_hard +): + def echo_or_fail(msg): + if fail_hard: + raise RuntimeError( + msg + + ". Use --no-strict to install latest versions if possible instead." + ) + echo("WARNING: " + msg + " -- installing latest version if possible.") + + def _install_pkg(pkg: str, ver: Optional[str]): try: subprocess.check_call( [ @@ -81,29 +115,118 @@ def _install_pkg(pkg: str, ver: str): "-m", "pip", "install", + "--quiet", "-t", ".", "--no-deps", - "%s==%s" % (pkg, ver), + "%s==%s" % (pkg, ver) if ver else pkg, ] ) except subprocess.CalledProcessError as e: raise RuntimeError( "Could not install version '%s' of '%s': %s" % (ver, pkg, e.stderr) - ) + ) from e + + if not version_str: + raise ValueError("Unknown version of Metaflow") s = version_str.split("+", 1) - _install_pkg("metaflow", s[0]) + if _git_version.search(s[0]): + # This is not a "public" release so we install the latest + echo_or_fail("Metaflow's version is non public (%s)" % s[0]) + _install_pkg("metaflow", None) + else: + _install_pkg("metaflow", s[0]) if len(s) == 1: return - # We now install the other packages, they are in the format name(ver);name(ver)... - s = s[1].split(";") - for pkg_desc in s: - m = _ext_parse.match(pkg_desc) - if not m: - raise ValueError("Metaflow extension '%s' is not a valid format" % pkg_desc) - pkg_name, pkg_version = m.groups() - pkg = name_to_pkg.get(pkg_name) - if pkg is None: - raise ValueError("Metaflow extension '%s' is not known" % pkg_name) - _install_pkg(pkg, pkg_version) + # We now install the other packages (extensions). + # If we have extension_info, we can get that information from there. + # That is ideal if we have that. If not, we do our best from the version string + # where packages are in the form name(vers);name(vers) but name is not necessarily + # the package name. + if extension_info: + wrong_version_info = set() + first_round = True + for pkg_name, pkg_info in extension_info["installed"].items(): + if pkg_name.startswith("_pythonpath"): + # Local package with *zero* information + echo_or_fail("Unknown extension present at runtime") + continue + if pkg_info["package_version"] == "" or _git_version.search( + pkg_info["package_version"] + ): + echo_or_fail( + "Extension '%s' has a non-public version (%s)" + % (pkg_info["extension_name"], pkg_info["package_version"]) + ) + pkg_version = pkg_info["dist_version"] + wrong_version_info.add(pkg_name) + _install_pkg(pkg_name, pkg_version) + if first_round: + shutil.move("metaflow_extensions", "metaflow_extensions_tmp") + first_round = False + else: + _merge_directories("metaflow_extensions", "metaflow_extensions_tmp") + shutil.rmtree("metaflow_extensions") + else: + s = s[1].split(";") + first_round = True + for pkg_desc in s: + m = _ext_parse.match(pkg_desc) + if not m: + # In some cases (older Metaflow), the version is not recorded so + # we just install the latest + echo_or_fail("Extension '%s' does not have a version" % pkg_desc) + pkg_name, pkg_version = pkg_desc, None + else: + pkg_name, pkg_version = m.groups() + pkg = name_to_pkg.get(pkg_name) + if pkg is None: + raise ValueError("Metaflow extension '%s' is not known" % pkg_name) + _install_pkg(pkg, pkg_version) + if first_round: + shutil.move("metaflow_extensions", "metaflow_extensions_tmp") + first_round = False + else: + _merge_directories("metaflow_extensions", "metaflow_extensions_tmp") + shutil.rmtree("metaflow_extensions") + # We now do a few things to make sure the Metaflow environment is recreated + # as closely as possible: + # - add a __init__.py file to the metaflow_extensions directory to prevent + # other extensions from being loaded + # - create a INFO file with the extension information. This will allow for the + # __init__.py file (since otherwise it is an error) and will also remove + # conflicts when trying to load the extensions. + # - we clean up all the dist-info directories that were created as part of the + # pip install. This is not strictly necessary but it is cleaner. + shutil.move("metaflow_extensions_tmp", "metaflow_extensions") + sys.path.insert(0, ".") + installed_packages, pkgs_per_extension_point = get_extensions_in_dir(os.getcwd()) + # Update the information with the reported version and name from extension_info + for pkg_name, pkg_info in extension_info["installed"].items(): + if pkg_name in installed_packages: + update_package_info( + pkg_to_update=installed_packages[pkg_name], + dist_version=pkg_info["dist_version"], + extension_name=pkg_info["extension_name"], + ) + if pkg_name not in wrong_version_info: + update_package_info( + pkg_to_update=installed_packages[pkg_name], + package_version=pkg_info["package_version"], + ) + + key, val = dump_module_info(installed_packages, pkgs_per_extension_point) + sys.path.pop(0) + + with open("metaflow_extensions/__init__.py", "w+", encoding="utf-8") as f: + f.write("# This file is automatically generated by Metaflow\n") + + with open(os.path.basename(INFO_FILE), "w+", encoding="utf-8") as f: + json.dump({key: val}, f) + + # Clean up the dist-info directories + for root, dirs, _ in os.walk(".", topdown=False): + for d in dirs: + if d.endswith(".dist-info"): + shutil.rmtree(os.path.join(root, d)) diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda.py b/metaflow_extensions/netflix_ext/plugins/conda/conda.py index 8f34a91..ec2093a 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda.py @@ -36,6 +36,7 @@ from shutil import which from requests.auth import AuthBase +from urllib3 import Retry from metaflow.plugins.datastores.local_storage import LocalStorage from metaflow.datastore.datastore_storage import DataStoreStorage @@ -58,6 +59,8 @@ CONDA_USE_REMOTE_LATEST, ) from metaflow.metaflow_environment import InvalidEnvironmentException + +from metaflow.system import _system_logger, _system_monitor from metaflow.util import get_username from metaflow._vendor.packaging.version import parse as parse_version @@ -122,6 +125,9 @@ def _modified_logger(*args: Any, **kwargs: Any): self._mode = mode self._bins = None # type: Optional[Dict[str, Optional[str]]] self._conda_executable_type = None # type: Optional[str] + # True when using micromamba or mamba 2.0+ which doesn't wrap + # conda anymore + self.is_non_conda_exec = False # type: bool self._have_micromamba_server = False # type: bool self._micromamba_server_port = None # type: Optional[int] @@ -269,10 +275,7 @@ def call_conda( if ( args and args[0] not in ("package", "info") - and ( - self._conda_executable_type == "micromamba" - or binary == "micromamba" - ) + and (self.is_non_conda_exec or binary == "micromamba") ): args.extend(["-r", self.root_prefix, "--json"]) debug.conda_exec("Conda call: %s" % str([self._bins[binary]] + args)) @@ -389,12 +392,50 @@ def create_for_step( try: env_name = self._env_directory_from_envid(env.env_id) - return self.create_for_name(env_name, env, do_symlink) + to_return = None + s = time.time() + with _system_monitor.measure("metaflow.conda.create_for_step"): + to_return = self.create_for_name(env_name, env, do_symlink) + + _system_logger.log_event( + level="info", + module="netflix_ext.conda", + name="env_create_for_step", + payload={ + "qualifier_name": str(env.env_id), + "msg": "Environment created in %d seconds" % (time.time() - s), + # We log the step name in case its not available in the event logger context + "step_name": step_name, + # Override the log stream to be the default metrics stream + "log_stream": "metrics", + }, + ) + return to_return except CondaException as e: + import traceback + + with _system_monitor.count("metaflow.conda.create_for_step.error"): + _system_logger.log_event( + level="error", + module="netflix_ext.conda", + name="env_create_for_step.error", + payload={ + "qualifier_name": str(env.env_id), + "msg": traceback.format_exc(), + # We log the step name in case its not available in the event logger context + "step_name": step_name, + # Override the log stream to be the default metrics stream + "log_stream": "metrics", + }, + ) raise CondaStepException(e, [step_name]) from None def create_for_name( - self, name: str, env: ResolvedEnvironment, do_symlink: bool = False + self, + name: str, + env: ResolvedEnvironment, + do_symlink: bool = False, + quiet: bool = False, ) -> str: """ Creates a local instance of the resolved environment @@ -408,6 +449,9 @@ def create_for_name( do_symlink : bool, optional If True, creates a `__conda_python` symlink in the current directory pointing to the created Conda Python executable, by default False + quiet : bool, optional + If True, does not print status messages when creating the environment, + by default False Returns ------- @@ -430,7 +474,12 @@ def create_for_name( with CondaLockMultiDir( self.echo, self._package_dirs, self._package_dir_lockfile_name ): + if quiet: + techo = self.echo + self.echo = self._no_echo env_path = self._create(env, name) + if quiet: + self.echo = techo if do_symlink: os.symlink( @@ -442,12 +491,11 @@ def create_for_name( def create_builder_env(self, builder_env: ResolvedEnvironment) -> str: # A helper to build a named environment specifically for builder environments. # We are more quiet and have a specific name for it - techo = self.echo - self.echo = self._no_echo r = self.create_for_name( - self._env_builder_directory_from_envid(builder_env.env_id), builder_env + self._env_builder_directory_from_envid(builder_env.env_id), + builder_env, + quiet=True, ) - self.echo = techo return r @@ -1191,7 +1239,8 @@ def _cache_pkg(pkg: PackageSpecification, pkg_fmt: str, local_path: str) -> str: self._upload_to_ds(upload_files) delta_time = int(time.time() - start) self.echo( - " done in %d second%s." % (delta_time, plural_marker(delta_time)) + " done in %d second%s." % (delta_time, plural_marker(delta_time)), + timestamp=False, ) else: self.echo( @@ -1239,7 +1288,8 @@ def _cache_pkg(pkg: PackageSpecification, pkg_fmt: str, local_path: str) -> str: self._upload_to_ds(upload_files) delta_time = int(time.time() - start) self.echo( - " done in %d second%s." % (delta_time, plural_marker(delta_time)) + " done in %d second%s." % (delta_time, plural_marker(delta_time)), + timestamp=False, ) else: self.echo( @@ -1585,7 +1635,7 @@ def _micromamba_transmute(src_file: str, dst_file: str, dst_format: str): a = requests.adapters.HTTPAdapter( pool_connections=executor._max_workers, pool_maxsize=executor._max_workers, - max_retries=3, + max_retries=Retry(total=5, backoff_factor=0.1), ) s.mount("https://", a) download_results = [ @@ -1797,7 +1847,7 @@ def _ensure_micromamba(self) -> str: "if ! type micromamba >/dev/null 2>&1; then " "mkdir -p ~/.local/bin >/dev/null 2>&1; " 'python -c "import requests, bz2, sys; ' - "data = requests.get('https://micro.mamba.pm/api/micromamba/%s/1.5.7').content; " + "data = requests.get('https://micro.mamba.pm/api/micromamba/%s/1.5.10').content; " 'sys.stdout.buffer.write(bz2.decompress(data))" | ' "tar -xv -C ~/.local/bin/ --strip-components=1 bin/micromamba > /dev/null 2>&1; " "echo $HOME/.local/bin/micromamba; " @@ -1872,6 +1922,7 @@ def _ensure_remote_conda(self): self._bins = {"conda": self._ensure_micromamba()} self._bins["micromamba"] = self._bins["conda"] self._conda_executable_type = "micromamba" + self.is_non_conda_exec = True def _install_remote_conda(self): # We download the installer and return a path to it @@ -1922,6 +1973,7 @@ def _install_remote_conda(self): os.sync() self._bins = {"conda": final_path, "micromamba": final_path} self._conda_executable_type = "micromamba" + self.is_non_conda_exec = True def _validate_conda_installation(self) -> Optional[Exception]: # If this is installed in CONDA_LOCAL_PATH look for special marker @@ -1993,6 +2045,16 @@ def _validate_conda_installation(self) -> Optional[Exception]: return InvalidEnvironmentException( self._install_message_for_resolver("micromamba") ) + else: + self.is_non_conda_exec = True + elif "mamba version" in self._info_no_lock: + # Mamba 2.0+ has mamba version but no conda version + if parse_version(self._info_no_lock) < parse_version("2.0.0"): + return InvalidEnvironmentException( + self._install_message_for_resolver("mamba") + ) + else: + self.is_non_conda_exec = True else: if parse_version(self._info_no_lock["conda_version"]) < parse_version( "4.14.0" @@ -2059,19 +2121,16 @@ def _check_match(dir_name: str) -> Optional[EnvID]: self._remove(os.path.basename(dir_name)) return None - if ( - self._conda_executable_type == "micromamba" - or CONDA_LOCAL_PATH is not None - or CONDA_TEST - ): - # Micromamba does not record created environments so we look around for them + if self.is_non_conda_exec or CONDA_LOCAL_PATH is not None or CONDA_TEST: + # Micromamba (or Mamba 2.0+) does not record created environments so we look + # around for them # in the root env directory. We also do this if had a local installation # because we don't want to look around at other environments created outside # of that local installation. Finally, we also do this in test mode for # similar reasons -- we only want to search the ones we created. # For micromamba OR if we are using a specific conda installation # (so with CONDA_LOCAL_PATH), only search there - env_dir = self._root_env_dir + env_dir = self.root_env_dir with CondaLock(self.echo, self._env_lock_file(os.path.join(env_dir, "_"))): # Grab a lock *once* on the parent directory so we pick anyname for # the "directory". @@ -2206,7 +2265,7 @@ def _package_dirs(self) -> List[str]: return info["pkgs_dirs"] @property - def _root_env_dir(self) -> str: + def root_env_dir(self) -> str: info = self._info # We rely on the first directory existing. This should be a fairly # easy check. @@ -2224,7 +2283,7 @@ def _info(self) -> Dict[str, Any]: def _info_no_lock(self) -> Dict[str, Any]: if self._cached_info is None: self._cached_info = json.loads(self.call_conda(["info", "--json"])) - if self._conda_executable_type == "micromamba": + if "root_prefix" not in self._cached_info: # Micromamba and Mamba 2.0+ self._cached_info["root_prefix"] = self._cached_info["base environment"] self._cached_info["envs_dirs"] = self._cached_info["envs directories"] self._cached_info["pkgs_dirs"] = self._cached_info["package cache"] @@ -2233,7 +2292,7 @@ def _info_no_lock(self) -> Dict[str, Any]: def _create(self, env: ResolvedEnvironment, env_name: str) -> str: # We first check to see if the environment exists -- if it does, we skip it - env_dir = os.path.join(self._root_env_dir, env_name) + env_dir = os.path.join(self.root_env_dir, env_name) self._cached_info = None @@ -2374,13 +2433,12 @@ def _create(self, env: ResolvedEnvironment, env_name: str) -> str: "--offline", "--no-deps", ] - if self._conda_executable_type == "micromamba": - # micromamba seems to have a bug when compiling .py files. In some + if self.is_non_conda_exec: + # Micromamba (some version) seems to have a bug when compiling .py files. In some # circumstances, it just hangs forever. We avoid this by not compiling # any file and letting things get compiled lazily. This may have the # added benefit of a faster environment creation. - # This option is only available for micromamba so we don't add it - # for anything else. This should cover all remote installations though. + # This works with Micromamba and Mamba 2.0+. args.append("--no-pyc") args.extend( [ @@ -2418,7 +2476,7 @@ def _create(self, env: ResolvedEnvironment, env_name: str) -> str: "--no-deps", "--no-input", ] - if self._conda_executable_type == "micromamba": + if self.is_non_conda_exec: # Be consistent with what we install with micromamba arg_list.append("--no-compile") arg_list.extend(["-r", pypi_list.name]) @@ -2484,10 +2542,10 @@ def paths_and_handles(): def _env_lock_file(self, env_directory: str): # env_directory is either a name or a directory -- if name, it is assumed - # to be rooted at _root_env_dir + # to be rooted at root_env_dir parent_dir = os.path.split(env_directory)[0] if parent_dir == "": - parent_dir = self._root_env_dir + parent_dir = self.root_env_dir return os.path.join(parent_dir, "mf_env-creation.lock") @property @@ -2660,6 +2718,7 @@ def _acquire(self) -> None: try_count = 0 while True: try: + debug.conda_exec("Attempting to create lock at %s" % self.lock) self.fd = os.open(self.lock, os.O_CREAT | os.O_EXCL | os.O_RDWR) self.locked = True break @@ -2667,15 +2726,16 @@ def _acquire(self) -> None: if e.errno != errno.EEXIST: raise - if try_count < 3: - try_count += 1 - elif try_count == 3: + debug.conda_exec( + "Lock at %s already exists -- try %d" % (self.lock, try_count + 1) + ) + try_count += 1 + if try_count % 3 == 0: self.echo( "Waited %ds to acquire lock at '%s' -- if unexpected, " "please remove that file and retry" % (try_count * self.delay, self.lock) ) - try_count += 1 if self.timeout is None: raise CondaException( @@ -2724,6 +2784,7 @@ def __init__( def _acquire(self) -> None: start = time.time() + debug.conda_exec("Will acquire locks on %s" % ", ".join(self.dirs)) for d in self.dirs: full_file = os.path.join(d, self.lockfile) try_count = 0 @@ -2734,6 +2795,8 @@ def _acquire(self) -> None: raise while True: try: + debug.conda_exec("Attempting to create lock at %s" % full_file) + self.fd.append( os.open(full_file, os.O_CREAT | os.O_EXCL | os.O_RDWR) ) @@ -2742,15 +2805,17 @@ def _acquire(self) -> None: if e.errno != errno.EEXIST: raise - if try_count < 3: - try_count += 1 - elif try_count == 3: + debug.conda_exec( + "Lock at %s already exists -- try %d" + % (full_file, try_count + 1) + ) + try_count += 1 + if try_count % 3 == 0: self.echo( "Waited %ds to acquire lock at '%s' -- if unexpected, " "please remove that file and retry" % (try_count * self.delay, full_file) ) - try_count += 1 if self.timeout is None: raise CondaException( diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py b/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py index 20f06fc..79f8a57 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py @@ -139,6 +139,7 @@ def init_environment(self, echo: Callable[..., None]): ) resolver.resolve_environments(echo) + self._log_conda_events(resolver) update_envs = [] # type: List[ResolvedEnvironment] if self._datastore_type != "local" or CONDA_TEST: @@ -672,3 +673,76 @@ def _get_executable(self, step_name: str) -> Optional[str]: # environment. return os.path.join(".", "__conda_python") return None + + def _log_conda_events(self, resolver): + from metaflow.system import _system_logger + from metaflow import current + + def _format_packages(packages): + return { + package.package_name: { + "package_version": package.package_version, + "filename": package.filename, + "package_url": package.url, + "package_detailed_version": package.package_detailed_version, + "package_format": package.url_format, + } + for package in packages + } + + for ( + newly_resolved_env_id, + newly_resolved_env, + needed_in_steps, + ) in resolver.new_environments(): + # Log the requested packages + _system_logger.log_event( + level="info", + module="netflix_ext.conda", + name="requested_packages", + payload={ + "full_id": newly_resolved_env_id.full_id, + "arch": newly_resolved_env_id.arch, + "req_id": newly_resolved_env_id.req_id, + "msg": str(newly_resolved_env.deps), + }, + ) + + # Log the sources + _system_logger.log_event( + level="info", + module="netflix_ext.conda", + name="sources", + payload={ + "full_id": newly_resolved_env_id.full_id, + "arch": newly_resolved_env_id.arch, + "req_id": newly_resolved_env_id.req_id, + "msg": str(newly_resolved_env.sources), + }, + ) + + # Log the sys packages + _system_logger.log_event( + level="info", + module="netflix_ext.conda", + name="sys_packages", + payload={ + "full_id": newly_resolved_env_id.full_id, + "arch": newly_resolved_env_id.arch, + "req_id": newly_resolved_env_id.req_id, + "msg": str(newly_resolved_env.extras), + }, + ) + + # Log the resolved packages + _system_logger.log_event( + level="info", + module="netflix_ext.conda", + name="resolved_packages", + payload={ + "full_id": newly_resolved_env_id.full_id, + "arch": newly_resolved_env_id.arch, + "req_id": newly_resolved_env_id.req_id, + "msg": str(_format_packages(newly_resolved_env.packages)), + }, + ) diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py b/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py index 4cffbf1..ba40167 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py @@ -24,8 +24,7 @@ from metaflow.extension_support import EXT_PKG from metaflow.flowspec import FlowSpec from metaflow.graph import FlowGraph -from metaflow.metadata import MetaDatum -from metaflow.metadata.metadata import MetadataProvider +from metaflow.metadata_provider import MetaDatum, MetadataProvider from metaflow.metaflow_config import CONDA_REMOTE_COMMANDS from metaflow.metaflow_environment import ( InvalidEnvironmentException, diff --git a/metaflow_extensions/netflix_ext/plugins/conda/env_descr.py b/metaflow_extensions/netflix_ext/plugins/conda/env_descr.py index 0894930..5cd5d88 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/env_descr.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/env_descr.py @@ -52,10 +52,23 @@ class EnvType(Enum): + """Type of environment""" + CONDA_ONLY = "conda-only" + """Environment only has conda packages + """ + PYPI_ONLY = "pypi-only" + """Environment only has Pypi packages + """ + PIP_ONLY = "pip-only" # Here for legacy reasons -- we now use pypi-only + """Alias to PYPI_ONLY + """ + MIXED = "mixed" + """Environment has mixed Conda and Pypi packages + """ class TStr: diff --git a/metaflow_extensions/netflix_ext/plugins/conda/envsresolver.py b/metaflow_extensions/netflix_ext/plugins/conda/envsresolver.py index 31d5fe0..395fee2 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/envsresolver.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/envsresolver.py @@ -30,6 +30,7 @@ CONDA_USE_REMOTE_LATEST, ) from metaflow.metaflow_environment import InvalidEnvironmentException +from metaflow.system import _system_monitor, _system_logger from metaflow._vendor.packaging.version import parse as parse_version from .env_descr import ( @@ -221,21 +222,34 @@ def resolve_environments(self, echo: Callable[..., None]): echo : Callable[..., None] Method to use to print things to the console """ - # At this point, we check in our backend storage if we have the files we need - need_resolution = [ - env_id - for env_id, req in self._requested_envs.items() - if req["resolved"] is None - ] - if debug.conda: - debug.conda_exec("Resolving environments:") - for env_id in need_resolution: - info = self._requested_envs[env_id] - debug.conda_exec( - "%s (%s): %s" % (env_id.req_id, env_id.full_id, str(info)) - ) - if len(need_resolution): - self._resolve_environments(echo, need_resolution) + with _system_monitor.measure("metaflow.conda.all_resolve"): + start = time.time() + # At this point, we check in our backend storage if we have the files we need + need_resolution = [ + env_id + for env_id, req in self._requested_envs.items() + if req["resolved"] is None + ] + if debug.conda: + debug.conda_exec("Resolving environments:") + for env_id in need_resolution: + info = self._requested_envs[env_id] + debug.conda_exec( + "%s (%s): %s" % (env_id.req_id, env_id.full_id, str(info)) + ) + if len(need_resolution): + self._resolve_environments(echo, need_resolution) + _system_logger.log_event( + level="info", + module="netflix_ext.conda", + name="all_envs_resolved", + payload={ + "msg": "All environment resolved and cached in %d seconds" + % (time.time() - start), + # Override the log stream to be the default metrics stream + "log_stream": "metrics", + }, + ) def all_environments( self, include_builder_envs: bool = False @@ -640,91 +654,106 @@ def _resolve( env_desc: Mapping[str, Any], builder_environments: Optional[Dict[str, List[EnvID]]], ) -> Tuple[EnvID, ResolvedEnvironment, Optional[List[ResolvedEnvironment]]]: - env_id = cast(EnvID, env_desc["id"]) - if builder_environments is None: - builder_environments = {} - - builder_envs = [ - self._builder_envs[builder_env_id]["resolved"] - for builder_env_id in builder_environments.get(env_id.req_id, []) - ] - - # Figure out the env_type - env_type = cast( - EnvType, env_desc.get("env_type") or env_type_for_deps(env_desc["deps"]) - ) + with _system_monitor.measure("metaflow.conda.resolve"): + start = time.time() + env_id = cast(EnvID, env_desc["id"]) + if builder_environments is None: + builder_environments = {} + + builder_envs = [ + self._builder_envs[builder_env_id]["resolved"] + for builder_env_id in builder_environments.get(env_id.req_id, []) + ] - # Create the resolver object - resolver = self.get_resolver(env_type)(self._conda) - - # Resolve the environment - if env_type == EnvType.PYPI_ONLY: - # Pypi only mode - # In this mode, we also allow (as a workaround for poor support for - # more advanced options in conda-lock (like git repo, local support, - # etc)) the inclusion of conda packages that are *not* python packages. - # To ensure this, we check the npconda packages, create an environment - # for it and check if that environment doesn't contain python deps. - # If that is the case, we then create the actual environment including - # both conda and npconda packages and re-resolve. We could maybe - # optimize to not resolve from scratch twice but given this is a rare - # situation and the cost is only during resolution, it doesn't seem - # worth it. - npconda_deps = env_desc["deps"].get("npconda", []) - if npconda_deps: - npcondaenv, _ = self.get_resolver(EnvType.CONDA_ONLY)( - self._conda - ).resolve( - EnvType.CONDA_ONLY, - {"npconda": npconda_deps}, - env_desc["sources"], - {}, - env_id.arch, - ) - if any((p.filename.startswith("python-") for p in npcondaenv.packages)): - raise InvalidEnvironmentException( - "Cannot specify a non-python Conda dependency that uses " - "python: %s. Please use the mixed mode instead." - % ", ".join([d.value for d in npconda_deps]) - ) - resolved_env, builder_envs = resolver.resolve( - env_type, - env_desc["deps"], - env_desc["sources"], - env_desc["extras"], - env_id.arch, - builder_envs, - env_desc["base"], - ) + # Figure out the env_type + env_type = cast( + EnvType, env_desc.get("env_type") or env_type_for_deps(env_desc["deps"]) + ) - if env_desc["base"]: - # We try to copy things over from the base environment as it contains - # potential caching information we don't need to rebuild. This also - # properly sets the user dependencies to what we want as opposed to - # including everything we resolved for. - merged_packages = [] # type: List[PackageSpecification] - base_packages = { - p.filename: p.to_dict() - for p in cast(ResolvedEnvironment, env_desc["base"]).packages - } - for p in resolved_env.packages: - existing_info = base_packages.get(p.filename) - if existing_info: - merged_packages.append( - PackageSpecification.from_dict(existing_info) + # Create the resolver object + resolver = self.get_resolver(env_type)(self._conda) + + # Resolve the environment + if env_type == EnvType.PYPI_ONLY: + # Pypi only mode + # In this mode, we also allow (as a workaround for poor support for + # more advanced options in conda-lock (like git repo, local support, + # etc)) the inclusion of conda packages that are *not* python packages. + # To ensure this, we check the npconda packages, create an environment + # for it and check if that environment doesn't contain python deps. + # If that is the case, we then create the actual environment including + # both conda and npconda packages and re-resolve. We could maybe + # optimize to not resolve from scratch twice but given this is a rare + # situation and the cost is only during resolution, it doesn't seem + # worth it. + npconda_deps = env_desc["deps"].get("npconda", []) + if npconda_deps: + npcondaenv, _ = self.get_resolver(EnvType.CONDA_ONLY)( + self._conda + ).resolve( + EnvType.CONDA_ONLY, + {"npconda": npconda_deps}, + env_desc["sources"], + {}, + env_id.arch, ) - else: - merged_packages.append(p) - resolved_env = ResolvedEnvironment( - env_desc["user_deps"], + if any( + (p.filename.startswith("python-") for p in npcondaenv.packages) + ): + raise InvalidEnvironmentException( + "Cannot specify a non-python Conda dependency that uses " + "python: %s. Please use the mixed mode instead." + % ", ".join([d.value for d in npconda_deps]) + ) + resolved_env, builder_envs = resolver.resolve( + env_type, + env_desc["deps"], env_desc["sources"], env_desc["extras"], env_id.arch, - all_packages=merged_packages, - env_type=resolved_env.env_type, - accurate_source=env_desc["base_accurate"], + builder_envs, + env_desc["base"], + ) + + if env_desc["base"]: + # We try to copy things over from the base environment as it contains + # potential caching information we don't need to rebuild. This also + # properly sets the user dependencies to what we want as opposed to + # including everything we resolved for. + merged_packages = [] # type: List[PackageSpecification] + base_packages = { + p.filename: p.to_dict() + for p in cast(ResolvedEnvironment, env_desc["base"]).packages + } + for p in resolved_env.packages: + existing_info = base_packages.get(p.filename) + if existing_info: + merged_packages.append( + PackageSpecification.from_dict(existing_info) + ) + else: + merged_packages.append(p) + resolved_env = ResolvedEnvironment( + env_desc["user_deps"], + env_desc["sources"], + env_desc["extras"], + env_id.arch, + all_packages=merged_packages, + env_type=resolved_env.env_type, + accurate_source=env_desc["base_accurate"], + ) + _system_logger.log_event( + level="info", + module="nflx.conda", + name="env_resolved", + payload={ + "qualifier_name": str(env_id), + "msg": "Environment resolved in %d seconds" % (time.time() - start), + # Override the log stream to be the default metrics stream + "log_stream": "metrics", + }, ) - return env_id, resolved_env, builder_envs + return env_id, resolved_env, builder_envs @staticmethod def extract_info_from_base( diff --git a/metaflow_extensions/netflix_ext/plugins/conda/pypi_package_builder.py b/metaflow_extensions/netflix_ext/plugins/conda/pypi_package_builder.py index d7999ef..22529f8 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/pypi_package_builder.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/pypi_package_builder.py @@ -8,8 +8,8 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, Tuple, cast if TYPE_CHECKING: - from metaflow.datastore.datastore_storage import DataStoreStorage - from .conda import Conda + import metaflow.datastore.datastore_storage + import metaflow_extensions.netflix_ext.plugins.conda.conda from metaflow.debug import debug @@ -53,8 +53,8 @@ def __init__( def build_pypi_packages( - conda: Conda, - storage: DataStoreStorage, + conda: "metaflow_extensions.netflix_ext.plugins.conda.conda.Conda", + storage: "metaflow.datastore.datastore_storage.DataStoreStorage", python_version: str, to_build_pkg_info: Dict[str, PackageToBuild], builder_envs: Optional[List[ResolvedEnvironment]], @@ -321,7 +321,7 @@ def build_pypi_packages( def _build_with_pip( - conda: Conda, + conda: "metaflow_extensions.netflix_ext.plugins.conda.conda.Conda", binary: str, dest_path: str, key: str, diff --git a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/__init__.py b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/__init__.py index 15c0e34..d7d91d5 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/__init__.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/__init__.py @@ -5,9 +5,8 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Type if TYPE_CHECKING: - from ..conda import Conda - from ..env_descr import ResolvedEnvironment - from ..env_descr import EnvType + import metaflow_extensions.netflix_ext.plugins.conda.conda + import metaflow_extensions.netflix_ext.plugins.conda.env_descr from ..utils import CondaException @@ -35,19 +34,34 @@ def get_resolver(cls, resolver_type: str): ) return resolver - def __init__(self, conda: Conda): + def __init__( + self, conda: "metaflow_extensions.netflix_ext.plugins.conda.conda.Conda" + ): self._conda = conda def resolve( self, - env_type: EnvType, + env_type: "metaflow_extensions.netflix_ext.plugins.conda.env_descr.EnvType", deps: Dict[str, List[str]], sources: Dict[str, List[str]], extras: Dict[str, List[str]], architecture: str, - builder_envs: Optional[List[ResolvedEnvironment]] = None, - base_env: Optional[ResolvedEnvironment] = None, - ) -> Tuple[ResolvedEnvironment, Optional[List[ResolvedEnvironment]]]: + builder_envs: Optional[ + List[ + "metaflow_extensions.netflix_ext.plugins.conda.env_descr.ResolvedEnvironment" + ] + ] = None, + base_env: Optional[ + "metaflow_extensions.netflix_ext.plugins.conda.env_descr.ResolvedEnvironment" + ] = None, + ) -> Tuple[ + "metaflow_extensions.netflix_ext.plugins.conda.env_descr.ResolvedEnvironment", + Optional[ + List[ + "metaflow_extensions.netflix_ext.plugins.conda.env_descr.ResolvedEnvironment" + ] + ], + ]: """ Resolves the environment specified by the dependencies, the sources (channels or indices), extra information (used for Pypi resolvers) for the given diff --git a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_lock_resolver.py b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_lock_resolver.py index e142b7d..dcbc004 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_lock_resolver.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_lock_resolver.py @@ -228,15 +228,10 @@ def resolve( # else: conda_exec_type = self._conda.conda_executable_type if conda_exec_type: - if conda_exec_type == "conda": - args.append(cast(str, self._conda.binary(conda_exec_type))) - else: - args.extend( - [ - cast(str, self._conda.binary(conda_exec_type)), - "--%s" % conda_exec_type, - ] - ) + args.append(cast(str, self._conda.binary(conda_exec_type))) + + if conda_exec_type != "conda": + args.append("--%s" % conda_exec_type) else: raise CondaException("Could not find conda binary for conda-lock") diff --git a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_resolver.py b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_resolver.py index 077164e..f3f47b0 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_resolver.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_resolver.py @@ -13,7 +13,12 @@ PackageSpecification, ResolvedEnvironment, ) -from ..utils import CondaException, channel_or_url, parse_explicit_url_conda +from ..utils import ( + CondaException, + channel_or_url, + clean_up_double_equal, + parse_explicit_url_conda, +) from . import Resolver @@ -40,7 +45,9 @@ def resolve( % ", ".join([p.package_name for p in local_packages]) ) sys_overrides = {k: v for d in deps.get("sys", []) for k, v in [d.split("==")]} - real_deps = list(chain(deps.get("conda", []), deps.get("npconda", []))) + real_deps = clean_up_double_equal( + chain(deps.get("conda", []), deps.get("npconda", [])) + ) packages = [] # type: List[PackageSpecification] with tempfile.TemporaryDirectory() as mamba_dir: args = [ @@ -86,7 +93,8 @@ def resolve( # - actions: # - FETCH: List of objects to fetch -- this is where we get hash and URL # - LINK: Packages to actually install (in that order) - # On micromamba, we can just use the LINK blob since it has all information we need + # On micromamba (or Mamba 2+), we can just use the LINK blob since it has all + # information we need if not conda_result["success"]: print( "Pretty-printed Conda create result:\n%s" % conda_result, @@ -96,7 +104,7 @@ def resolve( "Could not resolve environment -- see above pretty-printed error." ) - if self._conda.conda_executable_type == "micromamba": + if self._conda.is_non_conda_exec: for lnk in conda_result["actions"]["LINK"]: parse_result = parse_explicit_url_conda( "%s#%s" % (lnk["url"], lnk["md5"]) diff --git a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/pip_resolver.py b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/pip_resolver.py index 925b690..06244d1 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/pip_resolver.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/pip_resolver.py @@ -24,6 +24,7 @@ from ..utils import ( CondaException, arch_id, + clean_up_double_equal, correct_splitext, get_glibc_version, parse_explicit_path_pypi, @@ -254,6 +255,7 @@ def resolve( # Unfortunately, pip doesn't like things like ==<= so we need to strip # the == + args.extend(clean_up_double_equal(real_deps)) for d in real_deps: splits = d.split("==", 1) if len(splits) == 1: diff --git a/metaflow_extensions/netflix_ext/plugins/conda/utils.py b/metaflow_extensions/netflix_ext/plugins/conda/utils.py index 828331c..0d3375f 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/utils.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/utils.py @@ -18,6 +18,7 @@ Any, Dict, FrozenSet, + Iterable, List, Mapping, NamedTuple, @@ -62,7 +63,7 @@ from metaflow.metaflow_environment import InvalidEnvironmentException if TYPE_CHECKING: - from .env_descr import TStr + import metaflow_extensions.netflix_ext.plugins.conda.env_descr # NOTA: Most of the code does not assume that there are only two formats BUT the # transmute code does (since you can only specify the infile -- the outformat and file @@ -100,6 +101,8 @@ class AliasType(Enum): CONDA_FORMATS = _ALL_CONDA_FORMATS # type: Tuple[str, ...] FAKEURL_PATHCOMPONENT = "_fake" +_double_equal_match = re.compile("==(?=[<=>!~])") + class CondaException(MetaflowException): headline = "Conda ran into an error while setting up environment." @@ -441,7 +444,9 @@ def is_alias_mutable(alias_type: AliasType, resolved_alias: str) -> bool: return len(splits) == 2 and splits[1] in ("latest", "candidate", "stable") -def dict_to_tstr(deps: Dict[str, List[str]]) -> List[TStr]: +def dict_to_tstr( + deps: Dict[str, List[str]] +) -> List["metaflow_extensions.netflix_ext.plugins.conda.env_descr.TStr"]: from .env_descr import TStr # Avoid circular import result = [] # type: List[TStr] @@ -450,7 +455,9 @@ def dict_to_tstr(deps: Dict[str, List[str]]) -> List[TStr]: return result -def tstr_to_dict(deps: List[TStr]) -> Dict[str, List[str]]: +def tstr_to_dict( + deps: List["metaflow_extensions.netflix_ext.plugins.conda.env_descr.TStr"], +) -> Dict[str, List[str]]: result = {} # type: Dict[str, List[str]] for dep in deps: result.setdefault(dep.category, []).append(dep.value) @@ -468,6 +475,10 @@ def split_into_dict(deps: List[str]) -> Dict[str, str]: return result +def clean_up_double_equal(deps: Iterable[str]) -> List[str]: + return [_double_equal_match.sub("", d) for d in deps] + + def merge_dep_dicts( d1: Dict[str, str], d2: Dict[str, str], only_last_deps: bool = False ) -> Dict[str, str]: diff --git a/metaflow_extensions/netflix_ext/toplevel/netflixext_version.py b/metaflow_extensions/netflix_ext/toplevel/netflixext_version.py index 408444a..9002870 100644 --- a/metaflow_extensions/netflix_ext/toplevel/netflixext_version.py +++ b/metaflow_extensions/netflix_ext/toplevel/netflixext_version.py @@ -1 +1 @@ -netflixext_version = "1.2.2" +netflixext_version = "1.2.3" diff --git a/setup.py b/setup.py index 0ab011b..774868b 100644 --- a/setup.py +++ b/setup.py @@ -39,5 +39,5 @@ "metaflow_extensions.netflix_ext.plugins.conda.resources": ["*.png", "*.svg"] }, python_requires=">=3.7.2", - install_requires=["metaflow>=2.10.0"], + install_requires=["metaflow>=2.12.29"], )