Skip to content

Commit

Permalink
Improve exception handling, logging, and validation (#1477)
Browse files Browse the repository at this point in the history
* Add logging to apply method

Signed-off-by: Willem Pienaar <[email protected]>

* Linting changes

Signed-off-by: Willem Pienaar <[email protected]>

* Add error message when missing authentication for GCP provider

Signed-off-by: Willem Pienaar <[email protected]>

* Add proper error message for missing feature views in historical retrieval

Signed-off-by: Willem Pienaar <[email protected]>

* Add proper error message for credential error for bigquery client

Signed-off-by: Willem Pienaar <[email protected]>

* Add proper gcp project missing error for bigquery client

Signed-off-by: Willem Pienaar <[email protected]>

* Ensure all apply commands initialize a feature repository

Signed-off-by: Willem Pienaar <[email protected]>

* Ensure FeatureStore object can be run from outside a repository

Signed-off-by: Willem Pienaar <[email protected]>

* Fix linting

Signed-off-by: Willem Pienaar <[email protected]>

* Fix missing sqlite database when using relative paths

Signed-off-by: Willem Pienaar <[email protected]>

* Small tweaks based on PR feedback

Signed-off-by: Willem Pienaar <[email protected]>
  • Loading branch information
woop authored Apr 20, 2021
1 parent b8ef24e commit 9ec90f4
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 68 deletions.
18 changes: 6 additions & 12 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import pkg_resources
import yaml

from feast.errors import FeastObjectNotFoundException
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
from feast.feature_store import FeatureStore
from feast.repo_config import load_repo_config
from feast.repo_operations import (
Expand Down Expand Up @@ -156,8 +156,10 @@ def apply_total_command():
"""
cli_check_repo(Path.cwd())
repo_config = load_repo_config(Path.cwd())

apply_total(repo_config, Path.cwd())
try:
apply_total(repo_config, Path.cwd())
except FeastProviderLoginError as e:
print(str(e))


@cli.command("teardown")
Expand All @@ -179,7 +181,7 @@ def registry_dump_command():
cli_check_repo(Path.cwd())
repo_config = load_repo_config(Path.cwd())

registry_dump(repo_config)
registry_dump(repo_config, repo_path=Path.cwd())


@cli.command("materialize")
Expand Down Expand Up @@ -244,14 +246,6 @@ def init_command(project_directory, minimal: bool, template: str):
"""Create a new Feast repository"""
if not project_directory:
project_directory = generate_project_name()
if template and minimal:
from colorama import Fore, Style

click.echo(
f"Please select either a {Style.BRIGHT + Fore.GREEN}template{Style.RESET_ALL} or "
f"{Style.BRIGHT + Fore.GREEN}minimal{Style.RESET_ALL}, not both"
)
exit(1)

if minimal:
template = "minimal"
Expand Down
27 changes: 21 additions & 6 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,30 @@ class FeastObjectNotFoundException(Exception):


class EntityNotFoundException(FeastObjectNotFoundException):
def __init__(self, project, name):
super().__init__(f"Entity {name} does not exist in project {project}")
def __init__(self, name, project=None):
if project:
super().__init__(f"Entity {name} does not exist in project {project}")
else:
super().__init__(f"Entity {name} does not exist")


class FeatureViewNotFoundException(FeastObjectNotFoundException):
def __init__(self, project, name):
super().__init__(f"Feature view {name} does not exist in project {project}")
def __init__(self, name, project=None):
if project:
super().__init__(f"Feature view {name} does not exist in project {project}")
else:
super().__init__(f"Feature view {name} does not exist")


class FeatureTableNotFoundException(FeastObjectNotFoundException):
def __init__(self, project, name):
super().__init__(f"Feature table {name} does not exist in project {project}")
def __init__(self, name, project=None):
if project:
super().__init__(
f"Feature table {name} does not exist in project {project}"
)
else:
super().__init__(f"Feature table {name} does not exist")


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""
43 changes: 30 additions & 13 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from collections import OrderedDict, defaultdict
from datetime import datetime, timedelta
from pathlib import Path
Expand All @@ -21,6 +23,7 @@

from feast import utils
from feast.entity import Entity
from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException
from feast.feature_view import FeatureView
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.online_response import OnlineResponse, _infer_online_entity_rows
Expand All @@ -41,7 +44,7 @@ class FeatureStore:
"""

config: RepoConfig
repo_path: Optional[str]
repo_path: Path
_registry: Registry

def __init__(
Expand All @@ -53,19 +56,21 @@ def __init__(
repo_path: Path to a `feature_store.yaml` used to configure the feature store
config (RepoConfig): Configuration object used to configure the feature store
"""
self.repo_path = repo_path
if repo_path is not None and config is not None:
raise ValueError("You cannot specify both repo_path and config")
if config is not None:
self.repo_path = Path(os.getcwd())
self.config = config
elif repo_path is not None:
self.repo_path = Path(repo_path)
self.config = load_repo_config(Path(repo_path))
else:
raise ValueError("Please specify one of repo_path or config")

registry_config = self.config.get_registry_config()
self._registry = Registry(
registry_path=registry_config.path,
repo_path=self.repo_path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
self._tele = Telemetry()
Expand All @@ -80,7 +85,8 @@ def project(self) -> str:
return self.config.project

def _get_provider(self) -> Provider:
return get_provider(self.config)
# TODO: Bake self.repo_path into self.config so that we dont only have one interface to paths
return get_provider(self.config, self.repo_path)

def refresh_registry(self):
"""Fetches and caches a copy of the feature registry in memory.
Expand All @@ -101,6 +107,7 @@ def refresh_registry(self):
registry_config = self.config.get_registry_config()
self._registry = Registry(
registry_path=registry_config.path,
repo_path=self.repo_path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
self._registry.refresh()
Expand Down Expand Up @@ -271,16 +278,26 @@ def get_historical_features(
all_feature_views = self._registry.list_feature_views(
project=self.config.project
)
feature_views = _get_requested_feature_views(feature_refs, all_feature_views)
try:
feature_views = _get_requested_feature_views(
feature_refs, all_feature_views
)
except FeatureViewNotFoundException as e:
sys.exit(e)

provider = self._get_provider()
job = provider.get_historical_features(
self.config,
feature_views,
feature_refs,
entity_df,
self._registry,
self.project,
)
try:
job = provider.get_historical_features(
self.config,
feature_views,
feature_refs,
entity_df,
self._registry,
self.project,
)
except FeastProviderLoginError as e:
sys.exit(e)

return job

def materialize_incremental(
Expand Down Expand Up @@ -529,7 +546,7 @@ def _group_refs(
for ref in feature_refs:
view_name, feat_name = ref.split(":")
if view_name not in view_index:
raise ValueError(f"Could not find feature view from reference {ref}")
raise FeatureViewNotFoundException(view_name)
views_features[view_name].append(feat_name)

result = []
Expand Down
16 changes: 12 additions & 4 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import mmh3
import pandas
import pyarrow
from google.auth.exceptions import DefaultCredentialsError

from feast import FeatureTable, utils
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
Expand Down Expand Up @@ -37,10 +39,16 @@ def __init__(self, config: RepoConfig):
def _initialize_client(self):
from google.cloud import datastore

if self._gcp_project_id is not None:
return datastore.Client(self._gcp_project_id)
else:
return datastore.Client()
try:
if self._gcp_project_id is not None:
return datastore.Client(self._gcp_project_id)
else:
return datastore.Client()
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your local Google Cloud account'
)

def update_infra(
self,
Expand Down
10 changes: 7 additions & 3 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@


class LocalProvider(Provider):
_db_path: str
_db_path: Path

def __init__(self, config: RepoConfig):
def __init__(self, config: RepoConfig, repo_path: Path):

assert config is not None
assert config.online_store is not None
local_online_store_config = config.online_store
assert isinstance(local_online_store_config, SqliteOnlineStoreConfig)
self._db_path = local_online_store_config.path
local_path = Path(local_online_store_config.path)
if local_path.is_absolute():
self._db_path = local_path
else:
self._db_path = repo_path.joinpath(local_path)

def _get_conn(self):
Path(self._db_path).parent.mkdir(exist_ok=True)
Expand Down
46 changes: 36 additions & 10 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

import pandas
import pyarrow
from google.auth.exceptions import DefaultCredentialsError
from google.cloud import bigquery
from jinja2 import BaseLoader, Environment

from feast.data_source import BigQuerySource, DataSource
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.provider import (
Expand Down Expand Up @@ -59,9 +61,7 @@ def pull_latest_from_table_or_query(

@staticmethod
def _pull_query(query: str) -> pyarrow.Table:
from google.cloud import bigquery

client = bigquery.Client()
client = _get_bigquery_client()
query_job = client.query(query)
return query_job.to_arrow()

Expand All @@ -76,14 +76,18 @@ def get_historical_features(
) -> RetrievalJob:
# TODO: Add entity_df validation in order to fail before interacting with BigQuery

client = _get_bigquery_client()

if type(entity_df) is str:
entity_df_sql_table = f"({entity_df})"
elif isinstance(entity_df, pandas.DataFrame):
if "event_timestamp" not in entity_df.columns:
raise ValueError(
"Please provide an entity_df with a column named event_timestamp representing the time of events."
)
table_id = _upload_entity_df_into_bigquery(config.project, entity_df)
table_id = _upload_entity_df_into_bigquery(
config.project, entity_df, client
)
entity_df_sql_table = f"`{table_id}`"
else:
raise ValueError(
Expand All @@ -104,18 +108,19 @@ def get_historical_features(
max_timestamp=datetime.now() + timedelta(days=1),
left_table_query_string=entity_df_sql_table,
)
job = BigQueryRetrievalJob(query=query)

job = BigQueryRetrievalJob(query=query, client=client)
return job


class BigQueryRetrievalJob(RetrievalJob):
def __init__(self, query):
def __init__(self, query, client):
self.query = query
self.client = client

def to_df(self):
# TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df()
client = bigquery.Client()
df = client.query(self.query).to_dataframe(create_bqstorage_client=True)
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
return df


Expand All @@ -135,9 +140,8 @@ class FeatureViewQueryContext:
entity_selections: List[str]


def _upload_entity_df_into_bigquery(project, entity_df) -> str:
def _upload_entity_df_into_bigquery(project, entity_df, client) -> str:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns a reference to the resulting table"""
client = bigquery.Client()

# First create the BigQuery dataset if it doesn't exist
dataset = bigquery.Dataset(f"{client.project}.feast_{project}")
Expand Down Expand Up @@ -244,6 +248,28 @@ def build_point_in_time_query(
return query


def _get_bigquery_client():
try:
from google.cloud import bigquery

client = bigquery.Client()
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your '
"local Google Cloud account"
)
except EnvironmentError as e:
raise FeastProviderLoginError(
"GCP error: "
+ str(e)
+ "\nIt may be necessary to set a default GCP project by running "
'"gcloud config set project your-project"'
)

return client


# TODO: Optimizations
# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
Expand Down Expand Up @@ -123,15 +124,15 @@ def online_read(
...


def get_provider(config: RepoConfig) -> Provider:
def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
if config.provider == "gcp":
from feast.infra.gcp import GcpProvider

return GcpProvider(config)
elif config.provider == "local":
from feast.infra.local import LocalProvider

return LocalProvider(config)
return LocalProvider(config, repo_path)
else:
raise ValueError(config)

Expand Down
Loading

0 comments on commit 9ec90f4

Please sign in to comment.