Feature Store#
FeatureStore#
hsfs.feature_store.FeatureStore(
featurestore_id,
featurestore_name,
created,
project_name,
project_id,
offline_featurestore_name,
hive_endpoint,
online_enabled,
num_feature_groups=None,
num_training_datasets=None,
num_storage_connectors=None,
num_feature_views=None,
online_featurestore_name=None,
mysql_server_endpoint=None,
online_featurestore_size=None,
)
Retrieval#
get_feature_store#
Connection.get_feature_store(name=None)
Get a reference to a feature store to perform operations on.
Defaulting to the project name of default feature store. To get a Shared feature stores, the project name of the feature store is required.
How to get feature store instance
import hsfs
conn = hsfs.connection()
fs = conn.get_feature_store()
# or
import hopsworks
project = hopsworks.login()
fs = project.get_feature_store()
Arguments
- name
str
: The name of the feature store, defaults toNone
.
Returns
FeatureStore
. A feature store handle object to perform operations on.
Properties#
hive_endpoint#
Hive endpoint for the offline feature store.
id#
Id of the feature store.
mysql_server_endpoint#
MySQL server endpoint for the online feature store.
name#
Name of the feature store.
offline_featurestore_name#
Name of the offline feature store database.
online_enabled#
Indicator whether online feature store is enabled.
online_featurestore_name#
Name of the online feature store database.
project_id#
Id of the project in which the feature store is located.
project_name#
Name of the project in which the feature store is located.
Methods#
create_external_feature_group#
FeatureStore.create_external_feature_group(
name,
storage_connector,
query=None,
data_format=None,
path="",
options={},
version=None,
description="",
primary_key=[],
features=[],
statistics_config=None,
event_time=None,
expectation_suite=None,
online_enabled=False,
topic_name=None,
)
Create a external feature group metadata object.
Example
# connect to the Feature Store
fs = ...
external_fg = fs.create_external_feature_group(
name="sales",
version=1,
description="Physical shop sales features",
query=query,
storage_connector=connector,
primary_key=['ss_store_sk'],
event_time='sale_date'
)
Lazy
This method is lazy and does not persist any metadata in the
feature store on its own. To persist the feature group metadata in the feature store,
call the save()
method.
You can enable online storage for external feature groups, however, the sync from the external storage to Hopsworks online storage needs to be done manually:
external_fg = fs.create_external_feature_group(
name="sales",
version=1,
description="Physical shop sales features",
query=query,
storage_connector=connector,
primary_key=['ss_store_sk'],
event_time='sale_date',
online_enabled=True
)
external_fg.save()
# read from external storage and filter data to sync to online
df = external_fg.read().filter(external_fg.customer_status == "active")
# insert to online storage
external_fg.insert(df)
Arguments
- name
str
: Name of the external feature group to create. - storage_connector
hsfs.StorageConnector
: the storage connector to use to establish connectivity with the data source. - query
Optional[str]
: A string containing a SQL query valid for the target data source. the query will be used to pull data from the data sources when the feature group is used. - data_format
Optional[str]
: If the external feature groups refers to a directory with data, the data format to use when reading it - path
Optional[str]
: The location within the scope of the storage connector, from where to read the data for the external feature group - options
Optional[Dict[str, str]]
: Additional options to be used by the engine when reading data from the specified storage connector. For example,{"header": True}
when reading CSV files with column names in the first row. - version
Optional[int]
: Version of the external feature group to retrieve, defaults toNone
and will create the feature group with incremented version from the last version in the feature store. - description
Optional[str]
: A string describing the contents of the external feature group to improve discoverability for Data Scientists, defaults to empty string""
. - primary_key
Optional[List[str]]
: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list[]
, and the feature group won't have any primary key. - features
Optional[List[hsfs.feature.Feature]]
: Optionally, define the schema of the external feature group manually as a list ofFeature
objects. Defaults to empty list[]
and will use the schema information of the DataFrame resulting by executing the provided query against the data source. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this external feature group,"correlations
" to turn on feature correlation computation,"histograms"
to compute feature value frequencies and"exact_uniqueness"
to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - event_time
Optional[str]
: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults toNone
.
Event time data type restriction
The supported data types for the event time column are: timestamp
, date
and bigint
.
- expectation_suite
Optional[Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]]
: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults toNone
. - online_enabled
Optional[bool]
: Define whether it should be possible to sync the feature group to the online feature store for low latency access, defaults toFalse
. - topic_name
Optional[str]
: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic.
Returns
ExternalFeatureGroup
. The external feature group metadata object.
create_feature_group#
FeatureStore.create_feature_group(
name,
version=None,
description="",
online_enabled=False,
time_travel_format="HUDI",
partition_key=[],
primary_key=[],
hudi_precombine_key=None,
features=[],
statistics_config=None,
event_time=None,
stream=False,
expectation_suite=None,
parents=[],
topic_name=None,
)
Create a feature group metadata object.
Example
# connect to the Feature Store
fs = ...
fg = fs.create_feature_group(
name='air_quality',
description='Air Quality characteristics of each day',
version=1,
primary_key=['city','date'],
online_enabled=True,
event_time='date'
)
Lazy
This method is lazy and does not persist any metadata or feature data in the
feature store on its own. To persist the feature group and save feature data
along the metadata in the feature store, call the save()
method with a
DataFrame.
Arguments
- name
str
: Name of the feature group to create. - version
Optional[int]
: Version of the feature group to retrieve, defaults toNone
and will create the feature group with incremented version from the last version in the feature store. - description
Optional[str]
: A string describing the contents of the feature group to improve discoverability for Data Scientists, defaults to empty string""
. - online_enabled
Optional[bool]
: Define whether the feature group should be made available also in the online feature store for low latency access, defaults toFalse
. - time_travel_format
Optional[str]
: Format used for time travel, defaults to"HUDI"
. - partition_key
Optional[List[str]]
: A list of feature names to be used as partition key when writing the feature data to the offline storage, defaults to empty list[]
. - primary_key
Optional[List[str]]
: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list[]
, and the feature group won't have any primary key. - hudi_precombine_key
Optional[str]
: A feature name to be used as a precombine key for the"HUDI"
feature group. Defaults toNone
. If feature group has time travel format"HUDI"
and hudi precombine key was not specified then the first primary key of the feature group will be used as hudi precombine key. - features
Optional[List[hsfs.feature.Feature]]
: Optionally, define the schema of the feature group manually as a list ofFeature
objects. Defaults to empty list[]
and will use the schema information of the DataFrame provided in thesave
method. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation,"histograms"
to compute feature value frequencies and"exact_uniqueness"
to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. -
event_time
Optional[str]
: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults toNone
.Event time data type restriction
The supported data types for the event time column are:
timestamp
,date
andbigint
.- stream: Optionally, Define whether the feature group should support real time stream writing capabilities. Stream enabled Feature Groups have unified single API for writing streaming features transparently to both online and offline store.
- expectation_suite
Optional[Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]]
: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults toNone
. - parents
Optional[List[hsfs.feature_group.FeatureGroup]]
: Optionally, Define the parents of this feature group as the origin where the data is coming from. - topic_name
Optional[str]
: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic.
Returns
FeatureGroup
. The feature group metadata object.
create_feature_view#
FeatureStore.create_feature_view(
name, query, version=None, description="", labels=[], transformation_functions={}
)
Create a feature view metadata object and saved it to hopsworks.
Example
# connect to the Feature Store
fs = ...
# get the feature group instances
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)
# construct the query
query = fg1.select_all().join(fg2.select_all())
# get the transformation functions
standard_scaler = fs.get_transformation_function(name='standard_scaler')
# construct dictionary of "feature - transformation function" pairs
transformation_functions = {col_name: standard_scaler for col_name in df.columns}
feature_view = fs.create_feature_view(
name='air_quality_fv',
version=1,
transformation_functions=transformation_functions,
query=query
)
Example
# get feature store instance
fs = ...
# define query object
query = ...
# define dictionary with column names and transformation functions pairs
mapping_transformers = ...
# create feature view
feature_view = fs.create_feature_view(
name='feature_view_name',
version=1,
transformation_functions=mapping_transformers,
query=query
)
Warning
as_of
argument in the Query
will be ignored because
feature view does not support time travel query.
Arguments
- name
str
: Name of the feature view to create. - query
hsfs.constructor.query.Query
: Feature storeQuery
. - version
Optional[int]
: Version of the feature view to create, defaults toNone
and will create the feature view with incremented version from the last version in the feature store. - description
Optional[str]
: A string describing the contents of the feature view to improve discoverability for Data Scientists, defaults to empty string""
. - labels
Optional[List[str]]
: A list of feature names constituting the prediction label/feature of the feature view. When replaying aQuery
during model inference, the label features can be omitted from the feature vector retrieval. Defaults to[]
, no label. - transformation_functions
Optional[Dict[str, hsfs.transformation_function.TransformationFunction]]
: A dictionary mapping tansformation functions to to the features they should be applied to before writing out the vector and at inference time. Defaults to{}
, no transformations.
Returns:
FeatureView
: The feature view metadata object.
create_on_demand_feature_group#
FeatureStore.create_on_demand_feature_group(
name,
storage_connector,
query=None,
data_format=None,
path="",
options={},
version=None,
description="",
primary_key=[],
features=[],
statistics_config=None,
event_time=None,
expectation_suite=None,
topic_name=None,
)
Create a external feature group metadata object.
Deprecated
create_on_demand_feature_group
method is deprecated. Use the create_external_feature_group
method instead.
Lazy
This method is lazy and does not persist any metadata in the
feature store on its own. To persist the feature group metadata in the feature store,
call the save()
method.
Arguments
- name
str
: Name of the external feature group to create. - storage_connector
hsfs.StorageConnector
: the storage connector to use to establish connectivity with the data source. - query
Optional[str]
: A string containing a SQL query valid for the target data source. the query will be used to pull data from the data sources when the feature group is used. - data_format
Optional[str]
: If the external feature groups refers to a directory with data, the data format to use when reading it - path
Optional[str]
: The location within the scope of the storage connector, from where to read the data for the external feature group - options
Optional[Dict[str, str]]
: Additional options to be used by the engine when reading data from the specified storage connector. For example,{"header": True}
when reading CSV files with column names in the first row. - version
Optional[int]
: Version of the external feature group to retrieve, defaults toNone
and will create the feature group with incremented version from the last version in the feature store. - description
Optional[str]
: A string describing the contents of the external feature group to improve discoverability for Data Scientists, defaults to empty string""
. - primary_key
Optional[List[str]]
: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list[]
, and the feature group won't have any primary key. - features
Optional[List[hsfs.feature.Feature]]
: Optionally, define the schema of the external feature group manually as a list ofFeature
objects. Defaults to empty list[]
and will use the schema information of the DataFrame resulting by executing the provided query against the data source. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this external feature group,"correlations
" to turn on feature correlation computation,"histograms"
to compute feature value frequencies and"exact_uniqueness"
to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - event_time
Optional[str]
: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults toNone
. -
topic_name
Optional[str]
: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic.Event time data type restriction
The supported data types for the event time column are:
timestamp
,date
andbigint
.-
expectation_suite: Optionally, attach an expectation suite to the feature
group which dataframes should be validated against upon insertion.
Defaults to
None
.
-
expectation_suite: Optionally, attach an expectation suite to the feature
group which dataframes should be validated against upon insertion.
Defaults to
Returns
ExternalFeatureGroup
. The external feature group metadata object.
create_training_dataset#
FeatureStore.create_training_dataset(
name,
version=None,
description="",
data_format="tfrecords",
coalesce=False,
storage_connector=None,
splits={},
location="",
seed=None,
statistics_config=None,
label=[],
transformation_functions={},
train_split=None,
)
Create a training dataset metadata object.
Deprecated
TrainingDataset
is deprecated, use FeatureView
instead. From version 3.0
training datasets created with this API are not visibile in the API anymore.
Lazy
This method is lazy and does not persist any metadata or feature data in the
feature store on its own. To materialize the training dataset and save
feature data along the metadata in the feature store, call the save()
method with a DataFrame
or Query
.
Data Formats
The feature store currently supports the following data formats for training datasets:
- tfrecord
- csv
- tsv
- parquet
- avro
- orc
Currently not supported petastorm, hdf5 and npy file formats.
Arguments
- name
str
: Name of the training dataset to create. - version
Optional[int]
: Version of the training dataset to retrieve, defaults toNone
and will create the training dataset with incremented version from the last version in the feature store. - description
Optional[str]
: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string""
. - data_format
Optional[str]
: The data format used to save the training dataset, defaults to"tfrecords"
-format. - coalesce
Optional[bool]
: If true the training dataset data will be coalesced into a single partition before writing. The resulting training dataset will be a single file per split. Default False. - storage_connector
Optional[hsfs.StorageConnector]
: Storage connector defining the sink location for the training dataset, defaults toNone
, and materializes training dataset on HopsFS. - splits
Optional[Dict[str, float]]
: A dictionary defining training dataset splits to be created. Keys in the dictionary define the name of the split asstr
, values represent percentage of samples in the split asfloat
. Currently, only random splits are supported. Defaults to empty dict{}
, creating only a single training dataset without splits. - location
Optional[str]
: Path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset. Defaults to""
, saving the training dataset at the root defined by the storage connector. - seed
Optional[int]
: Optionally, define a seed to create the random splits with, in order to guarantee reproducability, defaults toNone
. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation and"histograms"
to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - label
Optional[List[str]]
: A list of feature names constituting the prediction label/feature of the training dataset. When replaying aQuery
during model inference, the label features can be omitted from the feature vector retrieval. Defaults to[]
, no label. - transformation_functions
Optional[Dict[str, hsfs.transformation_function.TransformationFunction]]
: A dictionary mapping tansformation functions to to the features they should be applied to before writing out the training data and at inference time. Defaults to{}
, no transformations. - train_split
Optional[str]
: Ifsplits
is set, provide the name of the split that is going to be used for training. The statistics of this split will be used for transformation functions if necessary. Defaults toNone
.
Returns:
TrainingDataset
: The training dataset metadata object.
create_transformation_function#
FeatureStore.create_transformation_function(transformation_function, output_type, version=None)
Create a transformation function metadata object.
Example
# define function
def plus_one(value):
return value + 1
# create transformation function
plus_one_meta = fs.create_transformation_function(
transformation_function=plus_one,
output_type=int,
version=1
)
# persist transformation function in backend
plus_one_meta.save()
Lazy
This method is lazy and does not persist the transformation function in the
feature store on its own. To materialize the transformation function and save
call the save()
method of the transformation function metadata object.
Arguments
- transformation_function
callable
: callable object. - output_type
Union[str, bytes, int, numpy.int8, numpy.int16, numpy.int32, numpy.int64, float, numpy.float64, datetime.datetime, numpy.datetime64, datetime.date, bool]
: python or numpy output type that will be inferred as pyspark.sql.types type.
Returns:
TransformationFunction
: The TransformationFunction metadata object.
from_response_json#
FeatureStore.from_response_json(json_dict)
get_external_feature_group#
FeatureStore.get_external_feature_group(name, version=None)
Get a external feature group entity from the feature store.
Getting a external feature group from the Feature Store means getting its
metadata handle so you can subsequently read the data into a Spark or
Pandas DataFrame or use the Query
-API to perform joins between feature groups.
Example
# connect to the Feature Store
fs = ...
external_fg = fs.get_external_feature_group("external_fg_test")
Arguments
- name
str
: Name of the external feature group to get. - version
Optional[int]
: Version of the external feature group to retrieve, defaults toNone
and will return theversion=1
.
Returns
ExternalFeatureGroup
: The external feature group metadata object.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature group from the feature store.
get_external_feature_groups#
FeatureStore.get_external_feature_groups(name)
Get a list of all versions of an external feature group entity from the feature store.
Getting a external feature group from the Feature Store means getting its
metadata handle so you can subsequently read the data into a Spark or
Pandas DataFrame or use the Query
-API to perform joins between feature groups.
Example
# connect to the Feature Store
fs = ...
external_fgs_list = fs.get_external_feature_groups("external_fg_test")
Arguments
- name
str
: Name of the external feature group to get.
Returns
ExternalFeatureGroup
: List of external feature group metadata objects.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature group from the feature store.
get_feature_group#
FeatureStore.get_feature_group(name, version=None)
Get a feature group entity from the feature store.
Getting a feature group from the Feature Store means getting its metadata handle
so you can subsequently read the data into a Spark or Pandas DataFrame or use
the Query
-API to perform joins between feature groups.
Example
# connect to the Feature Store
fs = ...
fg = fs.get_feature_group(
name="electricity_prices",
version=1,
)
Arguments
- name
str
: Name of the feature group to get. - version
Optional[int]
: Version of the feature group to retrieve, defaults toNone
and will return theversion=1
.
Returns
FeatureGroup
: The feature group metadata object.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature group from the feature store.
get_feature_groups#
FeatureStore.get_feature_groups(name)
Get a list of all versions of a feature group entity from the feature store.
Getting a feature group from the Feature Store means getting its metadata handle
so you can subsequently read the data into a Spark or Pandas DataFrame or use
the Query
-API to perform joins between feature groups.
Example
# connect to the Feature Store
fs = ...
fgs_list = fs.get_feature_groups(
name="electricity_prices"
)
Arguments
- name
str
: Name of the feature group to get.
Returns
FeatureGroup
: List of feature group metadata objects.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature group from the feature store.
get_feature_view#
FeatureStore.get_feature_view(name, version=None)
Get a feature view entity from the feature store.
Getting a feature view from the Feature Store means getting its metadata.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(
name='feature_view_name',
version=1
)
Arguments
- name
str
: Name of the feature view to get. - version
Optional[int]
: Version of the feature view to retrieve, defaults toNone
and will return theversion=1
.
Returns
FeatureView
: The feature view metadata object.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature view from the feature store.
get_feature_views#
FeatureStore.get_feature_views(name)
Get a list of all versions of a feature view entity from the feature store.
Getting a feature view from the Feature Store means getting its metadata.
Example
# get feature store instance
fs = ...
# get a list of all versions of a feature view
feature_view = fs.get_feature_views(
name='feature_view_name'
)
Arguments
- name: Name of the feature view to get.
Returns
FeatureView
: List of feature view metadata objects.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature view from the feature store.
get_on_demand_feature_group#
FeatureStore.get_on_demand_feature_group(name, version=None)
Get a external feature group entity from the feature store.
Deprecated
get_on_demand_feature_group
method is deprecated. Use the get_external_feature_group
method instead.
Getting a external feature group from the Feature Store means getting its
metadata handle so you can subsequently read the data into a Spark or
Pandas DataFrame or use the Query
-API to perform joins between feature groups.
Arguments
- name
str
: Name of the external feature group to get. - version
Optional[int]
: Version of the external feature group to retrieve, defaults toNone
and will return theversion=1
.
Returns
ExternalFeatureGroup
: The external feature group metadata object.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature group from the feature store.
get_on_demand_feature_groups#
FeatureStore.get_on_demand_feature_groups(name)
Get a list of all versions of an external feature group entity from the feature store.
Deprecated
get_on_demand_feature_groups
method is deprecated. Use the get_external_feature_groups
method instead.
Getting a external feature group from the Feature Store means getting its
metadata handle so you can subsequently read the data into a Spark or
Pandas DataFrame or use the Query
-API to perform joins between feature groups.
Arguments
- name
str
: Name of the external feature group to get.
Returns
ExternalFeatureGroup
: List of external feature group metadata objects.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature group from the feature store.
get_online_storage_connector#
FeatureStore.get_online_storage_connector()
Get the storage connector for the Online Feature Store of the respective project's feature store.
The returned storage connector depends on the project that you are connected to.
Example
# connect to the Feature Store
fs = ...
online_storage_connector = fs.get_online_storage_connector()
Returns
StorageConnector
. JDBC storage connector to the Online Feature Store.
get_or_create_feature_group#
FeatureStore.get_or_create_feature_group(
name,
version,
description="",
online_enabled=False,
time_travel_format="HUDI",
partition_key=[],
primary_key=[],
hudi_precombine_key=None,
features=[],
statistics_config=None,
expectation_suite=None,
event_time=None,
stream=False,
parents=[],
topic_name=None,
)
Get feature group metadata object or create a new one if it doesn't exist. This method doesn't update existing feature group metadata object.
Example
# connect to the Feature Store
fs = ...
fg = fs.get_or_create_feature_group(
name="electricity_prices",
version=1,
description="Electricity prices from NORD POOL",
primary_key=["day", "area"],
online_enabled=True,
event_time="timestamp",
)
Lazy
This method is lazy and does not persist any metadata or feature data in the
feature store on its own. To persist the feature group and save feature data
along the metadata in the feature store, call the insert()
method with a
DataFrame.
Arguments
- name
str
: Name of the feature group to create. - version
int
: Version of the feature group to retrieve or create. - description
Optional[str]
: A string describing the contents of the feature group to improve discoverability for Data Scientists, defaults to empty string""
. - online_enabled
Optional[bool]
: Define whether the feature group should be made available also in the online feature store for low latency access, defaults toFalse
. - time_travel_format
Optional[str]
: Format used for time travel, defaults to"HUDI"
. - partition_key
Optional[List[str]]
: A list of feature names to be used as partition key when writing the feature data to the offline storage, defaults to empty list[]
. - primary_key
Optional[List[str]]
: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list[]
, and the feature group won't have any primary key. - hudi_precombine_key
Optional[str]
: A feature name to be used as a precombine key for the"HUDI"
feature group. Defaults toNone
. If feature group has time travel format"HUDI"
and hudi precombine key was not specified then the first primary key of the feature group will be used as hudi precombine key. - features
Optional[List[hsfs.feature.Feature]]
: Optionally, define the schema of the feature group manually as a list ofFeature
objects. Defaults to empty list[]
and will use the schema information of the DataFrame provided in thesave
method. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation,"histograms"
to compute feature value frequencies and"exact_uniqueness"
to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - expectation_suite
Optional[Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]]
: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults toNone
. -
event_time
Optional[str]
: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults toNone
.Event time data type restriction
The supported data types for the event time column are:
timestamp
,date
andbigint
.- stream: Optionally, Define whether the feature group should support real time stream writing capabilities. Stream enabled Feature Groups have unified single API for writing streaming features transparently to both online and offline store.
- parents
Optional[List[hsfs.feature_group.FeatureGroup]]
: Optionally, Define the parents of this feature group as the origin where the data is coming from. - topic_name
Optional[str]
: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic.
Returns
FeatureGroup
. The feature group metadata object.
get_or_create_feature_view#
FeatureStore.get_or_create_feature_view(
name, query, version, description="", labels=[], transformation_functions={}
)
Get feature view metadata object or create a new one if it doesn't exist. This method doesn't update existing feature view metadata object.
Example
# connect to the Feature Store
fs = ...
feature_view = fs.get_or_create_feature_view(
name='bitcoin_feature_view',
version=1,
transformation_functions=transformation_functions,
query=query
)
Arguments
- name
str
: Name of the feature view to create. - query
hsfs.constructor.query.Query
: Feature storeQuery
. - version
int
: Version of the feature view to create. - description
Optional[str]
: A string describing the contents of the feature view to improve discoverability for Data Scientists, defaults to empty string""
. - labels
Optional[List[str]]
: A list of feature names constituting the prediction label/feature of the feature view. When replaying aQuery
during model inference, the label features can be omitted from the feature vector retrieval. Defaults to[]
, no label. - transformation_functions
Optional[Dict[str, hsfs.transformation_function.TransformationFunction]]
: A dictionary mapping tansformation functions to to the features they should be applied to before writing out the vector and at inference time. Defaults to{}
, no transformations.
Returns:
FeatureView
: The feature view metadata object.
get_or_create_spine_group#
FeatureStore.get_or_create_spine_group(
name, version=None, description="", primary_key=[], event_time=None, features=[], dataframe=None
)
Create a spine group metadata object.
Instead of using a feature group to save a label/prediction target, you can use a spine together with a dataframe containing the labels. A Spine is essentially a metadata object similar to a feature group, however, the data is not materialized in the feature store. It only containes the needed metadata such as the relevant event time column and primary key columns to perform point-in-time correct joins.
Example
# connect to the Feature Store
fs = ...
spine_df = pd.Dataframe()
spine_group = fs.get_or_create_spine_group(
name="sales",
version=1,
description="Physical shop sales features",
primary_key=['ss_store_sk'],
event_time='sale_date',
dataframe=spine_df
)
Note that you can inspect the dataframe in the spine group, or replace the dataframe:
spine_group.dataframe.show()
spine_group.dataframe = new_df
The spine can then be used to construct queries, with only one speciality:
Note
Spines can only be used on the left side of a feature join, as this is the base set of entities for which features are to be fetched and the left side of the join determines the event timestamps to compare against.
If you want to use the query for a feature view to be used for online serving, you can only select the label or target feature from the spine. For the online lookup, the label is not required, therefore it is important to only select label from the left feature group, so that we don't need to provide a spine for online serving.
These queries can then be used to create feature views. Since the dataframe contained in the spine is not being materialized, every time you use a feature view created with spine to read data you will have to provide a dataframe with the same structure again.
For example, to generate training data:
X_train, X_test, y_train, y_test = feature_view_spine.train_test_split(0.2, spine=training_data_entities)
Or to get batches of fresh data for batch scoring:
feature_view_spine.get_batch_data(spine=scoring_entities_df).show()
Here you have the chance to pass a different set of entities to generate the training dataset.
Sometimes it might be handy to create a feature view with a regular feature group containing the label, but then at serving time to use a spine in order to fetch features for example only for a small set of primary key values. To do this, you can pass the spine group instead of a dataframe. Just make sure it contains the needed primary key, event time and label column.
feature_view.get_batch_data(spine=spine_group)
Arguments
- name
str
: Name of the spine group to create. - version
Optional[int]
: Version of the spine group to retrieve, defaults toNone
and will create the spine group with incremented version from the last version in the feature store. - description
Optional[str]
: A string describing the contents of the spine group to improve discoverability for Data Scientists, defaults to empty string""
. - primary_key
Optional[List[str]]
: A list of feature names to be used as primary key for the spine group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list[]
, and the spine group won't have any primary key. - event_time
Optional[str]
: Optionally, provide the name of the feature containing the event time for the features in this spine group. If event_time is set the spine group can be used for point-in-time joins. Defaults toNone
. - features
Optional[List[hsfs.feature.Feature]]
: Optionally, define the schema of the spine group manually as a list ofFeature
objects. Defaults to empty list[]
and will use the schema information of the DataFrame resulting by executing the provided query against the data source.
Event time data type restriction
The supported data types for the event time column are: timestamp
, date
and bigint
.
- dataframe
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]]
: DataFrame, RDD, Ndarray, list. Spine dataframe with primary key, event time and label column to use for point in time join when fetching features.
Returns
SpineGroup
. The spine group metadata object.
get_storage_connector#
FeatureStore.get_storage_connector(name)
Get a previously created storage connector from the feature store.
Storage connectors encapsulate all information needed for the execution engine to read and write to specific storage. This storage can be S3, a JDBC compliant database or the distributed filesystem HOPSFS.
If you want to connect to the online feature store, see the
get_online_storage_connector
method to get the JDBC connector for the Online
Feature Store.
Example
# connect to the Feature Store
fs = ...
sc = fs.get_storage_connector("demo_fs_meb10000_Training_Datasets")
Arguments
- name
str
: Name of the storage connector to retrieve.
Returns
StorageConnector
. Storage connector object.
get_training_dataset#
FeatureStore.get_training_dataset(name, version=None)
Get a training dataset entity from the feature store.
Deprecated
TrainingDataset
is deprecated, use FeatureView
instead. You can still retrieve old
training datasets using this method, but after upgrading the old training datasets will
also be available under a Feature View with the same name and version.
It is recommended to use this method only for old training datasets that have been created directly from Dataframes and not with Query objects.
Getting a training dataset from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame.
Arguments
- name
str
: Name of the training dataset to get. - version
Optional[int]
: Version of the training dataset to retrieve, defaults toNone
and will return theversion=1
.
Returns
TrainingDataset
: The training dataset metadata object.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature group from the feature store.
get_training_datasets#
FeatureStore.get_training_datasets(name)
Get a list of all versions of a training dataset entity from the feature store.
Deprecated
TrainingDataset
is deprecated, use FeatureView
instead.
Getting a training dataset from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame.
Arguments
- name
str
: Name of the training dataset to get.
Returns
TrainingDataset
: List of training dataset metadata objects.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature group from the feature store.
get_transformation_function#
FeatureStore.get_transformation_function(name, version=None)
Get transformation function metadata object.
Get transformation function by name. This will default to version 1
# get feature store instance
fs = ...
# get transformation function metadata object
plus_one_fn = fs.get_transformation_function(name="plus_one")
Get built-in transformation function min max scaler
# get feature store instance
fs = ...
# get transformation function metadata object
min_max_scaler_fn = fs.get_transformation_function(name="min_max_scaler")
Get transformation function by name and version
# get feature store instance
fs = ...
# get transformation function metadata object
min_max_scaler = fs.get_transformation_function(name="min_max_scaler", version=2)
You can define in the feature view transformation functions as dict, where key is feature name and value is online transformation function instance. Then the transformation functions are applied when you read training data, get batch data, or get feature vector(s).
Attach transformation functions to the feature view
# get feature store instance
fs = ...
# define query object
query = ...
# get transformation function metadata object
min_max_scaler = fs.get_transformation_function(name="min_max_scaler", version=1)
# attach transformation functions
feature_view = fs.create_feature_view(
name='feature_view_name',
query=query,
labels=["target_column"],
transformation_functions={
"column_to_transform": min_max_scaler
}
)
Built-in transformation functions are attached in the same way.
The only difference is that it will compute the necessary statistics for the specific function in the background.
For example min and max values for min_max_scaler
; mean and standard deviation for standard_scaler
etc.
Attach built-in transformation functions to the feature view
# get feature store instance
fs = ...
# define query object
query = ...
# retrieve transformation functions
min_max_scaler = fs.get_transformation_function(name="min_max_scaler")
standard_scaler = fs.get_transformation_function(name="standard_scaler")
robust_scaler = fs.get_transformation_function(name="robust_scaler")
label_encoder = fs.get_transformation_function(name="label_encoder")
# attach built-in transformation functions while creating feature view
feature_view = fs.create_feature_view(
name='transactions_view',
query=query,
labels=["fraud_label"],
transformation_functions = {
"category_column": label_encoder,
"weight": robust_scaler,
"age": min_max_scaler,
"salary": standard_scaler
}
)
Arguments
- name
str
: name of transformation function. - version
Optional[int]
: version of transformation function. Optional, if not provided all functions that match to provided name will be retrieved.
Returns:
TransformationFunction
: The TransformationFunction metadata object.
get_transformation_functions#
FeatureStore.get_transformation_functions()
Get all transformation functions metadata objects.
Get all transformation functions
# get feature store instance
fs = ...
# get all transformation functions
list_transformation_fns = fs.get_transformation_functions()
Returns:
List[TransformationFunction]
. List of transformation function instances.
sql#
FeatureStore.sql(query, dataframe_type="default", online=False, read_options={})
Execute SQL command on the offline or online feature store database
Example
# connect to the Feature Store
fs = ...
# construct the query and show head rows
query_res_head = fs.sql("SELECT * FROM `fg_1`").head()
Arguments
- query
str
: The SQL query to execute. - dataframe_type
Optional[str]
: The type of the returned dataframe. Defaults to "default". which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Hive engine. - online
Optional[bool]
: Set to true to execute the query against the online feature store. Defaults to False. - read_options
Optional[dict]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. For python engine:- key
"hive_config"
to pass a dictionary of hive or tez configurations. For example:{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}
If running queries on the online feature store, users can provide an entry{'external': True}
, this instructs the library to use thehost
parameter in thehsfs.connection()
to establish the connection to the online feature store. If not set, or set to False, the online feature store storage connector is used which relies on the private ip. Defaults to{}
.
- key
Returns
DataFrame
: DataFrame depending on the chosen type.