Skip to content

Commit

Permalink
refactor: split collection_reader file
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulFarault committed Oct 31, 2024
1 parent f169b3a commit effa3d8
Show file tree
Hide file tree
Showing 16 changed files with 615 additions and 513 deletions.
282 changes: 0 additions & 282 deletions tdp/core/collection.py

This file was deleted.

9 changes: 9 additions & 0 deletions tdp/core/collections/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright 2022 TOSIT.IO
# SPDX-License-Identifier: Apache-2.0

from .check_collection_structure import (
MissingMandatoryDirectoryError,
PathDoesNotExistsError,
PathIsNotADirectoryError,
)
from .collections import Collections
57 changes: 57 additions & 0 deletions tdp/core/collections/check_collection_structure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2022 TOSIT.IO
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import logging
from pathlib import Path

from tdp.core.constants import (
DAG_DIRECTORY_NAME,
DEFAULT_VARS_DIRECTORY_NAME,
PLAYBOOKS_DIRECTORY_NAME,
)

MANDATORY_DIRECTORIES = [
DAG_DIRECTORY_NAME,
DEFAULT_VARS_DIRECTORY_NAME,
PLAYBOOKS_DIRECTORY_NAME,
]


class PathDoesNotExistsError(Exception):
pass


class PathIsNotADirectoryError(Exception):
pass


class MissingMandatoryDirectoryError(Exception):
pass


logger = logging.getLogger(__name__)


def check_collection_structure(path: Path) -> None:
"""Check the structure of a collection.
Args:
path: Path to the collection.
Raises:
PathDoesNotExistsError: If the path does not exists.
PathIsNotADirectoryError: If the path is not a directory.
MissingMandatoryDirectoryError: If the collection does not contain a mandatory directory.
"""
if not path.exists():
raise PathDoesNotExistsError(f"{path} does not exists.")
if not path.is_dir():
raise PathIsNotADirectoryError(f"{path} is not a directory.")
for mandatory_directory in MANDATORY_DIRECTORIES:
mandatory_path = path / mandatory_directory
if not mandatory_path.exists() or not mandatory_path.is_dir():
raise MissingMandatoryDirectoryError(
f"{path} does not contain the mandatory directory {mandatory_directory}.",
)
Loading

0 comments on commit effa3d8

Please sign in to comment.