Skip to content

Commit

Permalink
feat(chalice): cache autocomplete-top-10 responses
Browse files Browse the repository at this point in the history
feat(DB): support Spot login
  • Loading branch information
tahayk committed Aug 7, 2024
1 parent e940d9a commit 4bea4a6
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 4 deletions.
2 changes: 2 additions & 0 deletions api/chalicelib/core/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from chalicelib.utils import helper
from chalicelib.utils import pg_client
from chalicelib.utils.event_filter_definition import Event
from chalicelib.utils.or_cache import CachedResponse

logger = logging.getLogger(__name__)
TABLE = "public.autocomplete"
Expand Down Expand Up @@ -375,6 +376,7 @@ def is_top_supported(event_type):
return TYPE_TO_COLUMN.get(event_type, False)


@CachedResponse(table="or_cache.autocomplete_top_values", ttl=5 * 60)
def get_top_values(project_id, event_type, event_key=None):
with pg_client.PostgresClient() as cur:
if schemas.FilterType.has_value(event_type):
Expand Down
1 change: 1 addition & 0 deletions api/chalicelib/utils/or_cache/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .or_cache import CachedResponse
83 changes: 83 additions & 0 deletions api/chalicelib/utils/or_cache/or_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import functools
import inspect
import json
import logging
from chalicelib.utils import pg_client
import time
from fastapi.encoders import jsonable_encoder

logger = logging.getLogger(__name__)


class CachedResponse:
def __init__(self, table, ttl):
self.table = table
self.ttl = ttl

def __call__(self, func):
self.param_names = {i: param for i, param in enumerate(inspect.signature(func).parameters)}

@functools.wraps(func)
def wrapper(*args, **kwargs):
values = dict()
for i, param in self.param_names.items():
if i < len(args):
values[param] = args[i]
elif param in kwargs:
values[param] = kwargs[param]
else:
values[param] = None
result = self.__get(values)
if result is None or result["expired"] \
or result["result"] is None or len(result["result"]) == 0:
now = time.time()
result = func(*args, **kwargs)
now = time.time() - now
if result is not None and len(result) > 0:
self.__add(values, result, now)
result[0]["cached"] = False
else:
logger.info(f"using cached response for "
f"{func.__name__}({','.join([f'{key}={val}' for key, val in enumerate(values)])})")
result = result["result"]
result[0]["cached"] = True

return result

return wrapper

def __get(self, values):
with pg_client.PostgresClient() as cur:
sub_constraints = []
for key, value in values.items():
if value is not None:
sub_constraints.append(f"{key}=%({key})s")
else:
sub_constraints.append(f"{key} IS NULL")
query = f"""SELECT result,
(%(ttl)s>0
AND EXTRACT(EPOCH FROM (timezone('utc'::text, now()) - created_at - INTERVAL %(interval)s)) > 0) AS expired
FROM {self.table}
WHERE {" AND ".join(sub_constraints)}"""
query = cur.mogrify(query, {**values, 'ttl': self.ttl, 'interval': f'{self.ttl} seconds'})
logger.debug("------")
logger.debug(query)
logger.debug("------")
cur.execute(query)
result = cur.fetchone()
return result

def __add(self, values, result, execution_time):
with pg_client.PostgresClient() as cur:
query = f"""INSERT INTO {self.table} ({",".join(values.keys())},result,execution_time)
VALUES ({",".join([f"%({param})s" for param in values.keys()])},%(result)s,%(execution_time)s)
ON CONFLICT ({",".join(values.keys())}) DO UPDATE SET result=%(result)s,
execution_time=%(execution_time)s,
created_at=timezone('utc'::text, now());"""
query = cur.mogrify(query, {**values,
"result": json.dumps(jsonable_encoder(result)),
"execution_time": execution_time})
logger.debug("------")
logger.debug(query)
logger.debug("------")
cur.execute(query)
6 changes: 3 additions & 3 deletions api/routers/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ def events_search(projectId: int, q: Optional[str] = None,
context: schemas.CurrentContext = Depends(OR_context)):
if type and (not q or len(q) == 0) \
and (autocomplete.is_top_supported(type)):
# TODO: check if type is a valid value for autocomplete
return autocomplete.get_top_values(project_id=projectId, event_type=type, event_key=key)
elif (not q or len(q) == 0):
# return autocomplete.get_top_values(project_id=projectId, event_type=type, event_key=key)
return autocomplete.get_top_values(projectId, type, event_key=key)
elif not q or len(q) == 0:
return {"data": []}

