Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add view_url for DagBundles #45126

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/dag_processing/bundles/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,11 @@ def get_current_version(self) -> str | None:
@abstractmethod
def refresh(self) -> None:
"""Retrieve the latest version of the files in the bundle."""

def view_url(self, version: str | None = None) -> str | None:
"""
URL to view the bundle.

:param version: Version to view
:return: URL to view the bundle
"""
26 changes: 26 additions & 0 deletions airflow/dag_processing/bundles/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,29 @@ def refresh(self) -> None:

self.bare_repo.remotes.origin.fetch("+refs/heads/*:refs/heads/*")
self.repo.remotes.origin.pull()

def _convert_git_ssh_url_to_https(self) -> str:
if self.repo_url.startswith("git@"):
parts = self.repo_url.split(":")
domain = parts[0].replace("git@", "https://")
repo_path = parts[1].replace(".git", "")
return f"{domain}/{repo_path}"
raise ValueError(f"Invalid git SSH URL: {self.repo_url}")

def view_url(self, version: str | None = None) -> str:
if not version:
raise AirflowException("Version is required to view the repository")
if not self._has_version(self.repo, version):
raise AirflowException(f"Version {version} not found in the repository")
url = self.repo_url
if url.startswith("git@"):
url = self._convert_git_ssh_url_to_https()
if url.endswith(".git"):
url = url[:-4]
if "github" in url:
return f"{url}/tree/{version}"
if "gitlab" in url:
return f"{url}/-/tree/{version}"
if "bitbucket" in url:
return f"{url}/src/{version}"
return f"{url}/tree/{version}"
4 changes: 4 additions & 0 deletions airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,7 @@ def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle:
bundle_config = self.bundle_configs[name]
bundle_class = import_string(bundle_config["classpath"])
return bundle_class(name=name, version=version, **bundle_config["kwargs"])

def view_url(self, name: str, version: str | None = None) -> str | None:
bundle = self.get_bundle(name, version)
return bundle.view_url(version=version)
11 changes: 11 additions & 0 deletions tests/dag_processing/bundles/test_dag_bundle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.models.dagbundle import DagBundleModel
from airflow.utils.session import create_session

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_dag_bundles


Expand Down Expand Up @@ -107,6 +108,16 @@ def test_get_bundle():
assert bundle.version is None


@conf_vars({("dag_bundles", "testbundle"): json.dumps(BASIC_BUNDLE_CONFIG)})
@pytest.mark.parametrize("version", [None, "hello"])
def test_view_url(version):
"""Test that view_url calls the bundle's view_url method."""
bundle_manager = DagBundlesManager()
with patch.object(BaseDagBundle, "view_url") as view_url_mock:
bundle_manager.view_url("testbundle", version=version)
view_url_mock.assert_called_once_with(version=version)


def test_get_all_dag_bundles():
"""Test that get_all_dag_bundles returns all bundles."""

Expand Down
47 changes: 47 additions & 0 deletions tests/dag_processing/test_dag_bundles.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import tempfile
from pathlib import Path
from unittest import mock

import pytest
from git import Repo
Expand Down Expand Up @@ -265,3 +266,49 @@ def test_subdir(self, git_repo):
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert str(bundle.path).endswith(subdir)
assert {"some_new_file.py"} == files_in_repo

@pytest.mark.parametrize(
"repo_url, expected_url",
[
("[email protected]:apache/airflow.git", "https://github.com/apache/airflow/tree/0f0f0f"),
("[email protected]:apache/airflow.git", "https://gitlab.com/apache/airflow/-/tree/0f0f0f"),
("[email protected]:apache/airflow.git", "https://bitbucket.org/apache/airflow/src/0f0f0f"),
],
)
@mock.patch("airflow.dag_processing.bundles.git.Repo")
@mock.patch.object(GitDagBundle, "_has_version")
def test_view_url(self, mock_has_version, mock_gitrepo, repo_url, expected_url):
mock_has_version.return_value = True

bundle = GitDagBundle(
name="test",
refresh_interval=300,
repo_url=repo_url,
tracking_ref="main",
)
view_url = bundle.view_url("0f0f0f")
assert view_url == expected_url

@mock.patch("airflow.dag_processing.bundles.git.Repo")
def test_view_url_raises_if_no_version(self, mock_gitrepo):
bundle = GitDagBundle(
name="test",
refresh_interval=300,
repo_url="[email protected]:apache/airflow.git",
tracking_ref="main",
)
with pytest.raises(AirflowException, match="Version is required to view the repository"):
bundle.view_url(None)

@mock.patch("airflow.dag_processing.bundles.git.Repo")
@mock.patch.object(GitDagBundle, "_has_version")
def test_view_url_raises_if_version_not_found(self, mock_has_version, mock_gitrepo):
mock_has_version.return_value = False
bundle = GitDagBundle(
name="test",
refresh_interval=300,
repo_url="[email protected]:apache/airflow.git",
tracking_ref="main",
)
with pytest.raises(AirflowException, match="Version not_found not found in the repository"):
bundle.view_url("not_found")
Loading