Skip to content

Commit

Permalink
Added tests for rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-au-922 committed Dec 2, 2023
1 parent 4ec9480 commit 53e4a08
Show file tree
Hide file tree
Showing 24 changed files with 512 additions and 63 deletions.
Binary file added .coverage_consumer
Binary file not shown.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ RABBITMQ_USERNAME=rabbitmq
RABBITMQ_PASSWORD=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_WEBAPP_PORT=15672
RABBITMQ_POLLING_TIMEOUT=60

QUEUE_NAME=filenames

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ test_consumer:
export RABBITMQ_USERNAME=$(RABBITMQ_USERNAME) && \
export RABBITMQ_PASSWORD=$(RABBITMQ_PASSWORD) && \
export QUEUE_NAME=$(QUEUE_NAME) && \
COVERAGE_FILE=.coverage_consumer coverage run -m pytest -vxs consumer/tests
COVERAGE_FILE=.coverage_consumer coverage run -m pytest -vxs consumer/tests/test_adapters/test_fetch_filenames/test_rabbitmq
coverage_report:
coverage combine .coverage_producer .coverage_consumer && \
coverage report -m --omit="*/tests/*"
Expand Down
51 changes: 31 additions & 20 deletions consumer/src/adapters/fetch_filenames/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from contextlib import contextmanager
from datetime import datetime
import time
from ...usecases import FetchFilenameClient
import pika
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic, BasicProperties
from pika.connection import Connection
from typing import Iterator, Optional
from typing import Generator, Iterator, Optional
from typing_extensions import override
from collections.abc import Callable
import logging
Expand All @@ -25,6 +27,7 @@ def __init__(
self._queue = queue
self._conn: Optional[Connection] = None
self._polling_timeout = polling_timeout
self._last_poll_time: Optional[datetime] = None

def _reset_conn(self) -> None:
self._conn = None
Expand All @@ -42,36 +45,44 @@ def _get_amqp_conn(self) -> Iterator[pika.BaseConnection]:
self._conn = pika.BlockingConnection(conn_parameters)
yield self._conn

def _wait(self) -> None:
time.sleep(0.5)

@override
def fetch(self) -> Iterator[str]:
def fetch(self) -> Generator[str, None, None]:
while True:
try:
method: Optional[Basic.Deliver] = None
with self._get_amqp_conn() as connection:
channel: BlockingChannel = connection.channel()
channel.queue_declare(queue=self._queue, durable=True)

method: Optional[Basic.Deliver]
properties: Optional[BasicProperties]
body: Optional[bytes]
for method, properties, body in channel.consume(
queue=self._queue, inactivity_timeout=self._polling_timeout
):
if method == None and properties == None and body == None:
raise StopIteration
try:
yield body.decode("utf-8")
channel.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logging.exception(e)
channel.basic_nack(delivery_tag=method.delivery_tag)
raise e
except StopIteration:
logging.info("No more filenames to fetch")
break

method, properties, body = channel.basic_get(
queue=self._queue, auto_ack=False
)

if method is None and properties is None and body is None:
if self._last_poll_time is None:
self._last_poll_time = datetime.now()
if (
datetime.now() - self._last_poll_time
).total_seconds() > self._polling_timeout:
break
self._wait()
continue

self._last_poll_time = None

yield body.decode()

channel.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logging.exception(e)
if method is not None:
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
self._reset_conn()
raise e

@override
def close(self) -> bool:
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RabbitMQConfig:
USERNAME = os.getenv("RABBITMQ_USERNAME", "guest")
PASSWORD = os.getenv("RABBITMQ_PASSWORD", "guest")
QUEUE = os.getenv("RABBITMQ_QUEUE", "filenames")
POLLING_TIMEOUT = int(os.getenv("RABBITMQ_POLLING_TIMEOUT", 600))
POLLING_TIMEOUT = int(os.getenv("RABBITMQ_POLLING_TIMEOUT", 10))


class PostgresConfig:
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions consumer/src/usecases/fetch_filenames.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from abc import ABC, abstractmethod
from typing import Iterator
from typing import Generator


class FetchFilenameClient(ABC):
@abstractmethod
def fetch(self) -> Iterator[str]:
def fetch(self) -> Generator[str, None, None]:
...

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient
from src.deployments.scripts.config import RabbitMQConfig
from src.deployments.script.config import RabbitMQConfig
import pika
import pytest
from pytest import MonkeyPatch


@pytest.fixture(scope="function")
def rabbitmq_fetch_filenames_client() -> RabbitMQFetchFilenamesClient:
def rabbitmq_fetch_filenames_client() -> RabbitMQConfig:
return RabbitMQFetchFilenamesClient(
host=RabbitMQConfig.HOST,
port=RabbitMQConfig.PORT,
Expand All @@ -16,6 +16,17 @@ def rabbitmq_fetch_filenames_client() -> RabbitMQFetchFilenamesClient:
)


@pytest.fixture(scope="function")
def rabbitmq_fetch_filenames_no_wait_client() -> RabbitMQConfig:
return RabbitMQFetchFilenamesClient(
host=RabbitMQConfig.HOST,
port=RabbitMQConfig.PORT,
credentials_service=lambda: (RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD),
queue=RabbitMQConfig.QUEUE,
polling_timeout=0,
)


@pytest.fixture(scope="function")
def raw_rabbitmq_pika_conn_config() -> tuple[pika.BaseConnection, str]:
pika_conn = pika.BlockingConnection(
Expand All @@ -28,3 +39,16 @@ def raw_rabbitmq_pika_conn_config() -> tuple[pika.BaseConnection, str]:
)
)
return pika_conn, RabbitMQConfig.QUEUE


