Skip to content

Commit

Permalink
Implement bigquery auth using pydata_google_auth
Browse files Browse the repository at this point in the history
Also allow project_id and dataset_id to be optional  Closes #1583

Author: Phillip Cloud <[email protected]>

Closes #1728 from cpcloud/gh-1583 and squashes the following commits:

bd27a9a [Phillip Cloud] Add credentials cache
5b9622d [Phillip Cloud] Add credentials cache
5c4ddd3 [Phillip Cloud] Hack around missing python3.7 in conda-forge google-auth-oauthlib
dabf43c [Phillip Cloud] Fix annotation
873c79b [Phillip Cloud] Implement bigquery auth using pydata_google_auth and allow project_id and dataset_id to be optional
8e6102a [Phillip Cloud] Add test for an operation that requires a dataset
4672311 [Phillip Cloud] Checkpoint
  • Loading branch information
cpcloud committed Mar 17, 2019
1 parent 95d12ae commit eb3c98c
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 38 deletions.
3 changes: 2 additions & 1 deletion ci/Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ RUN conda config --add channels conda-forge \
&& conda install -q -y \
--file /requirements-dev.yml \
python=$PYTHON \
&& conda clean --all -y
&& conda clean --all -y \
&& pip install pydata-google-auth

COPY . /ibis
WORKDIR /ibis
Expand Down
64 changes: 48 additions & 16 deletions ibis/bigquery/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
"""BigQuery public API"""
"""BigQuery public API."""

from typing import Optional

import google.cloud.bigquery # noqa: F401, fail early if bigquery is missing
import google.auth.credentials
import pydata_google_auth

import google.cloud.bigquery # noqa: F401 fail early if bigquery is missing
import ibis.common as com

from ibis.config import options # noqa: F401
Expand All @@ -22,42 +27,69 @@


def compile(expr, params=None):
"""
Force compilation of expression as though it were an expression depending
on BigQuery. Note you can also call expr.compile()
"""Compile an expression for BigQuery.
Returns
-------
compiled : string
compiled : str
See Also
--------
ibis.expr.types.Expr.compile
"""
from ibis.bigquery.compiler import to_sql
return to_sql(expr, dialect.make_context(params=params))


def verify(expr, params=None):
"""
Determine if expression can be successfully translated to execute on
BigQuery
"""
"""Check if an expression can be compiled using BigQuery."""
try:
compile(expr, params=params)
return True
except com.TranslationError:
return False


def connect(project_id, dataset_id, credentials=None):
"""Create a BigQueryClient for use with Ibis
SCOPES = ["https://www.googleapis.com/auth/bigquery"]
CLIENT_ID = (
"546535678771-gvffde27nd83kfl6qbrnletqvkdmsese.apps.googleusercontent.com"
)
CLIENT_SECRET = "iU5ohAF2qcqrujegE3hQ1cPt"


def connect(
project_id: Optional[str] = None,
dataset_id: Optional[str] = None,
credentials: Optional[google.auth.credentials.Credentials] = None,
) -> BigQueryClient:
"""Create a BigQueryClient for use with Ibis.
Parameters
----------
project_id: str
dataset_id: str
credentials : google.auth.credentials.Credentials, optional, default None
project_id : str
A BigQuery project id.
dataset_id : str
A dataset id that lives inside of the project indicated by
`project_id`.
credentials : google.auth.credentials.Credentials
Returns
-------
BigQueryClient
"""
if credentials is None:
credentials_cache = pydata_google_auth.cache.ReadWriteCredentialsCache(
filename="ibis.json"
)
credentials, project_id = pydata_google_auth.default(
SCOPES,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
credentials_cache=credentials_cache,
)

return BigQueryClient(project_id, dataset_id, credentials=credentials)
return BigQueryClient(
project_id, dataset_id=dataset_id, credentials=credentials
)
88 changes: 68 additions & 20 deletions ibis/bigquery/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""BigQuery ibis client implementation."""

import datetime

from collections import OrderedDict
from pkg_resources import parse_version
from typing import Optional, Tuple

import regex as re

Expand Down Expand Up @@ -51,6 +54,7 @@

@dt.dtype.register(bq.schema.SchemaField)
def bigquery_field_to_ibis_dtype(field):
"""Convert BigQuery `field` to an ibis type."""
typ = field.field_type
if typ == 'RECORD':
fields = field.fields
Expand All @@ -68,6 +72,7 @@ def bigquery_field_to_ibis_dtype(field):

@sch.infer.register(bq.table.Table)
def bigquery_schema(table):
"""Infer the schema of a BigQuery `table` object."""
fields = OrderedDict((el.name, dt.dtype(el)) for el in table.schema)
partition_info = table._properties.get('timePartitioning', None)

Expand All @@ -81,38 +86,57 @@ def bigquery_schema(table):


class BigQueryCursor:
"""Cursor to allow the BigQuery client to reuse machinery in ibis/client.py
"""BigQuery cursor.
This allows the BigQuery client to reuse machinery in
:file:`ibis/client.py`.
"""

def __init__(self, query):
"""Construct a BigQueryCursor with query `query`."""
self.query = query

def fetchall(self):
"""Fetch all rows."""
result = self.query.result()
return [row.values() for row in result]

@property
def columns(self):
"""Return the columns of the result set."""
result = self.query.result()
return [field.name for field in result.schema]

@property
def description(self):
"""Get the fields of the result set's schema."""
result = self.query.result()
return [field for field in result.schema]

