Skip to content

Commit

Permalink
udpated the test
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-au-922 committed Dec 3, 2023
1 parent d3dcfdf commit 60a4a6f
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ CONSUMER_LOG_RETENTION=7
CONSUMER_LOG_ROTATION=midnight
CONSUMER_REPLICAS=2

CSV_PARSER_RECOGNIZED_DATETIME_FORMATS="%Y-%m-%dT%H:%M:%S.%f%z"
CSV_PARSER_RECOGNIZED_DATETIME_FORMATS="%Y-%m-%dT%H:%M:%S%z"
CSV_PARSER_DELIMITER=","
CSV_PARSER_FILE_EXTENSION=.csv

Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ build:
docker compose pull --ignore-buildable
docker compose build
up:
docker compose up
up_d:
docker compose up -d
docker compose up -d --wait && docker compose logs -f --tail 100 records_producer records_consumer
logs:
docker compose logs -f --tail 100 records_producer records_consumer
down:
docker compose down
stats:
Expand Down
3 changes: 3 additions & 0 deletions consumer/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__
.pytest_cache
.mypy_cache
1 change: 1 addition & 0 deletions consumer/src/adapters/file_parse_iot_records/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def parse_stream(self, filename: str) -> Iterator[IOTRecord]:
self._basic_file_check(filename)
with open(filename) as csvfile:
reader = csv.reader(csvfile, delimiter=self._delimiter, strict=True)
next(reader) # skip header
yield from self._parse_iter(reader)
except OSError as e:
logging.exception(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def random_invalid_value_rows() -> list[tuple[str, ...]]:
def random_invalid_datetime_and_value_rows() -> list[tuple[str, ...]]:
rows = []
all_datetime_formats = [
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M%z",
"%Y-%m-%d %H:%M:%S%z",
"%Y-%m-%d %H:%M%z",
Expand Down Expand Up @@ -110,7 +110,10 @@ def random_csv_file(base_dir: Path, rows: list[tuple[str, ...]]) -> str:
filename = "".join(random.choices(string.ascii_letters, k=10)) + ".csv"
filepath = base_dir.joinpath(filename)
with open(filepath, "w") as csvfile:
writer = csv.writer(csvfile, delimiter=",")
writer = csv.DictWriter(
csvfile, delimiter=",", fieldnames=["record_time", "sensor_id", "value"]
)
writer.writeheader()
writer.writerows(rows)
return str(filepath)

Expand All @@ -119,7 +122,10 @@ def random_tsv_file(base_dir: Path, rows: list[tuple[str, ...]]) -> str:
filename = "".join(random.choices(string.ascii_letters, k=10)) + ".tsv"
filepath = base_dir.joinpath(filename)
with open(filepath, "w") as csvfile:
writer = csv.writer(csvfile, delimiter="\t")
writer = csv.DictWriter(
csvfile, delimiter="\t", fieldnames=["record_time", "sensor_id", "value"]
)
writer.writeheader()
writer.writerows(rows)
return str(filepath)

Expand Down
3 changes: 3 additions & 0 deletions database/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__
.pytest_cache
.mypy_cache
15 changes: 12 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,15 @@ services:
LOG_DIR: ${PRODUCER_LOG_DIR}
LOG_RETENTION: ${PRODUCER_LOG_RETENTION}
LOG_ROTATION: ${PRODUCER_LOG_ROTATION}
RABBITMQ_HOST: records_rabbitmq
RABBITMQ_HOST: localhost
RABBITMQ_PORT: 5672
RABBITMQ_USERNAME: ${RABBITMQ_USERNAME}
RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD}
RABBITMQ_QUEUE_NAME: ${RABBITMQ_QUEUE_NAME}
network_mode: host
volumes:
- ./${TARGET_FILE_DIR}:/home/app/${TARGET_FILE_DIR}:ro
# - ./${PRODUCER_LOG_DIR}:/home/app/${PRODUCER_LOG_DIR}
depends_on:
records_postgres:
condition: service_healthy
Expand All @@ -81,12 +85,13 @@ services:
LOG_DIR: ${CONSUMER_LOG_DIR}
LOG_RETENTION: ${CONSUMER_LOG_RETENTION}
LOG_ROTATION: ${CONSUMER_LOG_ROTATION}
RABBITMQ_HOST: records_rabbitmq
RABBITMQ_HOST: localhost
RABBITMQ_PORT: 5672
RABBITMQ_USERNAME: ${RABBITMQ_USERNAME}
RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD}
RABBITMQ_QUEUE_NAME: ${RABBITMQ_QUEUE_NAME}
POSTGRES_HOST: records_postgres
RABBITMQ_POLLING_TIMEOUT: ${RABBITMQ_POLLING_TIMEOUT}
POSTGRES_HOST: localhost
POSTGRES_PORT: 5432
POSTGRES_USERNAME: ${POSTGRES_USERNAME}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
Expand All @@ -95,6 +100,10 @@ services:
CSV_PARSER_RECOGNIZED_DATETIME_FORMATS: ${CSV_PARSER_RECOGNIZED_DATETIME_FORMATS}
CSV_PARSER_DELIMITER: ${CSV_PARSER_DELIMITER}
CSV_PARSER_FILE_EXTENSION: ${CSV_PARSER_FILE_EXTENSION}
network_mode: host
volumes:
- ./${TARGET_FILE_DIR}:/home/app/${TARGET_FILE_DIR}:ro
# - ./${CONSUMER_LOG_DIR}:/home/app/src/${CONSUMER_LOG_DIR}
deploy:
mode: replicated
replicas: ${CONSUMER_REPLICAS}
Expand Down
3 changes: 3 additions & 0 deletions producer/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__
.pytest_cache
.mypy_cache
3 changes: 2 additions & 1 deletion producer/dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ RUN yum install -y \
RUN adduser app
WORKDIR /home/app

