From 60a4a6f0b4c3033d1547d1dbcf13ddd1ffc658ed Mon Sep 17 00:00:00 2001 From: alexau Date: Sun, 3 Dec 2023 20:47:31 +0800 Subject: [PATCH] udpated the test --- .env | 2 +- Makefile | 6 +- consumer/.dockerignore | 3 + .../adapters/file_parse_iot_records/csv.py | 1 + .../test_csv/utils.py | 12 ++- database/.dockerignore | 3 + docker-compose.yml | 15 +++- producer/.dockerignore | 3 + producer/dockerfile | 3 +- producer/src/deployments/script/main.py | 4 + .../src/deployments/script/setup_logging.py | 78 +++++++++---------- 11 files changed, 80 insertions(+), 50 deletions(-) create mode 100644 consumer/.dockerignore create mode 100644 database/.dockerignore create mode 100644 producer/.dockerignore diff --git a/.env b/.env index 4bb87c8..dae9792 100644 --- a/.env +++ b/.env @@ -34,7 +34,7 @@ CONSUMER_LOG_RETENTION=7 CONSUMER_LOG_ROTATION=midnight CONSUMER_REPLICAS=2 -CSV_PARSER_RECOGNIZED_DATETIME_FORMATS="%Y-%m-%dT%H:%M:%S.%f%z" +CSV_PARSER_RECOGNIZED_DATETIME_FORMATS="%Y-%m-%dT%H:%M:%S%z" CSV_PARSER_DELIMITER="," CSV_PARSER_FILE_EXTENSION=.csv diff --git a/Makefile b/Makefile index 9082133..dbc154d 100644 --- a/Makefile +++ b/Makefile @@ -7,9 +7,9 @@ build: docker compose pull --ignore-buildable docker compose build up: - docker compose up -up_d: - docker compose up -d + docker compose up -d --wait && docker compose logs -f --tail 100 records_producer records_consumer +logs: + docker compose logs -f --tail 100 records_producer records_consumer down: docker compose down stats: diff --git a/consumer/.dockerignore b/consumer/.dockerignore new file mode 100644 index 0000000..4a643f0 --- /dev/null +++ b/consumer/.dockerignore @@ -0,0 +1,3 @@ +__pycache__ +.pytest_cache +.mypy_cache diff --git a/consumer/src/adapters/file_parse_iot_records/csv.py b/consumer/src/adapters/file_parse_iot_records/csv.py index eb2f4f8..a09c10b 100644 --- a/consumer/src/adapters/file_parse_iot_records/csv.py +++ b/consumer/src/adapters/file_parse_iot_records/csv.py @@ -52,6 +52,7 @@ def parse_stream(self, filename: str) -> Iterator[IOTRecord]: self._basic_file_check(filename) with open(filename) as csvfile: reader = csv.reader(csvfile, delimiter=self._delimiter, strict=True) + next(reader) # skip header yield from self._parse_iter(reader) except OSError as e: logging.exception(e) diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py index e7980b2..aaec3f4 100644 --- a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py @@ -80,7 +80,7 @@ def random_invalid_value_rows() -> list[tuple[str, ...]]: def random_invalid_datetime_and_value_rows() -> list[tuple[str, ...]]: rows = [] all_datetime_formats = [ - "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M%z", "%Y-%m-%d %H:%M:%S%z", "%Y-%m-%d %H:%M%z", @@ -110,7 +110,10 @@ def random_csv_file(base_dir: Path, rows: list[tuple[str, ...]]) -> str: filename = "".join(random.choices(string.ascii_letters, k=10)) + ".csv" filepath = base_dir.joinpath(filename) with open(filepath, "w") as csvfile: - writer = csv.writer(csvfile, delimiter=",") + writer = csv.DictWriter( + csvfile, delimiter=",", fieldnames=["record_time", "sensor_id", "value"] + ) + writer.writeheader() writer.writerows(rows) return str(filepath) @@ -119,7 +122,10 @@ def random_tsv_file(base_dir: Path, rows: list[tuple[str, ...]]) -> str: filename = "".join(random.choices(string.ascii_letters, k=10)) + ".tsv" filepath = base_dir.joinpath(filename) with open(filepath, "w") as csvfile: - writer = csv.writer(csvfile, delimiter="\t") + writer = csv.DictWriter( + csvfile, delimiter="\t", fieldnames=["record_time", "sensor_id", "value"] + ) + writer.writeheader() writer.writerows(rows) return str(filepath) diff --git a/database/.dockerignore b/database/.dockerignore new file mode 100644 index 0000000..4a643f0 --- /dev/null +++ b/database/.dockerignore @@ -0,0 +1,3 @@ +__pycache__ +.pytest_cache +.mypy_cache diff --git a/docker-compose.yml b/docker-compose.yml index f57054c..4ca485c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -56,11 +56,15 @@ services: LOG_DIR: ${PRODUCER_LOG_DIR} LOG_RETENTION: ${PRODUCER_LOG_RETENTION} LOG_ROTATION: ${PRODUCER_LOG_ROTATION} - RABBITMQ_HOST: records_rabbitmq + RABBITMQ_HOST: localhost RABBITMQ_PORT: 5672 RABBITMQ_USERNAME: ${RABBITMQ_USERNAME} RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD} RABBITMQ_QUEUE_NAME: ${RABBITMQ_QUEUE_NAME} + network_mode: host + volumes: + - ./${TARGET_FILE_DIR}:/home/app/${TARGET_FILE_DIR}:ro + # - ./${PRODUCER_LOG_DIR}:/home/app/${PRODUCER_LOG_DIR} depends_on: records_postgres: condition: service_healthy @@ -81,12 +85,13 @@ services: LOG_DIR: ${CONSUMER_LOG_DIR} LOG_RETENTION: ${CONSUMER_LOG_RETENTION} LOG_ROTATION: ${CONSUMER_LOG_ROTATION} - RABBITMQ_HOST: records_rabbitmq + RABBITMQ_HOST: localhost RABBITMQ_PORT: 5672 RABBITMQ_USERNAME: ${RABBITMQ_USERNAME} RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD} RABBITMQ_QUEUE_NAME: ${RABBITMQ_QUEUE_NAME} - POSTGRES_HOST: records_postgres + RABBITMQ_POLLING_TIMEOUT: ${RABBITMQ_POLLING_TIMEOUT} + POSTGRES_HOST: localhost POSTGRES_PORT: 5432 POSTGRES_USERNAME: ${POSTGRES_USERNAME} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} @@ -95,6 +100,10 @@ services: CSV_PARSER_RECOGNIZED_DATETIME_FORMATS: ${CSV_PARSER_RECOGNIZED_DATETIME_FORMATS} CSV_PARSER_DELIMITER: ${CSV_PARSER_DELIMITER} CSV_PARSER_FILE_EXTENSION: ${CSV_PARSER_FILE_EXTENSION} + network_mode: host + volumes: + - ./${TARGET_FILE_DIR}:/home/app/${TARGET_FILE_DIR}:ro + # - ./${CONSUMER_LOG_DIR}:/home/app/src/${CONSUMER_LOG_DIR} deploy: mode: replicated replicas: ${CONSUMER_REPLICAS} diff --git a/producer/.dockerignore b/producer/.dockerignore new file mode 100644 index 0000000..4a643f0 --- /dev/null +++ b/producer/.dockerignore @@ -0,0 +1,3 @@ +__pycache__ +.pytest_cache +.mypy_cache diff --git a/producer/dockerfile b/producer/dockerfile index 9f0a359..7cdf707 100644 --- a/producer/dockerfile +++ b/producer/dockerfile @@ -28,6 +28,8 @@ RUN yum install -y \ RUN adduser app WORKDIR /home/app +COPY src ./src/ + RUN chown -R app:app /home/app USER app @@ -36,5 +38,4 @@ WORKDIR ${HOME} COPY --from=build /home/app/.local /home/app/.local -COPY src ./src/ CMD python3.11 -m src.deployments.script.main diff --git a/producer/src/deployments/script/main.py b/producer/src/deployments/script/main.py index e66c393..10bea4e 100644 --- a/producer/src/deployments/script/main.py +++ b/producer/src/deployments/script/main.py @@ -27,7 +27,10 @@ def main() -> None: successes_map = {} try: + logging.info("Publishing filenames...") + for filename in traverse_files(): + logging.info(f"Publishing {filename}...") successes_map[filename] = publish_filenames_client.publish(filename) failed_filenames = [ @@ -40,6 +43,7 @@ def main() -> None: logging.exception(e) raise e finally: + logging.info("Closing publish filenames client...") publish_filenames_client.close() diff --git a/producer/src/deployments/script/setup_logging.py b/producer/src/deployments/script/setup_logging.py index dcae074..e41bc91 100644 --- a/producer/src/deployments/script/setup_logging.py +++ b/producer/src/deployments/script/setup_logging.py @@ -20,50 +20,50 @@ def setup_logging() -> None: stream_handler.setLevel(LoggingConfig.LOG_LEVEL) handlers.append(stream_handler) - if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.INFO: - info_handler = TimedRotatingFileHandler( - filename=f"{LoggingConfig.LOG_DIR}/info.log", - when=LoggingConfig.LOG_ROTATION, - interval=1, - backupCount=LoggingConfig.LOG_RETENTION, - ) - info_handler.setFormatter( - logging.Formatter( - LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT - ) + # if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.INFO: + info_handler = TimedRotatingFileHandler( + filename=f"{LoggingConfig.LOG_DIR}/info.log", + when=LoggingConfig.LOG_ROTATION, + interval=1, + backupCount=LoggingConfig.LOG_RETENTION, + ) + info_handler.setFormatter( + logging.Formatter( + LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT ) - info_handler.setLevel(logging.INFO) - handlers.append(info_handler) + ) + info_handler.setLevel(logging.INFO) + handlers.append(info_handler) - if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.WARNING: - warning_handler = TimedRotatingFileHandler( - filename=f"{LoggingConfig.LOG_DIR}/warning.log", - when=LoggingConfig.LOG_ROTATION, - interval=1, - backupCount=LoggingConfig.LOG_RETENTION, - ) - warning_handler.setFormatter( - logging.Formatter( - LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT - ) + # if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.WARNING: + warning_handler = TimedRotatingFileHandler( + filename=f"{LoggingConfig.LOG_DIR}/warning.log", + when=LoggingConfig.LOG_ROTATION, + interval=1, + backupCount=LoggingConfig.LOG_RETENTION, + ) + warning_handler.setFormatter( + logging.Formatter( + LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT ) - warning_handler.setLevel(logging.WARNING) - handlers.append(warning_handler) + ) + warning_handler.setLevel(logging.WARNING) + handlers.append(warning_handler) - if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.ERROR: - error_handler = TimedRotatingFileHandler( - filename=f"{LoggingConfig.LOG_DIR}/error.log", - when=LoggingConfig.LOG_ROTATION, - interval=1, - backupCount=LoggingConfig.LOG_RETENTION, - ) - error_handler.setFormatter( - logging.Formatter( - LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT - ) + # if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.ERROR: + error_handler = TimedRotatingFileHandler( + filename=f"{LoggingConfig.LOG_DIR}/error.log", + when=LoggingConfig.LOG_ROTATION, + interval=1, + backupCount=LoggingConfig.LOG_RETENTION, + ) + error_handler.setFormatter( + logging.Formatter( + LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT ) - error_handler.setLevel(logging.ERROR) - handlers.append(error_handler) + ) + error_handler.setLevel(logging.ERROR) + handlers.append(error_handler) root_logger = logging.getLogger() root_logger.setLevel(LoggingConfig.LOG_LEVEL)