Skip to content

Commit

Permalink
refactor(crons): changed assist_events_aggregates_cron to have only 1…
Browse files Browse the repository at this point in the history
… execution every hour

refactor(crons): optimized assist_events_aggregates_cron to use only 1 DB cursor for successive queries
  • Loading branch information
tahayk committed Dec 13, 2023
1 parent 1559598 commit d3dd2d0
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
2 changes: 1 addition & 1 deletion ee/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async def lifespan(app: FastAPI):
await events_queue.init()
app.schedule.start()

for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs + ee_crons.ee_cron_jobs:
for job in core_crons.cron_jobs + core_dynamic_crons.cron_jobs + traces.cron_jobs + ee_crons.cron_jobs:
app.schedule.add_job(id=job["func"].__name__, **job)

ap_logger.info(">Scheduled jobs:")
Expand Down
12 changes: 6 additions & 6 deletions ee/api/chalicelib/core/assist_stats.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
from datetime import datetime

from fastapi import HTTPException

from chalicelib.utils import pg_client, helper
from chalicelib.utils.TimeUTC import TimeUTC
from schemas import AssistStatsSessionsRequest, AssistStatsSessionsResponse, AssistStatsTopMembersResponse

logger = logging.getLogger(__name__)
event_type_mapping = {
"sessionsAssisted": "assist",
"assistDuration": "assist",
Expand All @@ -17,12 +18,12 @@
def insert_aggregated_data():
try:
logging.info("Assist Stats: Inserting aggregated data")
end_timestamp = int(datetime.timestamp(datetime.now())) * 1000
end_timestamp = TimeUTC.now()
start_timestamp = __last_run_end_timestamp_from_aggregates()

if start_timestamp is None: # first run
logging.info("Assist Stats: First run, inserting data for last 7 days")
start_timestamp = end_timestamp - (7 * 24 * 60 * 60 * 1000)
start_timestamp = end_timestamp - TimeUTC.MS_WEEK

offset = 0
chunk_size = 1000
Expand Down Expand Up @@ -103,9 +104,8 @@ def __last_run_end_timestamp_from_aggregates():
result = cur.fetchone()
last_run_time = result['last_run_time'] if result else None

if last_run_time is None: # first run handle all data
sql = "SELECT MIN(timestamp) as last_timestamp FROM assist_events;"
with pg_client.PostgresClient() as cur:
if last_run_time is None: # first run handle all data
sql = "SELECT MIN(timestamp) as last_timestamp FROM assist_events;"
cur.execute(sql)
result = cur.fetchone()
last_run_time = result['last_timestamp'] if result else None
Expand Down
16 changes: 12 additions & 4 deletions ee/api/crons/ee_crons.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from chalicelib.utils import events_queue
from chalicelib.core import assist_stats

from decouple import config


async def pg_events_queue() -> None:
events_queue.global_queue.force_flush()
Expand All @@ -12,8 +14,14 @@ async def assist_events_aggregates_cron() -> None:
assist_stats.insert_aggregated_data()


ee_cron_jobs = [
{"func": pg_events_queue, "trigger": IntervalTrigger(minutes=5), "misfire_grace_time": 20, "max_instances": 1},
{"func": assist_events_aggregates_cron,
"trigger": IntervalTrigger(hours=1, start_date="2023-04-01 0:0:0", jitter=10), }
# SINGLE_CRONS are crons that will be run the crons-service, they are a singleton crons
SINGLE_CRONS = [{"func": assist_events_aggregates_cron,
"trigger": IntervalTrigger(hours=1, start_date="2023-04-01 0:0:0", jitter=10)}]

# cron_jobs is the list of crons to run in main API service (so you will have as many runs as the number of instances of the API)
cron_jobs = [
{"func": pg_events_queue, "trigger": IntervalTrigger(minutes=5), "misfire_grace_time": 20, "max_instances": 1}
]

if config("LOCAL_CRONS", default=False, cast=bool):
cron_jobs += SINGLE_CRONS

0 comments on commit d3dd2d0

Please sign in to comment.