COPY src ./src/

RUN chown -R app:app /home/app

USER app
Expand All @@ -36,5 +38,4 @@ WORKDIR ${HOME}

COPY --from=build /home/app/.local /home/app/.local

COPY src ./src/
CMD python3.11 -m src.deployments.script.main
4 changes: 4 additions & 0 deletions producer/src/deployments/script/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ def main() -> None:

successes_map = {}
try:
logging.info("Publishing filenames...")

for filename in traverse_files():
logging.info(f"Publishing {filename}...")
successes_map[filename] = publish_filenames_client.publish(filename)

failed_filenames = [
Expand All @@ -40,6 +43,7 @@ def main() -> None:
logging.exception(e)
raise e
finally:
logging.info("Closing publish filenames client...")
publish_filenames_client.close()


Expand Down
78 changes: 39 additions & 39 deletions producer/src/deployments/script/setup_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,50 @@ def setup_logging() -> None:
stream_handler.setLevel(LoggingConfig.LOG_LEVEL)
handlers.append(stream_handler)

if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.INFO:
info_handler = TimedRotatingFileHandler(
filename=f"{LoggingConfig.LOG_DIR}/info.log",
when=LoggingConfig.LOG_ROTATION,
interval=1,
backupCount=LoggingConfig.LOG_RETENTION,
)
info_handler.setFormatter(
logging.Formatter(
LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT
)
# if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.INFO:
info_handler = TimedRotatingFileHandler(
filename=f"{LoggingConfig.LOG_DIR}/info.log",
when=LoggingConfig.LOG_ROTATION,
interval=1,
backupCount=LoggingConfig.LOG_RETENTION,
)
info_handler.setFormatter(
logging.Formatter(
LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT
)
info_handler.setLevel(logging.INFO)
handlers.append(info_handler)
)
info_handler.setLevel(logging.INFO)
handlers.append(info_handler)

if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.WARNING:
warning_handler = TimedRotatingFileHandler(
filename=f"{LoggingConfig.LOG_DIR}/warning.log",
when=LoggingConfig.LOG_ROTATION,
interval=1,
backupCount=LoggingConfig.LOG_RETENTION,
)
warning_handler.setFormatter(
logging.Formatter(
LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT
)
# if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.WARNING:
warning_handler = TimedRotatingFileHandler(
filename=f"{LoggingConfig.LOG_DIR}/warning.log",
when=LoggingConfig.LOG_ROTATION,
interval=1,
backupCount=LoggingConfig.LOG_RETENTION,
)
warning_handler.setFormatter(
logging.Formatter(
LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT
)
warning_handler.setLevel(logging.WARNING)
handlers.append(warning_handler)
)
warning_handler.setLevel(logging.WARNING)
handlers.append(warning_handler)

if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.ERROR:
error_handler = TimedRotatingFileHandler(
filename=f"{LoggingConfig.LOG_DIR}/error.log",
when=LoggingConfig.LOG_ROTATION,
interval=1,
backupCount=LoggingConfig.LOG_RETENTION,
)
error_handler.setFormatter(
logging.Formatter(
LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT
)
# if LOG_LEVEL_INT is not None and LOG_LEVEL_INT <= logging.ERROR:
error_handler = TimedRotatingFileHandler(
filename=f"{LoggingConfig.LOG_DIR}/error.log",
when=LoggingConfig.LOG_ROTATION,
interval=1,
backupCount=LoggingConfig.LOG_RETENTION,
)
error_handler.setFormatter(
logging.Formatter(
LoggingConfig.LOG_FORMAT, datefmt=LoggingConfig.LOG_DATE_FORMAT
)
error_handler.setLevel(logging.ERROR)
handlers.append(error_handler)
)
error_handler.setLevel(logging.ERROR)
handlers.append(error_handler)

root_logger = logging.getLogger()
root_logger.setLevel(LoggingConfig.LOG_LEVEL)
Expand Down

0 comments on commit 60a4a6f

Please sign in to comment.