diff --git a/.env_file b/.env_file index 6295e4946..660e81211 100644 --- a/.env_file +++ b/.env_file @@ -20,4 +20,5 @@ CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 CLICKHOUSE_PASSWORD=pass SUPERSET_USERNAME=superset SUPERSET_PASSWORD=superset -SUPERSET_METADATA_PORT=5433 \ No newline at end of file +SUPERSET_METADATA_PORT=5433 +ENVIRONMENT=test \ No newline at end of file diff --git a/dff/stats/utils.py b/dff/stats/utils.py index c4b3c472a..e0247ddfd 100644 --- a/dff/stats/utils.py +++ b/dff/stats/utils.py @@ -109,6 +109,8 @@ def get_superset_session(args: Namespace, base_url: str = "http://localhost:8088 :return: Authorized session - authorization headers tuple. """ + username = args.username + password = args.password healthcheck_url = parse.urljoin(base_url, "/healthcheck") login_url = parse.urljoin(base_url, "/api/v1/security/login") csrf_url = parse.urljoin(base_url, "/api/v1/security/csrf_token/") @@ -121,7 +123,7 @@ def get_superset_session(args: Namespace, base_url: str = "http://localhost:8088 access_request = session.post( login_url, headers={"Content-Type": "application/json", "Accept": "*/*"}, - data=json.dumps({"username": args.username, "password": args.password, "refresh": True, "provider": "db"}), + data=json.dumps({"username": username, "password": password, "refresh": True, "provider": "db"}), ) access_token = access_request.json()["access_token"] # get csrf_token diff --git a/dff/utils/docker/dockerfile_stats b/dff/utils/docker/dockerfile_stats index 9c40071ea..3a24a69df 100644 --- a/dff/utils/docker/dockerfile_stats +++ b/dff/utils/docker/dockerfile_stats @@ -1,6 +1,7 @@ FROM apache/superset:2.1.0rc1 USER root RUN cd /app && pip install .[clickhouse] +COPY health_stats.sh /app/docker/ COPY entrypoint_stats.sh /app/docker/ COPY --chown=superset superset_config_docker.py /app/pythonpath/ ENV SUPERSET_CONFIG_PATH /app/pythonpath/superset_config_docker.py diff --git a/dff/utils/docker/entrypoint_stats.sh b/dff/utils/docker/entrypoint_stats.sh index cac946f7d..262676796 100644 --- a/dff/utils/docker/entrypoint_stats.sh +++ b/dff/utils/docker/entrypoint_stats.sh @@ -2,7 +2,7 @@ export SERVER_THREADS_AMOUNT=8 set -m nohup /bin/bash /usr/bin/run-server.sh & -sleep 5 +/bin/bash /app/docker/health_stats.sh http://localhost:8088/health superset fab create-admin --firstname superset --lastname admin --username $SUPERSET_USERNAME --email admin@admin.com --password $SUPERSET_PASSWORD superset db upgrade superset init diff --git a/dff/utils/docker/health_stats.sh b/dff/utils/docker/health_stats.sh new file mode 100644 index 000000000..d90c5066f --- /dev/null +++ b/dff/utils/docker/health_stats.sh @@ -0,0 +1,19 @@ +#!/bin/bash +if [[ $# = 0 ]] ; then printf "Specify healthcheck url;\n"; exit 1; fi; +for itr in {1..10} +do +healthcheck=$(curl -X GET "${1}" | grep "OK") +healthcheck=$? +if [ "$healthcheck" -ne 0 ] ; then +echo "Healthcheck failed. sleeping for 5 secs" +sleep 5 +echo 'Iteration' $itr +if [ $itr == 10 ]; then +echo 'Healthcheck suite unsuccessful.' +fi +else +echo "Healthcheck suite succesful." +break +exit 0 +fi +done \ No newline at end of file diff --git a/dff/utils/docker/superset_config_docker.py b/dff/utils/docker/superset_config_docker.py index ca138927f..ddd662e6c 100644 --- a/dff/utils/docker/superset_config_docker.py +++ b/dff/utils/docker/superset_config_docker.py @@ -24,9 +24,10 @@ # import os -SQLALCHEMY_DATABASE_URI = "postgresql+psycopg2://{0}:{1}@dashboard-metadata:{2}/{3}".format( - os.getenv("POSTGRES_USERNAME"), - os.getenv("POSTGRES_PASSWORD"), - os.getenv("SUPERSET_METADATA_PORT"), - os.getenv("POSTGRES_DB"), -) +if os.getenv("ENVIRONMENT") == "prod": + SQLALCHEMY_DATABASE_URI = "postgresql+psycopg2://{0}:{1}@dashboard-metadata:{2}/{3}".format( + os.getenv("POSTGRES_USERNAME"), + os.getenv("POSTGRES_PASSWORD"), + os.getenv("SUPERSET_METADATA_PORT"), + os.getenv("POSTGRES_DB"), + ) diff --git a/makefile b/makefile index ca568c1b2..d80a4d8b2 100644 --- a/makefile +++ b/makefile @@ -55,6 +55,7 @@ docker_up: wait_db: docker_up while ! docker-compose exec psql pg_isready; do sleep 1; done > /dev/null + while ! docker-compose exec dashboard /bin/bash -c "curl localhost:8088/health | grep OK"; do sleep 1; done > /dev/null while ! docker-compose exec mysql bash -c 'mysql -u $$MYSQL_USERNAME -p$$MYSQL_PASSWORD -e "select 1;"'; do sleep 1; done &> /dev/null .PHONY: wait_db diff --git a/tests/stats/chart_data.py b/tests/stats/chart_data.py new file mode 100644 index 000000000..deab85959 --- /dev/null +++ b/tests/stats/chart_data.py @@ -0,0 +1,134 @@ +# %% +import random +import asyncio +from tqdm import tqdm +from dff.script import Context, Message, RESPONSE, TRANSITIONS +from dff.script import conditions as cnd +from dff.pipeline import Pipeline, ACTOR, Service +from dff.stats import OtelInstrumentor, default_extractors + +# %% +# instrumentation code +dff_instrumentor = OtelInstrumentor.from_url("grpc://localhost:4317", insecure=True) +dff_instrumentor.instrument() + + +def numbered_flow_factory(number: int): + return { + f"node_{str(n)}": { + RESPONSE: Message(text=f"node_{str(number)}_{str(n)}"), + TRANSITIONS: {f"node_{str(n+1)}": cnd.true()} if n != 4 else {("root", "fallback"): cnd.true()}, + } + for n in range(5) + } + + +numbered_script = { + "root": { + "start": { + RESPONSE: Message(text="Hi"), + TRANSITIONS: { + lambda ctx, pipeline: (f"flow_{random.choice(range(1, 11))}", "node_1", 1): cnd.true(), + }, + }, + "fallback": {RESPONSE: Message(text="Oops")}, + }, + **{f"flow_{str(n)}": numbered_flow_factory(n) for n in range(1, 11)}, +} + +transitions_script = { + "root": { + "start": { + RESPONSE: Message(text="Hi"), + TRANSITIONS: { + ("flow_1", "node"): cnd.true(), + }, + }, + "fallback": {RESPONSE: Message(text="Oops")}, + }, + **{ + f"flow_{str(num)}": { + "node": { + RESPONSE: Message(text="Message."), + TRANSITIONS: {(f"flow_{str(num+1)}", "node"): cnd.true()} + if num != 100 + else {("root", "fallback"): cnd.true()}, + } + } + for num in range(1, 101) + }, +} + + +transition_test_pipeline = Pipeline.from_dict( + { + "script": transitions_script, + "start_label": ("root", "start"), + "fallback_label": ("root", "fallback"), + "components": [ + Service( + handler=ACTOR, + after_handler=[ + default_extractors.get_current_label, + ], + ), + ], + } +) + +numbered_test_pipeline = Pipeline.from_dict( + { + "script": numbered_script, + "start_label": ("root", "start"), + "fallback_label": ("root", "fallback"), + "components": [ + Service( + handler=ACTOR, + after_handler=[ + default_extractors.get_current_label, + ], + ), + ], + } +) + + +# %% +async def worker(pipeline: Pipeline, queue: asyncio.Queue): + """ + Worker function for dispatching one client message. + The client message is chosen randomly from a predetermined set of options. + It simulates pauses in between messages by calling the sleep function. + + The function also starts a new dialog as a new user, if the current dialog + ended in the fallback_node. + + :param queue: Queue for sharing context variables. + """ + ctx: Context = await queue.get() + in_message = Message(text="Hi") + await asyncio.sleep(random.random() * 3) + ctx = await pipeline._run_pipeline(in_message, ctx.id) + await asyncio.sleep(random.random() * 3) + await queue.put(ctx) + + +# %% +# main loop +async def loop(pipeline: Pipeline, n_iterations: int = 10, n_workers: int = 10): + """ + The main loop that runs one or more worker coroutines in parallel. + + :param n_iterations: Total number of coroutine runs. + :param n_workers: Number of parallelized coroutine runs. + """ + ctxs = asyncio.Queue() + parallel_iterations = n_iterations // n_workers + for _ in range(n_workers): + await ctxs.put(Context()) + for _ in tqdm(range(parallel_iterations)): + await asyncio.gather(*(worker(pipeline, ctxs) for _ in range(n_workers))) + + +if __name__ == "__main__": + asyncio.run(loop(numbered_test_pipeline)) diff --git a/tests/stats/test_charts.py b/tests/stats/test_charts.py new file mode 100644 index 000000000..b431bbb60 --- /dev/null +++ b/tests/stats/test_charts.py @@ -0,0 +1,136 @@ +import os +import random +import json +import asyncio +from argparse import Namespace +from urllib import parse +import pytest + +try: + from requests import Session + import omegaconf # noqa: F401 + import tqdm # noqa: F401 + from dff.stats.utils import get_superset_session + from dff.stats.cli import DEFAULT_SUPERSET_URL + from aiochclient import ChClient + from httpx import AsyncClient +except ImportError: + pytest.skip(reason="`OmegaConf` dependency missing.", allow_module_level=True) + +from tests.stats.chart_data import numbered_test_pipeline, transition_test_pipeline, loop +from tests.context_storages.test_dbs import ping_localhost +from tests.test_utils import get_path_from_tests_to_current_dir + +random.seed(42) +dot_path_to_addon = get_path_from_tests_to_current_dir(__file__) + +SUPERSET_ACTIVE = ping_localhost(8088) +COLLECTOR_AVAILABLE = ping_localhost(4317) +CLICKHOUSE_AVAILABLE = ping_localhost(8123) +CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER") +CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD") +CLICKHOUSE_DB = os.getenv("CLICKHOUSE_DB") +SUPERSET_USERNAME = os.getenv("SUPERSET_USERNAME") +SUPERSET_PASSWORD = os.getenv("SUPERSET_PASSWORD") + + +async def transitions_data_test(session: Session, headers: dict, base_url=DEFAULT_SUPERSET_URL): + charts_url = parse.urljoin(DEFAULT_SUPERSET_URL, "/api/v1/chart") + + result = session.get(charts_url, headers=headers) + result.raise_for_status() + result_json = result.json() + + target_chart_id = [item for item in result_json["result"] if item["slice_name"] == "Transition counts"][0]["id"] + target_url = parse.urljoin(DEFAULT_SUPERSET_URL, f"api/v1/chart/{target_chart_id}/data/") + result_status = 404 + attempts = 0 + while result_status != 200 and attempts < 10: + attempts += 1 + data_result = session.get(target_url, headers=headers) + result_status = data_result.status_code + await asyncio.sleep(1) + + data_result_json = data_result.json() + data = data_result_json["result"][0]["data"] + assert (len(data)) > 0 + assert "COUNT_DISTINCT(context_id)" in data[0] + assert data[0]["COUNT_DISTINCT(context_id)"] == 10 + session.close() + + +async def numbered_data_test(session: Session, headers: dict, base_url=DEFAULT_SUPERSET_URL): + charts_url = parse.urljoin(DEFAULT_SUPERSET_URL, "/api/v1/chart") + + result = session.get(charts_url, headers=headers) + result.raise_for_status() + result_json = result.json() + + target_chart_id = [item for item in result_json["result"] if item["slice_name"] == "Table"][0]["id"] + target_url = parse.urljoin(DEFAULT_SUPERSET_URL, f"api/v1/chart/{target_chart_id}/data/") + result_status = 404 + attempts = 0 + while result_status != 200 and attempts < 10: + attempts += 1 + data_result = session.get(target_url, headers=headers) + result_status = data_result.status_code + await asyncio.sleep(2) + + data_result_json = data_result.json() + grouped_dict = dict() + data = data_result_json["result"][0]["data"] + assert len(data) > 0 + for item in data: + if item["context_id"] not in grouped_dict: + grouped_dict[item["context_id"]] = [item] + else: + grouped_dict[item["context_id"]].append(item) + unique_flows = list(map(lambda x: set(map(lambda y: json.loads(y["data"])["flow"], x)), grouped_dict.values())) + assert all(map(lambda x: len(x) == 1, unique_flows)) + session.close() + + +@pytest.mark.skipif(not SUPERSET_ACTIVE, reason="Superset server not active") +@pytest.mark.skipif(not CLICKHOUSE_AVAILABLE, reason="Clickhouse unavailable.") +@pytest.mark.skipif(not COLLECTOR_AVAILABLE, reason="OTLP collector unavailable.") +@pytest.mark.skipif( + not all([CLICKHOUSE_USER, CLICKHOUSE_PASSWORD, CLICKHOUSE_DB]), reason="Clickhouse credentials missing" +) +@pytest.mark.asyncio +@pytest.mark.parametrize( + ["pipeline", "func"], + [ + (numbered_test_pipeline, numbered_data_test), + (transition_test_pipeline, transitions_data_test), + ], +) +@pytest.mark.docker +async def test_charts(pipeline, func, otlp_log_exp_provider, otlp_trace_exp_provider): + _, tracer_provider = otlp_trace_exp_provider + _, logger_provider = otlp_log_exp_provider + + table = "otel_logs" + http_client = AsyncClient() + ch_client = ChClient(http_client, user=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD, database=CLICKHOUSE_DB) + await ch_client.execute(f"TRUNCATE {table}") + await loop(pipeline=pipeline) # run with a test-specific pipeline + tracer_provider.force_flush() + logger_provider.force_flush() + num_records = 0 + + attempts = 0 + while num_records < 10 and attempts < 10: + attempts += 1 + await asyncio.sleep(2) + num_records = await ch_client.fetchval(f"SELECT COUNT (*) FROM {table}") + + os.system( + f"dff.stats tutorials/stats/example_config.yaml \ + -U {SUPERSET_USERNAME} \ + -P {SUPERSET_PASSWORD} \ + -dP {CLICKHOUSE_PASSWORD}" + ) + session, headers = get_superset_session( + Namespace(**{"username": SUPERSET_USERNAME, "password": SUPERSET_PASSWORD}), DEFAULT_SUPERSET_URL + ) + await func(session, headers) # run with a test-specific function with equal signature diff --git a/tests/stats/test_main.py b/tests/stats/test_main.py index c6539343e..c188a00d5 100644 --- a/tests/stats/test_main.py +++ b/tests/stats/test_main.py @@ -52,7 +52,6 @@ def dashboard_display_test(args: Namespace, session, headers, base_url: str): dashboard_res = session.get(dashboard_url, headers=headers) assert dashboard_res.status_code == 200 dashboard_json = dashboard_res.json() - print(dashboard_json["result"]["charts"]) assert sorted(dashboard_json["result"]["charts"]) == [ "Current topic [time series bar chart]", "Current topic slot [bar chart]", @@ -77,8 +76,6 @@ def dashboard_display_test(args: Namespace, session, headers, base_url: str): datasets_result = session.get(datasets_url, headers=headers) datasets_json = datasets_result.json() assert datasets_json["count"] == 3 - assert datasets_json["ids"] == [1, 2, 3] - assert [item["id"] for item in datasets_json["result"]] == [1, 2, 3] assert sorted([item["table_name"] for item in datasets_json["result"]]) == [ "dff_final_nodes", "dff_node_stats", @@ -87,7 +84,6 @@ def dashboard_display_test(args: Namespace, session, headers, base_url: str): charts_result = session.get(charts_url, headers=headers) charts_json = charts_result.json() assert charts_json["count"] == 17 - assert sorted(charts_json["ids"]) == list(range(1, 18)) session.close() diff --git a/tests/stats/test_tutorials.py b/tests/stats/test_tutorials.py index efa1b1748..6182e6f72 100644 --- a/tests/stats/test_tutorials.py +++ b/tests/stats/test_tutorials.py @@ -12,6 +12,7 @@ from aiochclient import ChClient from httpx import AsyncClient from dff import stats # noqa: F401 + from utils.stats.utils import cleanup_clickhouse except ImportError: pytest.skip(allow_module_level=True, reason="There are dependencies missing.") @@ -49,7 +50,7 @@ async def test_tutorials_ch(tutorial_module_name: str, expected_logs, otlp_log_e ch_client = ChClient(http_client, user=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD, database=CLICKHOUSE_DB) try: - await ch_client.execute(f"TRUNCATE {table}") + await cleanup_clickhouse(table, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD, CLICKHOUSE_DB) pipeline = module.pipeline module.dff_instrumentor.uninstrument() module.dff_instrumentor.instrument(logger_provider=logger_provider, tracer_provider=tracer_provider) diff --git a/utils/stats/sample_data_provider.py b/tutorials/stats/3_sample_data_provider.py similarity index 88% rename from utils/stats/sample_data_provider.py rename to tutorials/stats/3_sample_data_provider.py index d778c922a..b46bd289c 100644 --- a/utils/stats/sample_data_provider.py +++ b/tutorials/stats/3_sample_data_provider.py @@ -1,12 +1,15 @@ -#!/usr/bin/env python # %% [markdown] """ +# 3. Sample data provider + This script demonstrates various instrumentation capabilities. It also provides data for the dashboard emulating simultaneous queries to the service by multiple users. """ +# %pip install dff[stats] + # %% import random import asyncio @@ -17,22 +20,32 @@ default_extractors, OtelInstrumentor, ) -from dff.utils.testing.toy_script import MULTIFLOW_SCRIPT, MULTIFLOW_REQUEST_OPTIONS +from dff.utils.testing.toy_script import ( + MULTIFLOW_SCRIPT, + MULTIFLOW_REQUEST_OPTIONS, +) # %% # instrumentation code -dff_instrumentor = OtelInstrumentor.from_url("grpc://localhost:4317", insecure=True) +dff_instrumentor = OtelInstrumentor.from_url( + "grpc://localhost:4317", insecure=True +) dff_instrumentor.instrument() def slot_processor_1(ctx: Context): - ctx.misc["slots"] = {**ctx.misc.get("slots", {}), "rating": random.randint(1, 10)} + ctx.misc["slots"] = { + **ctx.misc.get("slots", {}), + "rating": random.randint(1, 10), + } def slot_processor_2(ctx: Context): ctx.misc["slots"] = { **ctx.misc.get("slots", {}), - "current_topic": random.choice(["films", "games", "books", "smalltalk"]), + "current_topic": random.choice( + ["films", "games", "books", "smalltalk"] + ), } diff --git a/utils/stats/utils.py b/utils/stats/utils.py new file mode 100644 index 000000000..2d5cb2ddf --- /dev/null +++ b/utils/stats/utils.py @@ -0,0 +1,38 @@ +import os +import subprocess +from aiochclient import ChClient +from httpx import AsyncClient + + +async def cleanup_clickhouse(table: str, user: str, password: str, database: str): + http_client = AsyncClient() + ch_client = ChClient(http_client, user=user, password=password, database=database) + await ch_client.execute(f"TRUNCATE {table}") + + +def restart_pk(): + id_reset_cmd = """sh -c "psql --user={} --password -p {} --db=test -c \' + ALTER SEQUENCE {}_id_seq RESTART WITH 1; + ALTER SEQUENCE {}_id_seq RESTART WITH 1; + ALTER SEQUENCE {}_id_seq RESTART WITH 1; + ALTER SEQUENCE {}_id_seq RESTART WITH 1; + \'" + """ + formatted_id_reset = id_reset_cmd.format( + os.getenv("POSTGRES_USERNAME"), + os.getenv("SUPERSET_METADATA_PORT"), + "dashboards", + "slices", + "tables", + "dbs", + ) + command = ["docker-compose", "exec", "dashboard-metadata", formatted_id_reset] + _, error = subprocess.Popen( + command, + shell=True, + universal_newlines=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ).communicate(os.getenv("POSGTRES_PASSWORD")) + assert len(error) == 0