Skip to content

Commit

Permalink
Updated the test and readme
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-au-922 committed Dec 3, 2023
1 parent 60a4a6f commit dbe4a10
Show file tree
Hide file tree
Showing 27 changed files with 403 additions and 268 deletions.
20 changes: 12 additions & 8 deletions .env
Original file line number Diff line number Diff line change
@@ -1,40 +1,44 @@
POSTGRES_VERSION_TAG=15.3-alpine3.17
TZ=Asia/Hong_Kong

POSTGRES_VERSION_TAG=13
POSTGRES_PORT=5432
POSTGRES_USERNAME=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_DATABASE=records
POSTGRES_BATCH_UPSERT_SIZE=1000
POSTGRES_BATCH_UPSERT_SIZE=5000
POSTGRES_VOLUME_DIR=./postgres-data

RABBITMQ_VERSION_TAG=3.12.10-management
RABBITMQ_USERNAME=rabbitmq
RABBITMQ_PASSWORD=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_WEBAPP_PORT=15672
RABBITMQ_POLLING_TIMEOUT=60
RABBITMQ_SOCKET_TIMEOUT=86400
RABBITMQ_VOLUME_DIR=./rabbitmq-data

RABBITMQ_QUEUE_NAME=filenames

AMAZON_LINUX_VERSION_TAG=2023.2.20231113.0

TARGET_FILE_DIR=./records
TARGET_FILE_DIR=./records_test
TARGET_FILE_EXTENSION=.csv

PRODUCER_LOG_LEVEL=INFO
PRODUCER_LOG_FORMAT="[%(asctime)s | %(levelname)s | %(name)s] {%(filename)s:%(lineno)d} >> %(message)s"
PRODUCER_LOG_FORMAT="[%(asctime)s | %(levelname)s] {%(filename)s:%(lineno)d} >> %(message)s"
PRODUCER_LOG_DATE_FORMAT="%Y-%m-%d %H:%M:%S"
PRODUCER_LOG_DIR=./logs/producer
PRODUCER_LOG_RETENTION=7
PRODUCER_LOG_ROTATION=midnight

CONSUMER_LOG_LEVEL=INFO
CONSUMER_LOG_FORMAT="[%(asctime)s | %(levelname)s | %(name)s] {%(filename)s:%(lineno)d} >> %(message)s"
CONSUMER_LOG_FORMAT="[%(asctime)s | %(levelname)s] {%(filename)s:%(lineno)d} >> %(message)s"
CONSUMER_LOG_DATE_FORMAT="%Y-%m-%d %H:%M:%S"
CONSUMER_LOG_DIR=./logs/producer
CONSUMER_LOG_DIR=./logs/consumer
CONSUMER_LOG_RETENTION=7
CONSUMER_LOG_ROTATION=midnight
CONSUMER_REPLICAS=2
CONSUMER_REPLICAS=16

