feast.infra.offline_stores package
Subpackages
- feast.infra.offline_stores.contrib package
- Subpackages
- feast.infra.offline_stores.contrib.athena_offline_store package
- feast.infra.offline_stores.contrib.mssql_offline_store package
- feast.infra.offline_stores.contrib.postgres_offline_store package
- feast.infra.offline_stores.contrib.spark_offline_store package
- feast.infra.offline_stores.contrib.trino_offline_store package
- Subpackages
- Submodules
- feast.infra.offline_stores.contrib.trino_offline_store.trino module
- feast.infra.offline_stores.contrib.trino_offline_store.trino_queries module
- feast.infra.offline_stores.contrib.trino_offline_store.trino_source module
- feast.infra.offline_stores.contrib.trino_offline_store.trino_type_map module
- Module contents
- Submodules
- feast.infra.offline_stores.contrib.athena_repo_configuration module
- feast.infra.offline_stores.contrib.mssql_repo_configuration module
- feast.infra.offline_stores.contrib.postgres_repo_configuration module
- feast.infra.offline_stores.contrib.spark_repo_configuration module
- feast.infra.offline_stores.contrib.trino_repo_configuration module
- Module contents
- Subpackages
Submodules
feast.infra.offline_stores.bigquery module
- class feast.infra.offline_stores.bigquery.BigQueryOfflineStore[source]
Bases:
OfflineStore
- static get_historical_features(config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], entity_df: DataFrame | str, registry: BaseRegistry, project: str, full_feature_names: bool = False) RetrievalJob [source]
Retrieves the point-in-time correct historical feature values for the specified entity rows.
- Parameters:
config – The config for the current feature store.
feature_views – A list containing all feature views that are referenced in the entity rows.
feature_refs – The features to be retrieved.
entity_df – A collection of rows containing all entity columns on which features need to be joined, as well as the timestamp column used for point-in-time joins. Either a pandas dataframe can be provided or a SQL query.
registry – The registry for the current feature store.
project – Feast project to which the feature views belong.
full_feature_names – If True, feature names will be prefixed with the corresponding feature view name, changing them from the format “feature” to “feature_view__feature” (e.g. “daily_transactions” changes to “customer_fv__daily_transactions”).
- Returns:
A RetrievalJob that can be executed to get the features.
- static offline_write_batch(config: RepoConfig, feature_view: FeatureView, table: Table, progress: Callable[[int], Any] | None)[source]
Writes the specified arrow table to the data source underlying the specified feature view.
- Parameters:
config – The config for the current feature store.
feature_view – The feature view whose batch source should be written.
table – The arrow table to write.
progress – Function to be called once a portion of the data has been written, used to show progress.
- static pull_all_from_table_or_query(config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime, end_date: datetime) RetrievalJob [source]
Extracts all the entity rows (i.e. the combination of join key columns, feature columns, and timestamp columns) from the specified data source that lie within the specified time range.
All of the column names should refer to columns that exist in the data source. In particular, any mapping of column names must have already happened.
- Parameters:
config – The config for the current feature store.
data_source – The data source from which the entity rows will be extracted.
join_key_columns – The columns of the join keys.
feature_name_columns – The columns of the features.
timestamp_field – The timestamp column.
start_date – The start of the time range.
end_date – The end of the time range.
- Returns:
A RetrievalJob that can be executed to get the entity rows.
- static pull_latest_from_table_or_query(config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: str | None, start_date: datetime, end_date: datetime) RetrievalJob [source]
Extracts the latest entity rows (i.e. the combination of join key columns, feature columns, and timestamp columns) from the specified data source that lie within the specified time range.
All of the column names should refer to columns that exist in the data source. In particular, any mapping of column names must have already happened.
- Parameters:
config – The config for the current feature store.
data_source – The data source from which the entity rows will be extracted.
join_key_columns – The columns of the join keys.
feature_name_columns – The columns of the features.
timestamp_field – The timestamp column, used to determine which rows are the most recent.
created_timestamp_column – The column indicating when the row was created, used to break ties.
start_date – The start of the time range.
end_date – The end of the time range.
- Returns:
A RetrievalJob that can be executed to get the entity rows.
- static write_logged_features(config: RepoConfig, data: Table | Path, source: LoggingSource, logging_config: LoggingConfig, registry: BaseRegistry)[source]
Writes logged features to a specified destination in the offline store.
If the specified destination exists, data will be appended; otherwise, the destination will be created and data will be added. Thus this function can be called repeatedly with the same destination to flush logs in chunks.
- Parameters:
config – The config for the current feature store.
data – An arrow table or a path to parquet directory that contains the logs to write.
source – The logging source that provides a schema and some additional metadata.
logging_config – A LoggingConfig object that determines where the logs will be written.
registry – The registry for the current feature store.
- class feast.infra.offline_stores.bigquery.BigQueryOfflineStoreConfig(*, type: typing_extensions.Literal[bigquery] = 'bigquery', dataset: StrictStr = 'feast', project_id: StrictStr | None = None, billing_project_id: StrictStr | None = None, location: StrictStr | None = None, gcs_staging_location: str | None = None)[source]
Bases:
FeastConfigBaseModel
Offline store config for GCP BigQuery
- dataset: StrictStr
(optional) BigQuery Dataset name for temporary tables
- gcs_staging_location: str | None
(optional) GCS location used for offloading BigQuery results as parquet files.
- location: StrictStr | None
(optional) GCP location name used for the BigQuery offline store. Examples of location names include
US
,EU
,us-central1
,us-west4
. If a location is not specified, the location defaults to theUS
multi-regional location. For more information on BigQuery data locations see: https://cloud.google.com/bigquery/docs/locations
- type: typing_extensions.Literal[bigquery]
Offline store type selector
- class feast.infra.offline_stores.bigquery.BigQueryRetrievalJob(query: str | Callable[[], AbstractContextManager[str]], client: Client, config: RepoConfig, full_feature_names: bool, on_demand_feature_views: List[OnDemandFeatureView] | None = None, metadata: RetrievalMetadata | None = None)[source]
Bases:
RetrievalJob
- property full_feature_names: bool
Returns True if full feature names should be applied to the results of the query.
- property metadata: RetrievalMetadata | None
Returns metadata about the retrieval job.
- property on_demand_feature_views: List[OnDemandFeatureView]
Returns a list containing all the on demand feature views to be handled.
- persist(storage: SavedDatasetStorage, allow_overwrite: bool | None = False, timeout: int | None = None)[source]
Synchronously executes the underlying query and persists the result in the same offline store at the specified destination.
- Parameters:
storage – The saved dataset storage object specifying where the result should be persisted.
allow_overwrite – If True, a pre-existing location (e.g. table or file) can be overwritten. Currently not all individual offline store implementations make use of this parameter.
- supports_remote_storage_export() bool [source]
Returns True if the RetrievalJob supports to_remote_storage.
- to_bigquery(job_config: QueryJobConfig | None = None, timeout: int | None = 1800, retry_cadence: int | None = 10) str [source]
Synchronously executes the underlying query and exports the result to a BigQuery table. The underlying BigQuery job runs for a limited amount of time (the default is 30 minutes).
- Parameters:
job_config (optional) – A bigquery.QueryJobConfig to specify options like the destination table, dry run, etc.
timeout (optional) – The time limit of the BigQuery job in seconds. Defaults to 30 minutes.
retry_cadence (optional) – The number of seconds for setting how long the job should checked for completion.
- Returns:
Returns the destination table name or None if job_config.dry_run is True.
- to_remote_storage() List[str] [source]
Synchronously executes the underlying query and exports the results to remote storage (e.g. S3 or GCS).
Implementations of this method should export the results as multiple parquet files, each file sized appropriately depending on how much data is being returned by the retrieval job.
- Returns:
A list of parquet file paths in remote storage.
- feast.infra.offline_stores.bigquery.arrow_schema_to_bq_schema(arrow_schema: Schema) List[SchemaField] [source]
- feast.infra.offline_stores.bigquery.block_until_done(client: Client, bq_job: QueryJob | LoadJob, timeout: int = 1800, retry_cadence: float = 1)[source]
Waits for bq_job to finish running, up to a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).
- Parameters:
client – A bigquery.client.Client to monitor the bq_job.
bq_job – The bigquery.job.QueryJob that blocks until done runnning.
timeout – An optional number of seconds for setting the time limit of the job.
retry_cadence – An optional number of seconds for setting how long the job should checked for completion.
- Raises:
BigQueryJobStillRunning exception if the function has blocked longer than 30 minutes. –
BigQueryJobCancelled exception to signify when that the job has been cancelled (i.e. from timeout or KeyboardInterrupt). –
feast.infra.offline_stores.bigquery_source module
- class feast.infra.offline_stores.bigquery_source.BigQueryLoggingDestination(*, table_ref)[source]
Bases:
LoggingDestination
- classmethod from_proto(config_proto: LoggingConfig) LoggingDestination [source]
- to_data_source() DataSource [source]
Convert this object into a data source to read logs from an offline store.
- class feast.infra.offline_stores.bigquery_source.BigQueryOptions(table: str | None, query: str | None)[source]
Bases:
object
Configuration options for a BigQuery data source.
- classmethod from_proto(bigquery_options_proto: BigQueryOptions)[source]
Creates a BigQueryOptions from a protobuf representation of a BigQuery option
- Parameters:
bigquery_options_proto – A protobuf representation of a DataSource
- Returns:
Returns a BigQueryOptions object based on the bigquery_options protobuf
- class feast.infra.offline_stores.bigquery_source.BigQuerySource(*, name: str | None = None, timestamp_field: str | None = None, table: str | None = None, created_timestamp_column: str | None = '', field_mapping: Dict[str, str] | None = None, query: str | None = None, description: str | None = '', tags: Dict[str, str] | None = None, owner: str | None = '')[source]
Bases:
DataSource
- static from_proto(data_source: DataSource)[source]
Converts data source config in protobuf spec to a DataSource class object.
- Parameters:
data_source – A protobuf representation of a DataSource.
- Returns:
A DataSource class object.
- Raises:
ValueError – The type of DataSource could not be identified.
- get_table_column_names_and_types(config: RepoConfig) Iterable[Tuple[str, str]] [source]
Returns the list of column names and raw column types.
- Parameters:
config – Configuration object used to configure a feature store.
- get_table_query_string() str [source]
Returns a string that can directly be used to reference this table in SQL
- property query
- static source_datatype_to_feast_value_type() Callable[[str], ValueType] [source]
Returns the callable method that returns Feast type given the raw column type.
- property table
- validate(config: RepoConfig)[source]
Validates the underlying data source.
- Parameters:
config – Configuration object used to configure a feature store.
- class feast.infra.offline_stores.bigquery_source.SavedDatasetBigQueryStorage(table: str)[source]
Bases:
SavedDatasetStorage
- bigquery_options: BigQueryOptions
- static from_proto(storage_proto: SavedDatasetStorage) SavedDatasetStorage [source]
- to_data_source() DataSource [source]
feast.infra.offline_stores.file module
- class feast.infra.offline_stores.file.FileOfflineStore[source]
Bases:
OfflineStore
- static get_historical_features(config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], entity_df: DataFrame | str, registry: BaseRegistry, project: str, full_feature_names: bool = False) RetrievalJob [source]
Retrieves the point-in-time correct historical feature values for the specified entity rows.
- Parameters:
config – The config for the current feature store.
feature_views – A list containing all feature views that are referenced in the entity rows.
feature_refs – The features to be retrieved.
entity_df – A collection of rows containing all entity columns on which features need to be joined, as well as the timestamp column used for point-in-time joins. Either a pandas dataframe can be provided or a SQL query.
registry – The registry for the current feature store.
project – Feast project to which the feature views belong.
full_feature_names – If True, feature names will be prefixed with the corresponding feature view name, changing them from the format “feature” to “feature_view__feature” (e.g. “daily_transactions” changes to “customer_fv__daily_transactions”).
- Returns:
A RetrievalJob that can be executed to get the features.
- static offline_write_batch(config: RepoConfig, feature_view: FeatureView, table: Table, progress: Callable[[int], Any] | None)[source]
Writes the specified arrow table to the data source underlying the specified feature view.
- Parameters:
config – The config for the current feature store.
feature_view – The feature view whose batch source should be written.
table – The arrow table to write.
progress – Function to be called once a portion of the data has been written, used to show progress.
- static pull_all_from_table_or_query(config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime, end_date: datetime) RetrievalJob [source]
Extracts all the entity rows (i.e. the combination of join key columns, feature columns, and timestamp columns) from the specified data source that lie within the specified time range.
All of the column names should refer to columns that exist in the data source. In particular, any mapping of column names must have already happened.
- Parameters:
config – The config for the current feature store.
data_source – The data source from which the entity rows will be extracted.
join_key_columns – The columns of the join keys.
feature_name_columns – The columns of the features.
timestamp_field – The timestamp column.
start_date – The start of the time range.
end_date – The end of the time range.
- Returns:
A RetrievalJob that can be executed to get the entity rows.
- static pull_latest_from_table_or_query(config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: str | None, start_date: datetime, end_date: datetime) RetrievalJob [source]
Extracts the latest entity rows (i.e. the combination of join key columns, feature columns, and timestamp columns) from the specified data source that lie within the specified time range.
All of the column names should refer to columns that exist in the data source. In particular, any mapping of column names must have already happened.
- Parameters:
config – The config for the current feature store.
data_source – The data source from which the entity rows will be extracted.
join_key_columns – The columns of the join keys.
feature_name_columns – The columns of the features.
timestamp_field – The timestamp column, used to determine which rows are the most recent.
created_timestamp_column – The column indicating when the row was created, used to break ties.
start_date – The start of the time range.
end_date – The end of the time range.
- Returns:
A RetrievalJob that can be executed to get the entity rows.
- static write_logged_features(config: RepoConfig, data: Table | Path, source: LoggingSource, logging_config: LoggingConfig, registry: BaseRegistry)[source]
Writes logged features to a specified destination in the offline store.
If the specified destination exists, data will be appended; otherwise, the destination will be created and data will be added. Thus this function can be called repeatedly with the same destination to flush logs in chunks.
- Parameters:
config – The config for the current feature store.
data – An arrow table or a path to parquet directory that contains the logs to write.
source – The logging source that provides a schema and some additional metadata.
logging_config – A LoggingConfig object that determines where the logs will be written.
registry – The registry for the current feature store.
- class feast.infra.offline_stores.file.FileOfflineStoreConfig(*, type: typing_extensions.Literal[file] = 'file')[source]
Bases:
FeastConfigBaseModel
Offline store config for local (file-based) store
- type: typing_extensions.Literal[file]
Offline store type selector
- class feast.infra.offline_stores.file.FileRetrievalJob(evaluation_function: Callable, full_feature_names: bool, on_demand_feature_views: List[OnDemandFeatureView] | None = None, metadata: RetrievalMetadata | None = None)[source]
Bases:
RetrievalJob
- property full_feature_names: bool
Returns True if full feature names should be applied to the results of the query.
- property metadata: RetrievalMetadata | None
Returns metadata about the retrieval job.
- property on_demand_feature_views: List[OnDemandFeatureView]
Returns a list containing all the on demand feature views to be handled.
- persist(storage: SavedDatasetStorage, allow_overwrite: bool | None = False, timeout: int | None = None)[source]
Synchronously executes the underlying query and persists the result in the same offline store at the specified destination.
- Parameters:
storage – The saved dataset storage object specifying where the result should be persisted.
allow_overwrite – If True, a pre-existing location (e.g. table or file) can be overwritten. Currently not all individual offline store implementations make use of this parameter.
feast.infra.offline_stores.file_source module
- class feast.infra.offline_stores.file_source.FileLoggingDestination(*, path: str, s3_endpoint_override='', partition_by: List[str] | None = None)[source]
Bases:
LoggingDestination
- classmethod from_proto(config_proto: LoggingConfig) LoggingDestination [source]
- to_data_source() DataSource [source]
Convert this object into a data source to read logs from an offline store.
- class feast.infra.offline_stores.file_source.FileOptions(uri: str, file_format: FileFormat | None, s3_endpoint_override: str | None)[source]
Bases:
object
Configuration options for a file data source.
- file_format
File source format, e.g. parquet.
- Type:
feast.data_format.FileFormat | None
- file_format: FileFormat | None
- classmethod from_proto(file_options_proto: FileOptions)[source]
Creates a FileOptions from a protobuf representation of a file option
- Parameters:
file_options_proto – a protobuf representation of a datasource
- Returns:
Returns a FileOptions object based on the file_options protobuf
- class feast.infra.offline_stores.file_source.FileSource(*, path: str, name: str | None = '', event_timestamp_column: str | None = '', file_format: FileFormat | None = None, created_timestamp_column: str | None = '', field_mapping: Dict[str, str] | None = None, s3_endpoint_override: str | None = None, description: str | None = '', tags: Dict[str, str] | None = None, owner: str | None = '', timestamp_field: str | None = '')[source]
Bases:
DataSource
- static create_filesystem_and_path(path: str, s3_endpoint_override: str) Tuple[FileSystem | None, str] [source]
- property file_format: FileFormat | None
Returns the file format of this file data source.
- static from_proto(data_source: DataSource)[source]
Converts data source config in protobuf spec to a DataSource class object.
- Parameters:
data_source – A protobuf representation of a DataSource.
- Returns:
A DataSource class object.
- Raises:
ValueError – The type of DataSource could not be identified.
- get_table_column_names_and_types(config: RepoConfig) Iterable[Tuple[str, str]] [source]
Returns the list of column names and raw column types.
- Parameters:
config – Configuration object used to configure a feature store.
- get_table_query_string() str [source]
Returns a string that can directly be used to reference this table in SQL.
- property s3_endpoint_override: str | None
Returns the s3 endpoint override of this file data source.
- static source_datatype_to_feast_value_type() Callable[[str], ValueType] [source]
Returns the callable method that returns Feast type given the raw column type.
- validate(config: RepoConfig)[source]
Validates the underlying data source.
- Parameters:
config – Configuration object used to configure a feature store.
- class feast.infra.offline_stores.file_source.SavedDatasetFileStorage(path: str, file_format: ~feast.data_format.FileFormat = <feast.data_format.ParquetFormat object>, s3_endpoint_override: str | None = None)[source]
Bases:
SavedDatasetStorage
- file_options: FileOptions
- static from_data_source(data_source: DataSource) SavedDatasetStorage [source]
- static from_proto(storage_proto: SavedDatasetStorage) SavedDatasetStorage [source]
- to_data_source() DataSource [source]
feast.infra.offline_stores.offline_store module
- class feast.infra.offline_stores.offline_store.OfflineStore[source]
Bases:
ABC
An offline store defines the interface that Feast uses to interact with the storage and compute system that handles offline features.
Each offline store implementation is designed to work only with the corresponding data source. For example, the SnowflakeOfflineStore can handle SnowflakeSources but not FileSources.
- abstract static get_historical_features(config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], entity_df: DataFrame | str, registry: BaseRegistry, project: str, full_feature_names: bool = False) RetrievalJob [source]
Retrieves the point-in-time correct historical feature values for the specified entity rows.
- Parameters:
config – The config for the current feature store.
feature_views – A list containing all feature views that are referenced in the entity rows.
feature_refs – The features to be retrieved.
entity_df – A collection of rows containing all entity columns on which features need to be joined, as well as the timestamp column used for point-in-time joins. Either a pandas dataframe can be provided or a SQL query.
registry – The registry for the current feature store.
project – Feast project to which the feature views belong.
full_feature_names – If True, feature names will be prefixed with the corresponding feature view name, changing them from the format “feature” to “feature_view__feature” (e.g. “daily_transactions” changes to “customer_fv__daily_transactions”).
- Returns:
A RetrievalJob that can be executed to get the features.
- static offline_write_batch(config: RepoConfig, feature_view: FeatureView, table: Table, progress: Callable[[int], Any] | None)[source]
Writes the specified arrow table to the data source underlying the specified feature view.
- Parameters:
config – The config for the current feature store.
feature_view – The feature view whose batch source should be written.
table – The arrow table to write.
progress – Function to be called once a portion of the data has been written, used to show progress.
- abstract static pull_all_from_table_or_query(config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime, end_date: datetime) RetrievalJob [source]
Extracts all the entity rows (i.e. the combination of join key columns, feature columns, and timestamp columns) from the specified data source that lie within the specified time range.
All of the column names should refer to columns that exist in the data source. In particular, any mapping of column names must have already happened.
- Parameters:
config – The config for the current feature store.
data_source – The data source from which the entity rows will be extracted.
join_key_columns – The columns of the join keys.
feature_name_columns – The columns of the features.
timestamp_field – The timestamp column.
start_date – The start of the time range.
end_date – The end of the time range.
- Returns:
A RetrievalJob that can be executed to get the entity rows.
- abstract static pull_latest_from_table_or_query(config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: str | None, start_date: datetime, end_date: datetime) RetrievalJob [source]
Extracts the latest entity rows (i.e. the combination of join key columns, feature columns, and timestamp columns) from the specified data source that lie within the specified time range.
All of the column names should refer to columns that exist in the data source. In particular, any mapping of column names must have already happened.
- Parameters:
config – The config for the current feature store.
data_source – The data source from which the entity rows will be extracted.
join_key_columns – The columns of the join keys.
feature_name_columns – The columns of the features.
timestamp_field – The timestamp column, used to determine which rows are the most recent.
created_timestamp_column – The column indicating when the row was created, used to break ties.
start_date – The start of the time range.
end_date – The end of the time range.
- Returns:
A RetrievalJob that can be executed to get the entity rows.
- static write_logged_features(config: RepoConfig, data: Table | Path, source: LoggingSource, logging_config: LoggingConfig, registry: BaseRegistry)[source]
Writes logged features to a specified destination in the offline store.
If the specified destination exists, data will be appended; otherwise, the destination will be created and data will be added. Thus this function can be called repeatedly with the same destination to flush logs in chunks.
- Parameters:
config – The config for the current feature store.
data – An arrow table or a path to parquet directory that contains the logs to write.
source – The logging source that provides a schema and some additional metadata.
logging_config – A LoggingConfig object that determines where the logs will be written.
registry – The registry for the current feature store.
- class feast.infra.offline_stores.offline_store.RetrievalJob[source]
Bases:
ABC
A RetrievalJob manages the execution of a query to retrieve data from the offline store.
- abstract property full_feature_names: bool
Returns True if full feature names should be applied to the results of the query.
- abstract property metadata: RetrievalMetadata | None
Returns metadata about the retrieval job.
- abstract property on_demand_feature_views: List[OnDemandFeatureView]
Returns a list containing all the on demand feature views to be handled.
- abstract persist(storage: SavedDatasetStorage, allow_overwrite: bool = False, timeout: int | None = None)[source]
Synchronously executes the underlying query and persists the result in the same offline store at the specified destination.
- Parameters:
storage – The saved dataset storage object specifying where the result should be persisted.
allow_overwrite – If True, a pre-existing location (e.g. table or file) can be overwritten. Currently not all individual offline store implementations make use of this parameter.
- supports_remote_storage_export() bool [source]
Returns True if the RetrievalJob supports to_remote_storage.
- to_arrow(validation_reference: ValidationReference | None = None, timeout: int | None = None) Table [source]
Synchronously executes the underlying query and returns the result as an arrow table.
On demand transformations will be executed. If a validation reference is provided, the dataframe will be validated.
- Parameters:
validation_reference (optional) – The validation to apply against the retrieved dataframe.
timeout (optional) – The query timeout if applicable.
- to_df(validation_reference: ValidationReference | None = None, timeout: int | None = None) DataFrame [source]
Synchronously executes the underlying query and returns the result as a pandas dataframe.
On demand transformations will be executed. If a validation reference is provided, the dataframe will be validated.
- Parameters:
validation_reference (optional) – The validation to apply against the retrieved dataframe.
timeout (optional) – The query timeout if applicable.
- to_remote_storage() List[str] [source]
Synchronously executes the underlying query and exports the results to remote storage (e.g. S3 or GCS).
Implementations of this method should export the results as multiple parquet files, each file sized appropriately depending on how much data is being returned by the retrieval job.
- Returns:
A list of parquet file paths in remote storage.
feast.infra.offline_stores.offline_utils module
- class feast.infra.offline_stores.offline_utils.FeatureViewQueryContext(name: str, ttl: int, entities: List[str], features: List[str], field_mapping: Dict[str, str], timestamp_field: str, created_timestamp_column: str | None, table_subquery: str, entity_selections: List[str], min_event_timestamp: str | None, max_event_timestamp: str, date_partition_column: str | None)[source]
Bases:
object
Context object used to template a BigQuery and Redshift point-in-time SQL query
- feast.infra.offline_stores.offline_utils.assert_expected_columns_in_entity_df(entity_schema: Dict[str, dtype], join_keys: Set[str], entity_df_event_timestamp_col: str)[source]
- feast.infra.offline_stores.offline_utils.build_point_in_time_query(feature_view_query_contexts: List[FeatureViewQueryContext], left_table_query_string: str, entity_df_event_timestamp_col: str, entity_df_columns: KeysView[str], query_template: str, full_feature_names: bool = False) str [source]
Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift
- feast.infra.offline_stores.offline_utils.get_entity_df_timestamp_bounds(entity_df: DataFrame, event_timestamp_col: str) Tuple[Timestamp, Timestamp] [source]
- feast.infra.offline_stores.offline_utils.get_expected_join_keys(project: str, feature_views: List[FeatureView], registry: BaseRegistry) Set[str] [source]
- feast.infra.offline_stores.offline_utils.get_feature_view_query_context(feature_refs: List[str], feature_views: List[FeatureView], registry: BaseRegistry, project: str, entity_df_timestamp_range: Tuple[datetime, datetime]) List[FeatureViewQueryContext] [source]
Build a query context containing all information required to template a BigQuery and Redshift point-in-time SQL query
- feast.infra.offline_stores.offline_utils.get_offline_store_from_config(offline_store_config: Any) OfflineStore [source]
Creates an offline store corresponding to the given offline store config.
- feast.infra.offline_stores.offline_utils.get_pyarrow_schema_from_batch_source(config: RepoConfig, batch_source: DataSource, timestamp_unit: str = 'us') Tuple[Schema, List[str]] [source]
Returns the pyarrow schema and column names for the given batch source.
feast.infra.offline_stores.redshift module
- class feast.infra.offline_stores.redshift.RedshiftOfflineStore[source]
Bases:
OfflineStore
- static get_historical_features(config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], entity_df: DataFrame | str, registry: BaseRegistry, project: str, full_feature_names: bool = False) RetrievalJob [source]
Retrieves the point-in-time correct historical feature values for the specified entity rows.
- Parameters:
config – The config for the current feature store.
feature_views – A list containing all feature views that are referenced in the entity rows.
feature_refs – The features to be retrieved.
entity_df – A collection of rows containing all entity columns on which features need to be joined, as well as the timestamp column used for point-in-time joins. Either a pandas dataframe can be provided or a SQL query.
registry – The registry for the current feature store.
project – Feast project to which the feature views belong.
full_feature_names – If True, feature names will be prefixed with the corresponding feature view name, changing them from the format “feature” to “feature_view__feature” (e.g. “daily_transactions” changes to “customer_fv__daily_transactions”).
- Returns:
A RetrievalJob that can be executed to get the features.
- static offline_write_batch(config: RepoConfig, feature_view: FeatureView, table: Table, progress: Callable[[int], Any] | None)[source]
Writes the specified arrow table to the data source underlying the specified feature view.
- Parameters:
config – The config for the current feature store.
feature_view – The feature view whose batch source should be written.
table – The arrow table to write.
progress – Function to be called once a portion of the data has been written, used to show progress.
- static pull_all_from_table_or_query(config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime, end_date: datetime) RetrievalJob [source]
Extracts all the entity rows (i.e. the combination of join key columns, feature columns, and timestamp columns) from the specified data source that lie within the specified time range.
All of the column names should refer to columns that exist in the data source. In particular, any mapping of column names must have already happened.
- Parameters:
config – The config for the current feature store.
data_source – The data source from which the entity rows will be extracted.
join_key_columns – The columns of the join keys.
feature_name_columns – The columns of the features.
timestamp_field – The timestamp column.
start_date – The start of the time range.
end_date – The end of the time range.
- Returns:
A RetrievalJob that can be executed to get the entity rows.
- static pull_latest_from_table_or_query(config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: str | None, start_date: datetime, end_date: datetime) RetrievalJob [source]
Extracts the latest entity rows (i.e. the combination of join key columns, feature columns, and timestamp columns) from the specified data source that lie within the specified time range.
All of the column names should refer to columns that exist in the data source. In particular, any mapping of column names must have already happened.
- Parameters:
config – The config for the current feature store.
data_source – The data source from which the entity rows will be extracted.
join_key_columns – The columns of the join keys.
feature_name_columns – The columns of the features.
timestamp_field – The timestamp column, used to determine which rows are the most recent.
created_timestamp_column – The column indicating when the row was created, used to break ties.
start_date – The start of the time range.
end_date – The end of the time range.
- Returns:
A RetrievalJob that can be executed to get the entity rows.
- static write_logged_features(config: RepoConfig, data: Table | Path, source: LoggingSource, logging_config: LoggingConfig, registry: BaseRegistry)[source]
Writes logged features to a specified destination in the offline store.
If the specified destination exists, data will be appended; otherwise, the destination will be created and data will be added. Thus this function can be called repeatedly with the same destination to flush logs in chunks.
- Parameters:
config – The config for the current feature store.
data – An arrow table or a path to parquet directory that contains the logs to write.
source – The logging source that provides a schema and some additional metadata.
logging_config – A LoggingConfig object that determines where the logs will be written.
registry – The registry for the current feature store.
- class feast.infra.offline_stores.redshift.RedshiftOfflineStoreConfig(*, type: typing_extensions.Literal[redshift] = 'redshift', cluster_id: StrictStr | None = None, user: StrictStr | None = None, workgroup: StrictStr | None = None, region: StrictStr, database: StrictStr, s3_staging_location: StrictStr, iam_role: StrictStr)[source]
Bases:
FeastConfigBaseModel
Offline store config for AWS Redshift
- database: StrictStr
Redshift database name
- iam_role: StrictStr
IAM Role for Redshift, granting it access to S3
- region: StrictStr
Redshift cluster’s AWS region
- classmethod require_cluster_and_user_or_workgroup(values)[source]
Provisioned Redshift clusters: Require cluster_id and user, ignore workgroup Serverless Redshift: Require workgroup, ignore cluster_id and user
- s3_staging_location: StrictStr
S3 path for importing & exporting data to Redshift
- type: typing_extensions.Literal[redshift]
Offline store type selector
- class feast.infra.offline_stores.redshift.RedshiftRetrievalJob(query: str | Callable[[], AbstractContextManager[str]], redshift_client, s3_resource, config: RepoConfig, full_feature_names: bool, on_demand_feature_views: List[OnDemandFeatureView] | None = None, metadata: RetrievalMetadata | None = None)[source]
Bases:
RetrievalJob
- property full_feature_names: bool
Returns True if full feature names should be applied to the results of the query.
- property metadata: RetrievalMetadata | None
Returns metadata about the retrieval job.
- property on_demand_feature_views: List[OnDemandFeatureView]
Returns a list containing all the on demand feature views to be handled.
- persist(storage: SavedDatasetStorage, allow_overwrite: bool | None = False, timeout: int | None = None)[source]
Synchronously executes the underlying query and persists the result in the same offline store at the specified destination.
- Parameters:
storage – The saved dataset storage object specifying where the result should be persisted.
allow_overwrite – If True, a pre-existing location (e.g. table or file) can be overwritten. Currently not all individual offline store implementations make use of this parameter.
- supports_remote_storage_export() bool [source]
Returns True if the RetrievalJob supports to_remote_storage.
- to_remote_storage() List[str] [source]
Synchronously executes the underlying query and exports the results to remote storage (e.g. S3 or GCS).
Implementations of this method should export the results as multiple parquet files, each file sized appropriately depending on how much data is being returned by the retrieval job.
- Returns:
A list of parquet file paths in remote storage.
feast.infra.offline_stores.redshift_source module
- class feast.infra.offline_stores.redshift_source.RedshiftLoggingDestination(*, table_name: str)[source]
Bases:
LoggingDestination
- classmethod from_proto(config_proto: LoggingConfig) LoggingDestination [source]
- to_data_source() DataSource [source]
Convert this object into a data source to read logs from an offline store.
- class feast.infra.offline_stores.redshift_source.RedshiftOptions(table: str | None, schema: str | None, query: str | None, database: str | None)[source]
Bases:
object
Configuration options for a Redshift data source.
- classmethod from_proto(redshift_options_proto: RedshiftOptions)[source]
Creates a RedshiftOptions from a protobuf representation of a Redshift option.
- Parameters:
redshift_options_proto – A protobuf representation of a DataSource
- Returns:
A RedshiftOptions object based on the redshift_options protobuf.
- class feast.infra.offline_stores.redshift_source.RedshiftSource(*, name: str | None = None, timestamp_field: str | None = '', table: str | None = None, schema: str | None = None, created_timestamp_column: str | None = '', field_mapping: Dict[str, str] | None = None, query: str | None = None, description: str | None = '', tags: Dict[str, str] | None = None, owner: str | None = '', database: str | None = '')[source]
Bases:
DataSource
- property database
Returns the Redshift database of this Redshift source.
- static from_proto(data_source: DataSource)[source]
Creates a RedshiftSource from a protobuf representation of a RedshiftSource.
- Parameters:
data_source – A protobuf representation of a RedshiftSource
- Returns:
A RedshiftSource object based on the data_source protobuf.
- get_table_column_names_and_types(config: RepoConfig) Iterable[Tuple[str, str]] [source]
Returns a mapping of column names to types for this Redshift source.
- Parameters:
config – A RepoConfig describing the feature repo
- get_table_query_string() str [source]
Returns a string that can directly be used to reference this table in SQL.
- property query
Returns the Redshift query of this Redshift source.
- property schema
Returns the schema of this Redshift source.
- static source_datatype_to_feast_value_type() Callable[[str], ValueType] [source]
Returns the callable method that returns Feast type given the raw column type.
- property table
Returns the table of this Redshift source.
- to_proto() DataSource [source]
Converts a RedshiftSource object to its protobuf representation.
- Returns:
A DataSourceProto object.
- validate(config: RepoConfig)[source]
Validates the underlying data source.
- Parameters:
config – Configuration object used to configure a feature store.
- class feast.infra.offline_stores.redshift_source.SavedDatasetRedshiftStorage(table_ref: str)[source]
Bases:
SavedDatasetStorage
- static from_proto(storage_proto: SavedDatasetStorage) SavedDatasetStorage [source]
- redshift_options: RedshiftOptions
- to_data_source() DataSource [source]
feast.infra.offline_stores.snowflake module
- class feast.infra.offline_stores.snowflake.SnowflakeOfflineStore[source]
Bases:
OfflineStore
- static get_historical_features(config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], entity_df: DataFrame | str, registry: BaseRegistry, project: str, full_feature_names: bool = False) RetrievalJob [source]
Retrieves the point-in-time correct historical feature values for the specified entity rows.
- Parameters:
config – The config for the current feature store.
feature_views – A list containing all feature views that are referenced in the entity rows.
feature_refs – The features to be retrieved.
entity_df – A collection of rows containing all entity columns on which features need to be joined, as well as the timestamp column used for point-in-time joins. Either a pandas dataframe can be provided or a SQL query.
registry – The registry for the current feature store.
project – Feast project to which the feature views belong.
full_feature_names – If True, feature names will be prefixed with the corresponding feature view name, changing them from the format “feature” to “feature_view__feature” (e.g. “daily_transactions” changes to “customer_fv__daily_transactions”).
- Returns:
A RetrievalJob that can be executed to get the features.
- static offline_write_batch(config: RepoConfig, feature_view: FeatureView, table: Table, progress: Callable[[int], Any] | None)[source]
Writes the specified arrow table to the data source underlying the specified feature view.
- Parameters:
config – The config for the current feature store.
feature_view – The feature view whose batch source should be written.
table – The arrow table to write.
progress – Function to be called once a portion of the data has been written, used to show progress.
- static pull_all_from_table_or_query(config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, start_date: datetime, end_date: datetime) RetrievalJob [source]
Extracts all the entity rows (i.e. the combination of join key columns, feature columns, and timestamp columns) from the specified data source that lie within the specified time range.
All of the column names should refer to columns that exist in the data source. In particular, any mapping of column names must have already happened.
- Parameters:
config – The config for the current feature store.
data_source – The data source from which the entity rows will be extracted.
join_key_columns – The columns of the join keys.
feature_name_columns – The columns of the features.
timestamp_field – The timestamp column.
start_date – The start of the time range.
end_date – The end of the time range.
- Returns:
A RetrievalJob that can be executed to get the entity rows.
- static pull_latest_from_table_or_query(config: RepoConfig, data_source: DataSource, join_key_columns: List[str], feature_name_columns: List[str], timestamp_field: str, created_timestamp_column: str | None, start_date: datetime, end_date: datetime) RetrievalJob [source]
Extracts the latest entity rows (i.e. the combination of join key columns, feature columns, and timestamp columns) from the specified data source that lie within the specified time range.
All of the column names should refer to columns that exist in the data source. In particular, any mapping of column names must have already happened.
- Parameters:
config – The config for the current feature store.
data_source – The data source from which the entity rows will be extracted.
join_key_columns – The columns of the join keys.
feature_name_columns – The columns of the features.
timestamp_field – The timestamp column, used to determine which rows are the most recent.
created_timestamp_column – The column indicating when the row was created, used to break ties.
start_date – The start of the time range.
end_date – The end of the time range.
- Returns:
A RetrievalJob that can be executed to get the entity rows.
- static write_logged_features(config: RepoConfig, data: Table | Path, source: LoggingSource, logging_config: LoggingConfig, registry: BaseRegistry)[source]
Writes logged features to a specified destination in the offline store.
If the specified destination exists, data will be appended; otherwise, the destination will be created and data will be added. Thus this function can be called repeatedly with the same destination to flush logs in chunks.
- Parameters:
config – The config for the current feature store.
data – An arrow table or a path to parquet directory that contains the logs to write.
source – The logging source that provides a schema and some additional metadata.
logging_config – A LoggingConfig object that determines where the logs will be written.
registry – The registry for the current feature store.
- class feast.infra.offline_stores.snowflake.SnowflakeOfflineStoreConfig(*, type: typing_extensions.Literal[snowflake.offline] = 'snowflake.offline', config_path: str | None = '/home/docs/.snowsql/config', account: str | None = None, user: str | None = None, password: str | None = None, role: str | None = None, warehouse: str | None = None, authenticator: str | None = None, database: StrictStr, schema: str | None = 'PUBLIC', storage_integration_name: str | None = None, blob_export_location: str | None = None, convert_timestamp_columns: bool | None = None)[source]
Bases:
FeastConfigBaseModel
Offline store config for Snowflake
- blob_export_location: str | None
Location (in S3, Google storage or Azure storage) where data is offloaded
- convert_timestamp_columns: bool | None
Convert timestamp columns on export to a Parquet-supported format
- database: StrictStr
Snowflake database name
- type: typing_extensions.Literal[snowflake.offline]
Offline store type selector
- class feast.infra.offline_stores.snowflake.SnowflakeRetrievalJob(query: str | Callable[[], AbstractContextManager[str]], snowflake_conn: SnowflakeConnection, config: RepoConfig, full_feature_names: bool, on_demand_feature_views: List[OnDemandFeatureView] | None = None, metadata: RetrievalMetadata | None = None)[source]
Bases:
RetrievalJob
- property full_feature_names: bool
Returns True if full feature names should be applied to the results of the query.
- property metadata: RetrievalMetadata | None
Returns metadata about the retrieval job.
- property on_demand_feature_views: List[OnDemandFeatureView]
Returns a list containing all the on demand feature views to be handled.
- persist(storage: SavedDatasetStorage, allow_overwrite: bool = False, timeout: int | None = None)[source]
Synchronously executes the underlying query and persists the result in the same offline store at the specified destination.
- Parameters:
storage – The saved dataset storage object specifying where the result should be persisted.
allow_overwrite – If True, a pre-existing location (e.g. table or file) can be overwritten. Currently not all individual offline store implementations make use of this parameter.
- supports_remote_storage_export() bool [source]
Returns True if the RetrievalJob supports to_remote_storage.
- to_remote_storage() List[str] [source]
Synchronously executes the underlying query and exports the results to remote storage (e.g. S3 or GCS).
Implementations of this method should export the results as multiple parquet files, each file sized appropriately depending on how much data is being returned by the retrieval job.
- Returns:
A list of parquet file paths in remote storage.
- to_snowflake(table_name: str, allow_overwrite: bool = False, temporary: bool = False) None [source]
Save dataset as a new Snowflake table
feast.infra.offline_stores.snowflake_source module
- class feast.infra.offline_stores.snowflake_source.SavedDatasetSnowflakeStorage(table_ref: str)[source]
Bases:
SavedDatasetStorage
- static from_proto(storage_proto: SavedDatasetStorage) SavedDatasetStorage [source]
- snowflake_options: SnowflakeOptions
- to_data_source() DataSource [source]
- class feast.infra.offline_stores.snowflake_source.SnowflakeLoggingDestination(*, table_name: str)[source]
Bases:
LoggingDestination
- classmethod from_proto(config_proto: LoggingConfig) LoggingDestination [source]
- to_data_source() DataSource [source]
Convert this object into a data source to read logs from an offline store.
- class feast.infra.offline_stores.snowflake_source.SnowflakeOptions(database: str | None, schema: str | None, table: str | None, query: str | None)[source]
Bases:
object
Configuration options for a Snowflake data source.
- classmethod from_proto(snowflake_options_proto: SnowflakeOptions)[source]
Creates a SnowflakeOptions from a protobuf representation of a snowflake option.
- Parameters:
snowflake_options_proto – A protobuf representation of a DataSource
- Returns:
A SnowflakeOptions object based on the snowflake_options protobuf.
- class feast.infra.offline_stores.snowflake_source.SnowflakeSource(*, name: str | None = None, timestamp_field: str | None = '', database: str | None = None, warehouse: str | None = None, schema: str | None = None, table: str | None = None, query: str | None = None, created_timestamp_column: str | None = '', field_mapping: Dict[str, str] | None = None, description: str | None = '', tags: Dict[str, str] | None = None, owner: str | None = '')[source]
Bases:
DataSource
- property database
Returns the database of this snowflake source.
- static from_proto(data_source: DataSource)[source]
Creates a SnowflakeSource from a protobuf representation of a SnowflakeSource.
- Parameters:
data_source – A protobuf representation of a SnowflakeSource
- Returns:
A SnowflakeSource object based on the data_source protobuf.
- get_table_column_names_and_types(config: RepoConfig) Iterable[Tuple[str, str]] [source]
Returns a mapping of column names to types for this snowflake source.
- Parameters:
config – A RepoConfig describing the feature repo
- get_table_query_string() str [source]
Returns a string that can directly be used to reference this table in SQL.
- property query
Returns the snowflake options of this snowflake source.
- property schema
Returns the schema of this snowflake source.
- static source_datatype_to_feast_value_type() Callable[[str], ValueType] [source]
Returns the callable method that returns Feast type given the raw column type.
- property table
Returns the table of this snowflake source.
- to_proto() DataSource [source]
Converts a SnowflakeSource object to its protobuf representation.
- Returns:
A DataSourceProto object.
- validate(config: RepoConfig)[source]
Validates the underlying data source.
- Parameters:
config – Configuration object used to configure a feature store.