diff --git a/.env b/.env index dae9792..e4964b2 100644 --- a/.env +++ b/.env @@ -1,9 +1,12 @@ -POSTGRES_VERSION_TAG=15.3-alpine3.17 +TZ=Asia/Hong_Kong + +POSTGRES_VERSION_TAG=13 POSTGRES_PORT=5432 POSTGRES_USERNAME=postgres POSTGRES_PASSWORD=postgres POSTGRES_DATABASE=records -POSTGRES_BATCH_UPSERT_SIZE=1000 +POSTGRES_BATCH_UPSERT_SIZE=5000 +POSTGRES_VOLUME_DIR=./postgres-data RABBITMQ_VERSION_TAG=3.12.10-management RABBITMQ_USERNAME=rabbitmq @@ -11,30 +14,31 @@ RABBITMQ_PASSWORD=rabbitmq RABBITMQ_PORT=5672 RABBITMQ_WEBAPP_PORT=15672 RABBITMQ_POLLING_TIMEOUT=60 +RABBITMQ_SOCKET_TIMEOUT=86400 +RABBITMQ_VOLUME_DIR=./rabbitmq-data RABBITMQ_QUEUE_NAME=filenames AMAZON_LINUX_VERSION_TAG=2023.2.20231113.0 -TARGET_FILE_DIR=./records +TARGET_FILE_DIR=./records_test TARGET_FILE_EXTENSION=.csv PRODUCER_LOG_LEVEL=INFO -PRODUCER_LOG_FORMAT="[%(asctime)s | %(levelname)s | %(name)s] {%(filename)s:%(lineno)d} >> %(message)s" +PRODUCER_LOG_FORMAT="[%(asctime)s | %(levelname)s] {%(filename)s:%(lineno)d} >> %(message)s" PRODUCER_LOG_DATE_FORMAT="%Y-%m-%d %H:%M:%S" PRODUCER_LOG_DIR=./logs/producer PRODUCER_LOG_RETENTION=7 PRODUCER_LOG_ROTATION=midnight CONSUMER_LOG_LEVEL=INFO -CONSUMER_LOG_FORMAT="[%(asctime)s | %(levelname)s | %(name)s] {%(filename)s:%(lineno)d} >> %(message)s" +CONSUMER_LOG_FORMAT="[%(asctime)s | %(levelname)s] {%(filename)s:%(lineno)d} >> %(message)s" CONSUMER_LOG_DATE_FORMAT="%Y-%m-%d %H:%M:%S" -CONSUMER_LOG_DIR=./logs/producer +CONSUMER_LOG_DIR=./logs/consumer CONSUMER_LOG_RETENTION=7 CONSUMER_LOG_ROTATION=midnight -CONSUMER_REPLICAS=2 +CONSUMER_REPLICAS=16 -CSV_PARSER_RECOGNIZED_DATETIME_FORMATS="%Y-%m-%dT%H:%M:%S%z" CSV_PARSER_DELIMITER="," CSV_PARSER_FILE_EXTENSION=.csv diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1b81d3d..06c8fee 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -19,7 +19,7 @@ jobs: rabbitmq-username: ${{ steps.load-dotenv.outputs.RABBITMQ_USERNAME }} rabbitmq-password: ${{ steps.load-dotenv.outputs.RABBITMQ_PASSWORD }} rabbitmq-queue-name: ${{ steps.load-dotenv.outputs.RABBITMQ_QUEUE_NAME }} - csv-parser-recognized-datetime-formats: ${{ steps.load-dotenv.outputs.CSV_PARSER_RECOGNIZED_DATETIME_FORMATS }} + rabbitmq-socket-timeout: ${{ steps.load-dotenv.outputs.RABBITMQ_SOCKET_TIMEOUT }} csv-parser-delimiter: ${{ steps.load-dotenv.outputs.CSV_PARSER_DELIMITER }} csv-parser-file-extension: ${{ steps.load-dotenv.outputs.CSV_PARSER_FILE_EXTENSION }} steps: @@ -43,7 +43,7 @@ jobs: echo "RABBITMQ_USERNAME=$RABBITMQ_USERNAME" >> $GITHUB_OUTPUT echo "RABBITMQ_PASSWORD=$RABBITMQ_PASSWORD" >> $GITHUB_OUTPUT echo "RABBITMQ_QUEUE_NAME=$RABBITMQ_QUEUE_NAME" >> $GITHUB_OUTPUT - echo "CSV_PARSER_RECOGNIZED_DATETIME_FORMATS=$CSV_PARSER_RECOGNIZED_DATETIME_FORMATS" >> $GITHUB_OUTPUT + echo "RABBITMQ_SOCKET_TIMEOUT=$RABBITMQ_SOCKET_TIMEOUT" >> $GITHUB_OUTPUT echo "CSV_PARSER_DELIMITER=$CSV_PARSER_DELIMITER" >> $GITHUB_OUTPUT echo "CSV_PARSER_FILE_EXTENSION=$CSV_PARSER_FILE_EXTENSION" >> $GITHUB_OUTPUT test-producer: @@ -101,6 +101,7 @@ jobs: RABBITMQ_USERNAME: ${{ needs.load-dotenv.outputs.rabbitmq-username }} RABBITMQ_PASSWORD: ${{ needs.load-dotenv.outputs.rabbitmq-password }} RABBITMQ_QUEUE_NAME: ${{ needs.load-dotenv.outputs.rabbitmq-queue-name }} + RABBITMQ_SOCKET_TIMEOUT: ${{ needs.load-dotenv.outputs.rabbitmq-socket-timeout }} TARGET_FILE_DIR: ${{ needs.load-dotenv.outputs.target-file-dir }} TARGET_FILE_EXTENSION: ${{ needs.load-dotenv.outputs.target-file-extension }} - name: Output coverage file @@ -181,7 +182,7 @@ jobs: RABBITMQ_USERNAME: ${{ needs.load-dotenv.outputs.rabbitmq-username }} RABBITMQ_PASSWORD: ${{ needs.load-dotenv.outputs.rabbitmq-password }} RABBITMQ_QUEUE_NAME: ${{ needs.load-dotenv.outputs.rabbitmq-queue-name }} - CSV_PARSER_RECOGNIZED_DATETIME_FORMATS: ${{ needs.load-dotenv.outputs.csv-parser-recognized-datetime-formats }} + RABBITMQ_SOCKET_TIMEOUT: ${{ needs.load-dotenv.outputs.rabbitmq-socket-timeout }} CSV_PARSER_DELIMITER: ${{ needs.load-dotenv.outputs.csv-parser-delimiter }} CSV_PARSER_FILE_EXTENSION: ${{ needs.load-dotenv.outputs.csv-parser-file-extension }} - name: Output coverage file diff --git a/.gitignore b/.gitignore index f8663b2..8f28c95 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,8 @@ __pycache__ .mypy_cache records +records_test +logs +postgres-data +postgres-logs +rabbitmq-data diff --git a/Makefile b/Makefile index dbc154d..d3c8970 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ build: docker compose pull --ignore-buildable docker compose build up: - docker compose up -d --wait && docker compose logs -f --tail 100 records_producer records_consumer + docker compose up -d && docker compose logs -f --tail 100 records_producer records_consumer logs: docker compose logs -f --tail 100 records_producer records_consumer down: @@ -16,6 +16,8 @@ stats: docker stats --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}\t{{.NetIO}}\t{{.BlockIO}}\t{{.PIDs}}" setup_test_env: docker compose -f docker-compose.test.yml up -d +teardown_test_env: + docker compose -f docker-compose.test.yml down test_producer: export POSTGRES_HOST=$(POSTGRES_HOST) && \ export POSTGRES_PORT=$(POSTGRES_PORT) && \ @@ -27,7 +29,7 @@ test_producer: export RABBITMQ_USERNAME=$(RABBITMQ_USERNAME) && \ export RABBITMQ_PASSWORD=$(RABBITMQ_PASSWORD) && \ export QUEUE_NAME=$(QUEUE_NAME) && \ - COVERAGE_FILE=.coverage_producer coverage run -m pytest -vx producer/tests + COVERAGE_FILE=.coverage_producer coverage run -m pytest -vx --last-failed producer/tests test_consumer: export POSTGRES_HOST=$(POSTGRES_HOST) && \ export POSTGRES_PORT=$(POSTGRES_PORT) && \ @@ -47,7 +49,7 @@ coverage_report: coverage report -m --omit="*/tests/*" test: test_producer test_consumer coverage_report -generate_csv: +generate_csv_demo: python test_generator.py \ --num-sensors $(GEN_NUM_SENSORS) \ --num-records $(GEN_NUM_RECORDS) \ @@ -55,3 +57,12 @@ generate_csv: --start-date $(GEN_START_DATE) \ --timezone $(GEN_TIMEZONE) \ --dir $(TARGET_FILE_DIR) + +generate_csv_end_to_end_test: + python test_generator.py \ + --num-sensors 10 \ + --num-records 5 \ + --record-interval 1 \ + --start-date 2021-01-01 \ + --timezone Asia/Shanghai \ + --dir records_test diff --git a/README.md b/README.md index 6b7a7c8..f31345f 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# producer_consumer_csv +# Producer Consumer CSV ![Build Status](https://github.com/github/docs/actions/workflows/test.yml/badge.svg) ![Code Coverage](./coverage.svg) @@ -6,16 +6,64 @@ ## Description This is a simple producer consumer application that reads a csv file and writes the data to a database. The application is written in Python. +## Architecture +The application is composed of the following components: +- Producer (Reads the csv file and sends the data to the message queue) +- Consumer (Reads the data from the message queue and writes the data to the database) +- Persistent Message Queue (RabbitMQ) +- Database (Postgres) + +All the components are running in docker containers. The producer and consumer are running in multiple containers. The producer and consumer containers are scaled using docker compose. The producer and consumer containers are running in different containers to simulate a real world scenario where the producer and consumer are running in different servers. + +For performance, you can scale the number of consumer containers by changing the `CONSUMER_REPLICAS` variable in the .env file. The default value is 16. + +### Requirements +- Python 3.11 +- Docker +- Make + +### Database Schema +The initialization script is located in the `database/assets` folder. The script will create the following tables: + +#### records +|column|type|description| +|------|----|-----------| +|record_time|timestamp with timezone|The time when the record was generated| +|sensor_id|text|The id of the sensor| +|value|double precision|The value of the sensor| + +### Database Indexes +|index|columns| +|-----|-------| +|PK|sensor_id, record_time| +|BRIN|record_time| +|HASH|sensor_id| + +BRIN index is used for the `record_time` column as the data is generated in a sequential manner (time-series data). The HASH index is used for the `sensor_id` column as the data is usually queried by equality operation, but not range operation. The HASH index is more efficient than the BTREE index for equality operation. + +### Queue +The queue is implemented using RabbitMQ. The queue is configured to be persistent. The queue is configured to be durable and the messages are configured to be persistent. + +However, due to the complexity of the application, in this project the `get` operation is prefered over the `consume` operation, which stimulates a short polling queue. + +## Test Data +The test data is generated using the `generate_csv_demo` command. The command will generate a csv file with the following columns: +- `record_time`: The time when the record was generated +- `sensor_id`: The id of the sensor +- `value`: The value of the sensor + +You can check the section [Running the application](#running-the-application) for more details on how to generate the csv file. + ## Installation 1. Clone the repository 2. Install make and docker - For Ubuntu and Debian ```bash -sudo apt install make +$ sudo apt install make ``` - For Fedora and CentOS ```bash -sudo yum install make +$ sudo yum install make ``` For Docker installation, please refer to the [official documentation](https://docs.docker.com/engine/install/) @@ -35,17 +83,133 @@ Please make sure you don't change the name of the variables as they are used in ### Running the application -1. Run the following command to build the docker image +1. First run the `generate_csv_demo` command to generate a csv file. You can change the following parameters in the .env: +- `GEN_NUM_SENSORS` +- `GEN_NUM_RECORDS` +- `GEN_START_DATE` +- `GEN_RECORD_INTERVAL` +- `GEN_TIMEZONE` + +```bash +$ make gen_csv_demo +``` + +2. Run the following command to build the docker image ```bash $ make build ``` -2. Run the following command to start the docker compose stack + +3. Run the following command to start the docker compose stack. This will start the postgres database, rabbitmq, producer and different consumer containers. You can change the following parameters in the .env: +- `CONSUMER_REPLICAS` ```bash -$ make up / make up_d +$ make up ``` -The `up_d` command will run the container in detached mode. -3. Run the following command to stop the docker compose stack +4. Run the following command to stop the docker compose stack ```bash $ make down ``` + +### End to end test +While the unit tests are run as part of the build process, you can run the end to end test by running the following steps: + +1. Change the + +1. Run the make command `generate_csv_end_to_end_test`. It will generate 10 sensor's data of 5 records each. The data will be generated in the `records_test` folder. +```bash +$ make generate_csv_end_to_end_test +``` + +2. Run the following command to build the docker image +```bash +$ make build +``` + +3. Run the following command to start the docker compose stack. This will start the postgres database, rabbitmq, producer and different consumer containers. You can change the following parameters in the .env: +- `CONSUMER_REPLICAS` + +```bash +$ make up +``` + +4. Query the database to check if the data has been written to the database and check the record with the following data from sql: +```sql +SELECT * + FROM records + ORDER BY sensor_id ASC, record_time ASC +``` + +|record_time|sensor_id|value| +|-----------|---------|-----| +|2021-01-01 00:00:00.000 +0800|17fc695a_4|0.9100387476052705| +|2021-01-01 00:00:01.000 +0800|17fc695a_4|0.9470819312177097| +|2021-01-01 00:00:02.000 +0800|17fc695a_4|0.9646317173285254| +|2021-01-01 00:00:03.000 +0800|17fc695a_4|0.5588283283219546| +|2021-01-01 00:00:04.000 +0800|17fc695a_4|0.10032294940781161| +|2021-01-01 00:00:00.000 +0800|23b8c1e9_1|0.17833466762332717| +|2021-01-01 00:00:01.000 +0800|23b8c1e9_1|0.5828395773770179| +|2021-01-01 00:00:02.000 +0800|23b8c1e9_1|0.6709222475097419| +|2021-01-01 00:00:03.000 +0800|23b8c1e9_1|0.08392094150600504| +|2021-01-01 00:00:04.000 +0800|23b8c1e9_1|0.519270757199653| +|2021-01-01 00:00:00.000 +0800|47378190_8|0.8730491223149253| +|2021-01-01 00:00:01.000 +0800|47378190_8|0.9269235181749119| +|2021-01-01 00:00:02.000 +0800|47378190_8|0.7912797041193453| +|2021-01-01 00:00:03.000 +0800|47378190_8|0.7901636441724763| +|2021-01-01 00:00:04.000 +0800|47378190_8|0.7886736978911509| +|2021-01-01 00:00:00.000 +0800|6b65a6a4_7|0.10293554590959142| +|2021-01-01 00:00:01.000 +0800|6b65a6a4_7|0.2888706613682428| +|2021-01-01 00:00:02.000 +0800|6b65a6a4_7|0.4279942939571587| +|2021-01-01 00:00:03.000 +0800|6b65a6a4_7|0.23512685053378612| +|2021-01-01 00:00:04.000 +0800|6b65a6a4_7|0.5272935984703412| +|2021-01-01 00:00:00.000 +0800|972a8469_3|0.7642357069109641| +|2021-01-01 00:00:01.000 +0800|972a8469_3|0.5701299072914774| +|2021-01-01 00:00:02.000 +0800|972a8469_3|0.17473379247794074| +|2021-01-01 00:00:03.000 +0800|972a8469_3|0.12464021515158785| +|2021-01-01 00:00:04.000 +0800|972a8469_3|0.5390567336729636| +|2021-01-01 00:00:00.000 +0800|9a1de644_5|0.3758090093767995| +|2021-01-01 00:00:01.000 +0800|9a1de644_5|0.33553000407688316| +|2021-01-01 00:00:02.000 +0800|9a1de644_5|0.9667728274172214| +|2021-01-01 00:00:03.000 +0800|9a1de644_5|0.9549845776369301| +|2021-01-01 00:00:04.000 +0800|9a1de644_5|0.7740952070735415| +|2021-01-01 00:00:00.000 +0800|b74d0fb1_6|0.3213794858378719| +|2021-01-01 00:00:01.000 +0800|b74d0fb1_6|0.5947556423536645| +|2021-01-01 00:00:02.000 +0800|b74d0fb1_6|0.8872919823927438| +|2021-01-01 00:00:03.000 +0800|b74d0fb1_6|0.28297514015876457| +|2021-01-01 00:00:04.000 +0800|b74d0fb1_6|0.6590113969392454| +|2021-01-01 00:00:00.000 +0800|bd9c66b3_2|0.36466072100083013| +|2021-01-01 00:00:01.000 +0800|bd9c66b3_2|0.8408935901254108| +|2021-01-01 00:00:02.000 +0800|bd9c66b3_2|0.8945802964470245| +|2021-01-01 00:00:03.000 +0800|bd9c66b3_2|0.027150264273096747| +|2021-01-01 00:00:04.000 +0800|bd9c66b3_2|0.9236042897439161| +|2021-01-01 00:00:00.000 +0800|bdd640fb_0|0.0746765216767864| +|2021-01-01 00:00:01.000 +0800|bdd640fb_0|0.8404332126798344| +|2021-01-01 00:00:02.000 +0800|bdd640fb_0|0.31870553433981874| +|2021-01-01 00:00:03.000 +0800|bdd640fb_0|0.825033074919654| +|2021-01-01 00:00:04.000 +0800|bdd640fb_0|0.7161990766355211| +|2021-01-01 00:00:00.000 +0800|c241330b_9|0.6940489142492581| +|2021-01-01 00:00:01.000 +0800|c241330b_9|0.7748088833830469| +|2021-01-01 00:00:02.000 +0800|c241330b_9|0.85280342321841| +|2021-01-01 00:00:03.000 +0800|c241330b_9|0.32443698906841056| +|2021-01-01 00:00:04.000 +0800|c241330b_9|0.4457555011219805| + + +## Unit tests +The unit tests are run as part of the CI pipeline. You can run the unit tests locally by running the following steps: + +1. Run the make `setup_test_env` command to setup the test environment +```bash +$ make setup_test_env +``` + +2. Install the following pip packages: +```bash +$ pip install -r producer/requirements-dev.txt +$ pip install -r consumer/requirements-dev.txt +``` + +3. Run the following command to run the unit tests +```bash +$ make test +``` + +The unit test will run the both the producer and consumer unit tests. The coverage report will be generated in the `.coverage` file. diff --git a/consumer/dockerfile b/consumer/dockerfile index 0642ced..1721b01 100644 --- a/consumer/dockerfile +++ b/consumer/dockerfile @@ -1,40 +1,16 @@ ARG AMAZON_LINUX_VERSION_TAG -FROM amazonlinux:${AMAZON_LINUX_VERSION_TAG} as build +FROM amazonlinux:${AMAZON_LINUX_VERSION_TAG} RUN yum install -y \ python3.11 \ - python3.11-pip \ python3.11-devel \ - shadow-utils + python3.11-pip -RUN adduser app -ENV HOME=/home/app -WORKDIR ${HOME} - -RUN chown -R app:app /home/app - -USER app +WORKDIR /app COPY requirements.txt . -RUN python3.11 -m pip install --user --no-warn-script-location -r requirements.txt - -FROM amazonlinux:2023.2.20231026.0 as runtime - -RUN yum install -y \ - python3.11 \ - python3.11-pip \ - shadow-utils - -RUN adduser app -WORKDIR /home/app - -RUN chown -R app:app /home/app - -USER app -ENV HOME=/home/app -WORKDIR ${HOME} - -COPY --from=build /home/app/.local /home/app/.local +RUN python3.11 -m pip install -r requirements.txt COPY src ./src/ + CMD python3.11 -m src.deployments.script.main diff --git a/consumer/src/adapters/fetch_filenames_stream/rabbitmq.py b/consumer/src/adapters/fetch_filenames_stream/rabbitmq.py index bb27e75..0a1e5a0 100644 --- a/consumer/src/adapters/fetch_filenames_stream/rabbitmq.py +++ b/consumer/src/adapters/fetch_filenames_stream/rabbitmq.py @@ -21,6 +21,7 @@ def __init__( credentials_service: Callable[[], tuple[str, str]], queue: str = "filenames", polling_timeout: int = 10, + socket_timeout: int = 86400, ) -> None: self._host = host self._port = port @@ -30,6 +31,7 @@ def __init__( self._channel: Optional[BlockingChannel] = None self._polling_timeout = polling_timeout self._last_poll_time: Optional[datetime] = None + self._socket_timeout = socket_timeout @overload def ack(self, message_receipt: int) -> bool: @@ -104,6 +106,7 @@ def _get_amqp_conn(self) -> Iterator[Connection]: host=self._host, port=self._port, credentials=credentials, + socket_timeout=self._socket_timeout, ) self._conn = pika.BlockingConnection(conn_parameters) yield self._conn diff --git a/consumer/src/adapters/file_parse_iot_records/csv.py b/consumer/src/adapters/file_parse_iot_records/csv.py index a09c10b..a7a204a 100644 --- a/consumer/src/adapters/file_parse_iot_records/csv.py +++ b/consumer/src/adapters/file_parse_iot_records/csv.py @@ -14,12 +14,10 @@ class CSVParseIOTRecordsClient(FileParseIOTRecordsClient): def __init__( self, - recognized_datetime_formats: Sequence[str], delimiter: str = ",", file_extension: str = ".csv", ) -> None: self._delimiter = delimiter - self._recognized_datetime_formats = recognized_datetime_formats self._file_extension = file_extension @overload @@ -63,12 +61,10 @@ def parse_stream(self, filename: str) -> Iterator[IOTRecord]: logging.exception(e) def _parse_datetime(self, datetime_str: str) -> Optional[datetime]: - for datetime_format in self._recognized_datetime_formats: - try: - return datetime.strptime(datetime_str, datetime_format) - except ValueError: - pass - return None + try: + return datetime.fromisoformat(datetime_str) + except ValueError: + return None def _parse_value(self, value_str: str) -> Optional[Decimal]: try: @@ -102,6 +98,7 @@ def _parse_single(self, filename: str) -> Optional[list[IOTRecord]]: self._basic_file_check(filename) with open(filename) as csvfile: reader = csv.reader(csvfile, delimiter=self._delimiter) + next(reader) # skip header return list(self._parse_iter(reader)) except Exception as e: logging.exception(e) diff --git a/consumer/src/deployments/script/config.py b/consumer/src/deployments/script/config.py index 0ed6ae1..93840c3 100644 --- a/consumer/src/deployments/script/config.py +++ b/consumer/src/deployments/script/config.py @@ -19,6 +19,7 @@ class RabbitMQConfig: PASSWORD = os.getenv("RABBITMQ_PASSWORD", "guest") QUEUE = os.getenv("RABBITMQ_QUEUE_NAME", "filenames") POLLING_TIMEOUT = int(os.getenv("RABBITMQ_POLLING_TIMEOUT", 10)) + SOCKET_TIMEOUT = int(os.getenv("RABBITMQ_SOCKET_TIMEOUT", 86400)) class PostgresConfig: @@ -31,8 +32,5 @@ class PostgresConfig: class CSVParserConfig: - RECOGNIZED_DATETIME_FORMATS = os.getenv( - "CSV_PARSER_RECOGNIZED_DATETIME_FORMATS", "" - ).split(",") DELIMITER = os.getenv("CSV_PARSER_DELIMITER", ",") FILE_EXTENSION = os.getenv("CSV_PARSER_FILE_EXTENSION", ".csv") diff --git a/consumer/src/deployments/script/main.py b/consumer/src/deployments/script/main.py index 57cd029..a80ee6c 100644 --- a/consumer/src/deployments/script/main.py +++ b/consumer/src/deployments/script/main.py @@ -30,10 +30,10 @@ def main() -> None: credentials_service=lambda: (RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD), queue=RabbitMQConfig.QUEUE, polling_timeout=RabbitMQConfig.POLLING_TIMEOUT, + socket_timeout=RabbitMQConfig.SOCKET_TIMEOUT, ) file_parse_iot_records_client = CSVParseIOTRecordsClient( - recognized_datetime_formats=CSVParserConfig.RECOGNIZED_DATETIME_FORMATS, delimiter=CSVParserConfig.DELIMITER, file_extension=CSVParserConfig.FILE_EXTENSION, ) @@ -47,6 +47,8 @@ def main() -> None: ) try: + logging.info("Starting to fetch filenames...") + for filename, receipt in fetch_filenames_stream_client.fetch_stream(): logging.info(f"Upserting {filename}...") iot_records_buffer: list[IOTRecord] = [] @@ -57,6 +59,8 @@ def main() -> None: if len(iot_records_buffer) < PostgresConfig.BATCH_UPSERT_SIZE: continue + logging.info(f"Upserting {len(iot_records_buffer)} records...") + _upsert_iot_records_buffer( iot_records_buffer, upsert_iot_records_client ) diff --git a/consumer/src/deployments/script/setup_logging.py b/consumer/src/deployments/script/setup_logging.py index dcae074..639d52b 100644 --- a/consumer/src/deployments/script/setup_logging.py +++ b/consumer/src/deployments/script/setup_logging.py @@ -5,9 +5,11 @@ def setup_logging() -> None: - LOG_LEVEL_INT = getattr(logging, LoggingConfig.LOG_LEVEL.upper(), None) + pathlib.Path(LoggingConfig.LOG_DIR).absolute().mkdir(parents=True, exist_ok=True) - pathlib.Path(LoggingConfig.LOG_DIR).mkdir(parents=True, exist_ok=True) + (pathlib.Path(LoggingConfig.LOG_DIR).absolute() / "info.log").touch() + (pathlib.Path(LoggingConfig.LOG_DIR).absolute() / "warning.log").touch() + (pathlib.Path(LoggingConfig.LOG_DIR).absolute() / "error.log").touch() handlers: list[logging.Handler] = [] @@ -20,50 +22,47 @@ def setup_logging() -> None: stream_handler.setLevel(LoggingConfig.LOG_LEVEL) handlers.append(stream_handler) - if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.INFO: - info_handler = TimedRotatingFileHandler( - filename=f"{LoggingConfig.LOG_DIR}/info.log", - when=LoggingConfig.LOG_ROTATION, - interval=1, - backupCount=LoggingConfig.LOG_RETENTION, - ) - info_handler.setFormatter( - logging.Formatter( - LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT - ) + info_handler = TimedRotatingFileHandler( + filename=f"{LoggingConfig.LOG_DIR}/info.log", + when=LoggingConfig.LOG_ROTATION, + interval=1, + backupCount=LoggingConfig.LOG_RETENTION, + ) + info_handler.setFormatter( + logging.Formatter( + LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT ) - info_handler.setLevel(logging.INFO) - handlers.append(info_handler) + ) + info_handler.setLevel(logging.INFO) + handlers.append(info_handler) - if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.WARNING: - warning_handler = TimedRotatingFileHandler( - filename=f"{LoggingConfig.LOG_DIR}/warning.log", - when=LoggingConfig.LOG_ROTATION, - interval=1, - backupCount=LoggingConfig.LOG_RETENTION, - ) - warning_handler.setFormatter( - logging.Formatter( - LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT - ) + warning_handler = TimedRotatingFileHandler( + filename=f"{LoggingConfig.LOG_DIR}/warning.log", + when=LoggingConfig.LOG_ROTATION, + interval=1, + backupCount=LoggingConfig.LOG_RETENTION, + ) + warning_handler.setFormatter( + logging.Formatter( + LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT ) - warning_handler.setLevel(logging.WARNING) - handlers.append(warning_handler) + ) + warning_handler.setLevel(logging.WARNING) + handlers.append(warning_handler) - if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.ERROR: - error_handler = TimedRotatingFileHandler( - filename=f"{LoggingConfig.LOG_DIR}/error.log", - when=LoggingConfig.LOG_ROTATION, - interval=1, - backupCount=LoggingConfig.LOG_RETENTION, - ) - error_handler.setFormatter( - logging.Formatter( - LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT - ) + error_handler = TimedRotatingFileHandler( + filename=f"{LoggingConfig.LOG_DIR}/error.log", + when=LoggingConfig.LOG_ROTATION, + interval=1, + backupCount=LoggingConfig.LOG_RETENTION, + ) + error_handler.setFormatter( + logging.Formatter( + LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT ) - error_handler.setLevel(logging.ERROR) - handlers.append(error_handler) + ) + error_handler.setLevel(logging.ERROR) + handlers.append(error_handler) root_logger = logging.getLogger() root_logger.setLevel(LoggingConfig.LOG_LEVEL) diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/conftest.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/conftest.py index c964d6c..0a61683 100644 --- a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/conftest.py +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/conftest.py @@ -82,7 +82,6 @@ def random_invalid_value_ndjson_file(setup_tempdir: Path) -> Path: @pytest.fixture(scope="function") def csv_parse_iot_records_client() -> CSVParseIOTRecordsClient: return CSVParseIOTRecordsClient( - recognized_datetime_formats=CSVParserConfig.RECOGNIZED_DATETIME_FORMATS, delimiter=CSVParserConfig.DELIMITER, file_extension=CSVParserConfig.FILE_EXTENSION, ) diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_parse.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_parse.py index a926c33..e6c284a 100644 --- a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_parse.py +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/test_failed_parse.py @@ -41,6 +41,8 @@ def test_parse_single_datetime_failed_ignore_row( iot_records = csv_parse_iot_records_client.parse( random_invalid_datetime_csv_file ) + + print(iot_records) assert len(iot_records) == 0 assert "Unrecognized datetime format:" in caplog.text assert "Unrecognized value format:" not in caplog.text diff --git a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py index aaec3f4..cc86435 100644 --- a/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py +++ b/consumer/tests/test_adapters/test_file_parse_iot_records/test_csv/utils.py @@ -22,35 +22,27 @@ def random_valid_format_rows() -> list[tuple[str, ...]]: random_datetime = datetime.now(tz=ZoneInfo(random_timezone)) - random_time_delta random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) random_value = Decimal(random.random() * 100) - rows.append((random_datetime.isoformat(), random_sensor_id, str(random_value))) + rows.append( + { + "record_time": random_datetime.isoformat(timespec="milliseconds"), + "sensor_id": random_sensor_id, + "value": str(random_value), + } + ) return rows def random_invalid_datetime_rows() -> list[tuple[str, ...]]: rows = [] - all_datetime_formats = [ - "%Y-%m-%dT%H:%M:%S%z", - "%Y-%m-%dT%H:%M%z", - "%Y-%m-%d %H:%M:%S%z", - "%Y-%m-%d %H:%M%z", - ] for _ in range(10): - random_timezone = random.choice(list(zoneinfo.available_timezones())) - random_time_delta = timedelta( - hours=random.randint(0, 24), - minutes=random.randint(0, 60), - seconds=random.randint(0, 60), - ) - random_datetime = datetime.now(tz=ZoneInfo(random_timezone)) - random_time_delta random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) random_value = Decimal(random.random() * 100) - random_datetime_format = random.choice(all_datetime_formats) rows.append( - ( - random_datetime.strftime(random_datetime_format), - random_sensor_id, - str(random_value), - ) + { + "record_time": "".join(random.choices(string.ascii_letters, k=10)), + "sensor_id": random_sensor_id, + "value": str(random_value), + } ) return rows @@ -68,40 +60,26 @@ def random_invalid_value_rows() -> list[tuple[str, ...]]: random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) random_value = "".join(random.choices(string.ascii_letters, k=10)) rows.append( - ( - random_datetime.isoformat(), - random_sensor_id, - random_value, - ) + { + "record_time": random_datetime.isoformat(timespec="milliseconds"), + "sensor_id": random_sensor_id, + "value": random_value, + } ) return rows def random_invalid_datetime_and_value_rows() -> list[tuple[str, ...]]: rows = [] - all_datetime_formats = [ - "%Y-%m-%dT%H:%M:%S.%f%z", - "%Y-%m-%dT%H:%M%z", - "%Y-%m-%d %H:%M:%S%z", - "%Y-%m-%d %H:%M%z", - ] for _ in range(10): - random_timezone = random.choice(list(zoneinfo.available_timezones())) - random_time_delta = timedelta( - hours=random.randint(0, 24), - minutes=random.randint(0, 60), - seconds=random.randint(0, 60), - ) - random_datetime = datetime.now(tz=ZoneInfo(random_timezone)) - random_time_delta random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) random_value = "".join(random.choices(string.ascii_letters, k=10)) - random_datetime_format = random.choice(all_datetime_formats) rows.append( - ( - random_datetime.strftime(random_datetime_format), - random_sensor_id, - str(random_value), - ) + { + "record_time": "".join(random.choices(string.ascii_letters, k=10)), + "sensor_id": random_sensor_id, + "value": random_value, + } ) return rows diff --git a/consumer/tests/test_deployments/test_script/test_main/test_main_read_file_resilience.py b/consumer/tests/test_deployments/test_script/test_main/test_main_read_file_resilience.py index 6883813..5c20651 100644 --- a/consumer/tests/test_deployments/test_script/test_main/test_main_read_file_resilience.py +++ b/consumer/tests/test_deployments/test_script/test_main/test_main_read_file_resilience.py @@ -55,6 +55,7 @@ def mock_open(*args, **kwargs): with open(random_csv_file, "r") as f: reader = csv.reader(f) + next(reader) for row in reader: record_time, sensor_id, value = row diff --git a/consumer/tests/test_deployments/test_script/test_main/test_main_successful.py b/consumer/tests/test_deployments/test_script/test_main/test_main_successful.py index e140c9b..6c5f6fa 100644 --- a/consumer/tests/test_deployments/test_script/test_main/test_main_successful.py +++ b/consumer/tests/test_deployments/test_script/test_main/test_main_successful.py @@ -42,6 +42,7 @@ def test_main_flow_single_no_failed_files( with open(random_csv_file, "r") as f: reader = csv.reader(f) + next(reader) for row in reader: record_time, sensor_id, value = row @@ -111,6 +112,7 @@ def test_main_flow_batch_no_failed_files( for random_csv_file in random_csv_files: with open(random_csv_file, "r") as f: reader = csv.reader(f) + next(reader) for row in reader: record_time, sensor_id, value = row @@ -178,6 +180,7 @@ def test_main_flow_single_in_batch_no_failed_files( with open(random_csv_file, "r") as f: reader = csv.reader(f) + next(reader) for row in reader: record_time, sensor_id, value = row @@ -247,6 +250,7 @@ def test_main_flow_batch_in_batch_no_failed_files( for random_csv_file in random_csv_files: with open(random_csv_file, "r") as f: reader = csv.reader(f) + next(reader) for row in reader: record_time, sensor_id, value = row diff --git a/consumer/tests/test_deployments/test_script/test_main/test_main_upsert_record_resilience.py b/consumer/tests/test_deployments/test_script/test_main/test_main_upsert_record_resilience.py index 5fab317..20bd46d 100644 --- a/consumer/tests/test_deployments/test_script/test_main/test_main_upsert_record_resilience.py +++ b/consumer/tests/test_deployments/test_script/test_main/test_main_upsert_record_resilience.py @@ -58,6 +58,7 @@ def mock_upsert(*args, **kwargs): with open(random_csv_file, "r") as f: reader = csv.reader(f) + next(reader) for row in reader: record_time, sensor_id, value = row @@ -136,6 +137,7 @@ def mock_upsert(self, records) -> list[bool]: with open(random_csv_file, "r") as f: reader = csv.reader(f) + next(reader) for row in reader: record_time, sensor_id, value = row diff --git a/consumer/tests/test_deployments/test_script/test_main/utils.py b/consumer/tests/test_deployments/test_script/test_main/utils.py index 56fce36..6609da2 100644 --- a/consumer/tests/test_deployments/test_script/test_main/utils.py +++ b/consumer/tests/test_deployments/test_script/test_main/utils.py @@ -22,35 +22,27 @@ def random_valid_format_rows() -> list[tuple[str, ...]]: random_datetime = datetime.now(tz=ZoneInfo(random_timezone)) - random_time_delta random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) random_value = Decimal(random.random() * 100) - rows.append((random_datetime.isoformat(), random_sensor_id, str(random_value))) + rows.append( + { + "record_time": random_datetime.isoformat(timespec="milliseconds"), + "sensor_id": random_sensor_id, + "value": str(random_value), + } + ) return rows def random_invalid_datetime_rows() -> list[tuple[str, ...]]: rows = [] - all_datetime_formats = [ - "%Y-%m-%dT%H:%M:%S%z", - "%Y-%m-%dT%H:%M%z", - "%Y-%m-%d %H:%M:%S%z", - "%Y-%m-%d %H:%M%z", - ] for _ in range(10): - random_timezone = random.choice(list(zoneinfo.available_timezones())) - random_time_delta = timedelta( - hours=random.randint(0, 24), - minutes=random.randint(0, 60), - seconds=random.randint(0, 60), - ) - random_datetime = datetime.now(tz=ZoneInfo(random_timezone)) - random_time_delta random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) random_value = Decimal(random.random() * 100) - random_datetime_format = random.choice(all_datetime_formats) rows.append( - ( - random_datetime.strftime(random_datetime_format), - random_sensor_id, - str(random_value), - ) + { + "record_time": "".join(random.choices(string.ascii_letters, k=10)), + "sensor_id": random_sensor_id, + "value": str(random_value), + } ) return rows @@ -68,40 +60,26 @@ def random_invalid_value_rows() -> list[tuple[str, ...]]: random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) random_value = "".join(random.choices(string.ascii_letters, k=10)) rows.append( - ( - random_datetime.isoformat(), - random_sensor_id, - random_value, - ) + { + "record_time": random_datetime.isoformat(timespec="milliseconds"), + "sensor_id": random_sensor_id, + "value": random_value, + } ) return rows def random_invalid_datetime_and_value_rows() -> list[tuple[str, ...]]: rows = [] - all_datetime_formats = [ - "%Y-%m-%dT%H:%M:%S%z", - "%Y-%m-%dT%H:%M%z", - "%Y-%m-%d %H:%M:%S%z", - "%Y-%m-%d %H:%M%z", - ] for _ in range(10): - random_timezone = random.choice(list(zoneinfo.available_timezones())) - random_time_delta = timedelta( - hours=random.randint(0, 24), - minutes=random.randint(0, 60), - seconds=random.randint(0, 60), - ) - random_datetime = datetime.now(tz=ZoneInfo(random_timezone)) - random_time_delta random_sensor_id = "".join(random.choices(string.ascii_letters, k=10)) random_value = "".join(random.choices(string.ascii_letters, k=10)) - random_datetime_format = random.choice(all_datetime_formats) rows.append( - ( - random_datetime.strftime(random_datetime_format), - random_sensor_id, - str(random_value), - ) + { + "record_time": "".join(random.choices(string.ascii_letters, k=10)), + "sensor_id": random_sensor_id, + "value": random_value, + } ) return rows @@ -110,7 +88,10 @@ def random_csv_file(base_dir: Path, rows: list[tuple[str, ...]]) -> str: filename = "".join(random.choices(string.ascii_letters, k=10)) + ".csv" filepath = base_dir.joinpath(filename) with open(filepath, "w") as csvfile: - writer = csv.writer(csvfile, delimiter=",") + writer = csv.DictWriter( + csvfile, delimiter=",", fieldnames=["record_time", "sensor_id", "value"] + ) + writer.writeheader() writer.writerows(rows) return str(filepath) @@ -119,7 +100,10 @@ def random_tsv_file(base_dir: Path, rows: list[tuple[str, ...]]) -> str: filename = "".join(random.choices(string.ascii_letters, k=10)) + ".tsv" filepath = base_dir.joinpath(filename) with open(filepath, "w") as csvfile: - writer = csv.writer(csvfile, delimiter="\t") + writer = csv.DictWriter( + csvfile, delimiter="\t", fieldnames=["record_time", "sensor_id", "value"] + ) + writer.writeheader() writer.writerows(rows) return str(filepath) diff --git a/database/dockerfile b/database/dockerfile index 021ced9..6b528c8 100644 --- a/database/dockerfile +++ b/database/dockerfile @@ -1,4 +1,3 @@ ARG POSTGRES_VERSION_TAG FROM docker.io/postgres:${POSTGRES_VERSION_TAG} -USER postgres COPY ./assets/create_records_table.sql /docker-entrypoint-initdb.d/init.sql diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 46275ec..0d6d511 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -12,16 +12,33 @@ services: POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} POSTGRES_USER: ${POSTGRES_USERNAME} POSTGRES_DB: ${POSTGRES_DATABASE} + TZ: ${TZ} ports: - ${POSTGRES_PORT}:5432 + volumes: + - ${POSTGRES_VOLUME_DIR}:/var/lib/postgresql/data restart: always + healthcheck: + test: pg_isready -U ${POSTGRES_USERNAME} + interval: 15s + timeout: 10s + retries: 5 + records_rabbitmq: image: rabbitmq:${RABBITMQ_VERSION_TAG} container_name: records_rabbitmq environment: RABBITMQ_DEFAULT_USER: ${RABBITMQ_USERNAME} RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD} + TZ: ${TZ} ports: - ${RABBITMQ_WEBAPP_PORT}:15672 - ${RABBITMQ_PORT}:5672 + volumes: + - ${RABBITMQ_VOLUME_DIR}:/var/lib/rabbitmq restart: always + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 30s + timeout: 30s + retries: 3 diff --git a/docker-compose.yml b/docker-compose.yml index 4ca485c..f6c2dfe 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,15 +12,17 @@ services: POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} POSTGRES_USER: ${POSTGRES_USERNAME} POSTGRES_DB: ${POSTGRES_DATABASE} + TZ: ${TZ} ports: - ${POSTGRES_PORT}:5432 + volumes: + - ${POSTGRES_VOLUME_DIR}:/var/lib/postgresql/data restart: always healthcheck: - test: ["CMD", "pg_isready", "-U", "${POSTGRES_USERNAME}"] + test: pg_isready -U ${POSTGRES_USERNAME} interval: 15s timeout: 10s retries: 5 - start_period: 10s records_rabbitmq: image: rabbitmq:${RABBITMQ_VERSION_TAG} @@ -28,16 +30,18 @@ services: environment: RABBITMQ_DEFAULT_USER: ${RABBITMQ_USERNAME} RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD} + TZ: ${TZ} ports: - ${RABBITMQ_WEBAPP_PORT}:15672 - ${RABBITMQ_PORT}:5672 + volumes: + - ${RABBITMQ_VOLUME_DIR}:/var/lib/rabbitmq restart: always healthcheck: - test: ["CMD", "rabbitmqctl", "status"] - interval: 15s - timeout: 10s - retries: 5 - start_period: 10s + test: rabbitmq-diagnostics -q ping + interval: 30s + timeout: 30s + retries: 3 records_producer: image: records_producer:latest @@ -56,15 +60,16 @@ services: LOG_DIR: ${PRODUCER_LOG_DIR} LOG_RETENTION: ${PRODUCER_LOG_RETENTION} LOG_ROTATION: ${PRODUCER_LOG_ROTATION} - RABBITMQ_HOST: localhost + RABBITMQ_HOST: records_rabbitmq RABBITMQ_PORT: 5672 RABBITMQ_USERNAME: ${RABBITMQ_USERNAME} RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD} RABBITMQ_QUEUE_NAME: ${RABBITMQ_QUEUE_NAME} - network_mode: host + RABBITMQ_SOCKET_TIMEOUT: ${RABBITMQ_SOCKET_TIMEOUT} + TZ: ${TZ} volumes: - - ./${TARGET_FILE_DIR}:/home/app/${TARGET_FILE_DIR}:ro - # - ./${PRODUCER_LOG_DIR}:/home/app/${PRODUCER_LOG_DIR} + - ${TARGET_FILE_DIR}:/app/${TARGET_FILE_DIR}:ro + - ${PRODUCER_LOG_DIR}:/app/${PRODUCER_LOG_DIR} depends_on: records_postgres: condition: service_healthy @@ -85,25 +90,25 @@ services: LOG_DIR: ${CONSUMER_LOG_DIR} LOG_RETENTION: ${CONSUMER_LOG_RETENTION} LOG_ROTATION: ${CONSUMER_LOG_ROTATION} - RABBITMQ_HOST: localhost + RABBITMQ_HOST: records_rabbitmq RABBITMQ_PORT: 5672 RABBITMQ_USERNAME: ${RABBITMQ_USERNAME} RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD} RABBITMQ_QUEUE_NAME: ${RABBITMQ_QUEUE_NAME} RABBITMQ_POLLING_TIMEOUT: ${RABBITMQ_POLLING_TIMEOUT} - POSTGRES_HOST: localhost + RABBITMQ_SOCKET_TIMEOUT: ${RABBITMQ_SOCKET_TIMEOUT} + POSTGRES_HOST: records_postgres POSTGRES_PORT: 5432 POSTGRES_USERNAME: ${POSTGRES_USERNAME} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} POSTGRES_DATABASE: ${POSTGRES_DATABASE} POSTGRES_BATCH_UPSERT_SIZE: ${POSTGRES_BATCH_UPSERT_SIZE} - CSV_PARSER_RECOGNIZED_DATETIME_FORMATS: ${CSV_PARSER_RECOGNIZED_DATETIME_FORMATS} CSV_PARSER_DELIMITER: ${CSV_PARSER_DELIMITER} CSV_PARSER_FILE_EXTENSION: ${CSV_PARSER_FILE_EXTENSION} - network_mode: host + TZ: ${TZ} volumes: - - ./${TARGET_FILE_DIR}:/home/app/${TARGET_FILE_DIR}:ro - # - ./${CONSUMER_LOG_DIR}:/home/app/src/${CONSUMER_LOG_DIR} + - ${TARGET_FILE_DIR}:/app/${TARGET_FILE_DIR}:ro + - ${CONSUMER_LOG_DIR}:/app/${CONSUMER_LOG_DIR} deploy: mode: replicated replicas: ${CONSUMER_REPLICAS} diff --git a/producer/dockerfile b/producer/dockerfile index 7cdf707..1721b01 100644 --- a/producer/dockerfile +++ b/producer/dockerfile @@ -1,41 +1,16 @@ ARG AMAZON_LINUX_VERSION_TAG -FROM amazonlinux:${AMAZON_LINUX_VERSION_TAG} as build +FROM amazonlinux:${AMAZON_LINUX_VERSION_TAG} RUN yum install -y \ python3.11 \ python3.11-devel \ - python3.11-pip \ - shadow-utils + python3.11-pip -RUN adduser app -ENV HOME=/home/app -WORKDIR ${HOME} - -RUN chown -R app:app /home/app - -USER app +WORKDIR /app COPY requirements.txt . -RUN python3.11 -m pip install --user --no-warn-script-location -r requirements.txt - -FROM amazonlinux:2023.2.20231026.0 as runtime - -RUN yum install -y \ - python3.11 \ - python3.11-pip \ - shadow-utils - -RUN adduser app -WORKDIR /home/app +RUN python3.11 -m pip install -r requirements.txt COPY src ./src/ -RUN chown -R app:app /home/app - -USER app -ENV HOME=/home/app -WORKDIR ${HOME} - -COPY --from=build /home/app/.local /home/app/.local - CMD python3.11 -m src.deployments.script.main diff --git a/producer/src/adapters/publish_filenames/rabbitmq.py b/producer/src/adapters/publish_filenames/rabbitmq.py index a192e21..8f1346c 100644 --- a/producer/src/adapters/publish_filenames/rabbitmq.py +++ b/producer/src/adapters/publish_filenames/rabbitmq.py @@ -16,12 +16,14 @@ def __init__( port: int, credentials_service: Callable[[], tuple[str, str]], queue: str = "filenames", + socket_timeout: int = 86400, ) -> None: self._host = host self._port = port self._credentials_service = credentials_service self._queue = queue self._conn: Optional[Connection] = None + self._socket_timeout = socket_timeout @overload def publish(self, filename: str) -> bool: @@ -49,6 +51,7 @@ def _get_amqp_conn(self) -> Iterator[pika.BaseConnection]: host=self._host, port=self._port, credentials=credentials, + socket_timeout=self._socket_timeout, ) self._conn = pika.BlockingConnection(conn_parameters) yield self._conn diff --git a/producer/src/deployments/script/config.py b/producer/src/deployments/script/config.py index 3fdd49b..c98681d 100644 --- a/producer/src/deployments/script/config.py +++ b/producer/src/deployments/script/config.py @@ -23,3 +23,4 @@ class RabbitMQConfig: USERNAME = os.getenv("RABBITMQ_USERNAME", "guest") PASSWORD = os.getenv("RABBITMQ_PASSWORD", "guest") QUEUE = os.getenv("RABBITMQ_QUEUE_NAME", "filenames") + SOCKET_TIMEOUT = int(os.getenv("RABBITMQ_SOCKET_TIMEOUT", 86400)) diff --git a/producer/src/deployments/script/main.py b/producer/src/deployments/script/main.py index 10bea4e..ccf4e08 100644 --- a/producer/src/deployments/script/main.py +++ b/producer/src/deployments/script/main.py @@ -23,6 +23,7 @@ def main() -> None: port=RabbitMQConfig.PORT, credentials_service=lambda: (RabbitMQConfig.USERNAME, RabbitMQConfig.PASSWORD), queue=RabbitMQConfig.QUEUE, + socket_timeout=RabbitMQConfig.SOCKET_TIMEOUT, ) successes_map = {} diff --git a/producer/src/deployments/script/setup_logging.py b/producer/src/deployments/script/setup_logging.py index e41bc91..57025e8 100644 --- a/producer/src/deployments/script/setup_logging.py +++ b/producer/src/deployments/script/setup_logging.py @@ -5,9 +5,11 @@ def setup_logging() -> None: - LOG_LEVEL_INT = getattr(logging, LoggingConfig.LOG_LEVEL.upper(), None) + pathlib.Path(LoggingConfig.LOG_DIR).absolute().mkdir(parents=True, exist_ok=True) - pathlib.Path(LoggingConfig.LOG_DIR).mkdir(parents=True, exist_ok=True) + (pathlib.Path(LoggingConfig.LOG_DIR).absolute() / "info.log").touch() + (pathlib.Path(LoggingConfig.LOG_DIR).absolute() / "warning.log").touch() + (pathlib.Path(LoggingConfig.LOG_DIR).absolute() / "error.log").touch() handlers: list[logging.Handler] = [] @@ -20,9 +22,8 @@ def setup_logging() -> None: stream_handler.setLevel(LoggingConfig.LOG_LEVEL) handlers.append(stream_handler) - # if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.INFO: info_handler = TimedRotatingFileHandler( - filename=f"{LoggingConfig.LOG_DIR}/info.log", + filename=str(pathlib.Path(LoggingConfig.LOG_DIR).absolute() / "info.log"), when=LoggingConfig.LOG_ROTATION, interval=1, backupCount=LoggingConfig.LOG_RETENTION, @@ -35,9 +36,8 @@ def setup_logging() -> None: info_handler.setLevel(logging.INFO) handlers.append(info_handler) - # if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.WARNING: warning_handler = TimedRotatingFileHandler( - filename=f"{LoggingConfig.LOG_DIR}/warning.log", + filename=str(pathlib.Path(LoggingConfig.LOG_DIR).absolute() / "warning.log"), when=LoggingConfig.LOG_ROTATION, interval=1, backupCount=LoggingConfig.LOG_RETENTION, @@ -50,9 +50,8 @@ def setup_logging() -> None: warning_handler.setLevel(logging.WARNING) handlers.append(warning_handler) - # if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.ERROR: error_handler = TimedRotatingFileHandler( - filename=f"{LoggingConfig.LOG_DIR}/error.log", + filename=str(pathlib.Path(LoggingConfig.LOG_DIR).absolute() / "error.log"), when=LoggingConfig.LOG_ROTATION, interval=1, backupCount=LoggingConfig.LOG_RETENTION, diff --git a/test_generator.py b/test_generator.py index 2d54770..4ad8944 100644 --- a/test_generator.py +++ b/test_generator.py @@ -4,13 +4,17 @@ import random from zoneinfo import ZoneInfo from pathlib import Path -from uuid import uuid4 +import uuid import logging -from tqdm.auto import tqdm from concurrent.futures import ThreadPoolExecutor, as_completed from io import StringIO import os +rng = random.Random() +rng.seed(42) + +uuid.uuid4 = lambda: uuid.UUID(int=rng.getrandbits(128)) + logging.basicConfig(level=logging.INFO) @@ -83,7 +87,7 @@ def generate_data( writer.writerows( [ { - "record_time": date.isoformat(), + "record_time": date.isoformat(timespec="milliseconds"), "sensor_id": sensor_id, "value": random_value, } @@ -109,23 +113,22 @@ def main( base_dir = Path(dir) base_dir.mkdir(exist_ok=True) futures = [] - with tqdm(total=num_sensors) as pbar: - with ThreadPoolExecutor(max_workers=os.cpu_count() * 2) as executor: - for i in range(num_sensors): - sensor_id = f"{uuid4().hex[:8]}_{i}" - futures.append( - executor.submit( - generate_data, - sensor_id, - num_records, - record_interval, - start_date, - base_dir, - ) + with ThreadPoolExecutor(max_workers=os.cpu_count() * 2) as executor: + for i in range(num_sensors): + sensor_id = f"{uuid.uuid4().hex[:8]}_{i}" + futures.append( + executor.submit( + generate_data, + sensor_id, + num_records, + record_interval, + start_date, + base_dir, ) + ) - for _ in as_completed(futures): - pbar.update(1) + for _ in as_completed(futures): + pass logging.info("Done")