diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fa64357..51cdcc9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,3 +77,23 @@ jobs: - name: Test run: | tox run -e coverage + + integration-tests: + runs-on: ubuntu-latest + steps: + - name: Repository checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: 3.13 + + - name: Install dependencies + run: | + pip install --upgrade pip tox + + - name: Test + run: | + tox run -e integration-tests + diff --git a/integration_tests/__init__.py b/integration_tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/integration_tests/conftest.py b/integration_tests/conftest.py new file mode 100644 index 0000000..71e2f1a --- /dev/null +++ b/integration_tests/conftest.py @@ -0,0 +1,140 @@ +from collections.abc import Iterator +import typing as t + +import pytest +from pytest_docker.plugin import ( + Services, + get_cleanup_command, + get_setup_command, +) +import yaml + +from .fixtures.databases import DATABASE_SERVERS, DatabaseServer +from .fixtures.exporter import ( + ConfigWriter, + Exporter, + exporter, + write_config, +) + +__all__ = [ + "ConfigWriter", + "DatabaseServer", + "Exporter", + "exporter", + "write_config", +] + + +def pytest_addoption(parser: pytest.Parser) -> None: + parser.addoption( + "--databases", + help="DB engine to run tests on", + nargs="+", + choices=list(DATABASE_SERVERS), + default=list(DATABASE_SERVERS), + ) + + +def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: + metafunc.parametrize( + "db_server_name", + metafunc.config.getoption("--databases"), + ) + + +@pytest.fixture(scope="session") +def selected_db_servers( + request: pytest.FixtureRequest, + tmp_path_factory: pytest.TempPathFactory, + unused_tcp_port_factory: t.Callable[[], int], + docker_compose_project_name: str, +) -> Iterator[dict[str, DatabaseServer]]: + """Map server names to helper class to interact with them.""" + yield { + name: DATABASE_SERVERS[name]( + docker_compose_project_name, + unused_tcp_port_factory(), + tmp_path_factory.mktemp(f"db-{name}"), + ) + for name in request.config.getoption("--databases") + } + + +@pytest.fixture(scope="session") +def selected_db_servers_services( + selected_db_servers: dict[str, DatabaseServer], +) -> Iterator[dict[str, dict[str, t.Any]]]: + """Configuration stanzas for docker-compose services.""" + services = {} + for server in selected_db_servers.values(): + if config := server.docker_config(): + services[server.name] = config + + yield services + + +@pytest.fixture(autouse=True) +def skip_if_not_selected_db_server( + request: pytest.FixtureRequest, + db_server_name: str, +) -> None: + """Skip test if narkers exclude the current database server.""" + if marker := request.node.get_closest_marker("database_only"): + if db_server_name not in marker.args: + pytest.skip("Database server excluded") + if marker := request.node.get_closest_marker("database_exclude"): + if db_server_name in marker.args: + pytest.skip("Database server excluded") + + +@pytest.fixture(scope="session") +def docker_setup( + selected_db_servers_services: dict[str, dict[str, t.Any]], +) -> list[str] | str: + if selected_db_servers_services: + return t.cast(list[str], get_setup_command()) + + # don't run docker + return "" + + +@pytest.fixture(scope="session") +def docker_cleanup( + selected_db_servers_services: dict[str, dict[str, t.Any]], +) -> list[str] | str: + if selected_db_servers_services: + return t.cast(list[str], get_cleanup_command()) + + # don't run docker + return "" + + +@pytest.fixture(scope="session") +def docker_compose_file( + tmp_path_factory: pytest.TempPathFactory, + selected_db_servers_services: dict[str, dict[str, t.Any]], +) -> Iterator[str]: + """Path to docker-compose.yaml config file.""" + config_path = ( + tmp_path_factory.mktemp("docker-compose") / "docker-compose.yml" + ) + + config = {"services": selected_db_servers_services} + with config_path.open("w") as fd: + yaml.dump(config, fd) + yield str(config_path) + + +@pytest.fixture(autouse=True) +def db_server( + docker_services: Services, + selected_db_servers: dict[str, DatabaseServer], + db_server_name: str, +) -> Iterator[DatabaseServer]: + server = selected_db_servers[db_server_name] + docker_services.wait_until_responsive( + check=server.check_ready, timeout=10.0, pause=0.5 + ) + yield server + server.drop_tables() diff --git a/integration_tests/db_test.py b/integration_tests/db_test.py new file mode 100644 index 0000000..34c24d4 --- /dev/null +++ b/integration_tests/db_test.py @@ -0,0 +1,70 @@ +from .conftest import DatabaseServer, Exporter + + +async def test_basic(db_server: DatabaseServer, exporter: Exporter) -> None: + db_server.make_table("test", ["m"], ["l"]) + db_server.insert_values("test", [(1, "foo"), (2, "bar")]) + await exporter.run( + { + "databases": { + "db": {"dsn": db_server.dsn}, + }, + "metrics": { + "m": { + "type": "gauge", + "labels": ["l"], + }, + }, + "queries": { + "q": { + "databases": ["db"], + "metrics": ["m"], + "sql": "SELECT m, l FROM test", + }, + }, + } + ) + metrics = await exporter.get_metrics() + assert metrics["m"] == {("db", "foo"): 1.0, ("db", "bar"): 2.0} + + +async def test_multiple_metrics( + db_server: DatabaseServer, exporter: Exporter +) -> None: + db_server.make_table("test", ["m1", "m2"], ["l1", "l2", "l3"]) + db_server.insert_values( + "test", [(10, 20, "a", "b", "c"), (100, 200, "x", "y", "z")] + ) + await exporter.run( + { + "databases": { + "db": {"dsn": db_server.dsn}, + }, + "metrics": { + "m1": { + "type": "gauge", + "labels": ["l1", "l2"], + }, + "m2": { + "type": "gauge", + "labels": ["l1", "l3"], + }, + }, + "queries": { + "q": { + "databases": ["db"], + "metrics": ["m1", "m2"], + "sql": "SELECT m1, m2, l1, l2, l3 FROM test", + }, + }, + } + ) + metrics = await exporter.get_metrics() + assert metrics["m1"] == { + ("db", "a", "b"): 10.0, + ("db", "x", "y"): 100.0, + } + assert metrics["m2"] == { + ("db", "a", "c"): 20.0, + ("db", "x", "z"): 200.0, + } diff --git a/integration_tests/fixtures/__init__.py b/integration_tests/fixtures/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/integration_tests/fixtures/databases.py b/integration_tests/fixtures/databases.py new file mode 100644 index 0000000..43dfca8 --- /dev/null +++ b/integration_tests/fixtures/databases.py @@ -0,0 +1,133 @@ +from abc import ABC, abstractmethod +from functools import cached_property +from pathlib import Path +import random +import string +import typing as t + +import sqlalchemy as sa + + +def random_password(length: int = 10) -> str: + return "".join(random.choice(string.hexdigits) for _ in range(length)) + + +class DatabaseServer(ABC): + name: str + image: str | None = None + port: int = 0 + + def __init__( + self, container_prefix: str, public_port: int, workdir: Path + ) -> None: + self.container_prefix = container_prefix + self.public_port = public_port + self.workdir = workdir + self._password = random_password() + self._metadata = sa.MetaData() + + def docker_config(self) -> dict[str, t.Any]: + if self.image is None: + return {} + return { + "container_name": f"{self.container_prefix}-{self.name}", + "image": self.image, + "ports": [f"{self.public_port}:{self.port}"], + } + + @property + @abstractmethod + def dsn(self) -> str: + """The database connection string.""" + + def check_ready(self) -> bool: + """Check if the database accepts queries.""" + try: + self.execute("SELECT 1") + except sa.exc.OperationalError: + return False + + return True + + def execute( + self, + statement: str, + params: dict[str, t.Any] | list[dict[str, t.Any]] | None = None, + ) -> None: + with self._engine.connect() as conn: + with conn.begin(): + conn.execute(sa.text(statement), params) + + def make_table( + self, + table_name: str, + metrics: t.Sequence[str], + labels: t.Sequence[str] = (), + ) -> None: + """Add a table to the database for specified metrics.""" + sa.Table( + table_name, + self._metadata, + *(sa.Column(name, sa.Integer) for name in metrics), + *(sa.Column(name, sa.Text) for name in labels), + ) + self._metadata.create_all(self._engine) + + def drop_tables(self) -> None: + """Drop created tables.""" + self._metadata.drop_all(self._engine) + self._metadata = sa.MetaData() + + def insert_values( + self, table_name: str, values: list[tuple[str | int, ...]] + ) -> None: + table = self._metadata.tables[table_name] + columns = [column.name for column in table.columns] + with self._engine.connect() as conn: + with conn.begin(): + conn.execute( + table.insert(), [dict(zip(columns, v)) for v in values] + ) + + @cached_property + def _engine(self) -> sa.Engine: + return sa.create_engine(self.dsn) + + +class SQLite(DatabaseServer): + name = "sqlite" + + @property + def dsn(self) -> str: + db = self.workdir / "query-exporter.db" + return f"sqlite:///{db.absolute()}" + + +class PostgreSQL(DatabaseServer): + name = "postgresql" + image = "postgres" + port = 5432 + + _database = "query_exporter" + + def docker_config(self) -> dict[str, t.Any]: + return super().docker_config() | { + "environment": { + "POSTGRES_PASSWORD": self._password, + "POSTGRES_DB": self._database, + }, + "volumes": [ + { + "type": "tmpfs", + "target": "/var/lib/postgresql/data", + }, + ], + "command": "-F", + } + + @property + def dsn(self) -> str: + return f"postgresql+psycopg2://postgres:{self._password}@localhost:{self.public_port}" + + +DATABASE_SERVERS = {server.name: server for server in (SQLite, PostgreSQL)} diff --git a/integration_tests/fixtures/exporter.py b/integration_tests/fixtures/exporter.py new file mode 100644 index 0000000..1adab38 --- /dev/null +++ b/integration_tests/fixtures/exporter.py @@ -0,0 +1,104 @@ +import asyncio +from asyncio.subprocess import PIPE +from collections.abc import AsyncIterator, Iterator +from pathlib import Path +import typing as t + +from aiohttp import ClientConnectionError, ClientSession +from prometheus_client.parser import text_string_to_metric_families +import pytest +import yaml + +ConfigWriter = t.Callable[[t.Any], Path] + + +@pytest.fixture +def write_config( + tmp_path_factory: pytest.TempPathFactory, +) -> Iterator[ConfigWriter]: + """Write exporter configuration.""" + + def write(data: t.Any) -> Path: + path = tmp_path_factory.mktemp("query-exporter") / "config.yaml" + path.write_text(yaml.dump(data), "utf-8") + return path + + yield write + + +class Exporter: + """Wrapper to run the exporter process.""" + + _process: asyncio.subprocess.Process | None + + def __init__(self, port: int, write_config: ConfigWriter) -> None: + self.port = port + self.url = f"http://localhost:{self.port}" + + self._write_config = write_config + self._process = None + + async def run(self, config: dict[str, t.Any]) -> None: + """Start the exporter with the specified config.""" + config_file = self._write_config(config) + self._process = await asyncio.create_subprocess_exec( + "query-exporter", + "--port", + str(self.port), + "--config", + str(config_file), + "--log-level", + "debug", + stdout=PIPE, + stderr=PIPE, + ) + tries = 5 + for n in range(tries): + try: + await self._get("/") + except ClientConnectionError: + if n < tries: + await asyncio.sleep(0.5) + else: + raise + else: + return + + async def stop(self) -> None: + """Stop the exporter.""" + if self._process is None: + return + self._process.terminate() + await self._process.wait() + self._process = None + + async def get_metrics(self) -> dict[str, dict[tuple[str, ...], float]]: + """Return parsed metrics.""" + payload = await self._get("/metrics") + + metrics: dict[str, dict[tuple[str, ...], float]] = {} + for family in text_string_to_metric_families(payload): + for sample in family.samples: + labels = tuple( + sample.labels[label] for label in sorted(sample.labels) + ) + metrics.setdefault(family.name, {})[labels] = sample.value + + return metrics + + async def _get(self, path: str) -> str: + async with ClientSession() as client: + async with client.get( + self.url + path, raise_for_status=True + ) as response: + return await response.text() + + +@pytest.fixture +async def exporter( + unused_tcp_port_factory: t.Callable[[], int], write_config: ConfigWriter +) -> AsyncIterator[Exporter]: + """Interface to run the exporter.""" + exp = Exporter(unused_tcp_port_factory(), write_config) + yield exp + await exp.stop() diff --git a/pyproject.toml b/pyproject.toml index aff491c..896acf1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,12 @@ dependencies = [ "structlog", "toolrack>=4", ] +optional-dependencies.integration-tests = [ + "psycopg2-binary", + "pytest", + "pytest-asyncio", + "pytest-docker", +] optional-dependencies.testing = [ "pytest", "pytest-asyncio", @@ -84,6 +90,13 @@ lint.isort.force-sort-within-sections = true [tool.pytest.ini_options] asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" +markers = [ + "database_only: only run test on specified database(s)", + "database_exclude: don't run test on specified database(s)", +] +testpaths = [ + "tests", +] [tool.coverage.report] fail_under = 100.0 diff --git a/tox.ini b/tox.ini index 2cdb1a6..b9d454e 100644 --- a/tox.ini +++ b/tox.ini @@ -52,6 +52,13 @@ deps = commands = {envbindir}/query-exporter {posargs} +[testenv:integration-tests] +deps = + -r requirements.txt + .[integration-tests] +commands = + pytest integration_tests {posargs} + [testenv:update-dependencies] deps = pip-tools @@ -60,5 +67,6 @@ commands = [base] lint_files = + integration_tests \ query_exporter \ tests