Skip to content

Commit eeacce7

Browse files
lyowangGerrrr
authored andcommitted
Added bigquery support (#16)
* Added BigQuery support * Update bigquery.py
1 parent dc88a38 commit eeacce7

12 files changed

+733
-4
lines changed

examples/bigquery/README.md

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
## Schema
2+
3+
See [schema.sql](schema.sql) for the example schema.
4+
5+
## Usage
6+
7+
Define BigQuery connection details via environment variables:
8+
9+
```bash
10+
export BIGQUERY_PROJECT_ID=...
11+
export BIGQUERY_DATASET=...
12+
export BIGQUERY_VAULT_SECRET=...
13+
```
14+
or in `hunter.yaml`.
15+
16+
Also configure the credentials. See [config_credentials.sh](config_credentials.sh) for an example.
17+
18+
The following command shows results for a single test `aggregate_mem` and updates the database with newly found change points:
19+
20+
```bash
21+
$ BRANCH=trunk HUNTER_CONFIG=hunter.yaml hunter analyze aggregate_mem --update-bigquery
22+
```
+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Configure the GCP BigQuery key.
2+
touch bigquery_credentials.json
3+
export BIGQUERY_CREDENTIALS=$(readlink -f bigquery_credentials.json)
4+
echo "Loading ${BIGQUERY_CREDENTIALS} to export analysis summaries to BigQuery/Metabase."
5+
# ie: export BIGQUERY_VAULT_SECRET=v1/ci/kv/gcp/flink_sql_bigquery
6+
vault kv get -field=json "${BIGQUERY_VAULT_SECRET}" > "${BIGQUERY_CREDENTIALS}"
7+
# You may also copy your credential json directly to the bigquery_credentials.json for this to work.
8+
chmod 600 "${BIGQUERY_CREDENTIALS}"

examples/bigquery/hunter.yaml

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# External systems connectors configuration:
2+
bigquery:
3+
project_id: ${BIGQUERY_PROJECT_ID}
4+
dataset: ${BIGQUERY_DATASET}
5+
credentials: ${BIGQUERY_CREDENTIALS}
6+
7+
# Templates define common bits shared between test definitions:
8+
templates:
9+
common:
10+
type: bigquery
11+
time_column: commit_ts
12+
attributes: [experiment_id, config_id, commit]
13+
# required for --update-bigquery to work
14+
update_statement: |
15+
UPDATE ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.results
16+
SET {metric}_rel_forward_change=%s,
17+
{metric}_rel_backward_change=%s,
18+
{metric}_p_value=%s
19+
WHERE experiment_id = '{experiment_id}' AND config_id = {config_id}
20+
metrics:
21+
process_cumulative_rate_mean:
22+
direction: 1
23+
scale: 1
24+
process_cumulative_rate_stderr:
25+
direction: -1
26+
scale: 1
27+
process_cumulative_rate_diff:
28+
direction: -1
29+
scale: 1
30+
31+
# Define your tests here:
32+
tests:
33+
aggregate_mem:
34+
inherit: [ common ] # avoids repeating metrics definitions and postgres-related config
35+
query: |
36+
SELECT e.commit,
37+
e.commit_ts,
38+
r.process_cumulative_rate_mean,
39+
r.process_cumulative_rate_stderr,
40+
r.process_cumulative_rate_diff,
41+
r.experiment_id,
42+
r.config_id
43+
FROM ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.results r
44+
INNER JOIN ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.configs c ON r.config_id = c.id
45+
INNER JOIN ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.experiments e ON r.experiment_id = e.id
46+
WHERE e.exclude_from_analysis = false AND
47+
e.branch = 'test-branch' AND
48+
e.username = 'ci' AND
49+
c.store = 'test-store' AND
50+
c.cache = true AND
51+
c.benchmark = 'tpcds' AND
52+
c.instance_type = 'test-instance'
53+
ORDER BY e.commit_ts ASC;

examples/bigquery/schema.sql

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
CREATE TABLE IF NOT EXISTS configs (
2+
id BIGINT PRIMARY KEY NOT ENFORCED,
3+
benchmark STRING NOT NULL,
4+
scenario STRING NOT NULL,
5+
store STRING NOT NULL,
6+
instance_type STRING NOT NULL,
7+
cache BOOLEAN NOT NULL
8+
);
9+
10+
CREATE TABLE IF NOT EXISTS experiments (
11+
id BIGINT PRIMARY KEY NOT ENFORCED,
12+
ts TIMESTAMP NOT NULL,
13+
branch STRING NOT NULL,
14+
commit STRING NOT NULL,
15+
commit_ts TIMESTAMP NOT NULL,
16+
username STRING NOT NULL,
17+
details_url STRING NOT NULL,
18+
exclude_from_analysis BOOLEAN DEFAULT false NOT NULL,
19+
exclude_reason STRING
20+
);
21+
22+
CREATE TABLE IF NOT EXISTS results (
23+
experiment_id BIGINT NOT NULL REFERENCES flink_sql.experiments(id) NOT ENFORCED,
24+
config_id BIGINT NOT NULL REFERENCES flink_sql.configs(id) NOT ENFORCED,
25+
26+
process_cumulative_rate_mean BIGINT NOT NULL,
27+
process_cumulative_rate_stderr BIGINT NOT NULL,
28+
process_cumulative_rate_diff BIGINT NOT NULL,
29+
30+
process_cumulative_rate_mean_rel_forward_change FLOAT64,
31+
process_cumulative_rate_mean_rel_backward_change FLOAT64,
32+
process_cumulative_rate_mean_p_value DECIMAL,
33+
34+
process_cumulative_rate_stderr_rel_forward_change FLOAT64,
35+
process_cumulative_rate_stderr_rel_backward_change FLOAT64,
36+
process_cumulative_rate_stderr_p_value DECIMAL,
37+
38+
process_cumulative_rate_diff_rel_forward_change FLOAT64,
39+
process_cumulative_rate_diff_rel_backward_change FLOAT64,
40+
process_cumulative_rate_diff_p_value DECIMAL,
41+
42+
PRIMARY KEY (experiment_id, config_id) NOT ENFORCED
43+
);

hunter/bigquery.py

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from dataclasses import dataclass
2+
from datetime import datetime
3+
from typing import Dict
4+
5+
from google.cloud import bigquery
6+
from google.oauth2 import service_account
7+
8+
from hunter.analysis import ChangePoint
9+
from hunter.test_config import BigQueryTestConfig
10+
11+
12+
@dataclass
13+
class BigQueryConfig:
14+
project_id: str
15+
dataset: str
16+
credentials: str
17+
18+
19+
@dataclass
20+
class BigQueryError(Exception):
21+
message: str
22+
23+
24+
class BigQuery:
25+
__client = None
26+
__config = None
27+
28+
def __init__(self, config: BigQueryConfig):
29+
self.__config = config
30+
31+
@property
32+
def client(self) -> bigquery.Client:
33+
if self.__client is None:
34+
credentials = service_account.Credentials.from_service_account_file(
35+
self.__config.credentials,
36+
scopes=["https://www.googleapis.com/auth/cloud-platform"],
37+
)
38+
self.__client = bigquery.Client(credentials=credentials, project=credentials.project_id)
39+
return self.__client
40+
41+
def fetch_data(self, query: str):
42+
query_job = self.client.query(query) # API request
43+
results = query_job.result()
44+
columns = [field.name for field in results.schema]
45+
return (columns, results)
46+
47+
def insert_change_point(
48+
self,
49+
test: BigQueryTestConfig,
50+
metric_name: str,
51+
attributes: Dict,
52+
change_point: ChangePoint,
53+
):
54+
kwargs = {**attributes, **{test.time_column: datetime.utcfromtimestamp(change_point.time)}}
55+
update_stmt = test.update_stmt.format(
56+
metric=metric_name,
57+
forward_change_percent=change_point.forward_change_percent(),
58+
backward_change_percent=change_point.backward_change_percent(),
59+
p_value=change_point.stats.pvalue,
60+
**kwargs
61+
)
62+
query_job = self.client.query(update_stmt)
63+
64+
# Wait for the query to finish
65+
query_job.result()
66+
67+
# Output the number of rows affected
68+
print("Affected rows: {}".format(query_job.num_dml_affected_rows))

hunter/config.py

+11
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from expandvars import expandvars
77
from ruamel.yaml import YAML
88

9+
from hunter.bigquery import BigQueryConfig
910
from hunter.grafana import GrafanaConfig
1011
from hunter.graphite import GraphiteConfig
1112
from hunter.postgres import PostgresConfig
@@ -22,6 +23,7 @@ class Config:
2223
test_groups: Dict[str, List[TestConfig]]
2324
slack: SlackConfig
2425
postgres: PostgresConfig
26+
bigquery: BigQueryConfig
2527

2628

2729
@dataclass
@@ -133,6 +135,14 @@ def load_config_from(config_file: Path) -> Config:
133135
database=config["postgres"]["database"],
134136
)
135137

138+
bigquery_config = None
139+
if config.get("bigquery") is not None:
140+
bigquery_config = BigQueryConfig(
141+
project_id=config["bigquery"]["project_id"],
142+
dataset=config["bigquery"]["dataset"],
143+
credentials=config["bigquery"]["credentials"],
144+
)
145+
136146
templates = load_templates(config)
137147
tests = load_tests(config, templates)
138148
groups = load_test_groups(config, tests)
@@ -142,6 +152,7 @@ def load_config_from(config_file: Path) -> Config:
142152
grafana=grafana_config,
143153
slack=slack_config,
144154
postgres=postgres_config,
155+
bigquery=bigquery_config,
145156
tests=tests,
146157
test_groups=groups,
147158
)

0 commit comments

Comments
 (0)