if live:
Expand Down
1 change: 1 addition & 0 deletions ee/api/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,4 @@ Pipfile.lock
/NOTES.md
/chalicelib/core/db_request_handler.py
/routers/subs/spot.py
/chalicelib/utils/or_cache/
2 changes: 2 additions & 0 deletions ee/api/chalicelib/core/autocomplete_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from chalicelib.utils import ch_client
from chalicelib.utils import helper, exp_ch_helper
from chalicelib.utils.event_filter_definition import Event
from chalicelib.utils.or_cache import CachedResponse

TABLE = "experimental.autocomplete"

Expand Down Expand Up @@ -294,6 +295,7 @@ def is_top_supported(event_type):
return TYPE_TO_COLUMN.get(event_type, False)


@CachedResponse(table="or_cache.autocomplete_top_values", ttl=5 * 60)
def get_top_values(project_id, event_type, event_key=None):
with ch_client.ClickHouseClient() as cur:
if schemas.FilterType.has_value(event_type):
Expand Down
3 changes: 2 additions & 1 deletion ee/api/clean-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,5 @@ rm -rf ./orpy.py
rm -rf ./chalicelib/core/usability_testing/
rm -rf ./chalicelib/core/db_request_handler.py
rm -rf ./chalicelib/core/db_request_handler.py
rm -rf ./routers/subs/spot.py
rm -rf ./routers/subs/spot.py
rm -rf ./chalicelib/utils/or_cache
12 changes: 12 additions & 0 deletions ee/scripts/schema/db/init_dbs/postgresql/1.20.0/1.20.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ ALTER TABLE IF EXISTS public.users
ADD COLUMN IF NOT EXISTS spot_jwt_refresh_jti integer NULL DEFAULT NULL,
ADD COLUMN IF NOT EXISTS spot_jwt_refresh_iat timestamp without time zone NULL DEFAULT NULL;

CREATE SCHEMA IF NOT EXISTS or_cache;
CREATE TABLE IF NOT EXISTS or_cache.autocomplete_top_values
(
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
event_type text NOT NULL,
event_key text NULL,
result jsonb NULL,
execution_time integer NULL,
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
UNIQUE (project_id, event_type, event_key)
);

COMMIT;

\elif :is_next
Expand Down
13 changes: 13 additions & 0 deletions ee/scripts/schema/db/init_dbs/postgresql/init_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1305,4 +1305,17 @@ CREATE TABLE public.projects_conditions
filters jsonb NOT NULL DEFAULT '[]'::jsonb
);

CREATE TABLE or_cache.autocomplete_top_values
(
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
event_type text NOT NULL,
event_key text NULL,
result jsonb NULL,
execution_time integer NULL,
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
UNIQUE (project_id, event_type, event_key)
-- TODO: use `UNIQUE NULLS NOT DISTINCT (project_id, event_type, event_key)`
-- when PG upgrade is validated by devops team
);

COMMIT;
34 changes: 34 additions & 0 deletions ee/scripts/schema/db/rollback_dbs/postgresql/1.20.0/1.20.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
\set previous_version 'v1.20.0-ee'
\set next_version 'v1.19.0-ee'
SELECT openreplay_version() AS current_version,
openreplay_version() = :'previous_version' AS valid_previous,
openreplay_version() = :'next_version' AS is_next
\gset

\if :valid_previous
\echo valid previous DB version :'previous_version', starting DB downgrade to :'next_version'
BEGIN;
SELECT format($fn_def$
CREATE OR REPLACE FUNCTION openreplay_version()
RETURNS text AS
$$
SELECT '%1$s'
$$ LANGUAGE sql IMMUTABLE;
$fn_def$, :'next_version')
\gexec

