Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement bigquery auth using pydata_google_auth #1728

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ci/Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ RUN conda config --add channels conda-forge \
&& conda install -q -y \
--file /requirements-dev.yml \
python=$PYTHON \
&& conda clean --all -y
&& conda clean --all -y \
&& pip install pydata-google-auth

COPY . /ibis
WORKDIR /ibis
Expand Down
64 changes: 48 additions & 16 deletions ibis/bigquery/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
"""BigQuery public API"""
"""BigQuery public API."""

from typing import Optional

import google.cloud.bigquery # noqa: F401, fail early if bigquery is missing
import google.auth.credentials
import pydata_google_auth

import google.cloud.bigquery # noqa: F401 fail early if bigquery is missing
import ibis.common as com

from ibis.config import options # noqa: F401
Expand All @@ -22,42 +27,69 @@


def compile(expr, params=None):
"""
Force compilation of expression as though it were an expression depending
on BigQuery. Note you can also call expr.compile()
"""Compile an expression for BigQuery.

Returns
-------
compiled : string
compiled : str

See Also
--------
ibis.expr.types.Expr.compile

"""
from ibis.bigquery.compiler import to_sql
return to_sql(expr, dialect.make_context(params=params))


def verify(expr, params=None):
"""
Determine if expression can be successfully translated to execute on
BigQuery
"""
"""Check if an expression can be compiled using BigQuery."""
try:
compile(expr, params=params)
return True
except com.TranslationError:
return False


def connect(project_id, dataset_id, credentials=None):
"""Create a BigQueryClient for use with Ibis
SCOPES = ["https://www.googleapis.com/auth/bigquery"]
CLIENT_ID = (
"546535678771-gvffde27nd83kfl6qbrnletqvkdmsese.apps.googleusercontent.com"
)
CLIENT_SECRET = "iU5ohAF2qcqrujegE3hQ1cPt"


def connect(
project_id: Optional[str] = None,
dataset_id: Optional[str] = None,
credentials: Optional[google.auth.credentials.Credentials] = None,
) -> BigQueryClient:
"""Create a BigQueryClient for use with Ibis.

Parameters
----------
project_id: str
dataset_id: str
credentials : google.auth.credentials.Credentials, optional, default None
project_id : str
A BigQuery project id.
dataset_id : str
A dataset id that lives inside of the project indicated by
`project_id`.
credentials : google.auth.credentials.Credentials

Returns
-------
BigQueryClient

"""
if credentials is None:
credentials_cache = pydata_google_auth.cache.ReadWriteCredentialsCache(
filename="ibis.json"
)
credentials, project_id = pydata_google_auth.default(
SCOPES,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
credentials_cache=credentials_cache,
)
cpcloud marked this conversation as resolved.
Show resolved Hide resolved

return BigQueryClient(project_id, dataset_id, credentials=credentials)
return BigQueryClient(
project_id, dataset_id=dataset_id, credentials=credentials
)
88 changes: 68 additions & 20 deletions ibis/bigquery/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""BigQuery ibis client implementation."""

import datetime

from collections import OrderedDict
from pkg_resources import parse_version
from typing import Optional, Tuple

import regex as re

Expand Down Expand Up @@ -51,6 +54,7 @@

@dt.dtype.register(bq.schema.SchemaField)
def bigquery_field_to_ibis_dtype(field):
"""Convert BigQuery `field` to an ibis type."""
typ = field.field_type
if typ == 'RECORD':
fields = field.fields
Expand All @@ -68,6 +72,7 @@ def bigquery_field_to_ibis_dtype(field):

@sch.infer.register(bq.table.Table)
def bigquery_schema(table):
"""Infer the schema of a BigQuery `table` object."""
fields = OrderedDict((el.name, dt.dtype(el)) for el in table.schema)
partition_info = table._properties.get('timePartitioning', None)

Expand All @@ -81,38 +86,57 @@ def bigquery_schema(table):


class BigQueryCursor:
"""Cursor to allow the BigQuery client to reuse machinery in ibis/client.py
"""BigQuery cursor.

This allows the BigQuery client to reuse machinery in
:file:`ibis/client.py`.

"""

def __init__(self, query):
"""Construct a BigQueryCursor with query `query`."""
self.query = query

def fetchall(self):
"""Fetch all rows."""
result = self.query.result()
return [row.values() for row in result]

@property
def columns(self):
"""Return the columns of the result set."""
result = self.query.result()
return [field.name for field in result.schema]

@property
def description(self):
"""Get the fields of the result set's schema."""
result = self.query.result()
return [field for field in result.schema]

def __enter__(self):
# For compatibility when constructed from Query.execute()
"""No-op for compatibility.

See Also
--------
ibis.client.Query.execute

"""
return self

def __exit__(self, exc_type, exc_value, traceback):
pass
"""No-op for compatibility.

See Also
--------
ibis.client.Query.execute

"""


