Skip to content

Commit

Permalink
changing the names for better consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-au-922 committed Dec 3, 2023
1 parent b4d0c70 commit 08d3cd5
Show file tree
Hide file tree
Showing 20 changed files with 884 additions and 45 deletions.
3 changes: 2 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RABBITMQ_PORT=5672
RABBITMQ_WEBAPP_PORT=15672
RABBITMQ_POLLING_TIMEOUT=60

QUEUE_NAME=filenames
RABBITMQ_QUEUE_NAME=filenames

AMAZON_LINUX_VERSION_TAG=2023.2.20231113.0

Expand All @@ -35,3 +35,4 @@ CONSUMER_LOG_ROTATION=midnight

CSV_PARSER_RECOGNIZED_DATETIME_FORMATS="%Y-%m-%dT%H:%M:%S.%f%z"
CSV_PARSER_DELIMITER=","
CSV_PARSER_FILE_EXTENSION=.csv
23 changes: 19 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ jobs:
load-dotenv:
runs-on: ubuntu-latest
outputs:
target-file-dir: ${{ steps.load-dotenv.outputs.TARGET_FILE_DIR }}
target-file-extension: ${{ steps.load-dotenv.outputs.TARGET_FILE_EXTENSION }}
postgres-version-tag: ${{ steps.load-dotenv.outputs.POSTGRES_VERSION_TAG }}
postgres-port: ${{ steps.load-dotenv.outputs.POSTGRES_PORT }}
postgres-username: ${{ steps.load-dotenv.outputs.POSTGRES_USERNAME }}
Expand All @@ -16,7 +18,10 @@ jobs:
rabbitmq-port: ${{ steps.load-dotenv.outputs.RABBITMQ_PORT }}
rabbitmq-username: ${{ steps.load-dotenv.outputs.RABBITMQ_USERNAME }}
rabbitmq-password: ${{ steps.load-dotenv.outputs.RABBITMQ_PASSWORD }}
queue-name: ${{ steps.load-dotenv.outputs.QUEUE_NAME }}
rabbitmq-queue-name: ${{ steps.load-dotenv.outputs.RABBITMQ_QUEUE_NAME }}
csv-parser-recognized-datetime-formats: ${{ steps.load-dotenv.outputs.CSV_PARSER_RECOGNIZED_DATETIME_FORMATS }}
csv-parser-delimiter: ${{ steps.load-dotenv.outputs.CSV_PARSER_DELIMITER }}
csv-parser-file-extension: ${{ steps.load-dotenv.outputs.CSV_PARSER_FILE_EXTENSION }}
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -26,6 +31,8 @@ jobs:
set -o allexport
source .env
set +o allexport
echo "TARGET_FILE_DIR=$TARGET_FILE_DIR" >> $GITHUB_OUTPUT
echo "TARGET_FILE_EXTENSION=$TARGET_FILE_EXTENSION" >> $GITHUB_OUTPUT
echo "POSTGRES_VERSION_TAG=$POSTGRES_VERSION_TAG" >> $GITHUB_OUTPUT
echo "POSTGRES_PORT=$POSTGRES_PORT" >> $GITHUB_OUTPUT
echo "POSTGRES_USERNAME=$POSTGRES_USERNAME" >> $GITHUB_OUTPUT
Expand All @@ -35,7 +42,10 @@ jobs:
echo "RABBITMQ_PORT=$RABBITMQ_PORT" >> $GITHUB_OUTPUT
echo "RABBITMQ_USERNAME=$RABBITMQ_USERNAME" >> $GITHUB_OUTPUT
echo "RABBITMQ_PASSWORD=$RABBITMQ_PASSWORD" >> $GITHUB_OUTPUT
echo "QUEUE_NAME=$QUEUE_NAME" >> $GITHUB_OUTPUT
echo "RABBITMQ_QUEUE_NAME=$RABBITMQ_QUEUE_NAME" >> $GITHUB_OUTPUT
echo "CSV_PARSER_RECOGNIZED_DATETIME_FORMATS=$CSV_PARSER_RECOGNIZED_DATETIME_FORMATS" >> $GITHUB_OUTPUT
echo "CSV_PARSER_DELIMITER=$CSV_PARSER_DELIMITER" >> $GITHUB_OUTPUT
echo "CSV_PARSER_FILE_EXTENSION=$CSV_PARSER_FILE_EXTENSION" >> $GITHUB_OUTPUT
test-producer:
needs: load-dotenv
runs-on: ubuntu-latest
Expand Down Expand Up @@ -90,7 +100,9 @@ jobs:
RABBITMQ_PORT: ${{ needs.load-dotenv.outputs.rabbitmq-port }}
RABBITMQ_USERNAME: ${{ needs.load-dotenv.outputs.rabbitmq-username }}
RABBITMQ_PASSWORD: ${{ needs.load-dotenv.outputs.rabbitmq-password }}
QUEUE_NAME: ${{ needs.load-dotenv.outputs.queue-name }}
RABBITMQ_QUEUE_NAME: ${{ needs.load-dotenv.outputs.rabbitmq-queue-name }}
TARGET_FILE_DIR: ${{ needs.load-dotenv.outputs.target-file-dir }}
TARGET_FILE_EXTENSION: ${{ needs.load-dotenv.outputs.target-file-extension }}
- name: Output coverage file
id: output-coverage-file
run: |
Expand Down Expand Up @@ -168,7 +180,10 @@ jobs:
RABBITMQ_PORT: ${{ needs.load-dotenv.outputs.rabbitmq-port }}
RABBITMQ_USERNAME: ${{ needs.load-dotenv.outputs.rabbitmq-username }}
RABBITMQ_PASSWORD: ${{ needs.load-dotenv.outputs.rabbitmq-password }}
QUEUE_NAME: ${{ needs.load-dotenv.outputs.queue-name }}
RABBITMQ_QUEUE_NAME: ${{ needs.load-dotenv.outputs.rabbitmq-queue-name }}
CSV_PARSER_RECOGNIZED_DATETIME_FORMATS: ${{ needs.load-dotenv.outputs.csv-parser-recognized-datetime-formats }}
CSV_PARSER_DELIMITER: ${{ needs.load-dotenv.outputs.csv-parser-delimiter }}
CSV_PARSER_FILE_EXTENSION: ${{ needs.load-dotenv.outputs.csv-parser-file-extension }}
- name: Output coverage file
id: output-coverage-file
run: |
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ test_consumer:
export RABBITMQ_USERNAME=$(RABBITMQ_USERNAME) && \
export RABBITMQ_PASSWORD=$(RABBITMQ_PASSWORD) && \
export QUEUE_NAME=$(QUEUE_NAME) && \
export CSV_PARSER_RECOGNIZED_DATETIME_FORMATS=$(CSV_PARSER_RECOGNIZED_DATETIME_FORMATS) && \
export CSV_PARSER_DELIMITER=$(CSV_PARSER_DELIMITER) && \
COVERAGE_FILE=.coverage_consumer coverage run -m pytest -vxs consumer/tests
coverage_report:
coverage combine .coverage_producer .coverage_consumer && \
Expand Down
1 change: 0 additions & 1 deletion consumer/src/adapters/fetch_filenames/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def fetch(self) -> Generator[str, None, None]:
method, properties, body = channel.basic_get(
queue=self._queue, auto_ack=False
)

if method is None and properties is None and body is None:
if self._last_poll_time is None:
self._last_poll_time = datetime.now()
Expand Down
41 changes: 25 additions & 16 deletions consumer/src/adapters/file_parse_iot_records/csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from decimal import Decimal
from decimal import InvalidOperation
from typing import Iterator, Optional, overload, Sequence
from typing_extensions import override
from ...entities import IOTRecord
Expand All @@ -14,9 +15,11 @@ def __init__(
self,
recognized_datetime_formats: Sequence[str],
delimiter: str = ",",
file_extension: str = ".csv",
) -> None:
self._delimiter = delimiter
self._recognized_datetime_formats = recognized_datetime_formats
self._file_extension = file_extension

@overload
def parse(self, filename: str) -> list[IOTRecord]:
Expand All @@ -37,10 +40,13 @@ def parse(
@override
def parse_stream(self, filename: str) -> Iterator[IOTRecord]:
try:
if not filename.endswith(self._file_extension):
raise ValueError(f"File extension must be {self._file_extension}")
with open(filename) as csvfile:
reader = csv.reader(csvfile, delimiter=self._delimiter)
reader = csv.reader(csvfile, delimiter=self._delimiter, strict=True)
yield from self._parse_iter(reader)
except Exception as e:
logging.error(f"Failed to parse {filename}")
logging.exception(e)

def _parse_datetime(self, datetime_str: str) -> Optional[datetime]:
Expand All @@ -54,36 +60,39 @@ def _parse_datetime(self, datetime_str: str) -> Optional[datetime]:
def _parse_value(self, value_str: str) -> Optional[Decimal]:
try:
return Decimal(value_str)
except ValueError:
except InvalidOperation:
return None

def _parse_iter(self, reader: Iterator[list[str]]) -> Iterator[IOTRecord]:
iot_records: list[IOTRecord] = []
for row in reader:
try:
parsed_datetime = self._parse_datetime(row[0])
if parsed_datetime is None:
raise ValueError(f"Unrecognized datetime format: {row[0]}")
parsed_datetime = self._parse_datetime(row[0])
if parsed_datetime is None:
logging.warning(f"Unrecognized datetime format: {row[0]}")

parsed_value = self._parse_value(row[2])
if parsed_value is None:
logging.warning(f"Unrecognized value format: {row[2]}")

parsed_value = self._parse_value(row[2])
if parsed_value is None:
raise ValueError(f"Unrecognized value format: {row[2]}")
if parsed_datetime is None or parsed_value is None:
continue

yield IOTRecord(
datetime=parsed_datetime,
sensor_id=str(row[1]),
value=parsed_value,
)
except Exception as e:
logging.exception(e)
yield IOTRecord(
record_time=parsed_datetime,
sensor_id=str(row[1]),
value=parsed_value,
)
return iot_records

def _parse_single(self, filename: str) -> list[IOTRecord]:
try:
if not filename.endswith(self._file_extension):
raise ValueError(f"File extension must be {self._file_extension}")
with open(filename) as csvfile:
reader = csv.reader(csvfile, delimiter=self._delimiter)
return list(self._parse_iter(reader))
except Exception as e:
logging.error(f"Failed to parse {filename}")
logging.exception(e)
return []

Expand Down
3 changes: 2 additions & 1 deletion consumer/src/deployments/script/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class RabbitMQConfig:
PORT = int(os.getenv("RABBITMQ_PORT", 5672))
USERNAME = os.getenv("RABBITMQ_USERNAME", "guest")
PASSWORD = os.getenv("RABBITMQ_PASSWORD", "guest")
QUEUE = os.getenv("RABBITMQ_QUEUE", "filenames")
QUEUE = os.getenv("RABBITMQ_QUEUE_NAME", "filenames")
POLLING_TIMEOUT = int(os.getenv("RABBITMQ_POLLING_TIMEOUT", 10))


Expand All @@ -35,3 +35,4 @@ class CSVParserConfig:
"CSV_PARSER_RECOGNIZED_DATETIME_FORMATS", ""
).split(",")
DELIMITER = os.getenv("CSV_PARSER_DELIMITER", ",")
FILE_EXTENSION = os.getenv("CSV_PARSER_FILE_EXTENSION", ".csv")
1 change: 1 addition & 0 deletions consumer/src/deployments/script/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
file_parse_iot_records_client = CSVParseIOTRecordsClient(
recognized_datetime_formats=CSVParserConfig.RECOGNIZED_DATETIME_FORMATS,
delimiter=CSVParserConfig.DELIMITER,
file_extension=CSVParserConfig.FILE_EXTENSION,
)

upsert_iot_records_client = PostgresUpsertIOTRecordsClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,53 @@ def mock_failed_fetch(
assert "Failed to fetch!" in caplog.text

assert sorted(all_filenames) == sorted(filenames)


@pytest.mark.parametrize("filename", random_csv_filenames())
def test_fetch_single_ack_exception_resilience(
rabbitmq_fetch_filenames_client: RabbitMQFetchFilenamesClient,
raw_rabbitmq_pika_conn_config: tuple[pika.BaseConnection, str],
filename: str,
monkeypatch: MonkeyPatch,
):
new_rabbitmq_fetch_filenames_client = RabbitMQFetchFilenamesClient(
host=rabbitmq_fetch_filenames_client._host,
port=rabbitmq_fetch_filenames_client._port,
credentials_service=rabbitmq_fetch_filenames_client._credentials_service,
queue=rabbitmq_fetch_filenames_client._queue,
polling_timeout=1,
)

conn, queue = raw_rabbitmq_pika_conn_config

channel = conn.channel()
channel.queue_declare(
queue=queue,
durable=True,
)

channel.basic_publish(
exchange="",
routing_key=rabbitmq_fetch_filenames_client._queue,
body=filename,
properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent),
)

counter = 0

def mock_failed_ack(
self,
*args,
**kwargs,
) -> None:
nonlocal counter

