From d3dd2d0961474383b22ada9b41556b534d8c73fe Mon Sep 17 00:00:00 2001 From: Taha Yassine Kraiem Date: Wed, 13 Dec 2023 18:05:28 +0100 Subject: [PATCH] refactor(crons): changed assist_events_aggregates_cron to have only 1 execution every hour refactor(crons): optimized assist_events_aggregates_cron to use only 1 DB cursor for successive queries --- ee/api/app.py | 2 +- ee/api/chalicelib/core/assist_stats.py | 12 ++++++------ ee/api/crons/ee_crons.py | 16 ++++++++++++---- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/ee/api/app.py b/ee/api/app.py index 1cbc89cf49..383927f259 100644 --- a/ee/api/app.py +++ b/ee/api/app.py @@ -52,7 +52,7 @@ async def lifespan(app: FastAPI): await events_queue.init() app.schedule.start() - for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs + ee_crons.ee_cron_jobs: + for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs + ee_crons.cron_jobs: app.schedule.add_job(id=job["func"].__name__, **job) ap_logger.info(">Scheduled jobs:") diff --git a/ee/api/chalicelib/core/assist_stats.py b/ee/api/chalicelib/core/assist_stats.py index 1f837ac58f..1195dc8847 100644 --- a/ee/api/chalicelib/core/assist_stats.py +++ b/ee/api/chalicelib/core/assist_stats.py @@ -1,11 +1,12 @@ import logging -from datetime import datetime from fastapi import HTTPException from chalicelib.utils import pg_client, helper +from chalicelib.utils.TimeUTC import TimeUTC from schemas import AssistStatsSessionsRequest, AssistStatsSessionsResponse, AssistStatsTopMembersResponse +logger = logging.getLogger(__name__) event_type_mapping = { "sessionsAssisted": "assist", "assistDuration": "assist", @@ -17,12 +18,12 @@ def insert_aggregated_data(): try: logging.info("Assist Stats: Inserting aggregated data") - end_timestamp = int(datetime.timestamp(datetime.now())) * 1000 + end_timestamp = TimeUTC.now() start_timestamp = __last_run_end_timestamp_from_aggregates() if start_timestamp is None: # first run logging.info("Assist Stats: First run, inserting data for last 7 days") - start_timestamp = end_timestamp - (7 * 24 * 60 * 60 * 1000) + start_timestamp = end_timestamp - TimeUTC.MS_WEEK offset = 0 chunk_size = 1000 @@ -103,9 +104,8 @@ def __last_run_end_timestamp_from_aggregates(): result = cur.fetchone() last_run_time = result['last_run_time'] if result else None - if last_run_time is None: # first run handle all data - sql = "SELECT MIN(timestamp) as last_timestamp FROM assist_events;" - with pg_client.PostgresClient() as cur: + if last_run_time is None: # first run handle all data + sql = "SELECT MIN(timestamp) as last_timestamp FROM assist_events;" cur.execute(sql) result = cur.fetchone() last_run_time = result['last_timestamp'] if result else None diff --git a/ee/api/crons/ee_crons.py b/ee/api/crons/ee_crons.py index 9a4d165e42..cbf3b6925b 100644 --- a/ee/api/crons/ee_crons.py +++ b/ee/api/crons/ee_crons.py @@ -3,6 +3,8 @@ from chalicelib.utils import events_queue from chalicelib.core import assist_stats +from decouple import config + async def pg_events_queue() -> None: events_queue.global_queue.force_flush() @@ -12,8 +14,14 @@ async def assist_events_aggregates_cron() -> None: assist_stats.insert_aggregated_data() -ee_cron_jobs = [ - {"func": pg_events_queue, "trigger": IntervalTrigger(minutes=5), "misfire_grace_time": 20, "max_instances": 1}, - {"func": assist_events_aggregates_cron, - "trigger": IntervalTrigger(hours=1, start_date="2023-04-01 0:0:0", jitter=10), } +# SINGLE_CRONS are crons that will be run the crons-service, they are a singleton crons +SINGLE_CRONS = [{"func": assist_events_aggregates_cron, + "trigger": IntervalTrigger(hours=1, start_date="2023-04-01 0:0:0", jitter=10)}] + +# cron_jobs is the list of crons to run in main API service (so you will have as many runs as the number of instances of the API) +cron_jobs = [ + {"func": pg_events_queue, "trigger": IntervalTrigger(minutes=5), "misfire_grace_time": 20, "max_instances": 1} ] + +if config("LOCAL_CRONS", default=False, cast=bool): + cron_jobs += SINGLE_CRONS