diff --git a/airflow/dag_processing/bundles/base.py b/airflow/dag_processing/bundles/base.py index a0061d97e8f21..ea560f1be26b0 100644 --- a/airflow/dag_processing/bundles/base.py +++ b/airflow/dag_processing/bundles/base.py @@ -82,3 +82,11 @@ def get_current_version(self) -> str | None: @abstractmethod def refresh(self) -> None: """Retrieve the latest version of the files in the bundle.""" + + def view_url(self, version: str | None = None) -> str | None: + """ + URL to view the bundle. + + :param version: Version to view + :return: URL to view the bundle + """ diff --git a/airflow/dag_processing/bundles/git.py b/airflow/dag_processing/bundles/git.py index 6547bc8ecad05..ad7a900d14331 100644 --- a/airflow/dag_processing/bundles/git.py +++ b/airflow/dag_processing/bundles/git.py @@ -126,3 +126,29 @@ def refresh(self) -> None: self.bare_repo.remotes.origin.fetch("+refs/heads/*:refs/heads/*") self.repo.remotes.origin.pull() + + def _convert_git_ssh_url_to_https(self) -> str: + if self.repo_url.startswith("git@"): + parts = self.repo_url.split(":") + domain = parts[0].replace("git@", "https://") + repo_path = parts[1].replace(".git", "") + return f"{domain}/{repo_path}" + raise ValueError(f"Invalid git SSH URL: {self.repo_url}") + + def view_url(self, version: str | None = None) -> str: + if not version: + raise AirflowException("Version is required to view the repository") + if not self._has_version(self.repo, version): + raise AirflowException(f"Version {version} not found in the repository") + url = self.repo_url + if url.startswith("git@"): + url = self._convert_git_ssh_url_to_https() + if url.endswith(".git"): + url = url[:-4] + if "github" in url: + return f"{url}/tree/{version}" + if "gitlab" in url: + return f"{url}/-/tree/{version}" + if "bitbucket" in url: + return f"{url}/src/{version}" + return f"{url}/tree/{version}" diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 4f8b59b956e18..63eabb996e543 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -94,3 +94,7 @@ def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: bundle_config = self.bundle_configs[name] bundle_class = import_string(bundle_config["classpath"]) return bundle_class(name=name, version=version, **bundle_config["kwargs"]) + + def view_url(self, name: str, version: str | None = None) -> str | None: + bundle = self.get_bundle(name, version) + return bundle.view_url(version=version) diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py index c79acfc2ed2a5..1dd961aec52a8 100644 --- a/tests/dag_processing/bundles/test_dag_bundle_manager.py +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -29,6 +29,7 @@ from airflow.models.dagbundle import DagBundleModel from airflow.utils.session import create_session +from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dag_bundles @@ -107,6 +108,16 @@ def test_get_bundle(): assert bundle.version is None +@conf_vars({("dag_bundles", "testbundle"): json.dumps(BASIC_BUNDLE_CONFIG)}) +@pytest.mark.parametrize("version", [None, "hello"]) +def test_view_url(version): + """Test that view_url calls the bundle's view_url method.""" + bundle_manager = DagBundlesManager() + with patch.object(BaseDagBundle, "view_url") as view_url_mock: + bundle_manager.view_url("testbundle", version=version) + view_url_mock.assert_called_once_with(version=version) + + def test_get_all_dag_bundles(): """Test that get_all_dag_bundles returns all bundles.""" diff --git a/tests/dag_processing/test_dag_bundles.py b/tests/dag_processing/test_dag_bundles.py index 343663fb58238..d33c44b341897 100644 --- a/tests/dag_processing/test_dag_bundles.py +++ b/tests/dag_processing/test_dag_bundles.py @@ -19,6 +19,7 @@ import tempfile from pathlib import Path +from unittest import mock import pytest from git import Repo @@ -265,3 +266,25 @@ def test_subdir(self, git_repo): files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()} assert str(bundle.path).endswith(subdir) assert {"some_new_file.py"} == files_in_repo + + @pytest.mark.parametrize( + "repo_url, expected_url", + [ + ("git@github.com:apache/airflow.git", "https://github.com/apache/airflow/tree/0f0f0f"), + ("git@gitlab.com:apache/airflow.git", "https://gitlab.com/apache/airflow/-/tree/0f0f0f"), + ("git@bitbucket.org:apache/airflow.git", "https://bitbucket.org/apache/airflow/src/0f0f0f"), + ], + ) + @mock.patch("airflow.dag_processing.bundles.git.Repo") + @mock.patch.object(GitDagBundle, "_has_version") + def test_view_url(self, mock_has_version, mock_gitrepo, repo_url, expected_url): + mock_has_version.return_value = True + + bundle = GitDagBundle( + name="test", + refresh_interval=300, + repo_url=repo_url, + tracking_ref="main", + ) + view_url = bundle.view_url("0f0f0f") + assert view_url == expected_url