from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
import pandas
from tqdm import tqdm
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig
[docs]class PassthroughProvider(Provider):
"""
The Passthrough provider delegates all operations to the underlying online and offline stores.
"""
def __init__(self, config: RepoConfig):
super().__init__(config)
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)
[docs] def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)
[docs] def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)
[docs] def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
self.online_store.online_write_batch(config, table, data, progress)
[docs] def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result = self.online_store.online_read(config, table, entity_keys)
return result
def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))
(
join_key_columns,
feature_name_columns,
event_timestamp_column,
created_timestamp_column,
) = _get_column_names(feature_view, entities)
offline_job = self.offline_store.pull_latest_from_table_or_query(
config=config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)
table = offline_job.to_arrow()
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)
join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)
def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool,
) -> RetrievalJob:
job = self.offline_store.get_historical_features(
config=config,
feature_views=feature_views,
feature_refs=feature_refs,
entity_df=entity_df,
registry=registry,
project=project,
full_feature_names=full_feature_names,
)
return job