diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda_common_decorator.py b/metaflow_extensions/netflix_ext/plugins/conda/conda_common_decorator.py index f6d640f..2da0a52 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda_common_decorator.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda_common_decorator.py @@ -127,6 +127,7 @@ def is_disabled(self) -> Optional[bool]: or self.from_pathspec or self.packages or self.sources + or self.is_fetch_at_exec ): return False return None diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py b/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py index 3b59700..13dcaeb 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py @@ -321,30 +321,26 @@ def resolve_fetch_at_exec_env( @classmethod def get_env_id(cls, conda: Conda, step_name: str) -> Optional[Union[str, EnvID]]: - # _METAFLOW_CONDA_ENV is set: - # - by remote_bootstrap: - # - if executing on a scheduler (no runtime), this will set it to - # the fully resolved ID even in the case of fetch_at_exec - # - if executing on a remote node with the runtime, it will also set it but - # it is unused - # - in runtime_step_cli: - # - if executing locally, we can use this in our actual execution to create - # the environment in runtime_step_cli - # - if executing remotely, we use this to determine the executable and - # it is used in bootstrap_commands to build the command line + # We either look in _result_for_step for the env id (this only works for the + # initial process that launched Metaflow ie: the first CLI process). This is what + # happens when deploying to argo/airflow or any other scheduler. For everything + # else, we look at _METAFLOW_CONDA_ENV: + # - when deploying to a scheduler: use _result_for_step to build the command line + # - when executing for scheduler, _METAFLOW_CONDA_ENV is set by remote_bootstrap + # and will also fully resolve fetch_at_exec and used in task_pre_step. Note that + # it relies on the value that was hardcoded in the command line when deploying + # to the scheduler. The value may still not be the same due to fetch_at_exec + # - when using the runtime: + # - if executing locally, the environment is created in runtime_step_cli + # and sets _METAFLOW_CONDA_ENV which is then read in task_pre_step (in the + # next process) + # - if executing remotely, _METAFLOW_CONDA_ENV is set by runtime_step_cli. It + # is then used by bootstrap_commands (in conda_environment.py) to build the + # command line. The command line will then call remote_bootstrap which will + # set again _METAFLOW_CONDA_ENV to the fully resolved environment which is + # then used in task_pre_step. step_info = cls._result_for_step.get(step_name) if step_info is None: - # _METAFLOW_CONDA_ENV is set: - # - by remote_bootstrap: - # - if executing on a scheduler (no runtime), this will set it to - # the fully resolved ID even in the case of fetch_at_exec - # - if executing on a remote node with the runtime, it will also set it but - # it is unused - # - in runtime_step_cli: - # - if executing locally, we can use this in our actual execution to create - # the environment in runtime_step_cli - # - if executing remotely, we use this to determine the executable and - # it is used in bootstrap_commands to build the command line resolved_env_id = None t = os.environ.get("_METAFLOW_CONDA_ENV") if t: diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py b/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py index dca2837..4cffbf1 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py @@ -500,31 +500,26 @@ def runtime_step_cli( max_user_code_retries: int, ubf_context: str, ): - # We also set the env var in remote case for is_fetch_at_exec - # so that it can be used to fill out the bootstrap command with - # the proper environment - if self._is_enabled(UBF_TASK) or self._is_fetch_at_exec(): - # Export this for local runs, we will use it to read the "resolved" - # environment ID in task_pre_step as well as in get_env_id in - # conda_environment.py. This makes it compatible with the remote - # bootstrap which also exports it. We do this even for UBF control tasks as - # this environment variable is then passed to the actual tasks. We don't - # always create the environment for the control task. Note that this is - # determined by _is_enabled - - # Note that in the case of a fetch_at_exec, self._env_id is fully resolved - # (it was resolved in runtime_task_created) so this is the env_id we need - # to use for our task. + # Check if there is a conda environment to use. We check for UBF_TASK (instead of + # ubf_context) because, even if we don't create the environment for a control task, + # we want to pass down the environment variable for the mapper tasks. + if self._is_enabled(UBF_TASK): cli_args.env["_METAFLOW_CONDA_ENV"] = json.dumps(self._env_id) - - # If we are executing remotely, we now have _METAFLOW_CONDA_ENV set-up - # properly so we will be able to use it in _get_env_id in conda_environment - # to figure out what environment we need to execute remotely if self._is_remote or not self._is_enabled(ubf_context): + # If we are executing remotely, we don't need to actually create the + # environment at all. We also don't create it if we are local and + # this is not enabled for the current UBF context. return else: + # In this case, there is no need for a conda environment. Since we can + # be running with a runner *inside* another Metaflow step that *does* have + # a conda environment, we make sure to clear _METAFLOW_CONDA_ENV so that + # the outside environment doesn't "leak" (it could have been set for the external + # process in which case it would get passed down). + cli_args.env["_METAFLOW_CONDA_ENV"] = "" return + # Create the environment only if executing locally (ie: not self._is_remote) conda = cast(Conda, self._env.conda) assert self._env_id entrypoint = None # type: Optional[str]