@pytest.fixture(scope="function", autouse=True)
def setup_teardown_rabbitmq_queue(
raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str],
) -> None:
pika_conn, queue = raw_rabbitmq_pika_conn_config

channel = pika_conn.channel()
channel.queue_declare(queue=queue, durable=True)
channel.queue_purge(queue=queue)
yield
channel.queue_purge(queue=queue)
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from pytest import MonkeyPatch, LogCaptureFixture
import pika
from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient
from .utils import random_csv_filenames


def test_close_conn_failed(
rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient,
raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str],
monkeypatch: MonkeyPatch,
caplog: LogCaptureFixture,
):
conn, _ = raw_rabbitmq_pika_conn_config

channel = conn.channel()

channel.queue_declare(
queue=rabbitmq_fetch_filenames_no_wait_client._queue, durable=True
)

channel.basic_publish(
exchange="",
routing_key=rabbitmq_fetch_filenames_no_wait_client._queue,
body=random_csv_filenames()[0],
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)

for filename in rabbitmq_fetch_filenames_no_wait_client.fetch():
assert filename is not None

assert rabbitmq_fetch_filenames_no_wait_client._conn is not None

def mock_failed_close(
self,
*args,
**kwargs,
) -> None:
raise Exception("Failed to close!")

monkeypatch.setattr(pika.BlockingConnection, "close", mock_failed_close)

with caplog.at_level("ERROR"):
assert not rabbitmq_fetch_filenames_no_wait_client.close()
assert "Failed to close!" in caplog.text
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient


def test_close_conn_successful(
rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient,
):
for _ in rabbitmq_fetch_filenames_no_wait_client.fetch():
pass
assert rabbitmq_fetch_filenames_no_wait_client._conn is not None
assert rabbitmq_fetch_filenames_no_wait_client.close()


def test_none_conn_close_successful(
rabbitmq_fetch_filenames_client: RabbitMQFetchFilenamesClient,
):
assert rabbitmq_fetch_filenames_client.close()
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import pytest
from .utils import random_csv_filenames
from src.adapters.fetch_filenames.rabbitmq import RabbitMQFetchFilenamesClient
from src.deployments.script.config import RabbitMQConfig
import pika
from pytest import MonkeyPatch


