diff --git a/.prow/config.yaml b/.prow/config.yaml index 41f95180fb..4b6e352a12 100644 --- a/.prow/config.yaml +++ b/.prow/config.yaml @@ -145,18 +145,18 @@ presubmits: postsubmits: gojek/feast: - name: publish-python-sdk - decorate: true + decorate: true spec: containers: - image: python:3 command: - sh - - -c + - -c - | .prow/scripts/publish-python-sdk.sh \ --directory-path sdk/python --repository pypi volumeMounts: - - name: pypirc + - name: pypirc mountPath: /root/.pypirc subPath: .pypirc readOnly: true @@ -170,7 +170,7 @@ postsubmits: - ^v(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(-(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*)?(\+[0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*)?$ - name: publish-docker-images - decorate: true + decorate: true spec: containers: - image: google/cloud-sdk:273.0.0 @@ -182,14 +182,14 @@ postsubmits: --archive-uri gs://feast-templocation-kf-feast/.m2.2019-10-24.tar \ --output-dir $PWD/ - if [ $PULL_BASE_REF == "master" ]; then - + if [ $PULL_BASE_REF == "master" ]; then + .prow/scripts/publish-docker-image.sh \ --repository gcr.io/kf-feast/feast-core \ --tag dev \ --file infra/docker/core/Dockerfile \ --google-service-account-file /etc/gcloud/service-account.json - + .prow/scripts/publish-docker-image.sh \ --repository gcr.io/kf-feast/feast-serving \ --tag dev \ @@ -203,13 +203,13 @@ postsubmits: docker push gcr.io/kf-feast/feast-serving:${PULL_BASE_SHA} else - + .prow/scripts/publish-docker-image.sh \ --repository gcr.io/kf-feast/feast-core \ --tag ${PULL_BASE_REF:1} \ --file infra/docker/core/Dockerfile \ --google-service-account-file /etc/gcloud/service-account.json - + .prow/scripts/publish-docker-image.sh \ --repository gcr.io/kf-feast/feast-serving \ --tag ${PULL_BASE_REF:1} \ @@ -244,7 +244,7 @@ postsubmits: - ^v(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(-(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*)?(\+[0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*)?$ - name: publish-helm-chart - decorate: true + decorate: true spec: containers: - image: google/cloud-sdk:273.0.0-slim @@ -253,7 +253,7 @@ postsubmits: - -c - | gcloud auth activate-service-account --key-file /etc/gcloud/service-account.json - + curl -s https://get.helm.sh/helm-v2.16.1-linux-amd64.tar.gz | tar -C /tmp -xz mv /tmp/linux-amd64/helm /usr/bin/helm helm init --client-only diff --git a/sdk/__init__.py b/sdk/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 3af2e12a91..646de343d5 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -15,10 +15,11 @@ import logging import os -import sys +import time from collections import OrderedDict from typing import Dict, Union from typing import List + import grpc import pandas as pd import pyarrow as pa @@ -33,11 +34,13 @@ GetFeatureSetResponse, ) from feast.core.CoreService_pb2_grpc import CoreServiceStub -from feast.exceptions import format_grpc_exception +from feast.core.FeatureSet_pb2 import FeatureSetStatus from feast.feature_set import FeatureSet, Entity from feast.job import Job +from feast.loaders.abstract_producer import get_producer from feast.loaders.file import export_dataframe_to_staging_location -from feast.loaders.ingest import ingest_table_to_kafka +from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT +from feast.loaders.ingest import get_feature_row_chunks from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse from feast.serving.ServingService_pb2 import ( GetOnlineFeaturesRequest, @@ -257,7 +260,7 @@ def _apply_feature_set(self, feature_set: FeatureSet): print(f"No change detected or applied: {feature_set.name}") # Deep copy from the returned feature set to the local feature set - feature_set.update_from_feature_set(applied_fs) + feature_set._update_from_feature_set(applied_fs) def list_feature_sets(self) -> List[FeatureSet]: """ @@ -470,35 +473,55 @@ def get_online_features( ) # type: GetOnlineFeaturesResponse def ingest( - self, - feature_set: Union[str, FeatureSet], - source: Union[pd.DataFrame, str], - version: int = None, - force_update: bool = False, - max_workers: int = CPU_COUNT, - disable_progress_bar: bool = False, - chunk_size: int = 5000, - timeout: int = None, - ): + self, + feature_set: Union[str, FeatureSet], + source: Union[pd.DataFrame, str], + chunk_size: int = 10000, + version: int = None, + force_update: bool = False, + max_workers: int = max(CPU_COUNT - 1, 1), + disable_progress_bar: bool = False, + timeout: int = KAFKA_CHUNK_PRODUCTION_TIMEOUT + ) -> None: """ Loads feature data into Feast for a specific feature set. Args: - feature_set: Name of feature set or a feature set object - source: Either a file path or Pandas Dataframe to ingest into Feast + feature_set (typing.Union[str, FeatureSet]): + Feature set object or the string name of the feature set + (without a version). + + source (typing.Union[pd.DataFrame, str]): + Either a file path or Pandas Dataframe to ingest into Feast Files that are currently supported: - * parquet - * csv - * json - version: Feature set version - force_update: Automatically update feature set based on source data - prior to ingesting. This will also register changes to Feast - max_workers: Number of worker processes to use to encode values - disable_progress_bar: Disable printing of progress statistics - chunk_size: Maximum amount of rows to load into memory and ingest at - a time - timeout: Seconds to wait before ingestion times out + * parquet + * csv + * json + + chunk_size (int): + Amount of rows to load and ingest at a time. + + version (int): + Feature set version. + + force_update (bool): + Automatically update feature set based on source data prior to + ingesting. This will also register changes to Feast. + + max_workers (int): + Number of worker processes to use to encode values. + + disable_progress_bar (bool): + Disable printing of progress statistics. + + timeout (int): + Timeout in seconds to wait for completion. + + Returns: + None: + None """ + if isinstance(feature_set, FeatureSet): name = feature_set.name if version is None: @@ -508,35 +531,69 @@ def ingest( else: raise Exception(f"Feature set name must be provided") - table = _read_table_from_source(source) + # Read table and get row count + tmp_table_name = _read_table_from_source( + source, chunk_size, max_workers + ) - # Update the feature set based on DataFrame schema - if force_update: - # Use a small as reference DataFrame to infer fields - ref_df = table.to_batches(max_chunksize=20)[0].to_pandas() + pq_file = pq.ParquetFile(tmp_table_name) - feature_set.infer_fields_from_df( - ref_df, discard_unused_fields=True, replace_existing_features=True + row_count = pq_file.metadata.num_rows + + # Update the feature set based on PyArrow table of first row group + if force_update: + feature_set.infer_fields_from_pa( + table=pq_file.read_row_group(0), + discard_unused_fields=True, + replace_existing_features=True ) self.apply(feature_set) feature_set = self.get_feature_set(name, version) - if feature_set.source.source_type == "Kafka": - ingest_table_to_kafka( - feature_set=feature_set, - table=table, - max_workers=max_workers, - disable_pbar=disable_progress_bar, - chunk_size=chunk_size, - timeout=timeout, - ) - else: - raise Exception( - f"Could not determine source type for feature set " - f'"{feature_set.name}" with source type ' - f'"{feature_set.source.source_type}"' - ) + try: + # Kafka configs + brokers = feature_set.get_kafka_source_brokers() + topic = feature_set.get_kafka_source_topic() + producer = get_producer(brokers, row_count, disable_progress_bar) + + # Loop optimization declarations + produce = producer.produce + flush = producer.flush + + # Transform and push data to Kafka + if feature_set.source.source_type == "Kafka": + for chunk in get_feature_row_chunks( + file=tmp_table_name, + row_groups=list(range(pq_file.num_row_groups)), + fs=feature_set, + max_workers=max_workers): + + # Push FeatureRow one chunk at a time to kafka + for serialized_row in chunk: + produce(topic=topic, value=serialized_row) + + # Force a flush after each chunk + flush(timeout=timeout) + + # Remove chunk from memory + del chunk + + else: + raise Exception( + f"Could not determine source type for feature set " + f'"{feature_set.name}" with source type ' + f'"{feature_set.source.source_type}"' + ) + + # Print ingestion statistics + producer.print_results() + finally: + # Remove parquet file(s) that were created earlier + print("Removing temporary file(s)...") + os.remove(tmp_table_name) + + return None def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest]: @@ -566,18 +623,38 @@ def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest return list(feature_set_request.values()) -def _read_table_from_source(source: Union[pd.DataFrame, str]) -> pa.lib.Table: +def _read_table_from_source( + source: Union[pd.DataFrame, str], + chunk_size: int, + max_workers: int +) -> str: """ Infers a data source type (path or Pandas Dataframe) and reads it in as a PyArrow Table. + The PyArrow Table that is read will be written to a parquet file with row + group size determined by the minimum of: + * (table.num_rows / max_workers) + * chunk_size + + The parquet file that is created will be passed as file path to the + multiprocessing pool workers. + Args: - source: Either a string path or Pandas Dataframe + source (Union[pd.DataFrame, str]): + Either a string path or Pandas DataFrame. + + chunk_size (int): + Number of worker processes to use to encode values. + + max_workers (int): + Amount of rows to load and ingest at a time. Returns: - PyArrow table + str: Path to parquet file that was created. """ - # Pandas dataframe detected + + # Pandas DataFrame detected if isinstance(source, pd.DataFrame): table = pa.Table.from_pandas(df=source) @@ -601,4 +678,14 @@ def _read_table_from_source(source: Union[pd.DataFrame, str]) -> pa.lib.Table: # Ensure that PyArrow table is initialised assert isinstance(table, pa.lib.Table) - return table + + # Write table as parquet file with a specified row_group_size + tmp_table_name = f"{int(time.time())}.parquet" + row_group_size = min(int(table.num_rows/max_workers), chunk_size) + pq.write_table(table=table, where=tmp_table_name, + row_group_size=row_group_size) + + # Remove table from memory + del table + + return tmp_table_name diff --git a/sdk/python/feast/feature_set.py b/sdk/python/feast/feature_set.py index 893378e8fa..28381891f5 100644 --- a/sdk/python/feast/feature_set.py +++ b/sdk/python/feast/feature_set.py @@ -13,21 +13,27 @@ # limitations under the License. -import pandas as pd -from typing import List, Optional from collections import OrderedDict from typing import Dict -from feast.source import Source -from pandas.api.types import is_datetime64_ns_dtype +from typing import List, Optional + +import pandas as pd +import pyarrow as pa +from feast.core.FeatureSet_pb2 import FeatureSet as FeatureSetProto +from feast.core.FeatureSet_pb2 import FeatureSetMeta as FeatureSetMetaProto +from feast.core.FeatureSet_pb2 import FeatureSetSpec as FeatureSetSpecProto from feast.entity import Entity from feast.feature import Feature, Field -from feast.core.FeatureSet_pb2 import FeatureSetSpec as FeatureSetSpecProto -from google.protobuf.duration_pb2 import Duration +from feast.loaders import yaml as feast_yaml +from feast.source import Source +from feast.type_map import DATETIME_COLUMN +from feast.type_map import pa_to_feast_value_type from feast.type_map import python_type_to_feast_value_type -from google.protobuf.json_format import MessageToJson from google.protobuf import json_format -from feast.type_map import DATETIME_COLUMN -from feast.loaders import yaml as feast_yaml +from google.protobuf.duration_pb2 import Duration +from google.protobuf.json_format import MessageToJson +from pandas.api.types import is_datetime64_ns_dtype +from pyarrow.lib import TimestampType class FeatureSet: @@ -256,7 +262,6 @@ def infer_fields_from_df( rows_to_sample: int = 100, ): """ - Adds fields (Features or Entities) to a feature set based on the schema of a Datatframe. Only Pandas dataframes are supported. All columns are detected as features, so setting at least one entity manually is @@ -283,6 +288,7 @@ def infer_fields_from_df( must have consistent types, even values within list types must be homogeneous """ + if entities is None: entities = list() if features is None: @@ -373,7 +379,187 @@ def infer_fields_from_df( self._fields = new_fields print(output_log) - def update_from_feature_set(self, feature_set): + def infer_fields_from_pa( + self, table: pa.lib.Table, + entities: Optional[List[Entity]] = None, + features: Optional[List[Feature]] = None, + replace_existing_features: bool = False, + replace_existing_entities: bool = False, + discard_unused_fields: bool = False + ) -> None: + """ + Adds fields (Features or Entities) to a feature set based on the schema + of a PyArrow table. Only PyArrow tables are supported. All columns are + detected as features, so setting at least one entity manually is + advised. + + + Args: + table (pyarrow.lib.Table): + PyArrow table to read schema from. + + entities (Optional[List[Entity]]): + List of entities that will be set manually and not inferred. + These will take precedence over any existing entities or + entities found in the PyArrow table. + + features (Optional[List[Feature]]): + List of features that will be set manually and not inferred. + These will take precedence over any existing feature or features + found in the PyArrow table. + + replace_existing_features (bool): + Boolean flag. If true, will replace existing features in this + feature set with features found in dataframe. If false, will + skip conflicting features. + + replace_existing_entities (bool): + Boolean flag. If true, will replace existing entities in this + feature set with features found in dataframe. If false, will + skip conflicting entities. + + discard_unused_fields (bool): + Boolean flag. Setting this to True will discard any existing + fields that are not found in the dataset or provided by the + user. + + Returns: + None: + None + """ + if entities is None: + entities = list() + if features is None: + features = list() + + # Validate whether the datetime column exists with the right name + if DATETIME_COLUMN not in table.column_names: + raise Exception("No column 'datetime'") + + # Validate the date type for the datetime column + if not isinstance(table.column(DATETIME_COLUMN).type, TimestampType): + raise Exception( + "Column 'datetime' does not have the correct type: datetime64[ms]" + ) + + # Create dictionary of fields that will not be inferred (manually set) + provided_fields = OrderedDict() + + for field in entities + features: + if not isinstance(field, Field): + raise Exception(f"Invalid field object type provided {type(field)}") + if field.name not in provided_fields: + provided_fields[field.name] = field + else: + raise Exception(f"Duplicate field name detected {field.name}.") + + new_fields = self._fields.copy() + output_log = "" + + # Add in provided fields + for name, field in provided_fields.items(): + if name in new_fields.keys(): + upsert_message = "created" + else: + upsert_message = "updated (replacing an existing field)" + + output_log += ( + f"{type(field).__name__} {field.name}" + f"({field.dtype}) manually {upsert_message}.\n" + ) + new_fields[name] = field + + # Iterate over all of the column names and create features + for column in table.column_names: + column = column.strip() + + # Skip datetime column + if DATETIME_COLUMN in column: + continue + + # Skip user provided fields + if column in provided_fields.keys(): + continue + + # Only overwrite conflicting fields if replacement is allowed + if column in new_fields: + if ( + isinstance(self._fields[column], Feature) + and not replace_existing_features + ): + continue + + if ( + isinstance(self._fields[column], Entity) + and not replace_existing_entities + ): + continue + + # Store this fields as a feature + # TODO: (Minor) Change the parameter name from dtype to patype + new_fields[column] = Feature( + name=column, + dtype=self._infer_pa_column_type(table.column(column)) + ) + + output_log += f"{type(new_fields[column]).__name__} {new_fields[column].name} ({new_fields[column].dtype}) added from PyArrow Table.\n" + + # Discard unused fields from feature set + if discard_unused_fields: + keys_to_remove = [] + for key in new_fields.keys(): + if not (key in table.column_names or key in provided_fields.keys()): + output_log += f"{type(new_fields[key]).__name__} {new_fields[key].name} ({new_fields[key].dtype}) removed because it is unused.\n" + keys_to_remove.append(key) + for key in keys_to_remove: + del new_fields[key] + + # Update feature set + self._fields = new_fields + print(output_log) + + def _infer_pd_column_type(self, column, series, rows_to_sample): + dtype = None + sample_count = 0 + + # Loop over all rows for this column to infer types + for key, value in series.iteritems(): + sample_count += 1 + # Stop sampling at the row limit + if sample_count > rows_to_sample: + continue + + # Infer the specific type for this row + current_dtype = python_type_to_feast_value_type(name=column, value=value) + + # Make sure the type is consistent for column + if dtype: + if dtype != current_dtype: + raise ValueError( + f"Type mismatch detected in column {column}. Both " + f"the types {current_dtype} and {dtype} " + f"have been found." + ) + else: + # Store dtype in field to type map if it isnt already + dtype = current_dtype + + return dtype + + def _infer_pa_column_type(self, column: pa.lib.ChunkedArray): + """ + Infers the PyArrow column type. + + :param column: Column from a PyArrow table + :type column: pa.lib.ChunkedArray + :return: + :rtype: + """ + # Validates the column to ensure that value types are consistent + column.validate() + return pa_to_feast_value_type(column) + + def _update_from_feature_set(self, feature_set): """ Deep replaces one feature set with another diff --git a/sdk/python/feast/loaders/abstract_producer.py b/sdk/python/feast/loaders/abstract_producer.py new file mode 100644 index 0000000000..884ae49984 --- /dev/null +++ b/sdk/python/feast/loaders/abstract_producer.py @@ -0,0 +1,248 @@ +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Union + +from tqdm import tqdm + + +class AbstractProducer: + """ + Abstract class for Kafka producers + """ + + def __init__( + self, + brokers: str, + row_count: int, + disable_progress_bar: bool + ): + self.brokers = brokers + self.row_count = row_count + self.error_count = 0 + self.last_exception = "" + + # Progress bar will always display average rate + self.pbar = tqdm( + total=row_count, + unit="rows", + smoothing=0, + disable=disable_progress_bar + ) + + def produce(self, topic: str, data: str): + message = "{} should implement a produce method".format( + self.__class__.__name__) + raise NotImplementedError(message) + + def flush(self, timeout: int): + message = "{} should implement a flush method".format( + self.__class__.__name__) + raise NotImplementedError(message) + + def _inc_pbar(self, meta): + self.pbar.update(1) + + def _set_error(self, exception: str): + self.error_count += 1 + self.last_exception = exception + + def print_results(self) -> None: + """ + Print ingestion statistics. + + Returns: + None: None + """ + # Refresh and close tqdm progress bar + self.pbar.refresh() + + self.pbar.close() + + print("Ingestion complete!") + + failed_message = ( + "" + if self.error_count == 0 + else f"\nFail: {self.error_count / self.row_count}" + ) + + last_exception_message = ( + "" + if self.last_exception == "" + else f"\nLast exception:\n{self.last_exception}" + ) + + print( + f"\nIngestion statistics:" + f"\nSuccess: {self.pbar.n}/{self.row_count}" + f"{failed_message}" + f"{last_exception_message}" + ) + return None + + +class ConfluentProducer(AbstractProducer): + """ + Concrete implementation of Confluent Kafka producer (confluent-kafka) + """ + + def __init__( + self, + brokers: str, + row_count: int, + disable_progress_bar: bool + ): + from confluent_kafka import Producer + self.producer = Producer({"bootstrap.servers": brokers}) + super().__init__(brokers, row_count, disable_progress_bar) + + def produce(self, topic: str, value: bytes) -> None: + """ + Generic produce that implements confluent-kafka's produce method to + push a byte encoded object into a Kafka topic. + + Args: + topic (str): Kafka topic. + value (bytes): Byte encoded object. + + Returns: + None: None. + """ + + try: + self.producer.produce( + topic, value=value, callback=self._delivery_callback) + # Serve delivery callback queue. + # NOTE: Since produce() is an asynchronous API this poll() call + # will most likely not serve the delivery callback for the + # last produce()d message. + self.producer.poll(0) + except Exception as ex: + self._set_error(str(ex)) + + return None + + def flush(self, timeout: Optional[int]): + """ + Generic flush that implements confluent-kafka's flush method. + + Args: + timeout (Optional[int]): Timeout in seconds to wait for completion. + + Returns: + int: Number of messages still in queue. + """ + return self.producer.flush(timeout=timeout) + + def _delivery_callback(self, err: str, msg) -> None: + """ + Optional per-message delivery callback (triggered by poll() or flush()) + when a message has been successfully delivered or permanently failed + delivery (after retries). + + Although the msg argument is not used, the current method signature is + required as specified in the confluent-kafka documentation. + + Args: + err (str): Error message. + msg (): Kafka message. + + Returns: + None + """ + if err: + self._set_error(err) + else: + self._inc_pbar(None) + + +class KafkaPythonProducer(AbstractProducer): + """ + Concrete implementation of Python Kafka producer (kafka-python) + """ + + def __init__( + self, + brokers: str, + row_count: int, + disable_progress_bar: bool + ): + from kafka import KafkaProducer + self.producer = KafkaProducer(bootstrap_servers=[brokers]) + super().__init__(brokers, row_count, disable_progress_bar) + + def produce(self, topic: str, value: bytes): + """ + Generic produce that implements kafka-python's send method to push a + byte encoded object into a Kafka topic. + + Args: + topic (str): Kafka topic. + value (bytes): Byte encoded object. + + Returns: + FutureRecordMetadata: resolves to RecordMetadata + + Raises: + KafkaTimeoutError: if unable to fetch topic metadata, or unable + to obtain memory buffer prior to configured max_block_ms + """ + return self.producer.send(topic, value=value).add_callback( + self._inc_pbar).add_errback(self._set_error) + + def flush(self, timeout: Optional[int]): + """ + Generic flush that implements kafka-python's flush method. + + Args: + timeout (Optional[int]): timeout in seconds to wait for completion. + + Returns: + None + + Raises: + KafkaTimeoutError: failure to flush buffered records within the + provided timeout + """ + return self.producer.flush(timeout=timeout) + + +def get_producer( + brokers: str, row_count: int, disable_progress_bar: bool +) -> Union[ConfluentProducer, KafkaPythonProducer]: + """ + Simple context helper function that returns a AbstractProducer object when + invoked. + + This helper function will try to import confluent-kafka as a producer first. + + This helper function will fallback to kafka-python if it fails to import + confluent-kafka. + + Args: + brokers (str): Kafka broker information with hostname and port. + row_count (int): Number of rows in table + + Returns: + Union[ConfluentProducer, KafkaPythonProducer]: + Concrete implementation of a Kafka producer. Ig can be: + * confluent-kafka producer + * kafka-python producer + """ + try: + return ConfluentProducer(brokers, row_count, disable_progress_bar) + except ImportError as e: + print("Unable to import confluent-kafka, falling back to kafka-python") + return KafkaPythonProducer(brokers, row_count, disable_progress_bar) diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index 23ba2ecb3b..527ab481fe 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -1,18 +1,16 @@ import logging -import multiprocessing -import os -import time from functools import partial -from multiprocessing import Process, Queue, Pool -from typing import Iterable +from multiprocessing import Pool +from typing import Iterable, List + import pandas as pd -import pyarrow as pa +import pyarrow.parquet as pq +from feast.constants import DATETIME_COLUMN from feast.feature_set import FeatureSet -from feast.type_map import convert_dict_to_proto_values +from feast.type_map import pa_column_to_timestamp_proto_column, \ + pa_column_to_proto_column +from feast.types import Field_pb2 as FieldProto from feast.types.FeatureRow_pb2 import FeatureRow -from kafka import KafkaProducer -from tqdm import tqdm -from feast.constants import DATETIME_COLUMN _logger = logging.getLogger(__name__) @@ -21,221 +19,120 @@ FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL" # type: str BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS = 300 -CPU_COUNT = os.cpu_count() # type: int KAFKA_CHUNK_PRODUCTION_TIMEOUT = 120 # type: int -def _kafka_feature_row_producer( - feature_row_queue: Queue, row_count: int, brokers, topic, ctx: dict, pbar: tqdm -): +def _encode_pa_tables( + file: str, + fs: FeatureSet, + row_group_idx: int, +) -> List[bytes]: """ - Pushes Feature Rows to Kafka. Reads rows from a queue. Function will run - until total row_count is reached. + Helper function to encode a PyArrow table(s) read from parquet file(s) into + FeatureRows. - Args: - feature_row_queue: Queue containing feature rows. - row_count: Total row count to process - brokers: Broker to push to - topic: Topic to push to - ctx: Context dict used to communicate with primary process - pbar: Progress bar object - """ - - # Callback for failed production to Kafka - def on_error(e): - # Save last exception - ctx["last_exception"] = e - - # Increment error count - if "error_count" in ctx: - ctx["error_count"] += 1 - else: - ctx["error_count"] = 1 - - # Callback for succeeded production to Kafka - def on_success(meta): - pbar.update() - - producer = KafkaProducer(bootstrap_servers=brokers) - processed_rows = 0 - - # Loop through feature rows until all rows are processed - while processed_rows < row_count: - # Wait if queue is empty - if feature_row_queue.empty(): - time.sleep(1) - producer.flush(timeout=KAFKA_CHUNK_PRODUCTION_TIMEOUT) - else: - while not feature_row_queue.empty(): - row = feature_row_queue.get() - if row is not None: - # Push row to Kafka - producer.send(topic, row.SerializeToString()).add_callback( - on_success - ).add_errback(on_error) - processed_rows += 1 - - # Force an occasional flush - if processed_rows % 10000 == 0: - producer.flush(timeout=KAFKA_CHUNK_PRODUCTION_TIMEOUT) - del row - pbar.refresh() - - # Ensure that all rows are pushed - producer.flush(timeout=KAFKA_CHUNK_PRODUCTION_TIMEOUT) - - # Using progress bar as counter is much faster than incrementing dict - ctx["success_count"] = pbar.n - pbar.close() - - -def _encode_pa_chunks( - tbl: pa.lib.Table, - fs: FeatureSet, - max_workers: int, - df_datetime_dtype: pd.DataFrame.dtypes, - chunk_size: int = 5000, -) -> Iterable[FeatureRow]: - """ - Generator function to encode rows in PyArrow table to FeatureRows by - breaking up the table into batches. + This function accepts a list of file directory pointing to many parquet + files. All parquet files must have the same schema. - Each batch will have its rows spread accross a pool of workers to be - transformed into FeatureRow objects. + Each parquet file will be read into as a table and encoded into FeatureRows + using a pool of max_workers workers. Args: - tbl: PyArrow table to be processed. - fs: FeatureSet describing PyArrow table. - max_workers: Maximum number of workers. - df_datetime_dtype: Pandas dtype of datetime column. - chunk_size: Maximum size of each chunk when PyArrow table is batched. - - Returns: - Iterable FeatureRow object. - """ - - pool = Pool(max_workers) - - # Create a partial function with static non-iterable arguments - func = partial( - convert_dict_to_proto_values, - df_datetime_dtype=df_datetime_dtype, - feature_set=fs, - ) - - for batch in tbl.to_batches(max_chunksize=chunk_size): - m_df = batch.to_pandas() - results = pool.map_async(func, m_df.to_dict("records")) - yield from results.get() + file (str): + File directory of all the parquet file to encode. + Parquet file must have more than one row group. - pool.close() - pool.join() - return + fs (feast.feature_set.FeatureSet): + FeatureSet describing parquet files. + row_group_idx(int): + Row group index to read and encode into byte like FeatureRow + protobuf objects. -def ingest_table_to_kafka( - feature_set: FeatureSet, - table: pa.lib.Table, - max_workers: int, - chunk_size: int = 5000, - disable_pbar: bool = False, - timeout: int = None, -) -> None: + Returns: + List[bytes]: + List of byte encoded FeatureRows from the parquet file. """ - Ingest a PyArrow Table to a Kafka topic based for a Feature Set + pq_file = pq.ParquetFile(file) + # Read parquet file as a PyArrow table + table = pq_file.read_row_group(row_group_idx) + + # Add datetime column + datetime_col = pa_column_to_timestamp_proto_column( + table.column(DATETIME_COLUMN)) + + # Preprocess the columns by converting all its values to Proto values + proto_columns = { + field_name: pa_column_to_proto_column(field.dtype, + table.column(field_name)) + for field_name, field in fs.fields.items() + } + + feature_set = f"{fs.name}:{fs.version}" + + # List to store result + feature_rows = [] + + # Loop optimization declaration(s) + field = FieldProto.Field + proto_items = proto_columns.items() + append = feature_rows.append + + # Iterate through the rows + for row_idx in range(table.num_rows): + feature_row = FeatureRow(event_timestamp=datetime_col[row_idx], + feature_set=feature_set) + # Loop optimization declaration + ext = feature_row.fields.extend + + # Insert field from each column + for k, v in proto_items: + ext([field(name=k, value=v[row_idx])]) + + # Append FeatureRow in byte string form + append(feature_row.SerializeToString()) + + return feature_rows + + +def get_feature_row_chunks( + file: str, + row_groups: List[int], + fs: FeatureSet, + max_workers: int +) -> Iterable[List[bytes]]: + """ + Iterator function to encode a PyArrow table read from a parquet file to + FeatureRow(s). Args: - feature_set: FeatureSet describing PyArrow table. - table: PyArrow table to be processed. - max_workers: Maximum number of workers. - chunk_size: Maximum size of each chunk when PyArrow table is batched. - disable_pbar: Flag to indicate if tqdm progress bar should be disabled. - timeout: Maximum time before method times out - """ + file (str): + File directory of the parquet file. The parquet file must have more + than one row group. - pbar = tqdm(unit="rows", total=table.num_rows, disable=disable_pbar) - - # Use a small DataFrame to validate feature set schema - ref_df = table.to_batches(max_chunksize=100)[0].to_pandas() - df_datetime_dtype = ref_df[DATETIME_COLUMN].dtype - - # Validate feature set schema - _validate_dataframe(ref_df, feature_set) - - # Create queue through which encoding and production will coordinate - row_queue = Queue() - - # Create a context object to send and receive information across processes - ctx = multiprocessing.Manager().dict( - {"success_count": 0, "error_count": 0, "last_exception": ""} - ) - - # Create producer to push feature rows to Kafka - ingestion_process = Process( - target=_kafka_feature_row_producer, - args=( - row_queue, - table.num_rows, - feature_set.get_kafka_source_brokers(), - feature_set.get_kafka_source_topic(), - ctx, - pbar, - ), - ) - - try: - # Start ingestion process - print( - f"\n(ingest table to kafka) Ingestion started for {feature_set.name}:{feature_set.version}" - ) - ingestion_process.start() - - # Iterate over chunks in the table and return feature rows - for row in _encode_pa_chunks( - tbl=table, - fs=feature_set, - max_workers=max_workers, - chunk_size=chunk_size, - df_datetime_dtype=df_datetime_dtype, - ): - # Push rows onto a queue for the production process to pick up - row_queue.put(row) - while row_queue.qsize() > chunk_size: - time.sleep(0.1) - row_queue.put(None) - except Exception as ex: - _logger.error(f"Exception occurred: {ex}") - finally: - # Wait for the Kafka production to complete - ingestion_process.join(timeout=timeout) - failed_message = ( - "" - if ctx["error_count"] == 0 - else f"\nFail: {ctx['error_count']}/{table.num_rows}" - ) + row_groups (List[int]): + Specific row group indexes to be read and transformed in the parquet + file. - last_exception_message = ( - "" - if ctx["last_exception"] == "" - else f"\nLast exception:\n{ctx['last_exception']}" - ) - print( - f"\nIngestion statistics:" - f"\nSuccess: {ctx['success_count']}/{table.num_rows}" - f"{failed_message}" - f"{last_exception_message}" - ) + fs (feast.feature_set.FeatureSet): + FeatureSet describing parquet files. + max_workers (int): + Maximum number of workers to spawn. -def _validate_dataframe(dataframe: pd.DataFrame, feature_set: FeatureSet): + Returns: + Iterable[List[bytes]]: + Iterable list of byte encoded FeatureRow(s). """ - Validates a Pandas dataframe based on a feature set - Args: - dataframe: Pandas dataframe - feature_set: Feature Set instance - """ + pool = Pool(max_workers) + func = partial(_encode_pa_tables, file, fs) + for chunk in pool.imap_unordered(func, row_groups): + yield chunk + return + +def validate_dataframe(dataframe: pd.DataFrame, feature_set: FeatureSet): if "datetime" not in dataframe.columns: raise ValueError( f'Dataframe does not contain entity "datetime" in columns {dataframe.columns}' diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 7573276d74..ca13c2573b 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -12,12 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime, timezone +from typing import List + import numpy as np import pandas as pd -from datetime import datetime, timezone -from feast.value_type import ValueType +import pyarrow as pa +from feast.constants import DATETIME_COLUMN +from feast.types import ( + FeatureRow_pb2 as FeatureRowProto, + Field_pb2 as FieldProto, +) from feast.types.Value_pb2 import ( Value as ProtoValue, + ValueType as ProtoValueType, Int64List, Int32List, BoolList, @@ -26,9 +34,9 @@ StringList, FloatList, ) -from feast.types import FeatureRow_pb2 as FeatureRowProto, Field_pb2 as FieldProto +from feast.value_type import ValueType from google.protobuf.timestamp_pb2 import Timestamp -from feast.constants import DATETIME_COLUMN +from pyarrow.lib import TimestampType def python_type_to_feast_value_type( @@ -104,9 +112,9 @@ def python_type_to_feast_value_type( return ValueType[common_item_value_type.name + "_LIST"] else: raise ValueError( - f"Value type for field {name} is {value.dtype.__str__()} " - f"but recursion is not allowed. Array types can only be one " - f"level deep." + f"Value type for field {name} is {value.dtype.__str__()} but " + f"recursion is not allowed. Array types can only be one level " + f"deep." ) return type_map[value.dtype.__str__()] @@ -160,7 +168,7 @@ def convert_series_to_proto_values(row: pd.Series): def convert_dict_to_proto_values( - row: dict, df_datetime_dtype: pd.DataFrame.dtypes, feature_set + row: dict, df_datetime_dtype: pd.DataFrame.dtypes, feature_set ) -> FeatureRowProto.FeatureRow: """ Encode a dictionary describing a feature row into a FeatureRows object. @@ -211,12 +219,14 @@ def _pd_datetime_to_timestamp_proto(dtype, value) -> Timestamp: # If timestamp does not contain timezone, we assume it is of local # timezone and adjust it to UTC local_timezone = datetime.now(timezone.utc).astimezone().tzinfo - value = value.tz_localize(local_timezone).tz_convert("UTC").tz_localize(None) + value = value.tz_localize(local_timezone).tz_convert("UTC").tz_localize( + None) return Timestamp(seconds=int(value.timestamp())) if dtype.__str__() == "datetime64[ns, UTC]": return Timestamp(seconds=int(value.timestamp())) else: - return Timestamp(seconds=np.datetime64(value).astype("int64") // 1000000) + return Timestamp( + seconds=np.datetime64(value).astype("int64") // 1000000) def _type_err(item, dtype): @@ -344,3 +354,139 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: return ProtoValue(bool_val=value) raise Exception(f"Unsupported data type: ${str(type(value))}") + +def pa_to_feast_value_attr(pa_type: object): + """ + Returns the equivalent Feast ValueType string for the given pa.lib type. + + Args: + pa_type (object): + PyArrow type. + + Returns: + str: + Feast attribute name in Feast ValueType string-ed representation. + """ + # Mapping of PyArrow type to attribute name in Feast ValueType strings + type_map = { + "timestamp[ms]": "int64_val", + "int32": "int32_val", + "int64": "int64_val", + "double": "double_val", + "float": "float_val", + "string": "string_val", + "binary": "bytes_val", + "bool": "bool_val", + "list": "int32_list_val", + "list": "int64_list_val", + "list": "double_list_val", + "list": "float_list_val", + "list": "string_list_val", + "list": "bytes_list_val", + "list": "bool_list_val", + } + + return type_map[pa_type.__str__()] + + +def pa_to_value_type(pa_type: object): + """ + Returns the equivalent Feast ValueType for the given pa.lib type. + + Args: + pa_type (object): + PyArrow type. + + Returns: + feast.types.Value_pb2.ValueType: + Feast ValueType. + + """ + + # Mapping of PyArrow to attribute name in Feast ValueType + type_map = { + "timestamp[ms]": ProtoValueType.INT64, + "int32": ProtoValueType.INT32, + "int64": ProtoValueType.INT64, + "double": ProtoValueType.DOUBLE, + "float": ProtoValueType.FLOAT, + "string": ProtoValueType.STRING, + "binary": ProtoValueType.BYTES, + "bool": ProtoValueType.BOOL, + "list": ProtoValueType.INT32_LIST, + "list": ProtoValueType.INT64_LIST, + "list": ProtoValueType.DOUBLE_LIST, + "list": ProtoValueType.FLOAT_LIST, + "list": ProtoValueType.STRING_LIST, + "list": ProtoValueType.BYTES_LIST, + "list": ProtoValueType.BOOL_LIST, + } + return type_map[pa_type.__str__()] + + +def pa_to_feast_value_type( + value: object +) -> ValueType: + type_map = { + "timestamp[ms]": ValueType.INT64, + "int32": ValueType.INT32, + "int64": ValueType.INT64, + "double": ValueType.DOUBLE, + "float": ValueType.FLOAT, + "string": ValueType.STRING, + "binary": ValueType.BYTES, + "bool": ValueType.BOOL, + "list": ValueType.INT32_LIST, + "list": ValueType.INT64_LIST, + "list": ValueType.DOUBLE_LIST, + "list": ValueType.FLOAT_LIST, + "list": ValueType.STRING_LIST, + "list": ValueType.BYTES_LIST, + "list": ValueType.BOOL_LIST, + } + return type_map[value.type.__str__()] + + +def pa_column_to_timestamp_proto_column( + column: pa.lib.ChunkedArray +) -> Timestamp: + if not isinstance(column.type, TimestampType): + raise Exception("Only TimestampType columns are allowed") + + proto_column = [] + for val in column: + timestamp = Timestamp() + timestamp.FromMicroseconds( + micros=int(val.as_py().timestamp() * 1_000_000)) + proto_column.append(timestamp) + return proto_column + + +def pa_column_to_proto_column( + feast_value_type, + column: pa.lib.ChunkedArray +) -> List[ProtoValue]: + type_map = {ValueType.INT32: "int32_val", + ValueType.INT64: "int64_val", + ValueType.FLOAT: "float_val", + ValueType.DOUBLE: "double_val", + ValueType.STRING: "string_val", + ValueType.BYTES: "bytes_val", + ValueType.BOOL: "bool_val", + ValueType.BOOL_LIST: {"bool_list_val": BoolList}, + ValueType.BYTES_LIST: {"bytes_list_val": BytesList}, + ValueType.STRING_LIST: {"string_list_val": StringList}, + ValueType.FLOAT_LIST: {"float_list_val": FloatList}, + ValueType.DOUBLE_LIST: {"double_list_val": DoubleList}, + ValueType.INT32_LIST: {"int32_list_val": Int32List}, + ValueType.INT64_LIST: {"int64_list_val": Int64List}, } + + value = type_map[feast_value_type] + # Process list types + if type(value) == dict: + list_param_name = list(value.keys())[0] + return [ProtoValue( + **{list_param_name: value[list_param_name](val=x.as_py())}) + for x in column] + else: + return [ProtoValue(**{value: x.as_py()}) for x in column] diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 66cad904b0..9ac7225e80 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -44,6 +44,7 @@ "pyarrow>=0.15.1", "numpy", "google", + "confluent_kafka" ] # README file from Feast repo root directory diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 2243ebfd1b..f979c5a55d 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -381,10 +381,40 @@ def test_feature_set_ingest_success(self, dataframe, client, mocker): ) # Need to create a mock producer - with patch("feast.loaders.ingest.KafkaProducer") as mocked_queue: + with patch("feast.client.get_producer") as mocked_queue: # Ingest data into Feast client.ingest("driver-feature-set", dataframe) + @pytest.mark.parametrize("dataframe,exception", [(dataframes.GOOD, TimeoutError)]) + def test_feature_set_ingest_fail_if_pending( + self, dataframe, exception, client, mocker + ): + with pytest.raises(exception): + driver_fs = FeatureSet( + "driver-feature-set", + source=KafkaSource(brokers="kafka:9092", topic="test"), + ) + driver_fs.add(Feature(name="feature_1", dtype=ValueType.FLOAT)) + driver_fs.add(Feature(name="feature_2", dtype=ValueType.STRING)) + driver_fs.add(Feature(name="feature_3", dtype=ValueType.INT64)) + driver_fs.add(Entity(name="entity_id", dtype=ValueType.INT64)) + + # Register with Feast core + client.apply(driver_fs) + driver_fs = driver_fs.to_proto() + driver_fs.meta.status = FeatureSetStatus.STATUS_PENDING + + mocker.patch.object( + client._core_service_stub, + "GetFeatureSet", + return_value=GetFeatureSetResponse(feature_set=driver_fs), + ) + + # Need to create a mock producer + with patch("feast.client.get_producer") as mocked_queue: + # Ingest data into Feast + client.ingest("driver-feature-set", dataframe, timeout=1) + @pytest.mark.parametrize( "dataframe,exception", [ @@ -445,6 +475,6 @@ def test_feature_set_types_success(self, client, dataframe, mocker): ) # Need to create a mock producer - with patch("feast.loaders.ingest.KafkaProducer") as mocked_queue: + with patch("feast.client.get_producer") as mocked_queue: # Ingest data into Feast client.ingest(all_types_fs, dataframe)