--
ALTER TABLE IF EXISTS public.users
DROP COLUMN IF EXISTS spot_jwt_iat,
DROP COLUMN IF EXISTS spot_jwt_refresh_jti,
DROP COLUMN IF EXISTS spot_jwt_refresh_iat;

DROP SCHEMA or_cache CASCADE;

COMMIT;

\elif :is_next
\echo new version detected :'next_version', nothing to do
\else
\warn skipping DB downgrade of :'next_version', expected previous version :'previous_version', found :'current_version'
\endif
12 changes: 12 additions & 0 deletions scripts/schema/db/init_dbs/postgresql/1.20.0/1.20.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ ALTER TABLE IF EXISTS public.users
ADD COLUMN IF NOT EXISTS spot_jwt_refresh_jti integer NULL DEFAULT NULL,
ADD COLUMN IF NOT EXISTS spot_jwt_refresh_iat timestamp without time zone NULL DEFAULT NULL;

CREATE SCHEMA IF NOT EXISTS or_cache;
CREATE TABLE IF NOT EXISTS or_cache.autocomplete_top_values
(
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
event_type text NOT NULL,
event_key text NULL,
result jsonb NULL,
execution_time integer NULL,
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
UNIQUE (project_id, event_type, event_key)
);

COMMIT;

\elif :is_next
Expand Down
15 changes: 15 additions & 0 deletions scripts/schema/db/init_dbs/postgresql/init_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ BEGIN;
CREATE SCHEMA IF NOT EXISTS events_common;
CREATE SCHEMA IF NOT EXISTS events;
CREATE SCHEMA IF NOT EXISTS events_ios;
CREATE SCHEMA IF NOT EXISTS or_cache;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE EXTENSION IF NOT EXISTS pgcrypto;

Expand Down Expand Up @@ -1190,4 +1191,18 @@ CREATE TABLE public.projects_conditions
filters jsonb NOT NULL DEFAULT '[]'::jsonb
);

CREATE TABLE or_cache.autocomplete_top_values
(
project_id integer NOT NULL REFERENCES public.projects (project_id) ON DELETE CASCADE,
event_type text NOT NULL,
event_key text NULL,
result jsonb NULL,
execution_time integer NULL,
created_at timestamp DEFAULT timezone('utc'::text, now()) NOT NULL,
UNIQUE (project_id, event_type, event_key)
-- TODO: use `UNIQUE NULLS NOT DISTINCT (project_id, event_type, event_key)`
-- when PG upgrade is validated by devops team
);


COMMIT;
34 changes: 34 additions & 0 deletions scripts/schema/db/rollback_dbs/postgresql/1.20.0/1.20.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
\set previous_version 'v1.20.0'
\set next_version 'v1.19.0'
SELECT openreplay_version() AS current_version,
openreplay_version() = :'previous_version' AS valid_previous,
openreplay_version() = :'next_version' AS is_next
\gset

\if :valid_previous
\echo valid previous DB version :'previous_version', starting DB downgrade to :'next_version'
BEGIN;
SELECT format($fn_def$
CREATE OR REPLACE FUNCTION openreplay_version()
RETURNS text AS
$$
SELECT '%1$s'
$$ LANGUAGE sql IMMUTABLE;
$fn_def$, :'next_version')
\gexec

--
ALTER TABLE IF EXISTS public.users
DROP COLUMN IF EXISTS spot_jwt_iat,
DROP COLUMN IF EXISTS spot_jwt_refresh_jti,
DROP COLUMN IF EXISTS spot_jwt_refresh_iat;

DROP SCHEMA or_cache CASCADE;

COMMIT;

\elif :is_next
\echo new version detected :'next_version', nothing to do
\else
\warn skipping DB downgrade of :'next_version', expected previous version :'previous_version', found :'current_version'
\endif

0 comments on commit 4bea4a6

Please sign in to comment.