Skip to content

Commit

Permalink
Make ImapQueue more robust against connection loss
Browse files Browse the repository at this point in the history
This will reconnect on every polling cycle as this was easier to
implement than only reconnecting on a lost connection. However, for my
use case there is only a low volume of incoming messages, so that it
makes sense to have a long polling interval in which case individual
connections to poll might be even better than a persistent connection.

Due to limitations in the aioimaplib [1], a timeout in one of the
commands will leave a cancelled, but pending task around. This also,
produces warnings in the tests that cannot be silenced.

[1]: iroco-co/aioimaplib#58
  • Loading branch information
jgosmann committed Dec 30, 2020
1 parent e3343be commit bf02e9b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 22 deletions.
49 changes: 30 additions & 19 deletions dmarc_metrics_exporter/imap_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import email.policy
import logging
import re
from asyncio.tasks import Task
from dataclasses import astuple, dataclass
Expand All @@ -9,6 +10,8 @@

from aioimaplib import aioimaplib

logger = logging.getLogger(__name__)


@dataclass
class ConnectionConfig:
Expand All @@ -31,43 +34,51 @@ def __init__(
*,
connection: ConnectionConfig,
folders: QueueFolders = QueueFolders(),
poll_interval_seconds=60,
poll_interval_seconds: int = 60,
timeout_seconds: int = 60,
):
self.connection = connection
self.folders = folders
self.poll_interval_seconds = poll_interval_seconds
self.timeout_seconds = timeout_seconds
self._stop = False
self._consumer: Optional[Task[Any]] = None

def consume(self, handler: Callable[[Any], Awaitable[None]]):
self._consumer = asyncio.create_task(self._consume(handler))

async def _consume(self, handler: Callable[[Any], Awaitable[None]]):
async with ImapClient(self.connection) as client:
for folder in astuple(self.folders):
await client.create_if_not_exists(folder)

while not self._stop:
msg_count = await client.select(self.folders.inbox)
if msg_count > 0:
async for uid, msg in client.fetch(1, msg_count):
try:
await handler(msg)
except Exception: # pylint: disable=broad-except
await client.uid_move(uid, self.folders.error)
else:
await client.uid_move(uid, self.folders.done)
else:
await asyncio.sleep(self.poll_interval_seconds)
while not self._stop:
try:
async with ImapClient(self.connection, self.timeout_seconds) as client:
for folder in astuple(self.folders):
await client.create_if_not_exists(folder)

msg_count = await client.select(self.folders.inbox)
if msg_count > 0:
async for uid, msg in client.fetch(1, msg_count):
try:
await handler(msg)
except Exception: # pylint: disable=broad-except
logger.exception(
"Handler for message in IMAP queue failed."
)
await client.uid_move(uid, self.folders.error)
else:
await client.uid_move(uid, self.folders.done)
except (asyncio.TimeoutError, Exception): # pylint: disable=broad-except
logger.exception("Error during IMAP queue polling.")
await asyncio.sleep(self.poll_interval_seconds)

async def stop_consumer(self):
self._stop = True
await self._consumer


class ImapClient:
def __init__(self, connection: ConnectionConfig):
def __init__(self, connection: ConnectionConfig, timeout_seconds: int = 10):
self.connection = connection
self.timeout_seconds = timeout_seconds
self._client = aioimaplib.IMAP4_SSL(
host=self.connection.host, port=self.connection.port
)
Expand All @@ -84,7 +95,7 @@ async def __aexit__(self, exc_type, exc, traceback):
await self._check("LOGOUT", self._client.logout())

async def _check(self, command: str, awaitable: Awaitable[Tuple[str, Any]]) -> Any:
res, data = await awaitable
res, data = await asyncio.wait_for(awaitable, self.timeout_seconds)
if res != "OK":
raise ImportError(command, res, data)
return data
Expand Down
13 changes: 11 additions & 2 deletions dmarc_metrics_exporter/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import smtplib
import time
from contextlib import contextmanager
from dataclasses import astuple, dataclass
from email.message import EmailMessage
from typing import Any, Awaitable, Callable, Union
from typing import Any, Awaitable, Callable, Generator, Union

