diff --git a/README.md b/README.md index 981d89f..dbd99ee 100644 --- a/README.md +++ b/README.md @@ -58,9 +58,7 @@ Environment variables are interpolated before interpreting the configuration fil ### Defining tests All test configurations are defined in the main configuration file. -Currently, there are two types of tests supported: tests that publish -their results to a CSV file, and tests that publish their results -to a Graphite database. +Hunter supports publishing results to a CSV file, [Graphite](https://graphiteapp.org/), and [PostgreSQL](https://www.postgresql.org/). Tests are defined in the `tests` section. @@ -142,6 +140,61 @@ $ curl -X POST "http://graphite_address/events/" \ Posting those events is not mandatory, but when they are available, Hunter is able to filter data by commit or version using `--since-commit` or `--since-version` selectors. +#### Importing results from PostgreSQL + +To import data from PostgreSQL, Hunter configuration must contain the database connection details: + +```yaml +# External systems connectors configuration: +postgres: + hostname: ... + port: ... + username: ... + password: ... + database: ... +``` + +Test configurations must contain a query to select experiment data, a time column, and a list of columns to analyze: + +```yaml +tests: + aggregate_mem: + type: postgres + time_column: commit_ts + attributes: [experiment_id, config_id, commit] + metrics: + process_cumulative_rate_mean: + direction: 1 + scale: 1 + process_cumulative_rate_stderr: + direction: -1 + scale: 1 + process_cumulative_rate_diff: + direction: -1 + scale: 1 + query: | + SELECT e.commit, + e.commit_ts, + r.process_cumulative_rate_mean, + r.process_cumulative_rate_stderr, + r.process_cumulative_rate_diff, + r.experiment_id, + r.config_id + FROM results r + INNER JOIN configs c ON r.config_id = c.id + INNER JOIN experiments e ON r.experiment_id = e.id + WHERE e.exclude_from_analysis = false AND + e.branch = 'trunk' AND + e.username = 'ci' AND + c.store = 'MEM' AND + c.cache = true AND + c.benchmark = 'aggregate' AND + c.instance_type = 'ec2i3.large' + ORDER BY e.commit_ts ASC; +``` + +For more details, see the examples in [examples/psql](examples/psql). + #### Avoiding test definition duplication You may find that your test definitions are very similar to each other, e.g. they all have the same metrics. Instead of copy-pasting the definitions diff --git a/examples/psql/README.md b/examples/psql/README.md new file mode 100644 index 0000000..f1333db --- /dev/null +++ b/examples/psql/README.md @@ -0,0 +1,22 @@ +## Schema + +See [schema.sql](schema.sql) for the example schema. + +## Usage + +Define PostgreSQL connection details via environment variables: + +```bash +export POSTGRES_HOSTNAME=... +export POSTGRES_USERNAME=... +export POSTGRES_PASSWORD=... +export POSTGRES_DATABASE=... +``` + +or in `hunter.yaml`. + +The following command shows results for a single test `aggregate_mem` and updates the database with newly found change points: + +```bash +$ BRANCH=trunk HUNTER_CONFIG=hunter.yaml hunter analyze aggregate_mem --update-postgres +``` diff --git a/examples/psql/hunter.yaml b/examples/psql/hunter.yaml new file mode 100644 index 0000000..664dca2 --- /dev/null +++ b/examples/psql/hunter.yaml @@ -0,0 +1,55 @@ +# External systems connectors configuration: +postgres: + hostname: ${POSTGRES_HOSTNAME} + port: ${POSTGRES_PORT} + username: ${POSTGRES_USERNAME} + password: ${POSTGRES_PASSWORD} + database: ${POSTGRES_DATABASE} + +# Templates define common bits shared between test definitions: +templates: + common: + type: postgres + time_column: commit_ts + attributes: [experiment_id, config_id, commit] + # required for --update-postgres to work + update_statement: | + UPDATE results + SET {metric}_rel_forward_change=%s, + {metric}_rel_backward_change=%s, + {metric}_p_value=%s + WHERE experiment_id = '{experiment_id}' AND config_id = {config_id} + metrics: + process_cumulative_rate_mean: + direction: 1 + scale: 1 + process_cumulative_rate_stderr: + direction: -1 + scale: 1 + process_cumulative_rate_diff: + direction: -1 + scale: 1 + +# Define your tests here: +tests: + aggregate_mem: + inherit: [ common ] # avoids repeating metrics definitions and postgres-related config + query: | + SELECT e.commit, + e.commit_ts, + r.process_cumulative_rate_mean, + r.process_cumulative_rate_stderr, + r.process_cumulative_rate_diff, + r.experiment_id, + r.config_id + FROM results r + INNER JOIN configs c ON r.config_id = c.id + INNER JOIN experiments e ON r.experiment_id = e.id + WHERE e.exclude_from_analysis = false AND + e.branch = '${BRANCH}' AND + e.username = 'ci' AND + c.store = 'MEM' AND + c.cache = true AND + c.benchmark = 'aggregate' AND + c.instance_type = 'ec2i3.large' + ORDER BY e.commit_ts ASC; \ No newline at end of file diff --git a/examples/psql/schema.sql b/examples/psql/schema.sql new file mode 100644 index 0000000..de825a9 --- /dev/null +++ b/examples/psql/schema.sql @@ -0,0 +1,48 @@ +CREATE TABLE IF NOT EXISTS configs ( + id SERIAL PRIMARY KEY, + benchmark TEXT NOT NULL, + scenario TEXT NOT NULL, + store TEXT NOT NULL, + instance_type TEXT NOT NULL, + cache BOOLEAN NOT NULL, + UNIQUE(benchmark, + scenario, + store, + cache, + instance_type) +); + +CREATE TABLE IF NOT EXISTS experiments ( + id TEXT PRIMARY KEY, + ts TIMESTAMPTZ NOT NULL, + branch TEXT NOT NULL, + commit TEXT NOT NULL, + commit_ts TIMESTAMPTZ NOT NULL, + username TEXT NOT NULL, + details_url TEXT NOT NULL, + exclude_from_analysis BOOLEAN DEFAULT false NOT NULL, + exclude_reason TEXT +); + +CREATE TABLE IF NOT EXISTS results ( + experiment_id TEXT NOT NULL REFERENCES experiments(id), + config_id INTEGER NOT NULL REFERENCES configs(id), + + process_cumulative_rate_mean BIGINT NOT NULL, + process_cumulative_rate_stderr BIGINT NOT NULL, + process_cumulative_rate_diff BIGINT NOT NULL, + + process_cumulative_rate_mean_rel_forward_change DOUBLE PRECISION, + process_cumulative_rate_mean_rel_backward_change DOUBLE PRECISION, + process_cumulative_rate_mean_p_value DECIMAL, + + process_cumulative_rate_stderr_rel_forward_change DOUBLE PRECISION, + process_cumulative_rate_stderr_rel_backward_change DOUBLE PRECISION, + process_cumulative_rate_stderr_p_value DECIMAL, + + process_cumulative_rate_diff_rel_forward_change DOUBLE PRECISION, + process_cumulative_rate_diff_rel_backward_change DOUBLE PRECISION, + process_cumulative_rate_diff_p_value DECIMAL, + + PRIMARY KEY (experiment_id, config_id) +); \ No newline at end of file diff --git a/hunter/config.py b/hunter/config.py index 323acde..e6638f2 100644 --- a/hunter/config.py +++ b/hunter/config.py @@ -8,6 +8,7 @@ from hunter.grafana import GrafanaConfig from hunter.graphite import GraphiteConfig +from hunter.postgres import PostgresConfig from hunter.slack import SlackConfig from hunter.test_config import TestConfig, create_test_config from hunter.util import merge_dict_list @@ -20,6 +21,7 @@ class Config: tests: Dict[str, TestConfig] test_groups: Dict[str, List[TestConfig]] slack: SlackConfig + postgres: PostgresConfig @dataclass @@ -110,6 +112,27 @@ def load_config_from(config_file: Path) -> Config: bot_token=config["slack"]["token"], ) + postgres_config = None + if config.get("postgres") is not None: + if not config["postgres"]["hostname"]: + raise ValueError("postgres.hostname") + if not config["postgres"]["port"]: + raise ValueError("postgres.port") + if not config["postgres"]["username"]: + raise ValueError("postgres.username") + if not config["postgres"]["password"]: + raise ValueError("postgres.password") + if not config["postgres"]["database"]: + raise ValueError("postgres.database") + + postgres_config = PostgresConfig( + hostname=config["postgres"]["hostname"], + port=config["postgres"]["port"], + username=config["postgres"]["username"], + password=config["postgres"]["password"], + database=config["postgres"]["database"], + ) + templates = load_templates(config) tests = load_tests(config, templates) groups = load_test_groups(config, tests) @@ -118,6 +141,7 @@ def load_config_from(config_file: Path) -> Config: graphite=graphite_config, grafana=grafana_config, slack=slack_config, + postgres=postgres_config, tests=tests, test_groups=groups, ) diff --git a/hunter/importer.py b/hunter/importer.py index f11392c..b02eb46 100644 --- a/hunter/importer.py +++ b/hunter/importer.py @@ -9,12 +9,15 @@ from hunter.config import Config from hunter.data_selector import DataSelector from hunter.graphite import DataPoint, Graphite, GraphiteError +from hunter.postgres import Postgres from hunter.series import Metric, Series from hunter.test_config import ( CsvMetric, CsvTestConfig, GraphiteTestConfig, HistoStatTestConfig, + PostgresMetric, + PostgresTestConfig, TestConfig, ) from hunter.util import ( @@ -431,17 +434,122 @@ def fetch_all_metric_names(self, test: HistoStatTestConfig) -> List[str]: return metric_names +class PostgresImporter(Importer): + __postgres: Postgres + + def __init__(self, postgres: Postgres): + self.__postgres = postgres + + @staticmethod + def __selected_metrics( + defined_metrics: Dict[str, PostgresMetric], selected_metrics: Optional[List[str]] + ) -> Dict[str, PostgresMetric]: + + if selected_metrics is not None: + return {name: defined_metrics[name] for name in selected_metrics} + else: + return defined_metrics + + def fetch_data(self, test_conf: TestConfig, selector: DataSelector = DataSelector()) -> Series: + if not isinstance(test_conf, PostgresTestConfig): + raise ValueError("Expected PostgresTestConfig") + + if selector.branch: + raise ValueError("Postgres tests don't support branching yet") + + since_time = selector.since_time + until_time = selector.until_time + if since_time.timestamp() > until_time.timestamp(): + raise DataImportError( + f"Invalid time range: [" + f"{format_timestamp(int(since_time.timestamp()))}, " + f"{format_timestamp(int(until_time.timestamp()))}]" + ) + metrics = self.__selected_metrics(test_conf.metrics, selector.metrics) + + columns, rows = self.__postgres.fetch_data(test_conf.query) + + # Decide which columns to fetch into which components of the result: + try: + time_index: int = columns.index(test_conf.time_column) + attr_indexes: List[int] = [columns.index(c) for c in test_conf.attributes] + metric_names = [m.name for m in metrics.values()] + metric_columns = [m.column for m in metrics.values()] + metric_indexes: List[int] = [columns.index(c) for c in metric_columns] + except ValueError as err: + raise DataImportError(f"Column not found {err.args[0]}") + + time: List[float] = [] + data: Dict[str, List[float]] = {} + for n in metric_names: + data[n] = [] + attributes: Dict[str, List[str]] = {} + for i in attr_indexes: + attributes[columns[i]] = [] + + for row in rows: + ts: datetime = row[time_index] + if since_time is not None and ts < since_time: + continue + if until_time is not None and ts >= until_time: + continue + time.append(ts.timestamp()) + + # Read metric values. Note we can still fail on conversion to float, + # because the user is free to override the column selection and thus + # they may select a column that contains non-numeric data: + for (name, i) in zip(metric_names, metric_indexes): + try: + data[name].append(float(row[i])) + except ValueError as err: + raise DataImportError( + "Could not convert value in column " + columns[i] + ": " + err.args[0] + ) + + # Attributes are just copied as-is, with no conversion: + for i in attr_indexes: + attributes[columns[i]].append(row[i]) + + # Convert metrics to series.Metrics + metrics = {m.name: Metric(m.direction, m.scale) for m in metrics.values()} + + # Leave last n points: + time = time[-selector.last_n_points :] + tmp = data + data = {} + for k, v in tmp.items(): + data[k] = v[-selector.last_n_points :] + tmp = attributes + attributes = {} + for k, v in tmp.items(): + attributes[k] = v[-selector.last_n_points :] + + return Series( + test_conf.name, + branch=None, + time=time, + metrics=metrics, + data=data, + attributes=attributes, + ) + + def fetch_all_metric_names(self, test_conf: PostgresTestConfig) -> List[str]: + return [m for m in test_conf.metrics.keys()] + + class Importers: __config: Config __csv_importer: Optional[CsvImporter] __graphite_importer: Optional[GraphiteImporter] __histostat_importer: Optional[HistoStatImporter] + __postgres_importer: Optional[PostgresImporter] def __init__(self, config: Config): self.__config = config self.__csv_importer = None self.__graphite_importer = None self.__histostat_importer = None + self.__postgres_importer = None def csv_importer(self) -> CsvImporter: if self.__csv_importer is None: @@ -458,6 +566,11 @@ def histostat_importer(self) -> HistoStatImporter: self.__histostat_importer = HistoStatImporter() return self.__histostat_importer + def postgres_importer(self) -> PostgresImporter: + if self.__postgres_importer is None: + self.__postgres_importer = PostgresImporter(Postgres(self.__config.postgres)) + return self.__postgres_importer + def get(self, test: TestConfig) -> Importer: if isinstance(test, CsvTestConfig): return self.csv_importer() @@ -465,5 +578,7 @@ def get(self, test: TestConfig) -> Importer: return self.graphite_importer() elif isinstance(test, HistoStatTestConfig): return self.histostat_importer() + elif isinstance(test, PostgresTestConfig): + return self.postgres_importer() else: raise ValueError(f"Unsupported test type {type(test)}") diff --git a/hunter/main.py b/hunter/main.py index 70e6da4..eb167ee 100644 --- a/hunter/main.py +++ b/hunter/main.py @@ -16,10 +16,16 @@ from hunter.grafana import Annotation, Grafana, GrafanaError from hunter.graphite import GraphiteError from hunter.importer import DataImportError, Importers +from hunter.postgres import Postgres, PostgresError from hunter.report import Report, ReportType from hunter.series import AnalysisOptions, AnalyzedSeries, compare from hunter.slack import NotificationError, SlackNotifier -from hunter.test_config import GraphiteTestConfig, TestConfig, TestConfigError +from hunter.test_config import ( + GraphiteTestConfig, + PostgresTestConfig, + TestConfig, + TestConfigError, +) from hunter.util import DateFormatError, interpolate, parse_datetime @@ -33,12 +39,14 @@ class Hunter: __importers: Importers __grafana: Optional[Grafana] __slack: Optional[SlackNotifier] + __postgres: Optional[Postgres] def __init__(self, conf: Config): self.__conf = conf self.__importers = Importers(conf) self.__grafana = None self.__slack = self.__maybe_create_slack_notifier() + self.__postgres = None def list_tests(self, group_names: Optional[List[str]]): if group_names is not None: @@ -203,6 +211,18 @@ def remove_grafana_annotations(self, test: Optional[TestConfig], force: bool): logging.info(f"Removing {len(annotations)} annotations...") grafana.delete_annotations(*(a.id for a in annotations)) + def __get_postgres(self) -> Postgres: + if self.__postgres is None: + self.__postgres = Postgres(self.__conf.postgres) + return self.__postgres + + def update_postgres(self, test: PostgresTestConfig, series: AnalyzedSeries): + postgres = self.__get_postgres() + for metric_name, change_points in series.change_points.items(): + for cp in change_points: + attributes = series.attributes_at(cp.index) + postgres.insert_change_point(test, metric_name, attributes, cp) + def regressions( self, test: TestConfig, selector: DataSelector, options: AnalysisOptions ) -> bool: @@ -498,6 +518,11 @@ def script_main(conf: Config, args: List[str] = None): help="Update Grafana dashboards with appropriate annotations of change points", action="store_true", ) + analyze_parser.add_argument( + "--update-postgres", + help="Update PostgreSQL database results with change points", + action="store_true", + ) analyze_parser.add_argument( "--notify-slack", help="Send notification containing a summary of change points to given Slack channels", @@ -556,6 +581,7 @@ def script_main(conf: Config, args: List[str] = None): if args.command == "analyze": update_grafana_flag = args.update_grafana + update_postgres_flag = args.update_postgres slack_notification_channels = args.notify_slack slack_cph_since = parse_datetime(args.cph_report_since) data_selector = data_selector_from_args(args) @@ -572,6 +598,10 @@ def script_main(conf: Config, args: List[str] = None): if not isinstance(test, GraphiteTestConfig): raise GrafanaError("Not a Graphite test") hunter.update_grafana_annotations(test, analyzed_series) + if update_postgres_flag: + if not isinstance(test, PostgresTestConfig): + raise PostgresError("Not a Postgres test") + hunter.update_postgres(test, analyzed_series) if slack_notification_channels: tests_analyzed_series[test.name] = analyzed_series except DataImportError as err: @@ -580,6 +610,10 @@ def script_main(conf: Config, args: List[str] = None): logging.error( f"Failed to update grafana dashboards for {test.name}: {err.message}" ) + except PostgresError as err: + logging.error( + f"Failed to update postgres database for {test.name}: {err.message}" + ) if slack_notification_channels: hunter.notify_slack( tests_analyzed_series, diff --git a/hunter/postgres.py b/hunter/postgres.py new file mode 100644 index 0000000..1b08e91 --- /dev/null +++ b/hunter/postgres.py @@ -0,0 +1,67 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Dict + +import psycopg2 + +from hunter.analysis import ChangePoint +from hunter.test_config import PostgresTestConfig + + +@dataclass +class PostgresConfig: + hostname: str + port: int + username: str + password: str + database: str + + +@dataclass +class PostgresError(Exception): + message: str + + +class Postgres: + __conn = None + __config = None + + def __init__(self, config: PostgresConfig): + self.__config = config + + def __get_conn(self) -> psycopg2.extensions.connection: + if self.__conn is None: + self.__conn = psycopg2.connect( + host=self.__config.hostname, + port=self.__config.port, + user=self.__config.username, + password=self.__config.password, + database=self.__config.database, + ) + return self.__conn + + def fetch_data(self, query: str): + cursor = self.__get_conn().cursor() + cursor.execute(query) + columns = [c.name for c in cursor.description] + return (columns, cursor.fetchall()) + + def insert_change_point( + self, + test: PostgresTestConfig, + metric_name: str, + attributes: Dict, + change_point: ChangePoint, + ): + cursor = self.__get_conn().cursor() + kwargs = {**attributes, **{test.time_column: datetime.utcfromtimestamp(change_point.time)}} + update_stmt = test.update_stmt.format(metric=metric_name, **kwargs) + cursor.execute( + update_stmt, + ( + change_point.forward_change_percent(), + change_point.backward_change_percent(), + change_point.stats.pvalue, + ), + ) + self.__get_conn().commit() diff --git a/hunter/series.py b/hunter/series.py index 30bc2a2..e5ae2a0 100644 --- a/hunter/series.py +++ b/hunter/series.py @@ -71,7 +71,7 @@ class ChangePointGroup: """A group of change points on multiple metrics, at the same time""" index: int - time: int + time: float prev_time: int attributes: Dict[str, str] prev_attributes: Dict[str, str] diff --git a/hunter/test_config.py b/hunter/test_config.py index 46a721b..97e9c31 100644 --- a/hunter/test_config.py +++ b/hunter/test_config.py @@ -121,12 +121,48 @@ def fully_qualified_metric_names(self): return HistoStatImporter().fetch_all_metric_names(self) +@dataclass +class PostgresMetric: + name: str + direction: int + scale: float + column: str + + +@dataclass +class PostgresTestConfig(TestConfig): + query: str + update_stmt: str + time_column: str + attributes: List[str] + metrics: Dict[str, PostgresMetric] + + def __init__( + self, + name: str, + query: str, + update_stmt: str = "", + time_column: str = "time", + metrics: List[PostgresMetric] = None, + attributes: List[str] = None, + ): + self.name = name + self.query = query + self.time_column = time_column + self.metrics = {m.name: m for m in metrics} if metrics else {} + self.attributes = attributes + self.update_stmt = update_stmt + + def fully_qualified_metric_names(self) -> List[str]: + return list(self.metrics.keys()) + + def create_test_config(name: str, config: Dict) -> TestConfig: """ Loads properties of a test from a dictionary read from hunter's config file This dictionary must have the `type` property to determine the type of the test. Other properties depend on the type. - Currently supported test types are `fallout`, `graphite` and `csv`. + Currently supported test types are `fallout`, `graphite`, `csv`, and `psql`. """ test_type = config.get("type") if test_type == "csv": @@ -135,6 +171,8 @@ def create_test_config(name: str, config: Dict) -> TestConfig: return create_graphite_test_config(name, config) elif test_type == "histostat": return create_histostat_test_config(name, config) + elif test_type == "postgres": + return create_postgres_test_config(name, config) elif test_type is None: raise TestConfigError(f"Test type not set for test {name}") else: @@ -226,3 +264,33 @@ def create_histostat_test_config(name: str, test_info: Dict) -> HistoStatTestCon f"Configuration referenced histostat file which does not exist: {file}" ) return HistoStatTestConfig(name, file) + + +def create_postgres_test_config(test_name: str, test_info: Dict) -> PostgresTestConfig: + try: + time_column = test_info.get("time_column", "time") + attributes = test_info.get("attributes", []) + metrics_info = test_info.get("metrics") + query = test_info["query"] + update_stmt = test_info.get("update_statement", "") + + metrics = [] + if isinstance(metrics_info, List): + for name in metrics_info: + metrics.append(CsvMetric(name, 1, 1.0)) + elif isinstance(metrics_info, Dict): + for (metric_name, metric_conf) in metrics_info.items(): + metrics.append( + PostgresMetric( + name=metric_name, + column=metric_conf.get("column", metric_name), + direction=int(metric_conf.get("direction", "1")), + scale=float(metric_conf.get("scale", "1")), + ) + ) + else: + raise TestConfigError(f"Metrics of the test {test_name} must be a list or dictionary") + + return PostgresTestConfig(test_name, query, update_stmt, time_column, metrics, attributes) + except KeyError as e: + raise TestConfigError(f"Configuration key not found in test {test_name}: {e.args[0]}") diff --git a/poetry.lock b/poetry.lock index 15b3499..e868cc9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -128,6 +128,7 @@ calendars = ["hijri-converter", "convertdate"] fasttext = ["fasttext"] langdetect = ["langdetect"] + [[package]] name = "decorator" version = "5.1.1" @@ -293,8 +294,16 @@ optional = false python-versions = ">=3.8" [package.extras] -dev = ["pre-commit", "tox"] -testing = ["pytest", "pytest-benchmark"] +testing = ["pytest-benchmark", "pytest"] +dev = ["tox", "pre-commit"] + +[[package]] +name = "psycopg2-binary" +version = "2.9.3" +description = "psycopg2 - Python-PostgreSQL Database Adapter" +category = "main" +optional = false +python-versions = ">=3.6" [[package]] name = "py" @@ -523,7 +532,6 @@ python-versions = ">=3.7" [[package]] name = "tox" version = "3.28.0" -description = "tox is a generic virtualenv management and test command line tool" category = "dev" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" @@ -665,6 +673,7 @@ packaging = [] pathspec = [] platformdirs = [] pluggy = [] +psycopg2-binary = [] py = [] pycodestyle = [] pyflakes = [] diff --git a/pyproject.toml b/pyproject.toml index b06f99d..8cbc7b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ expandvars = "^0.6.5" numpy = "1.24" python = ">=3.8,<3.13" python-dateutil = "^2.8.1" +psycopg2-binary = "^2.9.3" signal-processing-algorithms = "^1.3.2" "ruamel.yaml" = "=0.17.21" requests = "^2.25.1" @@ -18,6 +19,7 @@ tabulate = "^0.8.7" validators = "^0.18.2" slack-sdk = "^3.4.2" + [tool.poetry.dev-dependencies] pytest = "^6.2.2" pytz = "2021.1" diff --git a/tests/importer_test.py b/tests/importer_test.py index ddfebf8..600f06a 100644 --- a/tests/importer_test.py +++ b/tests/importer_test.py @@ -4,8 +4,14 @@ from hunter.csv_options import CsvOptions from hunter.graphite import DataSelector -from hunter.importer import CsvImporter, HistoStatImporter -from hunter.test_config import CsvMetric, CsvTestConfig, HistoStatTestConfig +from hunter.importer import CsvImporter, HistoStatImporter, PostgresImporter +from hunter.test_config import ( + CsvMetric, + CsvTestConfig, + HistoStatTestConfig, + PostgresMetric, + PostgresTestConfig, +) SAMPLE_CSV = "tests/resources/sample.csv" @@ -116,3 +122,79 @@ def test_import_histostat_last_n_points(): series = importer.fetch_data(test, selector=selector) assert len(series.time) == 2 assert len(series.data["initialize.result-success.count"]) == 2 + + +class MockPostgres: + def fetch_data(self, query: str): + return ( + ["time", "metric1", "metric2", "commit"], + [ + (datetime(2022, 7, 1, 15, 11, tzinfo=pytz.UTC), 2, 3, "aaabbb"), + (datetime(2022, 7, 2, 16, 22, tzinfo=pytz.UTC), 5, 6, "cccddd"), + (datetime(2022, 7, 3, 17, 13, tzinfo=pytz.UTC), 2, 3, "aaaccc"), + (datetime(2022, 7, 4, 18, 24, tzinfo=pytz.UTC), 5, 6, "ccc123"), + (datetime(2022, 7, 5, 19, 15, tzinfo=pytz.UTC), 2, 3, "aaa493"), + (datetime(2022, 7, 6, 20, 26, tzinfo=pytz.UTC), 5, 6, "cccfgl"), + (datetime(2022, 7, 7, 21, 17, tzinfo=pytz.UTC), 2, 3, "aaalll"), + (datetime(2022, 7, 8, 22, 28, tzinfo=pytz.UTC), 5, 6, "cccccc"), + (datetime(2022, 7, 9, 23, 19, tzinfo=pytz.UTC), 2, 3, "aadddd"), + (datetime(2022, 7, 10, 9, 29, tzinfo=pytz.UTC), 5, 6, "cciiii"), + ], + ) + + +def test_import_postgres(): + test = PostgresTestConfig( + name="test", + query="SELECT * FROM sample;", + time_column="time", + metrics=[PostgresMetric("m1", 1, 1.0, "metric1"), PostgresMetric("m2", 1, 5.0, "metric2")], + attributes=["commit"], + ) + importer = PostgresImporter(MockPostgres()) + series = importer.fetch_data(test_conf=test, selector=data_selector()) + assert len(series.data.keys()) == 2 + assert len(series.time) == 10 + assert len(series.data["m1"]) == 10 + assert len(series.data["m2"]) == 10 + assert len(series.attributes["commit"]) == 10 + assert series.metrics["m2"].scale == 5.0 + + +def test_import_postgres_with_time_filter(): + test = PostgresTestConfig( + name="test", + query="SELECT * FROM sample;", + time_column="time", + metrics=[PostgresMetric("m1", 1, 1.0, "metric1"), PostgresMetric("m2", 1, 5.0, "metric2")], + attributes=["commit"], + ) + + importer = PostgresImporter(MockPostgres()) + selector = DataSelector() + tz = pytz.timezone("Etc/GMT+1") + selector.since_time = datetime(2022, 7, 8, 0, 0, 0, tzinfo=tz) + selector.until_time = datetime(2022, 7, 10, 0, 0, 0, tzinfo=tz) + series = importer.fetch_data(test, selector=selector) + assert len(series.data.keys()) == 2 + assert len(series.time) == 2 + assert len(series.data["m1"]) == 2 + assert len(series.data["m2"]) == 2 + + +def test_import_postgres_last_n_points(): + test = PostgresTestConfig( + name="test", + query="SELECT * FROM sample;", + time_column="time", + metrics=[PostgresMetric("m1", 1, 1.0, "metric1"), PostgresMetric("m2", 1, 5.0, "metric2")], + attributes=["commit"], + ) + + importer = PostgresImporter(MockPostgres()) + selector = data_selector() + selector.last_n_points = 5 + series = importer.fetch_data(test, selector=selector) + assert len(series.time) == 5 + assert len(series.data["m2"]) == 5 + assert len(series.attributes["commit"]) == 5