diff --git a/.coverage_consumer b/.coverage_consumer new file mode 100644 index 0000000..ee56acd Binary files /dev/null and b/.coverage_consumer differ diff --git a/.env b/.env index 19bf897..0b24f8e 100644 --- a/.env +++ b/.env @@ -10,6 +10,7 @@ RABBITMQ_USERNAME=rabbitmq RABBITMQ_PASSWORD=rabbitmq RABBITMQ_PORT=5672 RABBITMQ_WEBAPP_PORT=15672 +RABBITMQ_POLLING_TIMEOUT=60 QUEUE_NAME=filenames diff --git a/Makefile b/Makefile index 3411d53..95fea5c 100644 --- a/Makefile +++ b/Makefile @@ -43,7 +43,7 @@ test_consumer: export RABBITMQ_USERNAME=$(RABBITMQ_USERNAME) && \ export RABBITMQ_PASSWORD=$(RABBITMQ_PASSWORD) && \ export QUEUE_NAME=$(QUEUE_NAME) && \ - COVERAGE_FILE=.coverage_consumer coverage run -m pytest -vxs consumer/tests + COVERAGE_FILE=.coverage_consumer coverage run -m pytest -vxs consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq coverage_report: coverage combine .coverage_producer .coverage_consumer && \ coverage report -m --omit="*/tests/*" diff --git a/consumer/src/adapters/fetch_filenames/rabbitmq.py b/consumer/src/adapters/fetch_filenames/rabbitmq.py index 68534e6..c166d67 100644 --- a/consumer/src/adapters/fetch_filenames/rabbitmq.py +++ b/consumer/src/adapters/fetch_filenames/rabbitmq.py @@ -1,10 +1,12 @@ from contextlib import contextmanager +from datetime import datetime +import time from ...usecases import FetchFilenameClient import pika from pika.adapters.blocking_connection import BlockingChannel from pika.spec import Basic, BasicProperties from pika.connection import Connection -from typing import Iterator, Optional +from typing import Generator, Iterator, Optional from typing_extensions import override from collections.abc import Callable import logging @@ -25,6 +27,7 @@ def __init__( self._queue = queue self._conn: Optional[Connection] = None self._polling_timeout = polling_timeout + self._last_poll_time: Optional[datetime] = None def _reset_conn(self) -> None: self._conn = None @@ -42,36 +45,44 @@ def _get_amqp_conn(self) -> Iterator[pika.BaseConnection]: self._conn = pika.BlockingConnection(conn_parameters) yield self._conn + def _wait(self) -> None: + time.sleep(0.5) + @override - def fetch(self) -> Iterator[str]: + def fetch(self) -> Generator[str, None, None]: while True: try: + method: Optional[Basic.Deliver] = None with self._get_amqp_conn() as connection: channel: BlockingChannel = connection.channel() channel.queue_declare(queue=self._queue, durable=True) - - method: Optional[Basic.Deliver] properties: Optional[BasicProperties] body: Optional[bytes] - for method, properties, body in channel.consume( - queue=self._queue, inactivity_timeout=self._polling_timeout - ): - if method == None and properties == None and body == None: - raise StopIteration - try: - yield body.decode("utf-8") - channel.basic_ack(delivery_tag=method.delivery_tag) - except Exception as e: - logging.exception(e) - channel.basic_nack(delivery_tag=method.delivery_tag) - raise e - except StopIteration: - logging.info("No more filenames to fetch") - break + + method, properties, body = channel.basic_get( + queue=self._queue, auto_ack=False + ) + + if method is None and properties is None and body is None: + if self._last_poll_time is None: + self._last_poll_time = datetime.now() + if ( + datetime.now() - self._last_poll_time + ).total_seconds() > self._polling_timeout: + break + self._wait() + continue + + self._last_poll_time = None + + yield body.decode() + + channel.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logging.exception(e) + if method is not None: + channel.basic_reject(delivery_tag=method.delivery_tag, requeue=True) self._reset_conn() - raise e @override def close(self) -> bool: diff --git a/consumer/src/deployments/scripts/__init__.py b/consumer/src/deployments/script/__init__.py similarity index 100% rename from consumer/src/deployments/scripts/__init__.py rename to consumer/src/deployments/script/__init__.py diff --git a/consumer/src/deployments/scripts/config.py b/consumer/src/deployments/script/config.py similarity index 99% rename from consumer/src/deployments/scripts/config.py rename to consumer/src/deployments/script/config.py index 071018d..a7ed314 100644 --- a/consumer/src/deployments/scripts/config.py +++ b/consumer/src/deployments/script/config.py @@ -18,7 +18,7 @@ class RabbitMQConfig: USERNAME = os.getenv("RABBITMQ_USERNAME", "guest") PASSWORD = os.getenv("RABBITMQ_PASSWORD", "guest") QUEUE = os.getenv("RABBITMQ_QUEUE", "filenames") - POLLING_TIMEOUT = int(os.getenv("RABBITMQ_POLLING_TIMEOUT", 600)) + POLLING_TIMEOUT = int(os.getenv("RABBITMQ_POLLING_TIMEOUT", 10)) class PostgresConfig: diff --git a/consumer/src/deployments/scripts/main.py b/consumer/src/deployments/script/main.py similarity index 100% rename from consumer/src/deployments/scripts/main.py rename to consumer/src/deployments/script/main.py diff --git a/consumer/src/deployments/scripts/setup_logging.py b/consumer/src/deployments/script/setup_logging.py similarity index 100% rename from consumer/src/deployments/scripts/setup_logging.py rename to consumer/src/deployments/script/setup_logging.py diff --git a/consumer/src/usecases/fetch_filenames.py b/consumer/src/usecases/fetch_filenames.py index 39f0594..c63f791 100644 --- a/consumer/src/usecases/fetch_filenames.py +++ b/consumer/src/usecases/fetch_filenames.py @@ -1,10 +1,10 @@ from abc import ABC, abstractmethod -from typing import Iterator +from typing import Generator class FetchFilenameClient(ABC): @abstractmethod - def fetch(self) -> Iterator[str]: + def fetch(self) -> Generator[str, None, None]: ... @abstractmethod diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/conftest.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/conftest.py index 0f6b85a..7ca45ac 100644 --- a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/conftest.py +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/conftest.py @@ -1,12 +1,12 @@ from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient -from src.deployments.scripts.config import RabbitMQConfig +from src.deployments.script.config import RabbitMQConfig import pika import pytest from pytest import MonkeyPatch @pytest.fixture(scope="function") -def rabbitmq_fetch_filenames_client() -> RabbitMQFetchFilenamesClient: +def rabbitmq_fetch_filenames_client() -> RabbitMQConfig: return RabbitMQFetchFilenamesClient( host=RabbitMQConfig.HOST, port=RabbitMQConfig.PORT, @@ -16,6 +16,17 @@ def rabbitmq_fetch_filenames_client() -> RabbitMQFetchFilenamesClient: ) +@pytest.fixture(scope="function") +def rabbitmq_fetch_filenames_no_wait_client() -> RabbitMQConfig: + return RabbitMQFetchFilenamesClient( + host=RabbitMQConfig.HOST, + port=RabbitMQConfig.PORT, + credentials_service=lambda: (RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD), + queue=RabbitMQConfig.QUEUE, + polling_timeout=0, + ) + + @pytest.fixture(scope="function") def raw_rabbitmq_pika_conn_config() -> tuple[pika.BaseConnection, str]: pika_conn = pika.BlockingConnection( @@ -28,3 +39,16 @@ def raw_rabbitmq_pika_conn_config() -> tuple[pika.BaseConnection, str]: ) ) return pika_conn, RabbitMQConfig.QUEUE + + +@pytest.fixture(scope="function", autouse=True) +def setup_teardown_rabbitmq_queue( + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], +) -> None: + pika_conn, queue = raw_rabbitmq_pika_conn_config + + channel = pika_conn.channel() + channel.queue_declare(queue=queue, durable=True) + channel.queue_purge(queue=queue) + yield + channel.queue_purge(queue=queue) diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_failed.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_failed.py new file mode 100644 index 0000000..ee46fa5 --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_failed.py @@ -0,0 +1,44 @@ +from pytest import MonkeyPatch, LogCaptureFixture +import pika +from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +from .utils import random_csv_filenames + + +def test_close_conn_failed( + rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, +): + conn, _ = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare( + queue=rabbitmq_fetch_filenames_no_wait_client._queue, durable=True + ) + + channel.basic_publish( + exchange="", + routing_key=rabbitmq_fetch_filenames_no_wait_client._queue, + body=random_csv_filenames()[0], + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + for filename in rabbitmq_fetch_filenames_no_wait_client.fetch(): + assert filename is not None + + assert rabbitmq_fetch_filenames_no_wait_client._conn is not None + + def mock_failed_close( + self, + *args, + **kwargs, + ) -> None: + raise Exception("Failed to close!") + + monkeypatch.setattr(pika.BlockingConnection, "close", mock_failed_close) + + with caplog.at_level("ERROR"): + assert not rabbitmq_fetch_filenames_no_wait_client.close() + assert "Failed to close!" in caplog.text diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_successful.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_successful.py new file mode 100644 index 0000000..756d329 --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_successful.py @@ -0,0 +1,16 @@ +from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient + + +def test_close_conn_successful( + rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, +): + for _ in rabbitmq_fetch_filenames_no_wait_client.fetch(): + pass + assert rabbitmq_fetch_filenames_no_wait_client._conn is not None + assert rabbitmq_fetch_filenames_no_wait_client.close() + + +def test_none_conn_close_successful( + rabbitmq_fetch_filenames_client: RabbitMQFetchFilenamesClient, +): + assert rabbitmq_fetch_filenames_client.close() diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_conn.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_conn.py new file mode 100644 index 0000000..e4d9e2c --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_conn.py @@ -0,0 +1,142 @@ +import pytest +from .utils import random_csv_filenames +from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +from src.deployments.script.config import RabbitMQConfig +import pika +from pytest import MonkeyPatch + + +@pytest.mark.smoke +def test_fetch_failed_conn( + rabbitmq_fetch_filenames_client: RabbitMQFetchFilenamesClient, + monkeypatch: MonkeyPatch, +): + def mocked_failed_conn( + self, + *args, + **kwargs, + ) -> None: + raise Exception("Failed to connect") + + monkeypatch.setattr(pika.BlockingConnection, "__init__", mocked_failed_conn) + + monkeypatch.setattr(RabbitMQFetchFilenamesClient, "_reset_conn", mocked_failed_conn) + + with pytest.raises(Exception, match="^Failed to connect$"): + next(rabbitmq_fetch_filenames_client.fetch()) + + monkeypatch.undo() + monkeypatch.undo() + + +@pytest.mark.smoke +def test_fetch_wrong_credentials( + monkeypatch: MonkeyPatch, +): + rabbitmq_fetch_filenames_client = RabbitMQFetchFilenamesClient( + host=RabbitMQConfig.HOST, + port=RabbitMQConfig.PORT, + credentials_service=lambda: ("wrong", "wrong"), + queue=RabbitMQConfig.QUEUE, + polling_timeout=RabbitMQConfig.POLLING_TIMEOUT, + ) + + def mocked_failed_conn( + self, + *args, + **kwargs, + ) -> None: + raise Exception("Failed to connect") + + monkeypatch.setattr(RabbitMQFetchFilenamesClient, "_reset_conn", mocked_failed_conn) + + with pytest.raises(Exception, match="^Failed to connect$"): + next(rabbitmq_fetch_filenames_client.fetch()) + + monkeypatch.undo() + + +@pytest.mark.slow +@pytest.mark.smoke +def test_publish_single_wrong_host( + monkeypatch: MonkeyPatch, +): + rabbitmq_fetch_filenames_client = RabbitMQFetchFilenamesClient( + host="wrong", + port=RabbitMQConfig.PORT, + credentials_service=lambda: (RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD), + queue=RabbitMQConfig.QUEUE, + polling_timeout=RabbitMQConfig.POLLING_TIMEOUT, + ) + + def mocked_failed_conn( + self, + *args, + **kwargs, + ) -> None: + raise Exception("Failed to connect") + + monkeypatch.setattr(RabbitMQFetchFilenamesClient, "_reset_conn", mocked_failed_conn) + + with pytest.raises(Exception, match="^Failed to connect$") as e: + next(rabbitmq_fetch_filenames_client.fetch()) + + monkeypatch.undo() + + +@pytest.mark.slow +def test_fetch_failed_conn_reset_conn( + rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + monkeypatch: MonkeyPatch, +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare(queue=queue, durable=True) + + first_published_filename = random_csv_filenames()[0] + second_published_filename = random_csv_filenames()[1] + + channel.basic_publish( + exchange="", + routing_key=rabbitmq_fetch_filenames_no_wait_client._queue, + body=first_published_filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + for i, filename in enumerate(rabbitmq_fetch_filenames_no_wait_client.fetch()): + if i == 0: + assert rabbitmq_fetch_filenames_no_wait_client._conn is not None + conn = rabbitmq_fetch_filenames_no_wait_client._conn + + assert filename == first_published_filename + channel.basic_publish( + exchange="", + routing_key=rabbitmq_fetch_filenames_no_wait_client._queue, + body=second_published_filename, + properties=pika.BasicProperties( + delivery_mode=pika.DeliveryMode.Persistent + ), + ) + + counter = 0 + + def mock_failed_fetch( + self, + *args, + **kwargs, + ) -> None: + nonlocal counter + + if counter == 0: + counter += 1 + monkeypatch.undo() + raise Exception("Failed to fetch!") + + monkeypatch.setattr(pika.channel.Channel, "basic_get", mock_failed_fetch) + if i == 1: + assert filename == second_published_filename + assert rabbitmq_fetch_filenames_no_wait_client._conn is not None + assert rabbitmq_fetch_filenames_no_wait_client._conn != conn diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_fetch.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_fetch.py new file mode 100644 index 0000000..4e8aec1 --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_fetch.py @@ -0,0 +1,98 @@ +import pytest +from .utils import random_csv_filenames +from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +import pika +import pytest +from pytest import LogCaptureFixture, MonkeyPatch + + +@pytest.mark.smoke +@pytest.mark.parametrize("filename", random_csv_filenames()) +def test_fetch_single_exception_resilience( + rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + filename: str, + monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + channel.queue_declare( + queue=queue, + durable=True, + ) + + counter = 0 + + def mock_failed_fetch( + self, + *args, + **kwargs, + ) -> None: + nonlocal counter + + if counter == 0: + counter += 1 + monkeypatch.undo() + raise Exception("Failed to fetch!") + + monkeypatch.setattr(pika.channel.Channel, "basic_get", mock_failed_fetch) + with caplog.at_level("ERROR"): + for fetched_filename in rabbitmq_fetch_filenames_no_wait_client.fetch(): + assert fetched_filename == filename + assert "Failed to fetch!" in caplog.text + + +@pytest.mark.smoke +@pytest.mark.parametrize( + "filenames", + [random_csv_filenames() for _ in range(5)], +) +def test_fetch_batch_exception_resilience( + rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + filenames: list[str], + monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + channel.queue_declare( + queue=queue, + durable=True, + ) + + for filename in filenames: + channel.basic_publish( + exchange="", + routing_key=rabbitmq_fetch_filenames_no_wait_client._queue, + body=filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + counter = 0 + + def mock_failed_fetch( + self, + *args, + **kwargs, + ) -> None: + nonlocal counter + + if counter == 0: + counter += 1 + monkeypatch.undo() + raise Exception("Failed to fetch!") + + monkeypatch.setattr(pika.channel.Channel, "basic_get", mock_failed_fetch) + + all_filenames = [] + + with caplog.at_level("ERROR"): + for fetched_filename in rabbitmq_fetch_filenames_no_wait_client.fetch(): + all_filenames.append(fetched_filename) + assert "Failed to fetch!" in caplog.text + + assert sorted(all_filenames) == sorted(filenames) diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_poll_until_timeout.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_poll_until_timeout.py new file mode 100644 index 0000000..6039dab --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_poll_until_timeout.py @@ -0,0 +1,46 @@ +import time +import pytest +from .utils import random_csv_filenames +from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +import pika +import pytest + + +@pytest.mark.smoke +@pytest.mark.parametrize("timeout", [0.5 * i for i in range(1, 5)]) +def test_fetch_none_wait_timeout( + rabbitmq_fetch_filenames_client: RabbitMQFetchFilenamesClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + timeout: int, +): + new_rabbitmq_fetch_filenames_client = RabbitMQFetchFilenamesClient( + host=rabbitmq_fetch_filenames_client._host, + port=rabbitmq_fetch_filenames_client._port, + credentials_service=rabbitmq_fetch_filenames_client._credentials_service, + queue=rabbitmq_fetch_filenames_client._queue, + polling_timeout=timeout, + ) + + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare(queue=queue, durable=True) + + filename = random_csv_filenames()[0] + + channel.basic_publish( + exchange="", + routing_key=queue, + body=filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + start_time = time.perf_counter() + + for fetched_filename in new_rabbitmq_fetch_filenames_client.fetch(): + assert fetched_filename == filename + + end_time = time.perf_counter() + + assert end_time - start_time >= timeout diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_successful_fetch.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_successful_fetch.py new file mode 100644 index 0000000..8fdeb4c --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_successful_fetch.py @@ -0,0 +1,60 @@ +import pytest +from .utils import random_csv_filenames +from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +import pika +import pytest + + +@pytest.mark.smoke +@pytest.mark.parametrize("filename", random_csv_filenames()) +def test_fetch_single_success( + rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + filename: str, +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=rabbitmq_fetch_filenames_no_wait_client._queue, + body=filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + for fetched_filename in rabbitmq_fetch_filenames_no_wait_client.fetch(): + assert fetched_filename == filename + + +@pytest.mark.smoke +@pytest.mark.parametrize( + "filenames", + [random_csv_filenames() for _ in range(5)], +) +def test_publish_batch_success( + rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + filenames: list[str], +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare(queue=queue, durable=True) + + for filename in filenames: + channel.basic_publish( + exchange="", + routing_key=queue, + body=filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + all_filenames = [] + for filename in rabbitmq_fetch_filenames_no_wait_client.fetch(): + all_filenames.append(filename) + + assert sorted(all_filenames) == sorted(filenames) diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/utils.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/utils.py new file mode 100644 index 0000000..8cc804a --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/utils.py @@ -0,0 +1,8 @@ +import random +import string + + +def random_csv_filenames() -> list[str]: + return [ + "".join(random.choices(string.ascii_letters, k=10)) + ".csv" for _ in range(5) + ] diff --git a/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/conftest.py b/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/conftest.py index 6e1d551..4131138 100644 --- a/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/conftest.py +++ b/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/conftest.py @@ -1,5 +1,5 @@ from src.adapters.upsert_iot_records.postgres import PostgresUpsertIOTRecordsClient -from src.deployments.scripts.config import PostgresConfig +from src.deployments.script.config import PostgresConfig import psycopg2 import pytest diff --git a/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/test_failed_conn.py b/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/test_failed_conn.py index 2edb0ad..7089aa3 100644 --- a/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/test_failed_conn.py +++ b/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/test_failed_conn.py @@ -1,6 +1,6 @@ import pytest from src.adapters.upsert_iot_records.postgres import PostgresUpsertIOTRecordsClient -from src.deployments.scripts.config import PostgresConfig +from src.deployments.script.config import PostgresConfig from src.entities import IOTRecord import psycopg2 from .utils import random_iot_records, MockedPostgresConnection @@ -24,9 +24,8 @@ def mocked_failed_conn( monkeypatch.setattr(psycopg2, "connect", mocked_failed_conn) - with pytest.raises(Exception) as e: + with pytest.raises(Exception, match="^Failed to connect$"): assert not postgres_upsert_iot_records_client.upsert(iot_record) - assert e.value == "Failed to connect" with raw_postgres_psycopg2_conn_config.cursor() as cursor: cursor.execute( @@ -65,9 +64,8 @@ def mocked_failed_conn( monkeypatch.setattr(psycopg2, "connect", mocked_failed_conn) - with pytest.raises(Exception) as e: + with pytest.raises(Exception, match="^Failed to connect$"): assert not any(postgres_upsert_iot_records_client.upsert(iot_records)) - assert e.value == "Failed to connect" with raw_postgres_psycopg2_conn_config.cursor() as cursor: stmt = """ @@ -105,9 +103,8 @@ def test_upsert_single_wrong_credentials( batch_upsert_size=1, ) - with pytest.raises(Exception) as e: + with pytest.raises(Exception, match="^.*403.*ACCESS_REFISED.*$"): assert not postgres_upsert_iot_records_client.upsert(iot_record) - assert "ACCESS_REFUSED" in e.value and "403" in e.value with raw_postgres_psycopg2_conn_config.cursor() as cursor: cursor.execute( @@ -141,9 +138,8 @@ def test_upsert_single_wrong_host( batch_upsert_size=1, ) - with pytest.raises(Exception) as e: + with pytest.raises(Exception, match="^.*403.*ACCESS_REFUSED.*$"): assert not postgres_upsert_iot_records_client.upsert(iot_record) - assert "ACCESS_REFUSED" in e.value and "403" in e.value with raw_postgres_psycopg2_conn_config.cursor() as cursor: cursor.execute( diff --git a/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/test_failed_upsert.py b/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/test_failed_upsert.py index a5f5425..12b7071 100644 --- a/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/test_failed_upsert.py +++ b/consumer/tests/test_adapters/test_upsert_iot_records/test_postgres/test_failed_upsert.py @@ -20,9 +20,8 @@ def test_upsert_single_failed( psycopg2, "connect", lambda *args, **kwargs: MockedPostgresConnection() ) - with pytest.raises(Exception) as e: + with pytest.raises(Exception, match="^Failed to execute!$"): assert not postgres_upsert_iot_records_client.upsert(iot_record) - assert e.value == "Failed to execute!" with raw_postgres_psycopg2_conn_config.cursor() as cursor: cursor.execute( @@ -56,9 +55,8 @@ def test_upsert_batch_failed( psycopg2, "connect", lambda *args, **kwargs: MockedPostgresConnection() ) - with pytest.raises(Exception) as e: + with pytest.raises(Exception, match="^Failed to execute!$"): assert not any(postgres_upsert_iot_records_client.upsert(iot_records)) - assert e.value == "Failed to execute!" with raw_postgres_psycopg2_conn_config.cursor() as cursor: stmt = """ @@ -141,13 +139,12 @@ def mocked_partially_failed_upsert( MockedPostgresCursor, "executemany", mocked_partially_failed_upsert ) - with pytest.raises(Exception) as e: + with pytest.raises(Exception, match="^Failed to execute!"): upsert_successes = new_postgres_upsert_iot_records_client.upsert(iot_records) assert not all(upsert_successes) assert any(upsert_successes) assert upsert_successes[2] == False - assert e.value == "Failed to execute!" successful_records = [ iot_record diff --git a/producer/tests/test_adapters/test_publish_filenames/conftest.py b/producer/tests/test_adapters/test_publish_filenames/conftest.py deleted file mode 100644 index e69de29..0000000 diff --git a/producer/tests/test_adapters/test_publish_filenames/test_rabbitmq/test_failed_conn.py b/producer/tests/test_adapters/test_publish_filenames/test_rabbitmq/test_failed_conn.py index 7e82051..6b7dd0e 100644 --- a/producer/tests/test_adapters/test_publish_filenames/test_rabbitmq/test_failed_conn.py +++ b/producer/tests/test_adapters/test_publish_filenames/test_rabbitmq/test_failed_conn.py @@ -3,7 +3,7 @@ from src.adapters.publish_filenames.rabbitmq import RabbitMQPublishFilenamesClient from src.deployments.script.config import RabbitMQConfig import pika -from pytest import MonkeyPatch +from pytest import LogCaptureFixture, MonkeyPatch @pytest.mark.smoke @@ -13,6 +13,7 @@ def test_publish_single_failed_conn( raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filename: str, monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, ): def mocked_failed_conn( self, @@ -23,9 +24,9 @@ def mocked_failed_conn( monkeypatch.setattr(pika.BlockingConnection, "__init__", mocked_failed_conn) - with pytest.raises(Exception) as e: + with caplog.at_level("ERROR"): assert not rabbitmq_publish_filenames_client.publish(filename) - assert e.value == "Failed to connect" + assert "Failed to connect" in caplog.text pika_conn, queue = raw_rabbitmq_pika_conn_config @@ -45,6 +46,7 @@ def test_publish_batch_failed_conn( raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filenames: list[str], monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, ): def mocked_failed_conn( self, @@ -55,9 +57,9 @@ def mocked_failed_conn( monkeypatch.setattr(pika.BlockingConnection, "__init__", mocked_failed_conn) - with pytest.raises(Exception) as e: + with caplog.at_level("ERROR"): assert not any(rabbitmq_publish_filenames_client.publish(filenames)) - assert e.value == "Failed to connect" + assert "Failed to connect" in caplog.text pika_conn, queue = raw_rabbitmq_pika_conn_config @@ -73,6 +75,7 @@ def mocked_failed_conn( def test_publish_single_wrong_credentials( raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filename: str, + caplog: LogCaptureFixture, ): rabbitmq_publish_filenames_client = RabbitMQPublishFilenamesClient( host=RabbitMQConfig.HOST, @@ -81,9 +84,9 @@ def test_publish_single_wrong_credentials( queue=RabbitMQConfig.QUEUE, ) - with pytest.raises(Exception) as e: + with caplog.at_level("ERROR"): assert not rabbitmq_publish_filenames_client.publish(filename) - assert "ACCESS_REFUSED" in e.value and "403" in e.value + assert "ACCESS_REFUSED" in caplog.text and "403" in caplog.text pika_conn, queue = raw_rabbitmq_pika_conn_config channel = pika_conn.channel() @@ -98,6 +101,7 @@ def test_publish_single_wrong_credentials( def test_publish_single_wrong_host( raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filename: str, + caplog: LogCaptureFixture, ): rabbitmq_publish_filenames_client = RabbitMQPublishFilenamesClient( host="wrong", @@ -106,9 +110,9 @@ def test_publish_single_wrong_host( queue=RabbitMQConfig.QUEUE, ) - with pytest.raises(Exception) as e: + with caplog.at_level("ERROR"): assert not rabbitmq_publish_filenames_client.publish(filename) - assert "ACCESS_REFUSED" in e.value and "403" in e.value + assert "Name or service not known" in caplog.text pika_conn, queue = raw_rabbitmq_pika_conn_config channel = pika_conn.channel() diff --git a/producer/tests/test_adapters/test_publish_filenames/test_rabbitmq/test_failed_publish.py b/producer/tests/test_adapters/test_publish_filenames/test_rabbitmq/test_failed_publish.py index 0750723..ff4cd2b 100644 --- a/producer/tests/test_adapters/test_publish_filenames/test_rabbitmq/test_failed_publish.py +++ b/producer/tests/test_adapters/test_publish_filenames/test_rabbitmq/test_failed_publish.py @@ -3,7 +3,7 @@ from src.adapters.publish_filenames.rabbitmq import RabbitMQPublishFilenamesClient import pika import pytest -from pytest import MonkeyPatch +from pytest import LogCaptureFixture, MonkeyPatch @pytest.mark.smoke @@ -13,10 +13,11 @@ def test_publish_single_failed( rabbitmq_publish_filenames_client: RabbitMQPublishFilenamesClient, raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filename: str, + caplog: LogCaptureFixture, ): - with pytest.raises(Exception) as e: + with caplog.at_level("ERROR"): assert not rabbitmq_publish_filenames_client.publish(filename) - assert e.value == "Failed to publish" + assert "Failed to publish" in caplog.text pika_conn, queue = raw_rabbitmq_pika_conn_config @@ -36,10 +37,11 @@ def test_publish_batch_failed( rabbitmq_publish_filenames_client: RabbitMQPublishFilenamesClient, raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filenames: list[str], + caplog: LogCaptureFixture, ): - with pytest.raises(Exception) as e: + with caplog.at_level("ERROR"): assert not any(rabbitmq_publish_filenames_client.publish(filenames)) - assert e.value == "Failed to publish" + assert "Failed to publish" in caplog.text pika_conn, queue = raw_rabbitmq_pika_conn_config @@ -59,6 +61,7 @@ def test_publish_batch_partial_failed( raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filenames: list[str], monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, ): counter = 0 @@ -94,7 +97,7 @@ def mocked_partially_failed_basic_publish( mocked_partially_failed_basic_publish, ) - with pytest.raises(Exception) as e: + with caplog.at_level("ERROR"): publish_successes = rabbitmq_publish_filenames_client.publish(filenames) successes_filenames = [ @@ -105,7 +108,7 @@ def mocked_partially_failed_basic_publish( assert not all(publish_successes) assert any(publish_successes) assert publish_successes[2] == False - assert e.value == "Failed to publish" + assert "Failed to publish" in caplog.text pika_conn, queue = raw_rabbitmq_pika_conn_config diff --git a/producer/tests/test_deployments/test_main/test_main_function_failed.py b/producer/tests/test_deployments/test_main/test_main_function_failed.py index 27a18e9..aa708c5 100644 --- a/producer/tests/test_deployments/test_main/test_main_function_failed.py +++ b/producer/tests/test_deployments/test_main/test_main_function_failed.py @@ -36,8 +36,7 @@ def test_main_flow_has_failed_files( "src.adapters.publish_filenames.rabbitmq.RabbitMQPublishFilenamesClient.publish", lambda self, filename: False, ) - caplog.at_level("CRITICAL") - with pytest.raises(Exception) as e: - main() - assert "Failed to publish filenames" in str(e.value) - assert "Failed to publish filenames" in caplog.text + with caplog.at_level("ERROR"): + with pytest.raises(Exception, match="^Failed to publish filenames.*$"): + main() + assert "Failed to publish filenames" in caplog.text