diff --git a/.env b/.env index 750e8fd..ee2c7c9 100644 --- a/.env +++ b/.env @@ -32,6 +32,7 @@ CONSUMER_LOG_DATE_FORMAT="%Y-%m-%d %H:%M:%S" CONSUMER_LOG_DIR=./logs/producer CONSUMER_LOG_RETENTION=7 CONSUMER_LOG_ROTATION=midnight +CONSUMER_REPLICAS=16 CSV_PARSER_RECOGNIZED_DATETIME_FORMATS="%Y-%m-%dT%H:%M:%S.%f%z" CSV_PARSER_DELIMITER="," diff --git a/Makefile b/Makefile index 86d53ca..78c985f 100644 --- a/Makefile +++ b/Makefile @@ -45,7 +45,7 @@ test_consumer: export QUEUE_NAME=$(QUEUE_NAME) && \ export CSV_PARSER_RECOGNIZED_DATETIME_FORMATS=$(CSV_PARSER_RECOGNIZED_DATETIME_FORMATS) && \ export CSV_PARSER_DELIMITER=$(CSV_PARSER_DELIMITER) && \ - COVERAGE_FILE=.coverage_consumer coverage run -m pytest -vxs consumer/tests + COVERAGE_FILE=.coverage_consumer coverage run -m pytest -vx --last-failed consumer/tests coverage_report: coverage combine .coverage_producer .coverage_consumer && \ coverage report -m --omit="*/tests/*" diff --git a/consumer/src/adapters/fetch_filenames/rabbitmq.py b/consumer/src/adapters/fetch_filenames/rabbitmq.py deleted file mode 100644 index fe858bc..0000000 --- a/consumer/src/adapters/fetch_filenames/rabbitmq.py +++ /dev/null @@ -1,94 +0,0 @@ -from contextlib import contextmanager -from datetime import datetime -import time -from ...usecases import FetchFilenameClient -import pika -from pika.adapters.blocking_connection import BlockingChannel -from pika.spec import Basic, BasicProperties -from pika.connection import Connection -from typing import Generator, Iterator, Optional -from typing_extensions import override -from collections.abc import Callable -import logging - - -class RabbitMQFetchFilenamesClient(FetchFilenameClient): - def __init__( - self, - host: str, - port: int, - credentials_service: Callable[[], tuple[str, str]], - queue: str = "filenames", - polling_timeout: int = 10, - ) -> None: - self._host = host - self._port = port - self._credentials_service = credentials_service - self._queue = queue - self._conn: Optional[Connection] = None - self._polling_timeout = polling_timeout - self._last_poll_time: Optional[datetime] = None - - def _reset_conn(self) -> None: - self._conn = None - - @contextmanager - def _get_amqp_conn(self) -> Iterator[pika.BaseConnection]: - if self._conn is None or self._conn.is_closed: - username, password = self._credentials_service() - credentials = pika.PlainCredentials(username, password) - conn_parameters = pika.ConnectionParameters( - host=self._host, - port=self._port, - credentials=credentials, - ) - self._conn = pika.BlockingConnection(conn_parameters) - yield self._conn - - def _wait(self) -> None: - time.sleep(0.5) - - @override - def fetch(self) -> Generator[str, None, None]: - while True: - try: - method: Optional[Basic.Deliver] = None - with self._get_amqp_conn() as connection: - channel: BlockingChannel = connection.channel() - channel.queue_declare(queue=self._queue, durable=True) - properties: Optional[BasicProperties] - body: Optional[bytes] - - method, properties, body = channel.basic_get( - queue=self._queue, auto_ack=False - ) - if method is None and properties is None and body is None: - if self._last_poll_time is None: - self._last_poll_time = datetime.now() - if ( - datetime.now() - self._last_poll_time - ).total_seconds() > self._polling_timeout: - break - self._wait() - continue - - self._last_poll_time = None - - yield body.decode() - - channel.basic_ack(delivery_tag=method.delivery_tag) - except Exception as e: - logging.exception(e) - if method is not None: - channel.basic_reject(delivery_tag=method.delivery_tag, requeue=True) - self._reset_conn() - - @override - def close(self) -> bool: - try: - if self._conn is not None: - self._conn.close() - return True - except Exception as e: - logging.exception(e) - return False diff --git a/consumer/src/adapters/fetch_filenames/__init__.py b/consumer/src/adapters/fetch_filenames_stream/__init__.py similarity index 100% rename from consumer/src/adapters/fetch_filenames/__init__.py rename to consumer/src/adapters/fetch_filenames_stream/__init__.py diff --git a/consumer/src/adapters/fetch_filenames_stream/rabbitmq.py b/consumer/src/adapters/fetch_filenames_stream/rabbitmq.py new file mode 100644 index 0000000..bb27e75 --- /dev/null +++ b/consumer/src/adapters/fetch_filenames_stream/rabbitmq.py @@ -0,0 +1,165 @@ +from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager +from datetime import datetime +import time +from ...usecases import FetchFilenameStreamClient +import pika +from pika.adapters.blocking_connection import BlockingChannel +from pika.spec import Basic, BasicProperties +from pika.connection import Connection +from typing import Generator, Iterator, Optional, Sequence, cast, overload +from typing_extensions import override +from collections.abc import Callable +import logging + + +class RabbitMQFetchFilenameStreamClient(FetchFilenameStreamClient[int]): + def __init__( + self, + host: str, + port: int, + credentials_service: Callable[[], tuple[str, str]], + queue: str = "filenames", + polling_timeout: int = 10, + ) -> None: + self._host = host + self._port = port + self._credentials_service = credentials_service + self._queue = queue + self._conn: Optional[Connection] = None + self._channel: Optional[BlockingChannel] = None + self._polling_timeout = polling_timeout + self._last_poll_time: Optional[datetime] = None + + @overload + def ack(self, message_receipt: int) -> bool: + ... + + @overload + def ack(self, message_receipt: Sequence[int]) -> list[bool]: + ... + + @override + def ack(self, message_receipt: int | Sequence[int]) -> bool | list[bool]: + if isinstance(message_receipt, int): + return self._ack_single(message_receipt) + return self._ack_batch(message_receipt) + + def _ack_single(self, message_receipt: int) -> bool: + try: + with self._get_channel() as channel: + channel.basic_ack(delivery_tag=message_receipt, multiple=False) + return True + except Exception as e: + logging.exception(e) + return False + + def _ack_batch(self, message_receipts: Sequence[int]) -> list[bool]: + #! RabbitMQ is not thread-safe, so we have to use a single thread to ack + results: list[bool] = [] + for receipt in message_receipts: + results.append(self._ack_single(receipt)) + return results + + @overload + def reject(self, message_receipt: int) -> bool: + ... + + @overload + def reject(self, message_receipt: Sequence[int]) -> list[bool]: + ... + + @override + def reject(self, message_receipt: int | Sequence[int]) -> bool | list[bool]: + if isinstance(message_receipt, int): + return self._reject_single(message_receipt) + return self._reject_batch(message_receipt) + + def _reject_single(self, message_receipt: int) -> bool: + try: + with self._get_channel() as channel: + channel.basic_nack(delivery_tag=message_receipt, requeue=True) + return True + except Exception as e: + logging.exception(e) + return False + + def _reject_batch(self, message_receipts: Sequence[int]) -> list[bool]: + #! RabbitMQ is not thread-safe, so we have to use a single thread to ack + results: list[bool] = [] + for receipt in message_receipts: + results.append(self._reject_single(receipt)) + return results + + def _reset_conn(self) -> None: + self._conn = None + self._channel = None + + @contextmanager + def _get_amqp_conn(self) -> Iterator[Connection]: + if self._conn is None or self._conn.is_closed: + username, password = self._credentials_service() + credentials = pika.PlainCredentials(username, password) + conn_parameters = pika.ConnectionParameters( + host=self._host, + port=self._port, + credentials=credentials, + ) + self._conn = pika.BlockingConnection(conn_parameters) + yield self._conn + + @contextmanager + def _get_channel(self) -> Iterator[BlockingChannel]: + if self._channel is None or self._channel.is_closed: + with self._get_amqp_conn() as connection: + self._channel = connection.channel() + yield self._channel + + def _wait(self) -> None: + time.sleep(0.5) + + @override + def fetch_stream(self) -> Generator[tuple[str, int], None, None]: + while True: + try: + method: Optional[Basic.Deliver] = None + with self._get_channel() as channel: + channel.queue_declare(queue=self._queue, durable=True) + properties: Optional[BasicProperties] + body: Optional[bytes] + + method, properties, body = channel.basic_get( + queue=self._queue, auto_ack=False + ) + + if method is None and properties is None and body is None: + if self._last_poll_time is None: + self._last_poll_time = datetime.now() + if ( + datetime.now() - self._last_poll_time + ).total_seconds() > self._polling_timeout: + break + self._wait() + continue + + self._last_poll_time = None + + yield body.decode(), cast(int, method.delivery_tag) + + except Exception as e: + logging.exception(e) + if method is not None: + self.reject(method.delivery_tag) + self._reset_conn() + + @override + def close(self) -> bool: + try: + if self._channel is not None: + self._channel.close() + if self._conn is not None: + self._conn.close() + return True + except Exception as e: + logging.exception(e) + return False diff --git a/consumer/src/adapters/file_parse_iot_records/csv.py b/consumer/src/adapters/file_parse_iot_records/csv.py index e38d0ca..eb2f4f8 100644 --- a/consumer/src/adapters/file_parse_iot_records/csv.py +++ b/consumer/src/adapters/file_parse_iot_records/csv.py @@ -8,6 +8,7 @@ from ...usecases import FileParseIOTRecordsClient import csv import logging +from pathlib import Path class CSVParseIOTRecordsClient(FileParseIOTRecordsClient): @@ -22,29 +23,40 @@ def __init__( self._file_extension = file_extension @overload - def parse(self, filename: str) -> list[IOTRecord]: + def parse(self, filename: str) -> Optional[list[IOTRecord]]: ... @overload - def parse(self, filename: Sequence[str]) -> list[list[IOTRecord]]: + def parse(self, filename: Sequence[str]) -> list[Optional[list[IOTRecord]]]: ... @override def parse( self, filename: str | Sequence[str] - ) -> list[IOTRecord] | list[list[IOTRecord]]: + ) -> Optional[list[IOTRecord]] | list[Optional[list[IOTRecord]]]: if isinstance(filename, str): return self._parse_single(filename) return self._parse_batch(filename) + def _basic_file_check(self, filename: str) -> bool: + if not Path(filename).exists(): + raise ValueError("File path must exist!") + if not Path(filename).is_file(): + raise ValueError("File path must be a file!") + if not filename.endswith(self._file_extension): + raise ValueError(f"File extension must be {self._file_extension}") + @override def parse_stream(self, filename: str) -> Iterator[IOTRecord]: try: - if not filename.endswith(self._file_extension): - raise ValueError(f"File extension must be {self._file_extension}") + self._basic_file_check(filename) with open(filename) as csvfile: reader = csv.reader(csvfile, delimiter=self._delimiter, strict=True) yield from self._parse_iter(reader) + except OSError as e: + logging.exception(e) + logging.error(f"Failed to read stream from {filename}!") + raise e except Exception as e: logging.error(f"Failed to parse {filename}") logging.exception(e) @@ -84,17 +96,16 @@ def _parse_iter(self, reader: Iterator[list[str]]) -> Iterator[IOTRecord]: ) return iot_records - def _parse_single(self, filename: str) -> list[IOTRecord]: + def _parse_single(self, filename: str) -> Optional[list[IOTRecord]]: try: - if not filename.endswith(self._file_extension): - raise ValueError(f"File extension must be {self._file_extension}") + self._basic_file_check(filename) with open(filename) as csvfile: reader = csv.reader(csvfile, delimiter=self._delimiter) return list(self._parse_iter(reader)) except Exception as e: - logging.error(f"Failed to parse {filename}") logging.exception(e) - return [] + logging.error(f"Failed to parse {filename}") + return None def _parse_batch(self, filenames: Sequence[str]) -> list[list[IOTRecord]]: with ThreadPoolExecutor() as executor: diff --git a/consumer/src/deployments/script/main.py b/consumer/src/deployments/script/main.py index 09d0022..57cd029 100644 --- a/consumer/src/deployments/script/main.py +++ b/consumer/src/deployments/script/main.py @@ -1,54 +1,85 @@ -from ...adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +from ...adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) from ...adapters.file_parse_iot_records.csv import CSVParseIOTRecordsClient from ...adapters.upsert_iot_records.postgres import PostgresUpsertIOTRecordsClient from .config import RabbitMQConfig, PostgresConfig, CSVParserConfig -from setup_logging import setup_logging -import logging +from .setup_logging import setup_logging from ...entities import IOTRecord +import logging setup_logging() -fetch_filenames_client = RabbitMQFetchFilenamesClient( - host=RabbitMQConfig.HOST, - port=RabbitMQConfig.PORT, - username=RabbitMQConfig.USERNAME, - password=RabbitMQConfig.PASSWORD, - queue=RabbitMQConfig.QUEUE, - polling_timeout=RabbitMQConfig.POLLING_TIMEOUT, -) +logging.getLogger("pika").setLevel(logging.ERROR) -file_parse_iot_records_client = CSVParseIOTRecordsClient( - recognized_datetime_formats=CSVParserConfig.RECOGNIZED_DATETIME_FORMATS, - delimiter=CSVParserConfig.DELIMITER, - file_extension=CSVParserConfig.FILE_EXTENSION, -) -upsert_iot_records_client = PostgresUpsertIOTRecordsClient( - host=PostgresConfig.HOST, - port=PostgresConfig.PORT, - credentials_service=lambda: (PostgresConfig.USERNAME, PostgresConfig.PASSWORD), - database=PostgresConfig.DATABASE, - batch_upsert_size=PostgresConfig.BATCH_UPSERT_SIZE, -) +def _upsert_iot_records_buffer( + iot_records_buffer: list[IOTRecord], + upsert_iot_records_client: PostgresUpsertIOTRecordsClient, +) -> None: + successes = upsert_iot_records_client.upsert(iot_records_buffer) + + if not all(successes): + raise Exception("Failed to upsert all records!") def main() -> None: - filestream_buffer: list[IOTRecord] = [] + fetch_filenames_stream_client = RabbitMQFetchFilenameStreamClient( + host=RabbitMQConfig.HOST, + port=RabbitMQConfig.PORT, + credentials_service=lambda: (RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD), + queue=RabbitMQConfig.QUEUE, + polling_timeout=RabbitMQConfig.POLLING_TIMEOUT, + ) + + file_parse_iot_records_client = CSVParseIOTRecordsClient( + recognized_datetime_formats=CSVParserConfig.RECOGNIZED_DATETIME_FORMATS, + delimiter=CSVParserConfig.DELIMITER, + file_extension=CSVParserConfig.FILE_EXTENSION, + ) + + upsert_iot_records_client = PostgresUpsertIOTRecordsClient( + host=PostgresConfig.HOST, + port=PostgresConfig.PORT, + credentials_service=lambda: (PostgresConfig.USERNAME, PostgresConfig.PASSWORD), + database=PostgresConfig.DATABASE, + batch_upsert_size=PostgresConfig.BATCH_UPSERT_SIZE, + ) + try: - for filename in fetch_filenames_client.fetch(): - for iot_record in file_parse_iot_records_client.parse_stream(filename): - filestream_buffer.append(iot_record) - if len(filestream_buffer) >= PostgresConfig.BATCH_UPSERT_SIZE: - upsert_iot_records_client.upsert(filestream_buffer) - filestream_buffer.clear() - if filestream_buffer: - upsert_iot_records_client.upsert(filestream_buffer) - filestream_buffer.clear() + for filename, receipt in fetch_filenames_stream_client.fetch_stream(): + logging.info(f"Upserting {filename}...") + iot_records_buffer: list[IOTRecord] = [] + try: + for iot_record in file_parse_iot_records_client.parse_stream(filename): + iot_records_buffer.append(iot_record) + + if len(iot_records_buffer) < PostgresConfig.BATCH_UPSERT_SIZE: + continue + + _upsert_iot_records_buffer( + iot_records_buffer, upsert_iot_records_client + ) + iot_records_buffer.clear() + + if len(iot_records_buffer) > 0: + _upsert_iot_records_buffer( + iot_records_buffer, upsert_iot_records_client + ) + + logging.info(f"Successfully upserted {filename}!") + fetch_filenames_stream_client.ack(receipt) + except Exception as e: + logging.exception(e) + fetch_filenames_stream_client.reject(receipt) + logging.error(f"Failed to upsert {filename}!") + finally: + iot_records_buffer.clear() except Exception as e: logging.exception(e) raise e finally: - fetch_filenames_client.close() + fetch_filenames_stream_client.close() upsert_iot_records_client.close() diff --git a/consumer/src/deployments/script/setup_logging.py b/consumer/src/deployments/script/setup_logging.py index 161394c..dcae074 100644 --- a/consumer/src/deployments/script/setup_logging.py +++ b/consumer/src/deployments/script/setup_logging.py @@ -1,6 +1,6 @@ import logging from logging.handlers import TimedRotatingFileHandler -from config import LoggingConfig +from .config import LoggingConfig import pathlib diff --git a/consumer/src/usecases/__init__.py b/consumer/src/usecases/__init__.py index 4265028..1fc456a 100644 --- a/consumer/src/usecases/__init__.py +++ b/consumer/src/usecases/__init__.py @@ -1,3 +1,3 @@ -from .fetch_filenames import FetchFilenameClient +from .fetch_filenames_stream import FetchFilenameStreamClient from .file_parse_iot_records import FileParseIOTRecordsClient from .upsert_iot_records import UpsertIOTRecordsClient diff --git a/consumer/src/usecases/fetch_filenames.py b/consumer/src/usecases/fetch_filenames.py deleted file mode 100644 index c63f791..0000000 --- a/consumer/src/usecases/fetch_filenames.py +++ /dev/null @@ -1,12 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Generator - - -class FetchFilenameClient(ABC): - @abstractmethod - def fetch(self) -> Generator[str, None, None]: - ... - - @abstractmethod - def close(self) -> bool: - ... diff --git a/consumer/src/usecases/fetch_filenames_stream.py b/consumer/src/usecases/fetch_filenames_stream.py new file mode 100644 index 0000000..d96527c --- /dev/null +++ b/consumer/src/usecases/fetch_filenames_stream.py @@ -0,0 +1,38 @@ +from abc import ABC, abstractmethod +from typing import Generator, Sequence, overload, TypeVar, Generic + +T = TypeVar("T") + + +class FetchFilenameStreamClient(ABC, Generic[T]): + @overload + def ack(self, message_receipt: T) -> bool: + ... + + @overload + def ack(self, message_receipt: Sequence[T]) -> list[bool]: + ... + + @abstractmethod + def ack(self, message_receipt: T | Sequence[T]) -> bool | list[bool]: + ... + + @overload + def reject(self, message_receipt: T) -> bool: + ... + + @overload + def reject(self, message_receipt: Sequence[T]) -> list[bool]: + ... + + @abstractmethod + def reject(self, message_receipt: T | Sequence[T]) -> bool | list[bool]: + ... + + @abstractmethod + def fetch_stream(self) -> Generator[tuple[str, T], None, None]: + ... + + @abstractmethod + def close(self) -> bool: + ... diff --git a/consumer/src/usecases/file_parse_iot_records.py b/consumer/src/usecases/file_parse_iot_records.py index ca2276c..d64690c 100644 --- a/consumer/src/usecases/file_parse_iot_records.py +++ b/consumer/src/usecases/file_parse_iot_records.py @@ -1,21 +1,21 @@ from abc import ABC, abstractmethod -from typing import Iterator, overload, Sequence +from typing import Iterator, Optional, overload, Sequence from ..entities import IOTRecord class FileParseIOTRecordsClient(ABC): @overload - def parse(self, filename: str) -> list[IOTRecord]: + def parse(self, filename: str) -> Optional[list[IOTRecord]]: ... @overload - def parse(self, filename: Sequence[str]) -> list[list[IOTRecord]]: + def parse(self, filename: Sequence[str]) -> list[Optional[list[IOTRecord]]]: ... @abstractmethod def parse( self, filename: str | Sequence[str] - ) -> list[IOTRecord] | list[list[IOTRecord]]: + ) -> Optional[list[IOTRecord]] | list[Optional[list[IOTRecord]]]: ... @abstractmethod diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/conftest.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/conftest.py index 7ca45ac..0c38d86 100644 --- a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/conftest.py +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/conftest.py @@ -1,13 +1,14 @@ -from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) from src.deployments.script.config import RabbitMQConfig import pika import pytest -from pytest import MonkeyPatch @pytest.fixture(scope="function") -def rabbitmq_fetch_filenames_client() -> RabbitMQConfig: - return RabbitMQFetchFilenamesClient( +def rabbitmq_fetch_filenames_stream_client() -> RabbitMQConfig: + return RabbitMQFetchFilenameStreamClient( host=RabbitMQConfig.HOST, port=RabbitMQConfig.PORT, credentials_service=lambda: (RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD), @@ -17,8 +18,8 @@ def rabbitmq_fetch_filenames_client() -> RabbitMQConfig: @pytest.fixture(scope="function") -def rabbitmq_fetch_filenames_no_wait_client() -> RabbitMQConfig: - return RabbitMQFetchFilenamesClient( +def rabbitmq_fetch_filenames_stream_no_wait_client() -> RabbitMQConfig: + return RabbitMQFetchFilenameStreamClient( host=RabbitMQConfig.HOST, port=RabbitMQConfig.PORT, credentials_service=lambda: (RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD), @@ -48,7 +49,9 @@ def setup_teardown_rabbitmq_queue( pika_conn, queue = raw_rabbitmq_pika_conn_config channel = pika_conn.channel() + channel.queue_delete(queue=queue) channel.queue_declare(queue=queue, durable=True) channel.queue_purge(queue=queue) yield channel.queue_purge(queue=queue) + channel.queue_delete(queue=queue) diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_ack_failed.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_ack_failed.py new file mode 100644 index 0000000..03f6f49 --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_ack_failed.py @@ -0,0 +1,41 @@ +import pytest +from .utils import random_csv_filenames +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) +import pika +from pytest import MonkeyPatch + + +@pytest.mark.parametrize("filename", random_csv_filenames()) +def test_fetch_single_ack_failed( + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + filename: str, + monkeypatch: MonkeyPatch, +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=rabbitmq_fetch_filenames_stream_no_wait_client._queue, + body=filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + for ( + fetched_filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): + assert fetched_filename == filename + + def mock_ack(self, *args, **kwargs): + raise Exception("Failed to ack!") + + monkeypatch.setattr(pika.channel.Channel, "basic_ack", mock_ack) + + assert not rabbitmq_fetch_filenames_stream_no_wait_client.ack(receipt) diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_ack_remove_data_in_stream.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_ack_remove_data_in_stream.py new file mode 100644 index 0000000..9ca8041 --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_ack_remove_data_in_stream.py @@ -0,0 +1,80 @@ +import pytest +from .utils import random_csv_filenames +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) +import pika + + +@pytest.mark.smoke +@pytest.mark.parametrize("filename", random_csv_filenames()) +def test_fetch_single_ack_remove_data_in_stream( + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + filename: str, +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=rabbitmq_fetch_filenames_stream_no_wait_client._queue, + body=filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + for ( + fetched_filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): + assert fetched_filename == filename + + assert rabbitmq_fetch_filenames_stream_no_wait_client.ack(receipt) + + method_frame, _, body = channel.basic_get(queue=queue) + assert method_frame is None + assert body is None + + +@pytest.mark.smoke +@pytest.mark.parametrize( + "filenames", + [random_csv_filenames() for _ in range(5)], +) +def test_fetch_batch_ack_remove_data_in_stream( + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + filenames: list[str], +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare(queue=queue, durable=True) + + for filename in filenames: + channel.basic_publish( + exchange="", + routing_key=queue, + body=filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + all_filenames = [] + all_receipts = [] + for ( + filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): + all_filenames.append(filename) + + assert all(rabbitmq_fetch_filenames_stream_no_wait_client.ack(all_receipts)) + + assert sorted(all_filenames) == sorted(filenames) + + method_frame, _, body = channel.basic_get(queue=queue) + assert method_frame is None + assert body is None diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_failed.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_failed.py index ee46fa5..d36b105 100644 --- a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_failed.py +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_failed.py @@ -1,11 +1,13 @@ from pytest import MonkeyPatch, LogCaptureFixture import pika -from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) from .utils import random_csv_filenames def test_close_conn_failed( - rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], monkeypatch: MonkeyPatch, caplog: LogCaptureFixture, @@ -15,20 +17,24 @@ def test_close_conn_failed( channel = conn.channel() channel.queue_declare( - queue=rabbitmq_fetch_filenames_no_wait_client._queue, durable=True + queue=rabbitmq_fetch_filenames_stream_no_wait_client._queue, durable=True ) channel.basic_publish( exchange="", - routing_key=rabbitmq_fetch_filenames_no_wait_client._queue, + routing_key=rabbitmq_fetch_filenames_stream_no_wait_client._queue, body=random_csv_filenames()[0], properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), ) - for filename in rabbitmq_fetch_filenames_no_wait_client.fetch(): + for ( + filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): assert filename is not None + assert rabbitmq_fetch_filenames_stream_no_wait_client.ack(receipt) - assert rabbitmq_fetch_filenames_no_wait_client._conn is not None + assert rabbitmq_fetch_filenames_stream_no_wait_client._conn is not None def mock_failed_close( self, @@ -40,5 +46,5 @@ def mock_failed_close( monkeypatch.setattr(pika.BlockingConnection, "close", mock_failed_close) with caplog.at_level("ERROR"): - assert not rabbitmq_fetch_filenames_no_wait_client.close() + assert not rabbitmq_fetch_filenames_stream_no_wait_client.close() assert "Failed to close!" in caplog.text diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_successful.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_successful.py index 756d329..ebc7583 100644 --- a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_successful.py +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_close_conn_successful.py @@ -1,16 +1,18 @@ -from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) def test_close_conn_successful( - rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, ): - for _ in rabbitmq_fetch_filenames_no_wait_client.fetch(): + for _ in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): pass - assert rabbitmq_fetch_filenames_no_wait_client._conn is not None - assert rabbitmq_fetch_filenames_no_wait_client.close() + assert rabbitmq_fetch_filenames_stream_no_wait_client._conn is not None + assert rabbitmq_fetch_filenames_stream_no_wait_client.close() def test_none_conn_close_successful( - rabbitmq_fetch_filenames_client: RabbitMQFetchFilenamesClient, + rabbitmq_fetch_filenames_stream_client: RabbitMQFetchFilenameStreamClient, ): - assert rabbitmq_fetch_filenames_client.close() + assert rabbitmq_fetch_filenames_stream_client.close() diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_conn.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_conn.py index e4d9e2c..ec30bb6 100644 --- a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_conn.py +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_conn.py @@ -1,6 +1,8 @@ import pytest from .utils import random_csv_filenames -from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) from src.deployments.script.config import RabbitMQConfig import pika from pytest import MonkeyPatch @@ -8,7 +10,7 @@ @pytest.mark.smoke def test_fetch_failed_conn( - rabbitmq_fetch_filenames_client: RabbitMQFetchFilenamesClient, + rabbitmq_fetch_filenames_stream_client: RabbitMQFetchFilenameStreamClient, monkeypatch: MonkeyPatch, ): def mocked_failed_conn( @@ -20,10 +22,12 @@ def mocked_failed_conn( monkeypatch.setattr(pika.BlockingConnection, "__init__", mocked_failed_conn) - monkeypatch.setattr(RabbitMQFetchFilenamesClient, "_reset_conn", mocked_failed_conn) + monkeypatch.setattr( + RabbitMQFetchFilenameStreamClient, "_reset_conn", mocked_failed_conn + ) with pytest.raises(Exception, match="^Failed to connect$"): - next(rabbitmq_fetch_filenames_client.fetch()) + next(rabbitmq_fetch_filenames_stream_client.fetch_stream()) monkeypatch.undo() monkeypatch.undo() @@ -33,7 +37,7 @@ def mocked_failed_conn( def test_fetch_wrong_credentials( monkeypatch: MonkeyPatch, ): - rabbitmq_fetch_filenames_client = RabbitMQFetchFilenamesClient( + rabbitmq_fetch_filenames_stream_client = RabbitMQFetchFilenameStreamClient( host=RabbitMQConfig.HOST, port=RabbitMQConfig.PORT, credentials_service=lambda: ("wrong", "wrong"), @@ -48,10 +52,12 @@ def mocked_failed_conn( ) -> None: raise Exception("Failed to connect") - monkeypatch.setattr(RabbitMQFetchFilenamesClient, "_reset_conn", mocked_failed_conn) + monkeypatch.setattr( + RabbitMQFetchFilenameStreamClient, "_reset_conn", mocked_failed_conn + ) with pytest.raises(Exception, match="^Failed to connect$"): - next(rabbitmq_fetch_filenames_client.fetch()) + next(rabbitmq_fetch_filenames_stream_client.fetch_stream()) monkeypatch.undo() @@ -61,7 +67,7 @@ def mocked_failed_conn( def test_publish_single_wrong_host( monkeypatch: MonkeyPatch, ): - rabbitmq_fetch_filenames_client = RabbitMQFetchFilenamesClient( + rabbitmq_fetch_filenames_stream_client = RabbitMQFetchFilenameStreamClient( host="wrong", port=RabbitMQConfig.PORT, credentials_service=lambda: (RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD), @@ -76,17 +82,19 @@ def mocked_failed_conn( ) -> None: raise Exception("Failed to connect") - monkeypatch.setattr(RabbitMQFetchFilenamesClient, "_reset_conn", mocked_failed_conn) + monkeypatch.setattr( + RabbitMQFetchFilenameStreamClient, "_reset_conn", mocked_failed_conn + ) with pytest.raises(Exception, match="^Failed to connect$") as e: - next(rabbitmq_fetch_filenames_client.fetch()) + next(rabbitmq_fetch_filenames_stream_client.fetch_stream()) monkeypatch.undo() @pytest.mark.slow def test_fetch_failed_conn_reset_conn( - rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], monkeypatch: MonkeyPatch, ): @@ -101,20 +109,23 @@ def test_fetch_failed_conn_reset_conn( channel.basic_publish( exchange="", - routing_key=rabbitmq_fetch_filenames_no_wait_client._queue, + routing_key=rabbitmq_fetch_filenames_stream_no_wait_client._queue, body=first_published_filename, properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), ) - for i, filename in enumerate(rabbitmq_fetch_filenames_no_wait_client.fetch()): + for i, (filename, receipt) in enumerate( + rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream() + ): if i == 0: - assert rabbitmq_fetch_filenames_no_wait_client._conn is not None - conn = rabbitmq_fetch_filenames_no_wait_client._conn + assert rabbitmq_fetch_filenames_stream_no_wait_client._conn is not None + conn = rabbitmq_fetch_filenames_stream_no_wait_client._conn assert filename == first_published_filename + assert rabbitmq_fetch_filenames_stream_no_wait_client.ack(receipt) channel.basic_publish( exchange="", - routing_key=rabbitmq_fetch_filenames_no_wait_client._queue, + routing_key=rabbitmq_fetch_filenames_stream_no_wait_client._queue, body=second_published_filename, properties=pika.BasicProperties( delivery_mode=pika.DeliveryMode.Persistent @@ -138,5 +149,6 @@ def mock_failed_fetch( monkeypatch.setattr(pika.channel.Channel, "basic_get", mock_failed_fetch) if i == 1: assert filename == second_published_filename - assert rabbitmq_fetch_filenames_no_wait_client._conn is not None - assert rabbitmq_fetch_filenames_no_wait_client._conn != conn + assert rabbitmq_fetch_filenames_stream_no_wait_client.ack(receipt) + assert rabbitmq_fetch_filenames_stream_no_wait_client._conn is not None + assert rabbitmq_fetch_filenames_stream_no_wait_client._conn != conn diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_fetch.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_fetch.py index 9cddf90..6658b1b 100644 --- a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_fetch.py +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_failed_fetch.py @@ -1,6 +1,8 @@ import pytest from .utils import random_csv_filenames -from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) import pika import pytest from pytest import LogCaptureFixture, MonkeyPatch @@ -9,7 +11,7 @@ @pytest.mark.smoke @pytest.mark.parametrize("filename", random_csv_filenames()) def test_fetch_single_exception_resilience( - rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filename: str, monkeypatch: MonkeyPatch, @@ -39,8 +41,12 @@ def mock_failed_fetch( monkeypatch.setattr(pika.channel.Channel, "basic_get", mock_failed_fetch) with caplog.at_level("ERROR"): - for fetched_filename in rabbitmq_fetch_filenames_no_wait_client.fetch(): + for ( + fetched_filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): assert fetched_filename == filename + assert rabbitmq_fetch_filenames_stream_no_wait_client.ack(receipt) assert "Failed to fetch!" in caplog.text @@ -50,7 +56,7 @@ def mock_failed_fetch( [random_csv_filenames() for _ in range(5)], ) def test_fetch_batch_exception_resilience( - rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filenames: list[str], monkeypatch: MonkeyPatch, @@ -67,7 +73,7 @@ def test_fetch_batch_exception_resilience( for filename in filenames: channel.basic_publish( exchange="", - routing_key=rabbitmq_fetch_filenames_no_wait_client._queue, + routing_key=rabbitmq_fetch_filenames_stream_no_wait_client._queue, body=filename, properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), ) @@ -91,8 +97,12 @@ def mock_failed_fetch( all_filenames = [] with caplog.at_level("ERROR"): - for fetched_filename in rabbitmq_fetch_filenames_no_wait_client.fetch(): + for ( + fetched_filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): all_filenames.append(fetched_filename) + assert rabbitmq_fetch_filenames_stream_no_wait_client.ack(receipt) assert "Failed to fetch!" in caplog.text assert sorted(all_filenames) == sorted(filenames) @@ -100,16 +110,16 @@ def mock_failed_fetch( @pytest.mark.parametrize("filename", random_csv_filenames()) def test_fetch_single_ack_exception_resilience( - rabbitmq_fetch_filenames_client: RabbitMQFetchFilenamesClient, + rabbitmq_fetch_filenames_stream_client: RabbitMQFetchFilenameStreamClient, raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filename: str, monkeypatch: MonkeyPatch, ): - new_rabbitmq_fetch_filenames_client = RabbitMQFetchFilenamesClient( - host=rabbitmq_fetch_filenames_client._host, - port=rabbitmq_fetch_filenames_client._port, - credentials_service=rabbitmq_fetch_filenames_client._credentials_service, - queue=rabbitmq_fetch_filenames_client._queue, + new_rabbitmq_fetch_filenames_stream_client = RabbitMQFetchFilenameStreamClient( + host=rabbitmq_fetch_filenames_stream_client._host, + port=rabbitmq_fetch_filenames_stream_client._port, + credentials_service=rabbitmq_fetch_filenames_stream_client._credentials_service, + queue=rabbitmq_fetch_filenames_stream_client._queue, polling_timeout=1, ) @@ -123,7 +133,7 @@ def test_fetch_single_ack_exception_resilience( channel.basic_publish( exchange="", - routing_key=rabbitmq_fetch_filenames_client._queue, + routing_key=rabbitmq_fetch_filenames_stream_client._queue, body=filename, properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), ) @@ -144,5 +154,10 @@ def mock_failed_ack( monkeypatch.setattr(pika.channel.Channel, "basic_ack", mock_failed_ack) - for fetched_filename in new_rabbitmq_fetch_filenames_client.fetch(): + for ( + fetched_filename, + receipt, + ) in new_rabbitmq_fetch_filenames_stream_client.fetch_stream(): + monkeypatch.undo() assert fetched_filename == filename + assert new_rabbitmq_fetch_filenames_stream_client.ack(receipt) diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_poll_until_timeout.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_poll_until_timeout.py index 6039dab..7972df3 100644 --- a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_poll_until_timeout.py +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_poll_until_timeout.py @@ -1,7 +1,9 @@ import time import pytest from .utils import random_csv_filenames -from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) import pika import pytest @@ -9,15 +11,15 @@ @pytest.mark.smoke @pytest.mark.parametrize("timeout", [0.5 * i for i in range(1, 5)]) def test_fetch_none_wait_timeout( - rabbitmq_fetch_filenames_client: RabbitMQFetchFilenamesClient, + rabbitmq_fetch_filenames_stream_client: RabbitMQFetchFilenameStreamClient, raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], timeout: int, ): - new_rabbitmq_fetch_filenames_client = RabbitMQFetchFilenamesClient( - host=rabbitmq_fetch_filenames_client._host, - port=rabbitmq_fetch_filenames_client._port, - credentials_service=rabbitmq_fetch_filenames_client._credentials_service, - queue=rabbitmq_fetch_filenames_client._queue, + new_rabbitmq_fetch_filenames_stream_client = RabbitMQFetchFilenameStreamClient( + host=rabbitmq_fetch_filenames_stream_client._host, + port=rabbitmq_fetch_filenames_stream_client._port, + credentials_service=rabbitmq_fetch_filenames_stream_client._credentials_service, + queue=rabbitmq_fetch_filenames_stream_client._queue, polling_timeout=timeout, ) @@ -38,8 +40,12 @@ def test_fetch_none_wait_timeout( start_time = time.perf_counter() - for fetched_filename in new_rabbitmq_fetch_filenames_client.fetch(): + for ( + fetched_filename, + receipt, + ) in new_rabbitmq_fetch_filenames_stream_client.fetch_stream(): assert fetched_filename == filename + assert new_rabbitmq_fetch_filenames_stream_client.ack(receipt) end_time = time.perf_counter() diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_reject_failed.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_reject_failed.py new file mode 100644 index 0000000..ab28578 --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_reject_failed.py @@ -0,0 +1,41 @@ +import pytest +from .utils import random_csv_filenames +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) +import pika +from pytest import MonkeyPatch + + +@pytest.mark.parametrize("filename", random_csv_filenames()) +def test_fetch_single_reject_retain_data_in_stream( + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + filename: str, + monkeypatch: MonkeyPatch, +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=rabbitmq_fetch_filenames_stream_no_wait_client._queue, + body=filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + for ( + fetched_filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): + assert fetched_filename == filename + + def mock_ack(self, *args, **kwargs): + raise Exception("Failed to reject!") + + monkeypatch.setattr(pika.channel.Channel, "basic_nack", mock_ack) + + assert not rabbitmq_fetch_filenames_stream_no_wait_client.reject(receipt) diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_reject_retain_data_in_stream.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_reject_retain_data_in_stream.py new file mode 100644 index 0000000..716f4d9 --- /dev/null +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_reject_retain_data_in_stream.py @@ -0,0 +1,94 @@ +import pytest +from .utils import random_csv_filenames +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) +import pika +import pytest + + +@pytest.mark.smoke +@pytest.mark.parametrize("filename", random_csv_filenames()) +def test_fetch_single_reject_retain_data_in_stream( + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + filename: str, +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=rabbitmq_fetch_filenames_stream_no_wait_client._queue, + body=filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + for ( + fetched_filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): + assert fetched_filename == filename + + assert rabbitmq_fetch_filenames_stream_no_wait_client.reject(receipt) + + for ( + fetched_filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): + assert fetched_filename == filename + assert rabbitmq_fetch_filenames_stream_no_wait_client.ack(receipt) + + +@pytest.mark.smoke +@pytest.mark.parametrize( + "filenames", + [random_csv_filenames() for _ in range(5)], +) +def test_fetch_batch_reject_retain_data_in_stream( + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + filenames: list[str], +): + conn, queue = raw_rabbitmq_pika_conn_config + + channel = conn.channel() + + channel.queue_declare(queue=queue, durable=True) + + for filename in filenames: + channel.basic_publish( + exchange="", + routing_key=queue, + body=filename, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + all_filenames = [] + all_receipts = [] + for ( + filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): + all_filenames.append(filename) + all_receipts.append(receipt) + + assert sorted(all_filenames) == sorted(filenames) + + assert all(rabbitmq_fetch_filenames_stream_no_wait_client.reject(all_receipts)) + + new_all_filenames = [] + new_all_receipts = [] + for ( + filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): + new_all_filenames.append(filename) + new_all_receipts.append(receipt) + + assert sorted(new_all_filenames) == sorted(all_filenames) + + assert all(rabbitmq_fetch_filenames_stream_no_wait_client.ack(new_all_receipts)) diff --git a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_successful_fetch.py b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_successful_fetch.py index 8fdeb4c..4b880ce 100644 --- a/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_successful_fetch.py +++ b/consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq/test_successful_fetch.py @@ -1,6 +1,8 @@ import pytest from .utils import random_csv_filenames -from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) import pika import pytest @@ -8,7 +10,7 @@ @pytest.mark.smoke @pytest.mark.parametrize("filename", random_csv_filenames()) def test_fetch_single_success( - rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filename: str, ): @@ -20,13 +22,17 @@ def test_fetch_single_success( channel.basic_publish( exchange="", - routing_key=rabbitmq_fetch_filenames_no_wait_client._queue, + routing_key=rabbitmq_fetch_filenames_stream_no_wait_client._queue, body=filename, properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), ) - for fetched_filename in rabbitmq_fetch_filenames_no_wait_client.fetch(): + for ( + fetched_filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): assert fetched_filename == filename + assert rabbitmq_fetch_filenames_stream_no_wait_client.ack(receipt) @pytest.mark.smoke @@ -34,8 +40,8 @@ def test_fetch_single_success( "filenames", [random_csv_filenames() for _ in range(5)], ) -def test_publish_batch_success( - rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient, +def test_fetch_batch_success( + rabbitmq_fetch_filenames_stream_no_wait_client: RabbitMQFetchFilenameStreamClient, raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], filenames: list[str], ): @@ -54,7 +60,11 @@ def test_publish_batch_success( ) all_filenames = [] - for filename in rabbitmq_fetch_filenames_no_wait_client.fetch(): + for ( + filename, + receipt, + ) in rabbitmq_fetch_filenames_stream_no_wait_client.fetch_stream(): all_filenames.append(filename) + assert rabbitmq_fetch_filenames_stream_no_wait_client.ack(receipt) assert sorted(all_filenames) == sorted(filenames) diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/conftest.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/conftest.py index 663ddf9..c964d6c 100644 --- a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/conftest.py +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/conftest.py @@ -84,4 +84,5 @@ def csv_parse_iot_records_client() -> CSVParseIOTRecordsClient: return CSVParseIOTRecordsClient( recognized_datetime_formats=CSVParserConfig.RECOGNIZED_DATETIME_FORMATS, delimiter=CSVParserConfig.DELIMITER, + file_extension=CSVParserConfig.FILE_EXTENSION, ) diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_file_error_return_none.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_file_error_return_none.py new file mode 100644 index 0000000..0cc315c --- /dev/null +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_file_error_return_none.py @@ -0,0 +1,68 @@ +import pytest +from src.adapters.file_parse_iot_records.csv import CSVParseIOTRecordsClient +from pytest import FixtureRequest, MonkeyPatch, LogCaptureFixture + + +@pytest.mark.parametrize( + "fixture_name", + [ + "random_valid_csv_file", + "random_invalid_datetime_and_value_csv_file", + "random_invalid_datetime_csv_file", + "random_invalid_value_csv_file", + ] + * 5, +) +def test_parse_single_failed_open_file_return_none( + csv_parse_iot_records_client: CSVParseIOTRecordsClient, + fixture_name: str, + request: FixtureRequest, + caplog: LogCaptureFixture, + monkeypatch: MonkeyPatch, +): + random_csv_file: str = request.getfixturevalue(fixture_name) + + def mock_open(*args, **kwargs): + raise OSError("Failed to open file!") + + monkeypatch.setattr("builtins.open", mock_open) + + with caplog.at_level("ERROR"): + assert csv_parse_iot_records_client.parse(random_csv_file) is None + assert "Failed to open file!" in caplog.text + + +@pytest.mark.parametrize( + "fixture_names", + [ + tuple( + [ + "random_valid_csv_file", + "random_invalid_datetime_and_value_csv_file", + "random_invalid_datetime_csv_file", + "random_invalid_value_csv_file", + ] + ) + for _ in range(5) + ], +) +def test_parse_batch_failed_open_file_return_none( + csv_parse_iot_records_client: CSVParseIOTRecordsClient, + fixture_names: tuple[str, ...], + request: FixtureRequest, + caplog: LogCaptureFixture, + monkeypatch: MonkeyPatch, +): + random_csv_files: list[str] = [ + request.getfixturevalue(fixture_name) for fixture_name in fixture_names + ] + + def mock_open(*args, **kwargs): + raise OSError("Failed to open file!") + + monkeypatch.setattr("builtins.open", mock_open) + + with caplog.at_level("ERROR"): + for parsed_record in csv_parse_iot_records_client.parse(random_csv_files): + assert parsed_record is None + assert "Failed to open file!" in caplog.text diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_file_not_exists.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_file_not_exists.py new file mode 100644 index 0000000..6a672b0 --- /dev/null +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_file_not_exists.py @@ -0,0 +1,82 @@ +import pytest +from src.adapters.file_parse_iot_records.csv import CSVParseIOTRecordsClient +from pytest import FixtureRequest, LogCaptureFixture +from src.entities import IOTRecord + + +@pytest.mark.parametrize( + "fixture_name", + [ + "random_valid_csv_file", + ] + * 5, +) +def test_parse_single_file_not_exists_failed( + csv_parse_iot_records_client: CSVParseIOTRecordsClient, + fixture_name: str, + request: FixtureRequest, + caplog: LogCaptureFixture, +): + random_file: str = request.getfixturevalue(fixture_name) + random_file = random_file.replace(".csv", "") + + with caplog.at_level("ERROR"): + iot_records = csv_parse_iot_records_client.parse(random_file) + assert iot_records is None + assert f"Failed to parse {random_file}" in caplog.text + assert "File path must exist!" in caplog.text + + +@pytest.mark.parametrize( + "fixture_name", + ["random_valid_csv_file"] * 5, +) +def test_parse_stream_file_not_exists_failed( + csv_parse_iot_records_client: CSVParseIOTRecordsClient, + fixture_name: str, + request: FixtureRequest, + caplog: LogCaptureFixture, +): + random_file: str = request.getfixturevalue(fixture_name) + random_file = random_file.replace(".csv", "") + + all_iot_records: list[IOTRecord] = [] + with caplog.at_level("ERROR"): + for iot_record in csv_parse_iot_records_client.parse_stream(random_file): + assert isinstance(iot_record, IOTRecord) + all_iot_records.append(iot_record) + assert len(all_iot_records) == 0 + assert f"Failed to parse {random_file}" in caplog.text + assert "File path must exist!" in caplog.text + + +@pytest.mark.parametrize( + "fixture_names", + [ + tuple( + [ + "random_valid_csv_file", + ] + * 5 + ) + for _ in range(5) + ], +) +def test_parse_batch_file_not_exists_failed( + csv_parse_iot_records_client: CSVParseIOTRecordsClient, + fixture_names: tuple[str, ...], + request: FixtureRequest, + caplog: LogCaptureFixture, +): + random_files: list[str] = [ + request.getfixturevalue(fixture_name) for fixture_name in fixture_names + ] + + random_files = [random_file.replace(".csv", "") for random_file in random_files] + + with caplog.at_level("ERROR"): + iot_records = csv_parse_iot_records_client.parse(random_files) + for random_file, iot_record in zip(random_files, iot_records): + assert iot_record is None + assert f"Failed to parse {random_file}" in caplog.text + assert "File path must exist!" in caplog.text diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_file_stream_error_raise.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_file_stream_error_raise.py new file mode 100644 index 0000000..34aec8b --- /dev/null +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_file_stream_error_raise.py @@ -0,0 +1,35 @@ +import pytest +from src.adapters.file_parse_iot_records.csv import CSVParseIOTRecordsClient +from pytest import FixtureRequest, MonkeyPatch, LogCaptureFixture + + +@pytest.mark.smoke +@pytest.mark.parametrize( + "fixture_name", + [ + "random_valid_csv_file", + "random_invalid_datetime_and_value_csv_file", + "random_invalid_datetime_csv_file", + "random_invalid_value_csv_file", + ] + * 5, +) +def test_parse_stream_failed_open_file_raise( + csv_parse_iot_records_client: CSVParseIOTRecordsClient, + fixture_name: str, + request: FixtureRequest, + caplog: LogCaptureFixture, + monkeypatch: MonkeyPatch, +): + random_csv_file: str = request.getfixturevalue(fixture_name) + + def mock_open(*args, **kwargs): + raise OSError("Failed to open file!") + + monkeypatch.setattr("builtins.open", mock_open) + + with caplog.at_level("ERROR"): + with pytest.raises(Exception, match="^Failed to open file!$"): + for _ in csv_parse_iot_records_client.parse_stream(random_csv_file): + pass + assert "Failed to open file!" in caplog.text diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_open_file.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_open_file.py deleted file mode 100644 index ee1a637..0000000 --- a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_open_file.py +++ /dev/null @@ -1,110 +0,0 @@ -import pytest -from src.adapters.file_parse_iot_records.csv import CSVParseIOTRecordsClient -from pytest import FixtureRequest, MonkeyPatch, LogCaptureFixture -from src.entities import IOTRecord - - -@pytest.mark.smoke -@pytest.mark.parametrize( - "fixture_name", - [ - "random_valid_csv_file", - "random_invalid_datetime_and_value_csv_file", - "random_invalid_datetime_csv_file", - "random_invalid_value_csv_file", - ] - * 5, -) -def test_parse_single_failed_open_file( - csv_parse_iot_records_client: CSVParseIOTRecordsClient, - fixture_name: str, - request: FixtureRequest, - caplog: LogCaptureFixture, - monkeypatch: MonkeyPatch, -): - random_csv_file: str = request.getfixturevalue(fixture_name) - - def mock_open(*args, **kwargs): - raise FileNotFoundError("Failed to open file!") - - monkeypatch.setattr("builtins.open", mock_open) - - with caplog.at_level("ERROR"): - iot_records = csv_parse_iot_records_client.parse(random_csv_file) - assert len(iot_records) == 0 - assert f"Failed to parse {random_csv_file}" in caplog.text - assert "Failed to open file!" in caplog.text - - -@pytest.mark.smoke -@pytest.mark.parametrize( - "fixture_name", - [ - "random_valid_csv_file", - "random_invalid_datetime_and_value_csv_file", - "random_invalid_datetime_csv_file", - "random_invalid_value_csv_file", - ] - * 5, -) -def test_parse_stream_failed_open_file( - csv_parse_iot_records_client: CSVParseIOTRecordsClient, - fixture_name: str, - request: FixtureRequest, - caplog: LogCaptureFixture, - monkeypatch: MonkeyPatch, -): - random_csv_file: str = request.getfixturevalue(fixture_name) - - def mock_open(*args, **kwargs): - raise FileNotFoundError("Failed to open file!") - - monkeypatch.setattr("builtins.open", mock_open) - - all_iot_records: list[IOTRecord] = [] - with caplog.at_level("ERROR"): - for iot_record in csv_parse_iot_records_client.parse_stream(random_csv_file): - assert isinstance(iot_record, IOTRecord) - all_iot_records.append(iot_record) - assert len(all_iot_records) == 0 - assert f"Failed to parse {random_csv_file}" in caplog.text - assert "Failed to open file!" in caplog.text - - -@pytest.mark.smoke -@pytest.mark.parametrize( - "fixture_names", - [ - tuple( - [ - "random_valid_csv_file", - "random_invalid_datetime_and_value_csv_file", - "random_invalid_datetime_csv_file", - "random_invalid_value_csv_file", - ] - ) - for _ in range(5) - ], -) -def test_parse_batch_failed_open_file( - csv_parse_iot_records_client: CSVParseIOTRecordsClient, - fixture_names: tuple[str, ...], - request: FixtureRequest, - caplog: LogCaptureFixture, - monkeypatch: MonkeyPatch, -): - random_csv_files: list[str] = [ - request.getfixturevalue(fixture_name) for fixture_name in fixture_names - ] - - def mock_open(*args, **kwargs): - raise FileNotFoundError("Failed to open file!") - - monkeypatch.setattr("builtins.open", mock_open) - - with caplog.at_level("ERROR"): - iot_records = csv_parse_iot_records_client.parse(random_csv_files) - for random_csv_file, iot_record in zip(random_csv_files, iot_records): - assert len(iot_record) == 0 - assert f"Failed to parse {random_csv_file}" in caplog.text - assert "Failed to open file!" in caplog.text diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_other_file_formats.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_other_file_formats.py index 9f1188d..5b32c3b 100644 --- a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_other_file_formats.py +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_other_file_formats.py @@ -29,7 +29,7 @@ def test_parse_single_other_format_failed( with caplog.at_level("ERROR"): iot_records = csv_parse_iot_records_client.parse(random_file) - assert len(iot_records) == 0 + assert iot_records is None assert f"Failed to parse {random_file}" in caplog.text assert ( f"File extension must be {csv_parse_iot_records_client._file_extension}" @@ -105,7 +105,7 @@ def test_parse_batch_other_format_failed( with caplog.at_level("ERROR"): iot_records = csv_parse_iot_records_client.parse(random_files) for random_file, iot_record in zip(random_files, iot_records): - assert len(iot_record) == 0 + assert iot_record is None assert f"Failed to parse {random_file}" in caplog.text assert ( f"File extension must be {csv_parse_iot_records_client._file_extension}" diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_parse_dir.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_parse_dir.py new file mode 100644 index 0000000..f37141d --- /dev/null +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_parse_dir.py @@ -0,0 +1,53 @@ +import pytest +from src.adapters.file_parse_iot_records.csv import CSVParseIOTRecordsClient +from pytest import FixtureRequest, LogCaptureFixture +from src.entities import IOTRecord +from pathlib import Path + + +def test_parse_single_dir_failed( + csv_parse_iot_records_client: CSVParseIOTRecordsClient, + caplog: LogCaptureFixture, + tmp_path: Path, +): + with caplog.at_level("ERROR"): + iot_records = csv_parse_iot_records_client.parse(str(tmp_path)) + assert iot_records is None + assert f"Failed to parse {tmp_path}" in caplog.text + assert "File path must be a file!" in caplog.text + + +def test_parse_stream_dir_failed( + csv_parse_iot_records_client: CSVParseIOTRecordsClient, + caplog: LogCaptureFixture, + tmp_path: Path, +): + all_iot_records: list[IOTRecord] = [] + with caplog.at_level("ERROR"): + for iot_record in csv_parse_iot_records_client.parse_stream(str(tmp_path)): + assert isinstance(iot_record, IOTRecord) + all_iot_records.append(iot_record) + assert len(all_iot_records) == 0 + assert f"Failed to parse {tmp_path}" in caplog.text + assert "File path must be a file!" in caplog.text + + +def test_parse_batch_other_format_failed( + csv_parse_iot_records_client: CSVParseIOTRecordsClient, + caplog: LogCaptureFixture, + tmp_path: Path, +): + tmp_paths = [] + for i in range(5): + new_tmp_path = tmp_path / f"random_valid_tsv_file{i}" + new_tmp_path.mkdir(parents=True, exist_ok=True) + tmp_paths.append(new_tmp_path) + + tmp_paths_str = [str(tmp_path) for tmp_path in tmp_paths] + + with caplog.at_level("ERROR"): + iot_records = csv_parse_iot_records_client.parse(tmp_paths_str) + for random_file, iot_record in zip(tmp_paths_str, iot_records): + assert iot_record is None + assert f"Failed to parse {random_file}" in caplog.text + assert "File path must be a file!" in caplog.text diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py index fa5b48d..e7980b2 100644 --- a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py @@ -80,10 +80,8 @@ def random_invalid_value_rows() -> list[tuple[str, ...]]: def random_invalid_datetime_and_value_rows() -> list[tuple[str, ...]]: rows = [] all_datetime_formats = [ - "%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M%z", - "%Y-%m-%d %H:%M:%S.%f%z", "%Y-%m-%d %H:%M:%S%z", "%Y-%m-%d %H:%M%z", ] diff --git a/consumer/tests/test_deployments/__init__.py b/consumer/tests/test_deployments/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/consumer/tests/test_deployments/test_script/__init__.py b/consumer/tests/test_deployments/test_script/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/consumer/tests/test_deployments/test_script/test_main/__init__.py b/consumer/tests/test_deployments/test_script/test_main/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/consumer/tests/test_deployments/test_script/test_main/conftest.py b/consumer/tests/test_deployments/test_script/test_main/conftest.py new file mode 100644 index 0000000..1cb77e5 --- /dev/null +++ b/consumer/tests/test_deployments/test_script/test_main/conftest.py @@ -0,0 +1,170 @@ +from src.deployments.script.config import RabbitMQConfig, PostgresConfig +import pika +import pytest +import psycopg2 +from pytest import TempdirFactory +from pathlib import Path +from .utils import ( + random_csv_file, + random_tsv_file, + random_ndjson_file, + random_invalid_datetime_rows, + random_invalid_datetime_and_value_rows, + random_invalid_value_rows, + random_valid_format_rows, +) + + +@pytest.fixture(scope="session") +def setup_tempdir(tmpdir_factory: TempdirFactory) -> Path: + return Path(tmpdir_factory.mktemp("artifact")) + + +@pytest.fixture(scope="session") +def raw_postgres_psycopg2_conn_config() -> psycopg2.extensions.connection: + with psycopg2.connect( + host=PostgresConfig.HOST, + port=PostgresConfig.PORT, + user=PostgresConfig.USERNAME, + password=PostgresConfig.PASSWORD, + database=PostgresConfig.DATABASE, + ) as conn: + yield conn + + +@pytest.fixture(scope="session") +def raw_rabbitmq_pika_conn_config() -> tuple[pika.BaseConnection, str]: + pika_conn = pika.BlockingConnection( + pika.ConnectionParameters( + host=RabbitMQConfig.HOST, + port=RabbitMQConfig.PORT, + credentials=pika.PlainCredentials( + RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD + ), + ) + ) + return pika_conn, RabbitMQConfig.QUEUE + + +@pytest.fixture(scope="session", autouse=True) +def init_postgres_tables( + raw_postgres_psycopg2_conn_config: psycopg2.extensions.connection, +) -> None: + with raw_postgres_psycopg2_conn_config.cursor() as cursor: + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS records ( + record_time TIMESTAMPTZ NOT NULL, + sensor_id TEXT NOT NULL, + value DOUBLE PRECISION NOT NULL, + PRIMARY KEY(record_time, sensor_id) + ); + + CREATE INDEX IF NOT EXISTS idx_records_record_time ON records USING BRIN (record_time); + CREATE INDEX IF NOT EXISTS idx_records_sensor_id ON records USING HASH (sensor_id); + """ + ) + raw_postgres_psycopg2_conn_config.commit() + + +@pytest.fixture(scope="function", autouse=True) +def setup_teardown_rabbitmq_queue( + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], +) -> None: + pika_conn, queue = raw_rabbitmq_pika_conn_config + + channel = pika_conn.channel() + channel.queue_declare(queue=queue, durable=True) + channel.queue_purge(queue=queue) + yield + channel.queue_purge(queue=queue) + + +@pytest.fixture(scope="function", autouse=True) +def setup_teardown_postgres_tables( + raw_postgres_psycopg2_conn_config: psycopg2.extensions.connection, +) -> None: + with raw_postgres_psycopg2_conn_config.cursor() as cursor: + try: + cursor.execute( + """ + TRUNCATE TABLE records; + """ + ) + raw_postgres_psycopg2_conn_config.commit() + yield + except Exception as e: + raw_postgres_psycopg2_conn_config.rollback() + raise e + finally: + cursor.execute( + """ + TRUNCATE TABLE records; + """ + ) + raw_postgres_psycopg2_conn_config.commit() + + +@pytest.fixture(scope="session") +def setup_tempdir(tmpdir_factory: TempdirFactory) -> Path: + return Path(tmpdir_factory.mktemp("artifact")) + + +@pytest.fixture(scope="function") +def random_valid_csv_file(setup_tempdir: Path) -> Path: + return random_csv_file(setup_tempdir, random_valid_format_rows()) + + +@pytest.fixture(scope="function") +def random_invalid_datetime_and_value_csv_file(setup_tempdir: Path) -> Path: + return random_csv_file(setup_tempdir, random_invalid_datetime_and_value_rows()) + + +@pytest.fixture(scope="function") +def random_invalid_datetime_csv_file(setup_tempdir: Path) -> Path: + return random_csv_file(setup_tempdir, random_invalid_datetime_rows()) + + +@pytest.fixture(scope="function") +def random_invalid_value_csv_file(setup_tempdir: Path) -> Path: + return random_csv_file(setup_tempdir, random_invalid_value_rows()) + + +@pytest.fixture(scope="function") +def random_valid_tsv_file(setup_tempdir: Path) -> Path: + return random_tsv_file(setup_tempdir, random_valid_format_rows()) + + +@pytest.fixture(scope="function") +def random_invalid_datetime_and_value_tsv_file(setup_tempdir: Path) -> Path: + return random_tsv_file(setup_tempdir, random_invalid_datetime_and_value_rows()) + + +@pytest.fixture(scope="function") +def random_invalid_datetime_tsv_file(setup_tempdir: Path) -> Path: + return random_tsv_file(setup_tempdir, random_invalid_datetime_rows()) + + +@pytest.fixture(scope="function") +def random_invalid_value_tsv_file(setup_tempdir: Path) -> Path: + return random_tsv_file(setup_tempdir, random_invalid_value_rows()) + + +@pytest.fixture(scope="function") +def random_valid_ndjson_file(setup_tempdir: Path) -> Path: + return random_ndjson_file(setup_tempdir, random_valid_format_rows()) + + +@pytest.fixture(scope="function") +def random_invalid_datetime_and_value_ndjson_file(setup_tempdir: Path) -> Path: + return random_ndjson_file(setup_tempdir, random_invalid_datetime_and_value_rows()) + + +@pytest.fixture(scope="function") +def random_invalid_datetime_ndjson_file(setup_tempdir: Path) -> Path: + return random_ndjson_file(setup_tempdir, random_invalid_datetime_rows()) + + +@pytest.fixture(scope="function") +def random_invalid_value_ndjson_file(setup_tempdir: Path) -> Path: + return random_ndjson_file(setup_tempdir, random_invalid_value_rows()) diff --git a/consumer/tests/test_deployments/test_script/test_main/test_main_failed_read_stream_raise.py b/consumer/tests/test_deployments/test_script/test_main/test_main_failed_read_stream_raise.py new file mode 100644 index 0000000..7c598c8 --- /dev/null +++ b/consumer/tests/test_deployments/test_script/test_main/test_main_failed_read_stream_raise.py @@ -0,0 +1,48 @@ +from typing import Iterator +from src.deployments.script.main import main +from src.deployments.script.config import RabbitMQConfig +from src.adapters.fetch_filenames_stream.rabbitmq import ( + RabbitMQFetchFilenameStreamClient, +) +import pytest +from pytest import MonkeyPatch, FixtureRequest +import pika + + +@pytest.mark.smoke +@pytest.mark.parametrize( + "fixture_name", + ["random_valid_csv_file"] * 5, +) +def test_main_flow_single_read_stream_failed_raise( + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + fixture_name: str, + request: FixtureRequest, + monkeypatch: MonkeyPatch, +): + random_csv_file: str = request.getfixturevalue(fixture_name) + + conn, queue = raw_rabbitmq_pika_conn_config + channel = conn.channel() + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=queue, + body=random_csv_file, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + def mock_read(self, *args, **kwargs) -> Iterator[str]: + raise IOError("Cannot read stream!") + + monkeypatch.setattr(RabbitMQFetchFilenameStreamClient, "fetch_stream", mock_read) + + monkeypatch.setattr(RabbitMQConfig, "POLLING_TIMEOUT", 1) + + with pytest.raises(IOError, match="^Cannot read stream!$"): + main() + + method_frame, _, body = channel.basic_get(queue=queue) + assert method_frame is not None + assert body.decode() == random_csv_file diff --git a/consumer/tests/test_deployments/test_script/test_main/test_main_read_file_resilience.py b/consumer/tests/test_deployments/test_script/test_main/test_main_read_file_resilience.py new file mode 100644 index 0000000..6883813 --- /dev/null +++ b/consumer/tests/test_deployments/test_script/test_main/test_main_read_file_resilience.py @@ -0,0 +1,87 @@ +from src.deployments.script.main import main +from src.deployments.script.config import RabbitMQConfig +import pytest +from pytest import MonkeyPatch, LogCaptureFixture, FixtureRequest +import pika +import psycopg2 +import csv +from datetime import datetime +from decimal import Decimal + + +@pytest.mark.parametrize( + "fixture_name", + ["random_valid_csv_file"] * 5, +) +def test_main_flow_single_cannot_read_file_throw_error( + raw_postgres_psycopg2_conn_config: psycopg2.extensions.connection, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + fixture_name: str, + request: FixtureRequest, + monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, +): + random_csv_file: str = request.getfixturevalue(fixture_name) + + conn, queue = raw_rabbitmq_pika_conn_config + channel = conn.channel() + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=queue, + body=random_csv_file, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + monkeypatch.setattr(RabbitMQConfig, "POLLING_TIMEOUT", 1) + + counter = 0 + + def mock_open(*args, **kwargs): + nonlocal counter + counter += 1 + if counter == 1: + monkeypatch.undo() + raise OSError("Cannot read file!") + + monkeypatch.setattr("builtins.open", mock_open) + + with caplog.at_level("INFO"): + main() + assert "Cannot read file!" in caplog.text + assert f"Failed to upsert {random_csv_file}!" in caplog.text + assert f"Successfully upserted {random_csv_file}!" in caplog.text + + with open(random_csv_file, "r") as f: + reader = csv.reader(f) + for row in reader: + record_time, sensor_id, value = row + + record_time_dt = datetime.fromisoformat(record_time) + value_dec = Decimal(value) + + with raw_postgres_psycopg2_conn_config.cursor() as cursor: + cursor.execute( + """ + SELECT record_time, sensor_id, value + FROM records + WHERE record_time = %s AND sensor_id = %s; + """, + (record_time_dt, sensor_id), + ) + + ( + fetched_record_time, + fetched_sensor_id, + fetched_value, + ) = cursor.fetchone() + + assert fetched_record_time == record_time_dt + assert fetched_sensor_id == sensor_id + assert pytest.approx(value_dec) == fetched_value + + method_frame, header_frame, body = channel.basic_get(queue=queue) + assert method_frame is None + assert header_frame is None + assert body is None diff --git a/consumer/tests/test_deployments/test_script/test_main/test_main_successful.py b/consumer/tests/test_deployments/test_script/test_main/test_main_successful.py new file mode 100644 index 0000000..e140c9b --- /dev/null +++ b/consumer/tests/test_deployments/test_script/test_main/test_main_successful.py @@ -0,0 +1,279 @@ +from src.deployments.script.main import main +from src.deployments.script.config import RabbitMQConfig, PostgresConfig +import pytest +from pytest import MonkeyPatch, LogCaptureFixture, FixtureRequest +import pika +import psycopg2 +import csv +from datetime import datetime +from decimal import Decimal + + +@pytest.mark.parametrize( + "fixture_name", + ["random_valid_csv_file"] * 5, +) +def test_main_flow_single_no_failed_files( + raw_postgres_psycopg2_conn_config: psycopg2.extensions.connection, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + fixture_name: str, + request: FixtureRequest, + monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, +): + random_csv_file: str = request.getfixturevalue(fixture_name) + + conn, queue = raw_rabbitmq_pika_conn_config + channel = conn.channel() + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=queue, + body=random_csv_file, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + monkeypatch.setattr(RabbitMQConfig, "POLLING_TIMEOUT", 1) + + with caplog.at_level("INFO"): + main() + assert f"Successfully upserted {random_csv_file}!" in caplog.text + + with open(random_csv_file, "r") as f: + reader = csv.reader(f) + for row in reader: + record_time, sensor_id, value = row + + record_time_dt = datetime.fromisoformat(record_time) + value_dec = Decimal(value) + + with raw_postgres_psycopg2_conn_config.cursor() as cursor: + cursor.execute( + """ + SELECT record_time, sensor_id, value + FROM records + WHERE record_time = %s AND sensor_id = %s; + """, + (record_time_dt, sensor_id), + ) + + ( + fetched_record_time, + fetched_sensor_id, + fetched_value, + ) = cursor.fetchone() + + assert fetched_record_time == record_time_dt + assert fetched_sensor_id == sensor_id + assert pytest.approx(value_dec) == fetched_value + + method_frame, header_frame, body = channel.basic_get(queue=queue) + assert method_frame is None + assert header_frame is None + assert body is None + + +@pytest.mark.parametrize( + "fixture_names", + [tuple(["random_valid_csv_file"] * 5) for _ in range(5)], +) +def test_main_flow_batch_no_failed_files( + raw_postgres_psycopg2_conn_config: psycopg2.extensions.connection, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + fixture_names: tuple[str, ...], + request: FixtureRequest, + monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, +): + random_csv_files: list[str] = [ + request.getfixturevalue(fixture_name) for fixture_name in fixture_names + ] + + conn, queue = raw_rabbitmq_pika_conn_config + channel = conn.channel() + channel.queue_declare(queue=queue, durable=True) + + for random_csv_file in random_csv_files: + channel.basic_publish( + exchange="", + routing_key=queue, + body=random_csv_file, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + monkeypatch.setattr(RabbitMQConfig, "POLLING_TIMEOUT", 1) + + with caplog.at_level("INFO"): + main() + assert f"Successfully upserted {random_csv_file}!" in caplog.text + + for random_csv_file in random_csv_files: + with open(random_csv_file, "r") as f: + reader = csv.reader(f) + for row in reader: + record_time, sensor_id, value = row + + record_time_dt = datetime.fromisoformat(record_time) + value_dec = Decimal(value) + + with raw_postgres_psycopg2_conn_config.cursor() as cursor: + cursor.execute( + """ + SELECT record_time, sensor_id, value + FROM records + WHERE record_time = %s AND sensor_id = %s; + """, + (record_time_dt, sensor_id), + ) + + ( + fetched_record_time, + fetched_sensor_id, + fetched_value, + ) = cursor.fetchone() + + assert fetched_record_time == record_time_dt + assert fetched_sensor_id == sensor_id + assert pytest.approx(value_dec) == fetched_value + + method_frame, header_frame, body = channel.basic_get(queue=queue) + assert method_frame is None + assert header_frame is None + assert body is None + + +@pytest.mark.parametrize( + "fixture_name", + ["random_valid_csv_file"] * 5, +) +def test_main_flow_single_in_batch_no_failed_files( + raw_postgres_psycopg2_conn_config: psycopg2.extensions.connection, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + fixture_name: str, + request: FixtureRequest, + monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, +): + random_csv_file: str = request.getfixturevalue(fixture_name) + + conn, queue = raw_rabbitmq_pika_conn_config + channel = conn.channel() + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=queue, + body=random_csv_file, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + monkeypatch.setattr(RabbitMQConfig, "POLLING_TIMEOUT", 1) + + monkeypatch.setattr(PostgresConfig, "BATCH_UPSERT_SIZE", 3) + + with caplog.at_level("INFO"): + main() + assert f"Successfully upserted {random_csv_file}!" in caplog.text + + with open(random_csv_file, "r") as f: + reader = csv.reader(f) + for row in reader: + record_time, sensor_id, value = row + + record_time_dt = datetime.fromisoformat(record_time) + value_dec = Decimal(value) + + with raw_postgres_psycopg2_conn_config.cursor() as cursor: + cursor.execute( + """ + SELECT record_time, sensor_id, value + FROM records + WHERE record_time = %s AND sensor_id = %s; + """, + (record_time_dt, sensor_id), + ) + + result = cursor.fetchone() + + fetched_record_time, fetched_sensor_id, fetched_value = result + + assert fetched_record_time == record_time_dt + assert fetched_sensor_id == sensor_id + assert pytest.approx(value_dec) == fetched_value + + method_frame, header_frame, body = channel.basic_get(queue=queue) + assert method_frame is None + assert header_frame is None + assert body is None + + +@pytest.mark.parametrize( + "fixture_names", + [tuple(["random_valid_csv_file"] * 5) for _ in range(5)], +) +def test_main_flow_batch_in_batch_no_failed_files( + raw_postgres_psycopg2_conn_config: psycopg2.extensions.connection, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + fixture_names: tuple[str, ...], + request: FixtureRequest, + monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, +): + random_csv_files: list[str] = [ + request.getfixturevalue(fixture_name) for fixture_name in fixture_names + ] + + conn, queue = raw_rabbitmq_pika_conn_config + channel = conn.channel() + channel.queue_declare(queue=queue, durable=True) + + for random_csv_file in random_csv_files: + channel.basic_publish( + exchange="", + routing_key=queue, + body=random_csv_file, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + monkeypatch.setattr(RabbitMQConfig, "POLLING_TIMEOUT", 1) + + monkeypatch.setattr(PostgresConfig, "BATCH_UPSERT_SIZE", 2) + + with caplog.at_level("INFO"): + main() + assert f"Successfully upserted {random_csv_file}!" in caplog.text + + for random_csv_file in random_csv_files: + with open(random_csv_file, "r") as f: + reader = csv.reader(f) + for row in reader: + record_time, sensor_id, value = row + + record_time_dt = datetime.fromisoformat(record_time) + value_dec = Decimal(value) + + with raw_postgres_psycopg2_conn_config.cursor() as cursor: + cursor.execute( + """ + SELECT record_time, sensor_id, value + FROM records + WHERE record_time = %s AND sensor_id = %s; + """, + (record_time_dt, sensor_id), + ) + + ( + fetched_record_time, + fetched_sensor_id, + fetched_value, + ) = cursor.fetchone() + + assert fetched_record_time == record_time_dt + assert fetched_sensor_id == sensor_id + assert pytest.approx(value_dec) == fetched_value + + method_frame, header_frame, body = channel.basic_get(queue=queue) + assert method_frame is None + assert header_frame is None + assert body is None diff --git a/consumer/tests/test_deployments/test_script/test_main/test_main_upsert_record_resilience.py b/consumer/tests/test_deployments/test_script/test_main/test_main_upsert_record_resilience.py new file mode 100644 index 0000000..5fab317 --- /dev/null +++ b/consumer/tests/test_deployments/test_script/test_main/test_main_upsert_record_resilience.py @@ -0,0 +1,168 @@ +from src.deployments.script.main import main +from src.deployments.script.config import RabbitMQConfig +import pytest +from pytest import MonkeyPatch, LogCaptureFixture, FixtureRequest +from src.adapters.upsert_iot_records.postgres import PostgresUpsertIOTRecordsClient +import pika +import psycopg2 +import csv +from datetime import datetime +from decimal import Decimal + + +@pytest.mark.parametrize( + "fixture_name", + ["random_valid_csv_file"] * 5, +) +def test_main_flow_single_upsert_record_failed_resilience( + raw_postgres_psycopg2_conn_config: psycopg2.extensions.connection, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + fixture_name: str, + request: FixtureRequest, + monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, +): + random_csv_file: str = request.getfixturevalue(fixture_name) + + conn, queue = raw_rabbitmq_pika_conn_config + channel = conn.channel() + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=queue, + body=random_csv_file, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + monkeypatch.setattr(RabbitMQConfig, "POLLING_TIMEOUT", 1) + + counter = 0 + + def mock_upsert(*args, **kwargs): + nonlocal counter + counter += 1 + if counter == 1: + monkeypatch.undo() + raise Exception("Failed to upsert!") + + monkeypatch.setattr( + "src.deployments.script.main._upsert_iot_records_buffer", mock_upsert + ) + + with caplog.at_level("INFO"): + main() + assert "Failed to upsert!" in caplog.text + assert f"Failed to upsert {random_csv_file}!" in caplog.text + assert f"Successfully upserted {random_csv_file}!" in caplog.text + + with open(random_csv_file, "r") as f: + reader = csv.reader(f) + for row in reader: + record_time, sensor_id, value = row + + record_time_dt = datetime.fromisoformat(record_time) + value_dec = Decimal(value) + + with raw_postgres_psycopg2_conn_config.cursor() as cursor: + cursor.execute( + """ + SELECT record_time, sensor_id, value + FROM records + WHERE record_time = %s AND sensor_id = %s; + """, + (record_time_dt, sensor_id), + ) + + ( + fetched_record_time, + fetched_sensor_id, + fetched_value, + ) = cursor.fetchone() + + assert fetched_record_time == record_time_dt + assert fetched_sensor_id == sensor_id + assert pytest.approx(value_dec) == fetched_value + + method_frame, header_frame, body = channel.basic_get(queue=queue) + assert method_frame is None + assert header_frame is None + assert body is None + + +@pytest.mark.parametrize( + "fixture_name", + ["random_valid_csv_file"] * 5, +) +def test_main_flow_single_upsert_step_failed_resilience( + raw_postgres_psycopg2_conn_config: psycopg2.extensions.connection, + raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str], + fixture_name: str, + request: FixtureRequest, + monkeypatch: MonkeyPatch, + caplog: LogCaptureFixture, +): + random_csv_file: str = request.getfixturevalue(fixture_name) + + conn, queue = raw_rabbitmq_pika_conn_config + channel = conn.channel() + channel.queue_declare(queue=queue, durable=True) + + channel.basic_publish( + exchange="", + routing_key=queue, + body=random_csv_file, + properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent), + ) + + monkeypatch.setattr(RabbitMQConfig, "POLLING_TIMEOUT", 1) + + counter = 0 + + def mock_upsert(self, records) -> list[bool]: + nonlocal counter + counter += 1 + if counter == 1: + monkeypatch.undo() + return [False] * len(records) + + monkeypatch.setattr(PostgresUpsertIOTRecordsClient, "_upsert_batch", mock_upsert) + + with caplog.at_level("INFO"): + main() + assert "Failed to upsert all records!" in caplog.text + assert f"Failed to upsert {random_csv_file}!" in caplog.text + assert f"Successfully upserted {random_csv_file}!" in caplog.text + + with open(random_csv_file, "r") as f: + reader = csv.reader(f) + for row in reader: + record_time, sensor_id, value = row + + record_time_dt = datetime.fromisoformat(record_time) + value_dec = Decimal(value) + + with raw_postgres_psycopg2_conn_config.cursor() as cursor: + cursor.execute( + """ + SELECT record_time, sensor_id, value + FROM records + WHERE record_time = %s AND sensor_id = %s; + """, + (record_time_dt, sensor_id), + ) + + ( + fetched_record_time, + fetched_sensor_id, + fetched_value, + ) = cursor.fetchone() + + assert fetched_record_time == record_time_dt + assert fetched_sensor_id == sensor_id + assert pytest.approx(value_dec) == fetched_value + + method_frame, header_frame, body = channel.basic_get(queue=queue) + assert method_frame is None + assert header_frame is None + assert body is None diff --git a/consumer/tests/test_deployments/test_script/test_main/utils.py b/consumer/tests/test_deployments/test_script/test_main/utils.py new file mode 100644 index 0000000..56fce36 --- /dev/null +++ b/consumer/tests/test_deployments/test_script/test_main/utils.py @@ -0,0 +1,134 @@ +import random +import string +from pathlib import Path +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo +import zoneinfo +import random +import json +from decimal import Decimal +import csv + + +def random_valid_format_rows() -> list[tuple[str, ...]]: + rows = [] + for _ in range(10): + random_timezone = random.choice(list(zoneinfo.available_timezones())) + random_time_delta = timedelta( + hours=random.randint(0, 24), + minutes=random.randint(0, 60), + seconds=random.randint(0, 60), + ) + random_datetime = datetime.now(tz=ZoneInfo(random_timezone)) - random_time_delta + random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) + random_value = Decimal(random.random() * 100) + rows.append((random_datetime.isoformat(), random_sensor_id, str(random_value))) + return rows + + +def random_invalid_datetime_rows() -> list[tuple[str, ...]]: + rows = [] + all_datetime_formats = [ + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%dT%H:%M%z", + "%Y-%m-%d %H:%M:%S%z", + "%Y-%m-%d %H:%M%z", + ] + for _ in range(10): + random_timezone = random.choice(list(zoneinfo.available_timezones())) + random_time_delta = timedelta( + hours=random.randint(0, 24), + minutes=random.randint(0, 60), + seconds=random.randint(0, 60), + ) + random_datetime = datetime.now(tz=ZoneInfo(random_timezone)) - random_time_delta + random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) + random_value = Decimal(random.random() * 100) + random_datetime_format = random.choice(all_datetime_formats) + rows.append( + ( + random_datetime.strftime(random_datetime_format), + random_sensor_id, + str(random_value), + ) + ) + return rows + + +def random_invalid_value_rows() -> list[tuple[str, ...]]: + rows = [] + for _ in range(10): + random_timezone = random.choice(list(zoneinfo.available_timezones())) + random_time_delta = timedelta( + hours=random.randint(0, 24), + minutes=random.randint(0, 60), + seconds=random.randint(0, 60), + ) + random_datetime = datetime.now(tz=ZoneInfo(random_timezone)) - random_time_delta + random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) + random_value = "".join(random.choices(string.ascii_letters, k=10)) + rows.append( + ( + random_datetime.isoformat(), + random_sensor_id, + random_value, + ) + ) + return rows + + +def random_invalid_datetime_and_value_rows() -> list[tuple[str, ...]]: + rows = [] + all_datetime_formats = [ + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%dT%H:%M%z", + "%Y-%m-%d %H:%M:%S%z", + "%Y-%m-%d %H:%M%z", + ] + for _ in range(10): + random_timezone = random.choice(list(zoneinfo.available_timezones())) + random_time_delta = timedelta( + hours=random.randint(0, 24), + minutes=random.randint(0, 60), + seconds=random.randint(0, 60), + ) + random_datetime = datetime.now(tz=ZoneInfo(random_timezone)) - random_time_delta + random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) + random_value = "".join(random.choices(string.ascii_letters, k=10)) + random_datetime_format = random.choice(all_datetime_formats) + rows.append( + ( + random_datetime.strftime(random_datetime_format), + random_sensor_id, + str(random_value), + ) + ) + return rows + + +def random_csv_file(base_dir: Path, rows: list[tuple[str, ...]]) -> str: + filename = "".join(random.choices(string.ascii_letters, k=10)) + ".csv" + filepath = base_dir.joinpath(filename) + with open(filepath, "w") as csvfile: + writer = csv.writer(csvfile, delimiter=",") + writer.writerows(rows) + return str(filepath) + + +def random_tsv_file(base_dir: Path, rows: list[tuple[str, ...]]) -> str: + filename = "".join(random.choices(string.ascii_letters, k=10)) + ".tsv" + filepath = base_dir.joinpath(filename) + with open(filepath, "w") as csvfile: + writer = csv.writer(csvfile, delimiter="\t") + writer.writerows(rows) + return str(filepath) + + +def random_ndjson_file(base_dir: Path, rows: list[tuple[str, ...]]) -> str: + filename = "".join(random.choices(string.ascii_letters, k=10)) + ".tsv" + filepath = base_dir.joinpath(filename) + with open(filepath, "w") as csvfile: + for row in rows: + json.dump(row, csvfile) + csvfile.write("\n") + return str(filepath) diff --git a/docker-compose.yml b/docker-compose.yml index fd4a64f..b19bfce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -75,3 +75,6 @@ services: CSV_PARSER_RECOGNIZED_DATETIME_FORMATS: ${CSV_PARSER_RECOGNIZED_DATETIME_FORMATS} CSV_PARSER_DELIMITER: ${CSV_PARSER_DELIMITER} CSV_PARSER_FILE_EXTENSION: ${CSV_PARSER_FILE_EXTENSION} + deploy: + mode: replicated + replicas: ${CONSUMER_REPLICAS} diff --git a/test_generator.py b/test_generator.py new file mode 100644 index 0000000..58b64be --- /dev/null +++ b/test_generator.py @@ -0,0 +1,7 @@ +import argparse +import csv +import datetime + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser