import warnings
from datetime import datetime
from typing import Dict, List, Optional, Union
from google.protobuf.json_format import MessageToJson
from typeguard import typechecked
from feast.base_feature_view import BaseFeatureView
from feast.feature_logging import LoggingConfig
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureService as FeatureServiceProto,
)
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceMeta as FeatureServiceMetaProto,
)
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceSpec as FeatureServiceSpecProto,
)
from feast.usage import log_exceptions
[docs]@typechecked
class FeatureService:
"""
A feature service defines a logical group of features from one or more feature views.
This group of features can be retrieved together during training or serving.
Attributes:
name: The unique name of the feature service.
feature_view_projections: A list containing feature views and feature view
projections, representing the features in the feature service.
description: A human-readable description.
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the feature service, typically the email of the primary
maintainer.
created_timestamp: The time when the feature service was created.
last_updated_timestamp: The time when the feature service was last updated.
"""
name: str
_features: List[Union[FeatureView, OnDemandFeatureView]]
feature_view_projections: List[FeatureViewProjection]
description: str
tags: Dict[str, str]
owner: str
created_timestamp: Optional[datetime] = None
last_updated_timestamp: Optional[datetime] = None
logging_config: Optional[LoggingConfig] = None
@log_exceptions
def __init__(
self,
*args,
name: Optional[str] = None,
features: Optional[List[Union[FeatureView, OnDemandFeatureView]]] = None,
tags: Dict[str, str] = None,
description: str = "",
owner: str = "",
logging_config: Optional[LoggingConfig] = None,
):
"""
Creates a FeatureService object.
Raises:
ValueError: If one of the specified features is not a valid type.
"""
positional_attributes = ["name", "features"]
_name = name
_features = features
if args:
warnings.warn(
(
"Feature service parameters should be specified as a keyword argument instead of a positional arg."
"Feast 0.24+ will not support positional arguments to construct feature service"
),
DeprecationWarning,
)
if len(args) > len(positional_attributes):
raise ValueError(
f"Only {', '.join(positional_attributes)} are allowed as positional args when defining "
f"feature service, for backwards compatibility."
)
if len(args) >= 1:
_name = args[0]
if len(args) >= 2:
_features = args[1]
if not _name:
raise ValueError("Feature service name needs to be specified")
if not _features:
# Technically, legal to create feature service with no feature views before.
_features = []
self.name = _name
self._features = _features
self.feature_view_projections = []
self.description = description
self.tags = tags or {}
self.owner = owner
self.created_timestamp = None
self.last_updated_timestamp = None
self.logging_config = logging_config
for feature_grouping in self._features:
if isinstance(feature_grouping, BaseFeatureView):
self.feature_view_projections.append(feature_grouping.projection)
[docs] def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None):
for feature_grouping in self._features:
if isinstance(feature_grouping, BaseFeatureView):
# For feature services that depend on an unspecified feature view, apply inferred schema
if fvs_to_update and feature_grouping.name in fvs_to_update:
if feature_grouping.projection.desired_features:
desired_features = set(
feature_grouping.projection.desired_features
)
actual_features = set(
[
f.name
for f in fvs_to_update[feature_grouping.name].features
]
)
assert desired_features.issubset(actual_features)
# We need to set the features for the projection at this point so we ensure we're starting with
# an empty list.
feature_grouping.projection.features = []
for f in fvs_to_update[feature_grouping.name].features:
if f.name in desired_features:
feature_grouping.projection.features.append(f)
else:
feature_grouping.projection.features = fvs_to_update[
feature_grouping.name
].features
else:
raise ValueError(
f"The feature service {self.name} has been provided with an invalid type "
f'{type(feature_grouping)} as part of the "features" argument.)'
)
def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
return f"<{self.__class__.__name__}({', '.join(items)})>"
def __str__(self):
return str(MessageToJson(self.to_proto()))
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
if not isinstance(other, FeatureService):
raise TypeError(
"Comparisons should only involve FeatureService class objects."
)
if (
self.name != other.name
or self.description != other.description
or self.tags != other.tags
or self.owner != other.owner
):
return False
if sorted(self.feature_view_projections) != sorted(
other.feature_view_projections
):
return False
return True
[docs] @classmethod
def from_proto(cls, feature_service_proto: FeatureServiceProto):
"""
Converts a FeatureServiceProto to a FeatureService object.
Args:
feature_service_proto: A protobuf representation of a FeatureService.
"""
fs = cls(
name=feature_service_proto.spec.name,
features=[],
tags=dict(feature_service_proto.spec.tags),
description=feature_service_proto.spec.description,
owner=feature_service_proto.spec.owner,
logging_config=LoggingConfig.from_proto(
feature_service_proto.spec.logging_config
),
)
fs.feature_view_projections.extend(
[
FeatureViewProjection.from_proto(projection)
for projection in feature_service_proto.spec.features
]
)
if feature_service_proto.meta.HasField("created_timestamp"):
fs.created_timestamp = (
feature_service_proto.meta.created_timestamp.ToDatetime()
)
if feature_service_proto.meta.HasField("last_updated_timestamp"):
fs.last_updated_timestamp = (
feature_service_proto.meta.last_updated_timestamp.ToDatetime()
)
return fs
[docs] def to_proto(self) -> FeatureServiceProto:
"""
Converts a feature service to its protobuf representation.
Returns:
A FeatureServiceProto protobuf.
"""
meta = FeatureServiceMetaProto()
if self.created_timestamp:
meta.created_timestamp.FromDatetime(self.created_timestamp)
if self.last_updated_timestamp:
meta.last_updated_timestamp.FromDatetime(self.last_updated_timestamp)
spec = FeatureServiceSpecProto(
name=self.name,
features=[
projection.to_proto() for projection in self.feature_view_projections
],
tags=self.tags,
description=self.description,
owner=self.owner,
logging_config=self.logging_config.to_proto()
if self.logging_config
else None,
)
return FeatureServiceProto(spec=spec, meta=meta)
[docs] def validate(self):
pass