def __enter__(self):
# For compatibility when constructed from Query.execute()
"""No-op for compatibility.
See Also
--------
ibis.client.Query.execute
"""
return self

def __exit__(self, exc_type, exc_value, traceback):
pass
"""No-op for compatibility.
See Also
--------
ibis.client.Query.execute
"""


def _find_scalar_parameter(expr):
""":func:`~ibis.expr.lineage.traverse` function to find all
:class:`~ibis.expr.types.ScalarParameter` instances and yield the operation
and the parent expresssion's resolved name.
"""Find all :class:`~ibis.expr.types.ScalarParameter` instances.
Parameters
----------
Expand All @@ -121,6 +145,8 @@ def _find_scalar_parameter(expr):
Returns
-------
Tuple[bool, object]
The operation and the parent expresssion's resolved name.
"""
op = expr.op()

Expand Down Expand Up @@ -163,7 +189,7 @@ def execute(self):


class BigQueryDatabase(Database):
pass
"""A BigQuery dataset."""


bigquery_param = Dispatcher('bigquery_param')
Expand Down Expand Up @@ -277,21 +303,22 @@ def rename_partitioned_column(table_expr, bq_table):
return table_expr.relabel({NATIVE_PARTITION_COL: col})


def parse_project_and_dataset(project, dataset):
"""Figure out the project id under which queries will run versus the
project of where the data live as well as what dataset to use.
def parse_project_and_dataset(
project: str,
dataset: Optional[str] = None,
) -> Tuple[str, str, Optional[str]]:
"""Compute the billing project, data project, and dataset if available.
This function figure out the project id under which queries will run versus
the project of where the data live as well as what dataset to use.
Parameters
----------
project : str
A project name
dataset : str
dataset : Optional[str]
A ``<project>.<dataset>`` string or just a dataset name
Returns
-------
data_project, billing_project, dataset : str, str, str
Examples
--------
>>> data_project, billing_project, dataset = parse_project_and_dataset(
Expand All @@ -314,32 +341,44 @@ def parse_project_and_dataset(project, dataset):
'ibis-gbq'
>>> dataset
'my_dataset'
>>> data_project, billing_project, dataset = parse_project_and_dataset(
... 'ibis-gbq'
... )
>>> data_project
'ibis-gbq'
>>> print(dataset)
None
"""
try:
data_project, dataset = dataset.split('.')
except ValueError:
except (ValueError, AttributeError):
billing_project = data_project = project
else:
billing_project = project

return data_project, billing_project, dataset


class BigQueryClient(SQLClient):
"""An ibis BigQuery client implementation."""

query_class = BigQueryQuery
database_class = BigQueryDatabase
table_class = BigQueryTable
dialect = comp.BigQueryDialect

def __init__(self, project_id, dataset_id, credentials=None):
"""
def __init__(self, project_id, dataset_id=None, credentials=None):
"""Construct a BigQueryClient.
Parameters
----------
project_id : str
A project name
dataset_id : str
dataset_id : Optional[str]
A ``<project_id>.<dataset_id>`` string or just a dataset name
credentials : google.auth.credentials.Credentials, optional
credentials : google.auth.credentials.Credentials
"""
(self.data_project,
self.billing_project,
Expand All @@ -348,6 +387,8 @@ def __init__(self, project_id, dataset_id, credentials=None):
credentials=credentials)

def _parse_project_and_dataset(self, dataset):
if not dataset and not self.dataset:
raise ValueError("Unable to determine BigQuery dataset.")
project, _, dataset = parse_project_and_dataset(
self.billing_project,
dataset or '{}.{}'.format(self.data_project, self.dataset),
Expand Down Expand Up @@ -381,10 +422,11 @@ def _execute_query(self, dml):

def _fully_qualified_name(self, name, database):
project, dataset = self._parse_project_and_dataset(database)
return '{}.{}.{}'.format(project, dataset, name)
return "{}.{}.{}".format(project, dataset, name)

def _get_table_schema(self, qualified_name):
dataset, table = qualified_name.rsplit('.', 1)
assert dataset is not None, "dataset is None"
return self.get_schema(table, database=dataset)

def _get_schema_using_query(self, limited_query):
Expand Down Expand Up @@ -413,6 +455,12 @@ def _execute(self, stmt, results=True, query_parameters=None):
return BigQueryCursor(query)

def database(self, name=None):
if name is None and self.dataset is None:
raise ValueError(
"Unable to determine BigQuery dataset. Call "
"client.database('my_dataset') or set_database('my_dataset') "
"to assign your client a dataset."
)
return self.database_class(name or self.dataset, self)

@property
Expand Down
6 changes: 6 additions & 0 deletions ibis/bigquery/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,3 +707,9 @@ def test_approx_median(alltypes):
expr = m.approx_median()
result = expr.execute()
assert result == expected


def test_client_without_dataset(project_id):
con = ibis.bigquery.connect(project_id)
with pytest.raises(ValueError, match="Unable to determine BigQuery"):
con.list_tables()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
kerberos_requires = ['requests-kerberos']
visualization_requires = ['graphviz']
clickhouse_requires = ['clickhouse-driver>=0.0.8', 'clickhouse-cityhash']
bigquery_requires = ['google-cloud-bigquery>=1.0.0']
bigquery_requires = ['google-cloud-bigquery>=1.0.0', 'pydata-google-auth']
hdf5_requires = ['tables>=3.0.0']
parquet_requires = ['pyarrow>=0.6.0']

Expand Down

0 comments on commit eb3c98c

Please sign in to comment.