CSV_PARSER_RECOGNIZED_DATETIME_FORMATS="%Y-%m-%dT%H:%M:%S%z"
CSV_PARSER_DELIMITER=","
CSV_PARSER_FILE_EXTENSION=.csv

Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
rabbitmq-username: ${{ steps.load-dotenv.outputs.RABBITMQ_USERNAME }}
rabbitmq-password: ${{ steps.load-dotenv.outputs.RABBITMQ_PASSWORD }}
rabbitmq-queue-name: ${{ steps.load-dotenv.outputs.RABBITMQ_QUEUE_NAME }}
csv-parser-recognized-datetime-formats: ${{ steps.load-dotenv.outputs.CSV_PARSER_RECOGNIZED_DATETIME_FORMATS }}
rabbitmq-socket-timeout: ${{ steps.load-dotenv.outputs.RABBITMQ_SOCKET_TIMEOUT }}
csv-parser-delimiter: ${{ steps.load-dotenv.outputs.CSV_PARSER_DELIMITER }}
csv-parser-file-extension: ${{ steps.load-dotenv.outputs.CSV_PARSER_FILE_EXTENSION }}
steps:
Expand All @@ -43,7 +43,7 @@ jobs:
echo "RABBITMQ_USERNAME=$RABBITMQ_USERNAME" >> $GITHUB_OUTPUT
echo "RABBITMQ_PASSWORD=$RABBITMQ_PASSWORD" >> $GITHUB_OUTPUT
echo "RABBITMQ_QUEUE_NAME=$RABBITMQ_QUEUE_NAME" >> $GITHUB_OUTPUT
echo "CSV_PARSER_RECOGNIZED_DATETIME_FORMATS=$CSV_PARSER_RECOGNIZED_DATETIME_FORMATS" >> $GITHUB_OUTPUT
echo "RABBITMQ_SOCKET_TIMEOUT=$RABBITMQ_SOCKET_TIMEOUT" >> $GITHUB_OUTPUT
echo "CSV_PARSER_DELIMITER=$CSV_PARSER_DELIMITER" >> $GITHUB_OUTPUT
echo "CSV_PARSER_FILE_EXTENSION=$CSV_PARSER_FILE_EXTENSION" >> $GITHUB_OUTPUT
test-producer:
Expand Down Expand Up @@ -101,6 +101,7 @@ jobs:
RABBITMQ_USERNAME: ${{ needs.load-dotenv.outputs.rabbitmq-username }}
RABBITMQ_PASSWORD: ${{ needs.load-dotenv.outputs.rabbitmq-password }}
RABBITMQ_QUEUE_NAME: ${{ needs.load-dotenv.outputs.rabbitmq-queue-name }}
RABBITMQ_SOCKET_TIMEOUT: ${{ needs.load-dotenv.outputs.rabbitmq-socket-timeout }}
TARGET_FILE_DIR: ${{ needs.load-dotenv.outputs.target-file-dir }}
TARGET_FILE_EXTENSION: ${{ needs.load-dotenv.outputs.target-file-extension }}
- name: Output coverage file
Expand Down Expand Up @@ -181,7 +182,7 @@ jobs:
RABBITMQ_USERNAME: ${{ needs.load-dotenv.outputs.rabbitmq-username }}
RABBITMQ_PASSWORD: ${{ needs.load-dotenv.outputs.rabbitmq-password }}
RABBITMQ_QUEUE_NAME: ${{ needs.load-dotenv.outputs.rabbitmq-queue-name }}
CSV_PARSER_RECOGNIZED_DATETIME_FORMATS: ${{ needs.load-dotenv.outputs.csv-parser-recognized-datetime-formats }}
RABBITMQ_SOCKET_TIMEOUT: ${{ needs.load-dotenv.outputs.rabbitmq-socket-timeout }}
CSV_PARSER_DELIMITER: ${{ needs.load-dotenv.outputs.csv-parser-delimiter }}
CSV_PARSER_FILE_EXTENSION: ${{ needs.load-dotenv.outputs.csv-parser-file-extension }}
- name: Output coverage file
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
__pycache__
.mypy_cache
records
records_test
logs
postgres-data
postgres-logs
rabbitmq-data
17 changes: 14 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ build:
docker compose pull --ignore-buildable
docker compose build
up:
docker compose up -d --wait && docker compose logs -f --tail 100 records_producer records_consumer
docker compose up -d && docker compose logs -f --tail 100 records_producer records_consumer
logs:
docker compose logs -f --tail 100 records_producer records_consumer
down:
Expand All @@ -16,6 +16,8 @@ stats:
docker stats --format "table {{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}\t{{.NetIO}}\t{{.BlockIO}}\t{{.PIDs}}"
setup_test_env:
docker compose -f docker-compose.test.yml up -d
teardown_test_env:
docker compose -f docker-compose.test.yml down
test_producer:
export POSTGRES_HOST=$(POSTGRES_HOST) && \
export POSTGRES_PORT=$(POSTGRES_PORT) && \
Expand All @@ -27,7 +29,7 @@ test_producer:
export RABBITMQ_USERNAME=$(RABBITMQ_USERNAME) && \
export RABBITMQ_PASSWORD=$(RABBITMQ_PASSWORD) && \
export QUEUE_NAME=$(QUEUE_NAME) && \
COVERAGE_FILE=.coverage_producer coverage run -m pytest -vx producer/tests
COVERAGE_FILE=.coverage_producer coverage run -m pytest -vx --last-failed producer/tests
test_consumer:
export POSTGRES_HOST=$(POSTGRES_HOST) && \
export POSTGRES_PORT=$(POSTGRES_PORT) && \
Expand All @@ -47,11 +49,20 @@ coverage_report:
coverage report -m --omit="*/tests/*"
test: test_producer test_consumer coverage_report

generate_csv:
generate_csv_demo:
python test_generator.py \
--num-sensors $(GEN_NUM_SENSORS) \
--num-records $(GEN_NUM_RECORDS) \
--record-interval $(GEN_RECORD_INTERVAL) \
--start-date $(GEN_START_DATE) \
--timezone $(GEN_TIMEZONE) \
--dir $(TARGET_FILE_DIR)

generate_csv_end_to_end_test:
python test_generator.py \
--num-sensors 10 \
--num-records 5 \
--record-interval 1 \
--start-date 2021-01-01 \
--timezone Asia/Shanghai \
--dir records_test
180 changes: 172 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,69 @@
# producer_consumer_csv
# Producer Consumer CSV