import docker.models
import pytest
Expand Down Expand Up @@ -31,7 +32,15 @@ def fixture_docker_client() -> docker.DockerClient:
@pytest.fixture(name="greenmail")
def fixture_greenmail(
docker_client: docker.DockerClient,
) -> docker.models.containers.Container:
) -> Generator[Greenmail, None, None]:
with run_greenmail(docker_client) as greenmail:
yield greenmail


@contextmanager
def run_greenmail(
docker_client: docker.DockerClient,
) -> Generator[Greenmail, None, None]:
container = docker_client.containers.run(
"greenmail/standalone:1.6.0",
detach=True,
Expand Down
6 changes: 6 additions & 0 deletions dmarc_metrics_exporter/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ async def __aexit__(self, exc_type, exc, traceback):
pass


async def async_noop():
pass


@pytest.mark.asyncio
async def test_loads_persisted_metrics_and_stores_them_on_shutdown():
metrics = DmarcMetricsCollection()
Expand All @@ -30,6 +34,7 @@ async def test_loads_persisted_metrics_and_stores_them_on_shutdown():
metrics_persister = MagicMock()
metrics_persister.load.return_value = metrics
imap_queue = MagicMock()
imap_queue.stop_consumer.return_value = async_noop()
app = App(
prometheus_addr=("127.0.0.1", 9119),
imap_queue=imap_queue,
Expand Down Expand Up @@ -62,6 +67,7 @@ async def test_metrics_autosave():
metrics_persister = MagicMock()
metrics_persister.load.return_value = metrics
imap_queue = MagicMock()
imap_queue.stop_consumer.return_value = async_noop()
app = App(
prometheus_addr=("127.0.0.1", 9119),
imap_queue=imap_queue,
Expand Down
33 changes: 33 additions & 0 deletions dmarc_metrics_exporter/tests/test_imap_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dmarc_metrics_exporter.imap_queue import ImapClient, ImapQueue

from .conftest import (
run_greenmail,
send_email,
try_until_success,
verify_email_delivered,
Expand Down Expand Up @@ -114,3 +115,35 @@ async def handler(_queue_msg: EmailMessage, is_done=is_done):
async with ImapClient(greenmail.imap) as client:
assert await client.select() == 0
assert await client.select(queue.folders.error) == 1


@pytest.mark.asyncio
async def test_reconnects_if_imap_connection_is_lost(docker_client):
is_done = asyncio.Event()

async def handler(queue_msg: EmailMessage, is_done=is_done):
is_done.set()
assert_emails_equal(queue_msg, msg)

queue = None
try:
with run_greenmail(docker_client) as greenmail:
await try_until_success(lambda: verify_imap_available(greenmail.imap))
queue = ImapQueue(
connection=greenmail.imap,
poll_interval_seconds=0.1,
timeout_seconds=0.5,
)
queue.consume(handler)
msg = create_dummy_email(greenmail.imap.username)
await try_until_success(lambda: send_email(msg, greenmail.smtp))
await asyncio.wait_for(is_done.wait(), 10)

is_done.clear()
with run_greenmail(docker_client) as greenmail:
msg = create_dummy_email(greenmail.imap.username)
await try_until_success(lambda: send_email(msg, greenmail.smtp))
await asyncio.wait_for(is_done.wait(), 10)
finally:
if queue is not None:
await queue.stop_consumer()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dmarc-metrics-exporter"
version = "0.1.0"
version = "0.2.0"
description = "Export Prometheus metrics from DMARC reports."
authors = ["Jan Gosmann <[email protected]>"]
license = "MIT"
Expand Down

0 comments on commit bf02e9b

Please sign in to comment.