def _find_scalar_parameter(expr):
""":func:`~ibis.expr.lineage.traverse` function to find all
:class:`~ibis.expr.types.ScalarParameter` instances and yield the operation
and the parent expresssion's resolved name.
"""Find all :class:`~ibis.expr.types.ScalarParameter` instances.

Parameters
----------
Expand All @@ -121,6 +145,8 @@ def _find_scalar_parameter(expr):
Returns
-------
Tuple[bool, object]
The operation and the parent expresssion's resolved name.

"""
op = expr.op()

Expand Down Expand Up @@ -163,7 +189,7 @@ def execute(self):


class BigQueryDatabase(Database):
pass
"""A BigQuery dataset."""


bigquery_param = Dispatcher('bigquery_param')
Expand Down Expand Up @@ -277,21 +303,22 @@ def rename_partitioned_column(table_expr, bq_table):
return table_expr.relabel({NATIVE_PARTITION_COL: col})


def parse_project_and_dataset(project, dataset):
"""Figure out the project id under which queries will run versus the
project of where the data live as well as what dataset to use.
def parse_project_and_dataset(
project: str,
dataset: Optional[str] = None,
) -> Tuple[str, str, Optional[str]]:
"""Compute the billing project, data project, and dataset if available.

This function figure out the project id under which queries will run versus
the project of where the data live as well as what dataset to use.

Parameters
----------
project : str
A project name
dataset : str
dataset : Optional[str]
A ``<project>.<dataset>`` string or just a dataset name

Returns
-------
data_project, billing_project, dataset : str, str, str

Examples
--------
>>> data_project, billing_project, dataset = parse_project_and_dataset(
Expand All @@ -314,32 +341,44 @@ def parse_project_and_dataset(project, dataset):
'ibis-gbq'
>>> dataset
'my_dataset'
>>> data_project, billing_project, dataset = parse_project_and_dataset(
... 'ibis-gbq'
... )
>>> data_project
'ibis-gbq'
>>> print(dataset)
None

"""
try:
data_project, dataset = dataset.split('.')
except ValueError:
except (ValueError, AttributeError):
billing_project = data_project = project
else:
billing_project = project

return data_project, billing_project, dataset


class BigQueryClient(SQLClient):
"""An ibis BigQuery client implementation."""

query_class = BigQueryQuery
database_class = BigQueryDatabase
table_class = BigQueryTable
dialect = comp.BigQueryDialect

def __init__(self, project_id, dataset_id, credentials=None):
"""
def __init__(self, project_id, dataset_id=None, credentials=None):
"""Construct a BigQueryClient.

Parameters
----------
project_id : str
A project name
dataset_id : str
dataset_id : Optional[str]
A ``<project_id>.<dataset_id>`` string or just a dataset name
credentials : google.auth.credentials.Credentials, optional
credentials : google.auth.credentials.Credentials

"""
(self.data_project,
self.billing_project,
Expand All @@ -348,6 +387,8 @@ def __init__(self, project_id, dataset_id, credentials=None):
credentials=credentials)

def _parse_project_and_dataset(self, dataset):
if not dataset and not self.dataset:
raise ValueError("Unable to determine BigQuery dataset.")
project, _, dataset = parse_project_and_dataset(
self.billing_project,
dataset or '{}.{}'.format(self.data_project, self.dataset),
Expand Down Expand Up @@ -381,10 +422,11 @@ def _execute_query(self, dml):

def _fully_qualified_name(self, name, database):
project, dataset = self._parse_project_and_dataset(database)
return '{}.{}.{}'.format(project, dataset, name)
return "{}.{}.{}".format(project, dataset, name)

def _get_table_schema(self, qualified_name):
dataset, table = qualified_name.rsplit('.', 1)
assert dataset is not None, "dataset is None"
return self.get_schema(table, database=dataset)

def _get_schema_using_query(self, limited_query):
Expand Down Expand Up @@ -413,6 +455,12 @@ def _execute(self, stmt, results=True, query_parameters=None):
return BigQueryCursor(query)

def database(self, name=None):
if name is None and self.dataset is None:
raise ValueError(
"Unable to determine BigQuery dataset. Call "
"client.database('my_dataset') or set_database('my_dataset') "
"to assign your client a dataset."
)
return self.database_class(name or self.dataset, self)

@property
Expand Down
6 changes: 6 additions & 0 deletions ibis/bigquery/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,3 +707,9 @@ def test_approx_median(alltypes):
expr = m.approx_median()
result = expr.execute()
assert result == expected


def test_client_without_dataset(project_id):
con = ibis.bigquery.connect(project_id)
with pytest.raises(ValueError, match="Unable to determine BigQuery"):
con.list_tables()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
kerberos_requires = ['requests-kerberos']
visualization_requires = ['graphviz']
clickhouse_requires = ['clickhouse-driver>=0.0.8', 'clickhouse-cityhash']
bigquery_requires = ['google-cloud-bigquery>=1.0.0']
bigquery_requires = ['google-cloud-bigquery>=1.0.0', 'pydata-google-auth']
hdf5_requires = ['tables>=3.0.0']
parquet_requires = ['pyarrow>=0.6.0']

Expand Down