![Build Status](https://github.com/github/docs/actions/workflows/test.yml/badge.svg)
![Code Coverage](./coverage.svg)

## Description
This is a simple producer consumer application that reads a csv file and writes the data to a database. The application is written in Python.

## Architecture
The application is composed of the following components:
- Producer (Reads the csv file and sends the data to the message queue)
- Consumer (Reads the data from the message queue and writes the data to the database)
- Persistent Message Queue (RabbitMQ)
- Database (Postgres)

All the components are running in docker containers. The producer and consumer are running in multiple containers. The producer and consumer containers are scaled using docker compose. The producer and consumer containers are running in different containers to simulate a real world scenario where the producer and consumer are running in different servers.

For performance, you can scale the number of consumer containers by changing the `CONSUMER_REPLICAS` variable in the .env file. The default value is 16.

### Requirements
- Python 3.11
- Docker
- Make

### Database Schema
The initialization script is located in the `database/assets` folder. The script will create the following tables:

#### records
|column|type|description|
|------|----|-----------|
|record_time|timestamp with timezone|The time when the record was generated|
|sensor_id|text|The id of the sensor|
|value|double precision|The value of the sensor|

### Database Indexes
|index|columns|
|-----|-------|
|PK|sensor_id, record_time|
|BRIN|record_time|
|HASH|sensor_id|

BRIN index is used for the `record_time` column as the data is generated in a sequential manner (time-series data). The HASH index is used for the `sensor_id` column as the data is usually queried by equality operation, but not range operation. The HASH index is more efficient than the BTREE index for equality operation.

### Queue
The queue is implemented using RabbitMQ. The queue is configured to be persistent. The queue is configured to be durable and the messages are configured to be persistent.

However, due to the complexity of the application, in this project the `get` operation is prefered over the `consume` operation, which stimulates a short polling queue.

## Test Data
The test data is generated using the `generate_csv_demo` command. The command will generate a csv file with the following columns:
- `record_time`: The time when the record was generated
- `sensor_id`: The id of the sensor
- `value`: The value of the sensor

You can check the section [Running the application](#running-the-application) for more details on how to generate the csv file.

## Installation
1. Clone the repository
2. Install make and docker
- For Ubuntu and Debian
```bash
sudo apt install make
$ sudo apt install make
```
- For Fedora and CentOS
```bash
sudo yum install make
$ sudo yum install make
```

For Docker installation, please refer to the [official documentation](https://docs.docker.com/engine/install/)
Expand All @@ -35,17 +83,133 @@ Please make sure you don't change the name of the variables as they are used in

### Running the application

1. Run the following command to build the docker image
1. First run the `generate_csv_demo` command to generate a csv file. You can change the following parameters in the .env:
- `GEN_NUM_SENSORS`
- `GEN_NUM_RECORDS`
- `GEN_START_DATE`
- `GEN_RECORD_INTERVAL`
- `GEN_TIMEZONE`

```bash
$ make gen_csv_demo
```

2. Run the following command to build the docker image
```bash
$ make build
```
2. Run the following command to start the docker compose stack

3. Run the following command to start the docker compose stack. This will start the postgres database, rabbitmq, producer and different consumer containers. You can change the following parameters in the .env:
- `CONSUMER_REPLICAS`
```bash
$ make up / make up_d
$ make up
```
The `up_d` command will run the container in detached mode.

3. Run the following command to stop the docker compose stack
4. Run the following command to stop the docker compose stack
```bash
$ make down
```

### End to end test
While the unit tests are run as part of the build process, you can run the end to end test by running the following steps:

1. Change the

1. Run the make command `generate_csv_end_to_end_test`. It will generate 10 sensor's data of 5 records each. The data will be generated in the `records_test` folder.
```bash
$ make generate_csv_end_to_end_test
```

2. Run the following command to build the docker image
```bash
$ make build
```

3. Run the following command to start the docker compose stack. This will start the postgres database, rabbitmq, producer and different consumer containers. You can change the following parameters in the .env:
- `CONSUMER_REPLICAS`

```bash
$ make up
```

4. Query the database to check if the data has been written to the database and check the record with the following data from sql:
```sql
SELECT *
FROM records
ORDER BY sensor_id ASC, record_time ASC
```

|record_time|sensor_id|value|
|-----------|---------|-----|
|2021-01-01 00:00:00.000 +0800|17fc695a_4|0.9100387476052705|
|2021-01-01 00:00:01.000 +0800|17fc695a_4|0.9470819312177097|
|2021-01-01 00:00:02.000 +0800|17fc695a_4|0.9646317173285254|
|2021-01-01 00:00:03.000 +0800|17fc695a_4|0.5588283283219546|
|2021-01-01 00:00:04.000 +0800|17fc695a_4|0.10032294940781161|
|2021-01-01 00:00:00.000 +0800|23b8c1e9_1|0.17833466762332717|
|2021-01-01 00:00:01.000 +0800|23b8c1e9_1|0.5828395773770179|
|2021-01-01 00:00:02.000 +0800|23b8c1e9_1|0.6709222475097419|
|2021-01-01 00:00:03.000 +0800|23b8c1e9_1|0.08392094150600504|
|2021-01-01 00:00:04.000 +0800|23b8c1e9_1|0.519270757199653|
|2021-01-01 00:00:00.000 +0800|47378190_8|0.8730491223149253|
|2021-01-01 00:00:01.000 +0800|47378190_8|0.9269235181749119|
|2021-01-01 00:00:02.000 +0800|47378190_8|0.7912797041193453|
|2021-01-01 00:00:03.000 +0800|47378190_8|0.7901636441724763|
|2021-01-01 00:00:04.000 +0800|47378190_8|0.7886736978911509|
|2021-01-01 00:00:00.000 +0800|6b65a6a4_7|0.10293554590959142|
|2021-01-01 00:00:01.000 +0800|6b65a6a4_7|0.2888706613682428|
|2021-01-01 00:00:02.000 +0800|6b65a6a4_7|0.4279942939571587|
|2021-01-01 00:00:03.000 +0800|6b65a6a4_7|0.23512685053378612|
|2021-01-01 00:00:04.000 +0800|6b65a6a4_7|0.5272935984703412|
|2021-01-01 00:00:00.000 +0800|972a8469_3|0.7642357069109641|
|2021-01-01 00:00:01.000 +0800|972a8469_3|0.5701299072914774|
|2021-01-01 00:00:02.000 +0800|972a8469_3|0.17473379247794074|
|2021-01-01 00:00:03.000 +0800|972a8469_3|0.12464021515158785|
|2021-01-01 00:00:04.000 +0800|972a8469_3|0.5390567336729636|
|2021-01-01 00:00:00.000 +0800|9a1de644_5|0.3758090093767995|
|2021-01-01 00:00:01.000 +0800|9a1de644_5|0.33553000407688316|
|2021-01-01 00:00:02.000 +0800|9a1de644_5|0.9667728274172214|
|2021-01-01 00:00:03.000 +0800|9a1de644_5|0.9549845776369301|
|2021-01-01 00:00:04.000 +0800|9a1de644_5|0.7740952070735415|
|2021-01-01 00:00:00.000 +0800|b74d0fb1_6|0.3213794858378719|
|2021-01-01 00:00:01.000 +0800|b74d0fb1_6|0.5947556423536645|
|2021-01-01 00:00:02.000 +0800|b74d0fb1_6|0.8872919823927438|
|2021-01-01 00:00:03.000 +0800|b74d0fb1_6|0.28297514015876457|
|2021-01-01 00:00:04.000 +0800|b74d0fb1_6|0.6590113969392454|
|2021-01-01 00:00:00.000 +0800|bd9c66b3_2|0.36466072100083013|
|2021-01-01 00:00:01.000 +0800|bd9c66b3_2|0.8408935901254108|
|2021-01-01 00:00:02.000 +0800|bd9c66b3_2|0.8945802964470245|
|2021-01-01 00:00:03.000 +0800|bd9c66b3_2|0.027150264273096747|
|2021-01-01 00:00:04.000 +0800|bd9c66b3_2|0.9236042897439161|
|2021-01-01 00:00:00.000 +0800|bdd640fb_0|0.0746765216767864|
|2021-01-01 00:00:01.000 +0800|bdd640fb_0|0.8404332126798344|
|2021-01-01 00:00:02.000 +0800|bdd640fb_0|0.31870553433981874|
|2021-01-01 00:00:03.000 +0800|bdd640fb_0|0.825033074919654|
|2021-01-01 00:00:04.000 +0800|bdd640fb_0|0.7161990766355211|
|2021-01-01 00:00:00.000 +0800|c241330b_9|0.6940489142492581|
|2021-01-01 00:00:01.000 +0800|c241330b_9|0.7748088833830469|
|2021-01-01 00:00:02.000 +0800|c241330b_9|0.85280342321841|
|2021-01-01 00:00:03.000 +0800|c241330b_9|0.32443698906841056|
|2021-01-01 00:00:04.000 +0800|c241330b_9|0.4457555011219805|


## Unit tests
The unit tests are run as part of the CI pipeline. You can run the unit tests locally by running the following steps:

1. Run the make `setup_test_env` command to setup the test environment
```bash
$ make setup_test_env
```

2. Install the following pip packages:
```bash
$ pip install -r producer/requirements-dev.txt
$ pip install -r consumer/requirements-dev.txt
```

3. Run the following command to run the unit tests
```bash
$ make test
```

The unit test will run the both the producer and consumer unit tests. The coverage report will be generated in the `.coverage` file.
Loading

0 comments on commit dbe4a10

Please sign in to comment.