-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Co-authored-by: lyowang <[email protected]>
- Loading branch information
Showing
12 changed files
with
782 additions
and
64 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
## Schema | ||
|
||
See [schema.sql](schema.sql) for the example schema. | ||
|
||
## Usage | ||
|
||
Define BigQuery connection details via environment variables: | ||
|
||
```bash | ||
export BIGQUERY_PROJECT_ID=... | ||
export BIGQUERY_DATASET=... | ||
export BIGQUERY_VAULT_SECRET=... | ||
``` | ||
or in `hunter.yaml`. | ||
|
||
Also configure the credentials. See [config_credentials.sh](config_credentials.sh) for an example. | ||
|
||
The following command shows results for a single test `aggregate_mem` and updates the database with newly found change points: | ||
|
||
```bash | ||
$ BRANCH=trunk HUNTER_CONFIG=hunter.yaml hunter analyze aggregate_mem --update-bigquery | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# Configure the GCP BigQuery key. | ||
touch bigquery_credentials.json | ||
export BIGQUERY_CREDENTIALS=$(readlink -f bigquery_credentials.json) | ||
echo "Loading ${BIGQUERY_CREDENTIALS} to export analysis summaries to BigQuery/Metabase." | ||
# ie: export BIGQUERY_VAULT_SECRET=v1/ci/kv/gcp/flink_sql_bigquery | ||
vault kv get -field=json "${BIGQUERY_VAULT_SECRET}" > "${BIGQUERY_CREDENTIALS}" | ||
# You may also copy your credential json directly to the bigquery_credentials.json for this to work. | ||
chmod 600 "${BIGQUERY_CREDENTIALS}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# External systems connectors configuration: | ||
bigquery: | ||
project_id: ${BIGQUERY_PROJECT_ID} | ||
dataset: ${BIGQUERY_DATASET} | ||
credentials: ${BIGQUERY_CREDENTIALS} | ||
|
||
# Templates define common bits shared between test definitions: | ||
templates: | ||
common: | ||
type: bigquery | ||
time_column: commit_ts | ||
attributes: [experiment_id, config_id, commit] | ||
# required for --update-bigquery to work | ||
update_statement: | | ||
UPDATE ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.results | ||
SET {metric}_rel_forward_change=%s, | ||
{metric}_rel_backward_change=%s, | ||
{metric}_p_value=%s | ||
WHERE experiment_id = '{experiment_id}' AND config_id = {config_id} | ||
metrics: | ||
process_cumulative_rate_mean: | ||
direction: 1 | ||
scale: 1 | ||
process_cumulative_rate_stderr: | ||
direction: -1 | ||
scale: 1 | ||
process_cumulative_rate_diff: | ||
direction: -1 | ||
scale: 1 | ||
|
||
# Define your tests here: | ||
tests: | ||
aggregate_mem: | ||
inherit: [ common ] # avoids repeating metrics definitions and postgres-related config | ||
query: | | ||
SELECT e.commit, | ||
e.commit_ts, | ||
r.process_cumulative_rate_mean, | ||
r.process_cumulative_rate_stderr, | ||
r.process_cumulative_rate_diff, | ||
r.experiment_id, | ||
r.config_id | ||
FROM ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.results r | ||
INNER JOIN ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.configs c ON r.config_id = c.id | ||
INNER JOIN ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.experiments e ON r.experiment_id = e.id | ||
WHERE e.exclude_from_analysis = false AND | ||
e.branch = 'test-branch' AND | ||
e.username = 'ci' AND | ||
c.store = 'test-store' AND | ||
c.cache = true AND | ||
c.benchmark = 'tpcds' AND | ||
c.instance_type = 'test-instance' | ||
ORDER BY e.commit_ts ASC; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
CREATE TABLE IF NOT EXISTS configs ( | ||
id BIGINT PRIMARY KEY NOT ENFORCED, | ||
benchmark STRING NOT NULL, | ||
scenario STRING NOT NULL, | ||
store STRING NOT NULL, | ||
instance_type STRING NOT NULL, | ||
cache BOOLEAN NOT NULL | ||
); | ||
|
||
CREATE TABLE IF NOT EXISTS experiments ( | ||
id BIGINT PRIMARY KEY NOT ENFORCED, | ||
ts TIMESTAMP NOT NULL, | ||
branch STRING NOT NULL, | ||
commit STRING NOT NULL, | ||
commit_ts TIMESTAMP NOT NULL, | ||
username STRING NOT NULL, | ||
details_url STRING NOT NULL, | ||
exclude_from_analysis BOOLEAN DEFAULT false NOT NULL, | ||
exclude_reason STRING | ||
); | ||
|
||
CREATE TABLE IF NOT EXISTS results ( | ||
experiment_id BIGINT NOT NULL REFERENCES flink_sql.experiments(id) NOT ENFORCED, | ||
config_id BIGINT NOT NULL REFERENCES flink_sql.configs(id) NOT ENFORCED, | ||
|
||
process_cumulative_rate_mean BIGINT NOT NULL, | ||
process_cumulative_rate_stderr BIGINT NOT NULL, | ||
process_cumulative_rate_diff BIGINT NOT NULL, | ||
|
||
process_cumulative_rate_mean_rel_forward_change FLOAT64, | ||
process_cumulative_rate_mean_rel_backward_change FLOAT64, | ||
process_cumulative_rate_mean_p_value DECIMAL, | ||
|
||
process_cumulative_rate_stderr_rel_forward_change FLOAT64, | ||
process_cumulative_rate_stderr_rel_backward_change FLOAT64, | ||
process_cumulative_rate_stderr_p_value DECIMAL, | ||
|
||
process_cumulative_rate_diff_rel_forward_change FLOAT64, | ||
process_cumulative_rate_diff_rel_backward_change FLOAT64, | ||
process_cumulative_rate_diff_p_value DECIMAL, | ||
|
||
PRIMARY KEY (experiment_id, config_id) NOT ENFORCED | ||
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
from dataclasses import dataclass | ||
from datetime import datetime | ||
from typing import Dict | ||
|
||
from google.cloud import bigquery | ||
from google.oauth2 import service_account | ||
|
||
from hunter.analysis import ChangePoint | ||
from hunter.test_config import BigQueryTestConfig | ||
|
||
|
||
@dataclass | ||
class BigQueryConfig: | ||
project_id: str | ||
dataset: str | ||
credentials: str | ||
|
||
|
||
@dataclass | ||
class BigQueryError(Exception): | ||
message: str | ||
|
||
|
||
class BigQuery: | ||
__client = None | ||
__config = None | ||
|
||
def __init__(self, config: BigQueryConfig): | ||
self.__config = config | ||
|
||
@property | ||
def client(self) -> bigquery.Client: | ||
if self.__client is None: | ||
credentials = service_account.Credentials.from_service_account_file( | ||
self.__config.credentials, | ||
scopes=["https://www.googleapis.com/auth/cloud-platform"], | ||
) | ||
self.__client = bigquery.Client(credentials=credentials, project=credentials.project_id) | ||
return self.__client | ||
|
||
def fetch_data(self, query: str): | ||
query_job = self.client.query(query) # API request | ||
results = query_job.result() | ||
columns = [field.name for field in results.schema] | ||
return (columns, results) | ||
|
||
def insert_change_point( | ||
self, | ||
test: BigQueryTestConfig, | ||
metric_name: str, | ||
attributes: Dict, | ||
change_point: ChangePoint, | ||
): | ||
kwargs = {**attributes, **{test.time_column: datetime.utcfromtimestamp(change_point.time)}} | ||
update_stmt = test.update_stmt.format( | ||
metric=metric_name, | ||
forward_change_percent=change_point.forward_change_percent(), | ||
backward_change_percent=change_point.backward_change_percent(), | ||
p_value=change_point.stats.pvalue, | ||
**kwargs | ||
) | ||
query_job = self.client.query(update_stmt) | ||
|
||
# Wait for the query to finish | ||
query_job.result() | ||
|
||
# Output the number of rows affected | ||
print("Affected rows: {}".format(query_job.num_dml_affected_rows)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.