if counter == 0:
counter += 1
monkeypatch.undo()
raise Exception("Failed to ack!")

monkeypatch.setattr(pika.channel.Channel, "basic_ack", mock_failed_ack)

for fetched_filename in new_rabbitmq_fetch_filenames_client.fetch():
assert fetched_filename == filename
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from .utils import (
random_csv_file,
random_tsv_file,
random_ndjson_file,
random_invalid_datetime_rows,
random_invalid_datetime_and_value_rows,
random_invalid_value_rows,
random_valid_format_rows,
)
import pytest
from pytest import TempdirFactory
from pathlib import Path
from src.adapters.file_parse_iot_records.csv import CSVParseIOTRecordsClient
from src.deployments.script.config import CSVParserConfig


@pytest.fixture(scope="session")
def setup_tempdir(tmpdir_factory: TempdirFactory) -> Path:
return Path(tmpdir_factory.mktemp("artifact"))


@pytest.fixture(scope="function")
def random_valid_csv_file(setup_tempdir: Path) -> Path:
return random_csv_file(setup_tempdir, random_valid_format_rows())


@pytest.fixture(scope="function")
def random_invalid_datetime_and_value_csv_file(setup_tempdir: Path) -> Path:
return random_csv_file(setup_tempdir, random_invalid_datetime_and_value_rows())


@pytest.fixture(scope="function")
def random_invalid_datetime_csv_file(setup_tempdir: Path) -> Path:
return random_csv_file(setup_tempdir, random_invalid_datetime_rows())


@pytest.fixture(scope="function")
def random_invalid_value_csv_file(setup_tempdir: Path) -> Path:
return random_csv_file(setup_tempdir, random_invalid_value_rows())


@pytest.fixture(scope="function")
def random_valid_tsv_file(setup_tempdir: Path) -> Path:
return random_tsv_file(setup_tempdir, random_valid_format_rows())


@pytest.fixture(scope="function")
def random_invalid_datetime_and_value_tsv_file(setup_tempdir: Path) -> Path:
return random_tsv_file(setup_tempdir, random_invalid_datetime_and_value_rows())


@pytest.fixture(scope="function")
def random_invalid_datetime_tsv_file(setup_tempdir: Path) -> Path:
return random_tsv_file(setup_tempdir, random_invalid_datetime_rows())


@pytest.fixture(scope="function")
def random_invalid_value_tsv_file(setup_tempdir: Path) -> Path:
return random_tsv_file(setup_tempdir, random_invalid_value_rows())


@pytest.fixture(scope="function")
def random_valid_ndjson_file(setup_tempdir: Path) -> Path:
return random_ndjson_file(setup_tempdir, random_valid_format_rows())


@pytest.fixture(scope="function")
def random_invalid_datetime_and_value_ndjson_file(setup_tempdir: Path) -> Path:
return random_ndjson_file(setup_tempdir, random_invalid_datetime_and_value_rows())


@pytest.fixture(scope="function")
def random_invalid_datetime_ndjson_file(setup_tempdir: Path) -> Path:
return random_ndjson_file(setup_tempdir, random_invalid_datetime_rows())


@pytest.fixture(scope="function")
def random_invalid_value_ndjson_file(setup_tempdir: Path) -> Path:
return random_ndjson_file(setup_tempdir, random_invalid_value_rows())


@pytest.fixture(scope="function")
def csv_parse_iot_records_client() -> CSVParseIOTRecordsClient:
return CSVParseIOTRecordsClient(
recognized_datetime_formats=CSVParserConfig.RECOGNIZED_DATETIME_FORMATS,
delimiter=CSVParserConfig.DELIMITER,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest
from src.adapters.file_parse_iot_records.csv import CSVParseIOTRecordsClient
from pytest import FixtureRequest


@pytest.mark.smoke
@pytest.mark.parametrize(
"fixture_name",
[
"random_valid_csv_file",
"random_invalid_datetime_and_value_csv_file",
"random_invalid_datetime_csv_file",
"random_invalid_value_csv_file",
]
* 5,
)
def test_close_always_successful(
csv_parse_iot_records_client: CSVParseIOTRecordsClient,
fixture_name: str,
request: FixtureRequest,
):
random_valid_csv_file: str = request.getfixturevalue(fixture_name)

csv_parse_iot_records_client.parse(random_valid_csv_file)

assert csv_parse_iot_records_client.close()
Loading

0 comments on commit 08d3cd5

Please sign in to comment.