@pytest.mark.smoke
def test_fetch_failed_conn(
rabbitmq_fetch_filenames_client: RabbitMQFetchFilenamesClient,
monkeypatch: MonkeyPatch,
):
def mocked_failed_conn(
self,
*args,
**kwargs,
) -> None:
raise Exception("Failed to connect")

monkeypatch.setattr(pika.BlockingConnection, "__init__", mocked_failed_conn)

monkeypatch.setattr(RabbitMQFetchFilenamesClient, "_reset_conn", mocked_failed_conn)

with pytest.raises(Exception, match="^Failed to connect$"):
next(rabbitmq_fetch_filenames_client.fetch())

monkeypatch.undo()
monkeypatch.undo()


@pytest.mark.smoke
def test_fetch_wrong_credentials(
monkeypatch: MonkeyPatch,
):
rabbitmq_fetch_filenames_client = RabbitMQFetchFilenamesClient(
host=RabbitMQConfig.HOST,
port=RabbitMQConfig.PORT,
credentials_service=lambda: ("wrong", "wrong"),
queue=RabbitMQConfig.QUEUE,
polling_timeout=RabbitMQConfig.POLLING_TIMEOUT,
)

def mocked_failed_conn(
self,
*args,
**kwargs,
) -> None:
raise Exception("Failed to connect")

monkeypatch.setattr(RabbitMQFetchFilenamesClient, "_reset_conn", mocked_failed_conn)

with pytest.raises(Exception, match="^Failed to connect$"):
next(rabbitmq_fetch_filenames_client.fetch())

monkeypatch.undo()


@pytest.mark.slow
@pytest.mark.smoke
def test_publish_single_wrong_host(
monkeypatch: MonkeyPatch,
):
rabbitmq_fetch_filenames_client = RabbitMQFetchFilenamesClient(
host="wrong",
port=RabbitMQConfig.PORT,
credentials_service=lambda: (RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD),
queue=RabbitMQConfig.QUEUE,
polling_timeout=RabbitMQConfig.POLLING_TIMEOUT,
)

def mocked_failed_conn(
self,
*args,
**kwargs,
) -> None:
raise Exception("Failed to connect")

monkeypatch.setattr(RabbitMQFetchFilenamesClient, "_reset_conn", mocked_failed_conn)

with pytest.raises(Exception, match="^Failed to connect$") as e:
next(rabbitmq_fetch_filenames_client.fetch())

monkeypatch.undo()


@pytest.mark.slow
def test_fetch_failed_conn_reset_conn(
rabbitmq_fetch_filenames_no_wait_client: RabbitMQFetchFilenamesClient,
raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str],
monkeypatch: MonkeyPatch,
):
conn, queue = raw_rabbitmq_pika_conn_config

channel = conn.channel()

channel.queue_declare(queue=queue, durable=True)

first_published_filename = random_csv_filenames()[0]
second_published_filename = random_csv_filenames()[1]

channel.basic_publish(
exchange="",
routing_key=rabbitmq_fetch_filenames_no_wait_client._queue,
body=first_published_filename,
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)

for i, filename in enumerate(rabbitmq_fetch_filenames_no_wait_client.fetch()):
if i == 0:
assert rabbitmq_fetch_filenames_no_wait_client._conn is not None
conn = rabbitmq_fetch_filenames_no_wait_client._conn

assert filename == first_published_filename
channel.basic_publish(
exchange="",
routing_key=rabbitmq_fetch_filenames_no_wait_client._queue,
body=second_published_filename,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
),
)

counter = 0

def mock_failed_fetch(
self,
*args,
**kwargs,
) -> None:
nonlocal counter

if counter == 0:
counter += 1
monkeypatch.undo()
raise Exception("Failed to fetch!")

monkeypatch.setattr(pika.channel.Channel, "basic_get", mock_failed_fetch)
if i == 1:
assert filename == second_published_filename
assert rabbitmq_fetch_filenames_no_wait_client._conn is not None
assert rabbitmq_fetch_filenames_no_wait_client._conn != conn
Loading

0 comments on commit 53e4a08

Please sign in to comment.