Skip to content

Feature Store#

[source]

FeatureStore#

hsfs.feature_store.FeatureStore(
    featurestore_id,
    featurestore_name,
    created,
    project_name,
    project_id,
    offline_featurestore_name,
    hive_endpoint,
    online_enabled,
    num_feature_groups=None,
    num_training_datasets=None,
    num_storage_connectors=None,
    num_feature_views=None,
    online_featurestore_name=None,
    mysql_server_endpoint=None,
    online_featurestore_size=None,
)

Retrieval#

[source]

get_feature_store#

Connection.get_feature_store(name=None)

Get a reference to a feature store to perform operations on.

Defaulting to the project name of default feature store. To get a Shared feature stores, the project name of the feature store is required.

How to get feature store instance

import hsfs
conn = hsfs.connection()
fs = conn.get_feature_store()

# or

import hopsworks
project = hopsworks.login()
fs = project.get_feature_store()

Arguments

  • name str: The name of the feature store, defaults to None.

Returns

FeatureStore. A feature store handle object to perform operations on.


Properties#

[source]

hive_endpoint#

Hive endpoint for the offline feature store.


[source]

id#

Id of the feature store.


[source]

mysql_server_endpoint#

MySQL server endpoint for the online feature store.


[source]

name#

Name of the feature store.


[source]

offline_featurestore_name#

Name of the offline feature store database.


[source]

online_enabled#

Indicator whether online feature store is enabled.


[source]

online_featurestore_name#

Name of the online feature store database.


[source]

project_id#

Id of the project in which the feature store is located.


[source]

project_name#

Name of the project in which the feature store is located.


Methods#

[source]

create_external_feature_group#

FeatureStore.create_external_feature_group(
    name,
    storage_connector,
    query=None,
    data_format=None,
    path="",
    options={},
    version=None,
    description="",
    primary_key=[],
    features=[],
    statistics_config=None,
    event_time=None,
    expectation_suite=None,
    online_enabled=False,
    topic_name=None,
)

Create a external feature group metadata object.

Example

# connect to the Feature Store
fs = ...

external_fg = fs.create_external_feature_group(
                    name="sales",
                    version=1,
                    description="Physical shop sales features",
                    query=query,
                    storage_connector=connector,
                    primary_key=['ss_store_sk'],
                    event_time='sale_date'
                    )

Lazy

This method is lazy and does not persist any metadata in the feature store on its own. To persist the feature group metadata in the feature store, call the save() method.

You can enable online storage for external feature groups, however, the sync from the external storage to Hopsworks online storage needs to be done manually:

external_fg = fs.create_external_feature_group(
            name="sales",
            version=1,
            description="Physical shop sales features",
            query=query,
            storage_connector=connector,
            primary_key=['ss_store_sk'],
            event_time='sale_date',
            online_enabled=True
            )
external_fg.save()

# read from external storage and filter data to sync to online
df = external_fg.read().filter(external_fg.customer_status == "active")

# insert to online storage
external_fg.insert(df)

Arguments

  • name str: Name of the external feature group to create.
  • storage_connector hsfs.StorageConnector: the storage connector to use to establish connectivity with the data source.
  • query Optional[str]: A string containing a SQL query valid for the target data source. the query will be used to pull data from the data sources when the feature group is used.
  • data_format Optional[str]: If the external feature groups refers to a directory with data, the data format to use when reading it
  • path Optional[str]: The location within the scope of the storage connector, from where to read the data for the external feature group
  • options Optional[Dict[str, str]]: Additional options to be used by the engine when reading data from the specified storage connector. For example, {"header": True} when reading CSV files with column names in the first row.
  • version Optional[int]: Version of the external feature group to retrieve, defaults to None and will create the feature group with incremented version from the last version in the feature store.
  • description Optional[str]: A string describing the contents of the external feature group to improve discoverability for Data Scientists, defaults to empty string "".
  • primary_key Optional[List[str]]: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list [], and the feature group won't have any primary key.
  • features Optional[List[hsfs.feature.Feature]]: Optionally, define the schema of the external feature group manually as a list of Feature objects. Defaults to empty list [] and will use the schema information of the DataFrame resulting by executing the provided query against the data source.
  • statistics_config Optional[Union[hsfs.StatisticsConfig, bool, dict]]: A configuration object, or a dictionary with keys "enabled" to generally enable descriptive statistics computation for this external feature group, "correlations" to turn on feature correlation computation, "histograms" to compute feature value frequencies and "exact_uniqueness" to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation pass statistics_config=False. Defaults to None and will compute only descriptive statistics.
  • event_time Optional[str]: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults to None.

Event time data type restriction

The supported data types for the event time column are: timestamp, date and bigint.

  • expectation_suite Optional[Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]]: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults to None.
  • online_enabled Optional[bool]: Define whether it should be possible to sync the feature group to the online feature store for low latency access, defaults to False.
  • topic_name Optional[str]: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic.

Returns

ExternalFeatureGroup. The external feature group metadata object.


[source]

create_feature_group#

FeatureStore.create_feature_group(
    name,
    version=None,
    description="",
    online_enabled=False,
    time_travel_format="HUDI",
    partition_key=[],
    primary_key=[],
    hudi_precombine_key=None,
    features=[],
    statistics_config=None,
    event_time=None,
    stream=False,
    expectation_suite=None,
    parents=[],
    topic_name=None,
)

Create a feature group metadata object.

Example

# connect to the Feature Store
fs = ...

fg = fs.create_feature_group(
        name='air_quality',
        description='Air Quality characteristics of each day',
        version=1,
        primary_key=['city','date'],
        online_enabled=True,
        event_time='date'
    )

Lazy

This method is lazy and does not persist any metadata or feature data in the feature store on its own. To persist the feature group and save feature data along the metadata in the feature store, call the save() method with a DataFrame.

Arguments

  • name str: Name of the feature group to create.
  • version Optional[int]: Version of the feature group to retrieve, defaults to None and will create the feature group with incremented version from the last version in the feature store.
  • description Optional[str]: A string describing the contents of the feature group to improve discoverability for Data Scientists, defaults to empty string "".
  • online_enabled Optional[bool]: Define whether the feature group should be made available also in the online feature store for low latency access, defaults to False.
  • time_travel_format Optional[str]: Format used for time travel, defaults to "HUDI".
  • partition_key Optional[List[str]]: A list of feature names to be used as partition key when writing the feature data to the offline storage, defaults to empty list [].
  • primary_key Optional[List[str]]: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list [], and the feature group won't have any primary key.
  • hudi_precombine_key Optional[str]: A feature name to be used as a precombine key for the "HUDI" feature group. Defaults to None. If feature group has time travel format "HUDI" and hudi precombine key was not specified then the first primary key of the feature group will be used as hudi precombine key.
  • features Optional[List[hsfs.feature.Feature]]: Optionally, define the schema of the feature group manually as a list of Feature objects. Defaults to empty list [] and will use the schema information of the DataFrame provided in the save method.
  • statistics_config Optional[Union[hsfs.StatisticsConfig, bool, dict]]: A configuration object, or a dictionary with keys "enabled" to generally enable descriptive statistics computation for this feature group, "correlations" to turn on feature correlation computation, "histograms" to compute feature value frequencies and "exact_uniqueness" to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation pass statistics_config=False. Defaults to None and will compute only descriptive statistics.
  • event_time Optional[str]: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults to None.

    Event time data type restriction

    The supported data types for the event time column are: timestamp, date and bigint.

    • stream: Optionally, Define whether the feature group should support real time stream writing capabilities. Stream enabled Feature Groups have unified single API for writing streaming features transparently to both online and offline store.
    • expectation_suite Optional[Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]]: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults to None.
    • parents Optional[List[hsfs.feature_group.FeatureGroup]]: Optionally, Define the parents of this feature group as the origin where the data is coming from.
    • topic_name Optional[str]: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic.

Returns

FeatureGroup. The feature group metadata object.


[source]

create_feature_view#

FeatureStore.create_feature_view(
    name, query, version=None, description="", labels=[], transformation_functions={}
)

Create a feature view metadata object and saved it to hopsworks.

Example

# connect to the Feature Store
fs = ...

# get the feature group instances
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)

# construct the query
query = fg1.select_all().join(fg2.select_all())

# get the transformation functions
standard_scaler = fs.get_transformation_function(name='standard_scaler')

# construct dictionary of "feature - transformation function" pairs
transformation_functions = {col_name: standard_scaler for col_name in df.columns}

feature_view = fs.create_feature_view(
    name='air_quality_fv',
    version=1,
    transformation_functions=transformation_functions,
    query=query
)

Example

# get feature store instance
fs = ...

# define query object
query = ...

# define dictionary with column names and transformation functions pairs
mapping_transformers = ...

# create feature view
feature_view = fs.create_feature_view(
    name='feature_view_name',
    version=1,
    transformation_functions=mapping_transformers,
    query=query
)

Warning

as_of argument in the Query will be ignored because feature view does not support time travel query.

Arguments

  • name str: Name of the feature view to create.
  • query hsfs.constructor.query.Query: Feature store Query.
  • version Optional[int]: Version of the feature view to create, defaults to None and will create the feature view with incremented version from the last version in the feature store.
  • description Optional[str]: A string describing the contents of the feature view to improve discoverability for Data Scientists, defaults to empty string "".
  • labels Optional[List[str]]: A list of feature names constituting the prediction label/feature of the feature view. When replaying a Query during model inference, the label features can be omitted from the feature vector retrieval. Defaults to [], no label.
  • transformation_functions Optional[Dict[str, hsfs.transformation_function.TransformationFunction]]: A dictionary mapping tansformation functions to to the features they should be applied to before writing out the vector and at inference time. Defaults to {}, no transformations.

Returns:

FeatureView: The feature view metadata object.


[source]

create_on_demand_feature_group#

FeatureStore.create_on_demand_feature_group(
    name,
    storage_connector,
    query=None,
    data_format=None,
    path="",
    options={},
    version=None,
    description="",
    primary_key=[],
    features=[],
    statistics_config=None,
    event_time=None,
    expectation_suite=None,
    topic_name=None,
)

Create a external feature group metadata object.

Deprecated

create_on_demand_feature_group method is deprecated. Use the create_external_feature_group method instead.

Lazy

This method is lazy and does not persist any metadata in the feature store on its own. To persist the feature group metadata in the feature store, call the save() method.

Arguments

  • name str: Name of the external feature group to create.
  • storage_connector hsfs.StorageConnector: the storage connector to use to establish connectivity with the data source.
  • query Optional[str]: A string containing a SQL query valid for the target data source. the query will be used to pull data from the data sources when the feature group is used.
  • data_format Optional[str]: If the external feature groups refers to a directory with data, the data format to use when reading it
  • path Optional[str]: The location within the scope of the storage connector, from where to read the data for the external feature group
  • options Optional[Dict[str, str]]: Additional options to be used by the engine when reading data from the specified storage connector. For example, {"header": True} when reading CSV files with column names in the first row.
  • version Optional[int]: Version of the external feature group to retrieve, defaults to None and will create the feature group with incremented version from the last version in the feature store.
  • description Optional[str]: A string describing the contents of the external feature group to improve discoverability for Data Scientists, defaults to empty string "".
  • primary_key Optional[List[str]]: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list [], and the feature group won't have any primary key.
  • features Optional[List[hsfs.feature.Feature]]: Optionally, define the schema of the external feature group manually as a list of Feature objects. Defaults to empty list [] and will use the schema information of the DataFrame resulting by executing the provided query against the data source.
  • statistics_config Optional[Union[hsfs.StatisticsConfig, bool, dict]]: A configuration object, or a dictionary with keys "enabled" to generally enable descriptive statistics computation for this external feature group, "correlations" to turn on feature correlation computation, "histograms" to compute feature value frequencies and "exact_uniqueness" to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation pass statistics_config=False. Defaults to None and will compute only descriptive statistics.
  • event_time Optional[str]: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults to None.
  • topic_name Optional[str]: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic.

    Event time data type restriction

    The supported data types for the event time column are: timestamp, date and bigint.

    • expectation_suite: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults to None.

Returns

ExternalFeatureGroup. The external feature group metadata object.


[source]

create_training_dataset#

FeatureStore.create_training_dataset(
    name,
    version=None,
    description="",
    data_format="tfrecords",
    coalesce=False,
    storage_connector=None,
    splits={},
    location="",
    seed=None,
    statistics_config=None,
    label=[],
    transformation_functions={},
    train_split=None,
)

Create a training dataset metadata object.

Deprecated

TrainingDataset is deprecated, use FeatureView instead. From version 3.0 training datasets created with this API are not visibile in the API anymore.

Lazy

This method is lazy and does not persist any metadata or feature data in the feature store on its own. To materialize the training dataset and save feature data along the metadata in the feature store, call the save() method with a DataFrame or Query.

Data Formats

The feature store currently supports the following data formats for training datasets:

  1. tfrecord
  2. csv
  3. tsv
  4. parquet
  5. avro
  6. orc

Currently not supported petastorm, hdf5 and npy file formats.

Arguments

  • name str: Name of the training dataset to create.
  • version Optional[int]: Version of the training dataset to retrieve, defaults to None and will create the training dataset with incremented version from the last version in the feature store.
  • description Optional[str]: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string "".
  • data_format Optional[str]: The data format used to save the training dataset, defaults to "tfrecords"-format.
  • coalesce Optional[bool]: If true the training dataset data will be coalesced into a single partition before writing. The resulting training dataset will be a single file per split. Default False.
  • storage_connector Optional[hsfs.StorageConnector]: Storage connector defining the sink location for the training dataset, defaults to None, and materializes training dataset on HopsFS.
  • splits Optional[Dict[str, float]]: A dictionary defining training dataset splits to be created. Keys in the dictionary define the name of the split as str, values represent percentage of samples in the split as float. Currently, only random splits are supported. Defaults to empty dict{}, creating only a single training dataset without splits.
  • location Optional[str]: Path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset. Defaults to "", saving the training dataset at the root defined by the storage connector.
  • seed Optional[int]: Optionally, define a seed to create the random splits with, in order to guarantee reproducability, defaults to None.
  • statistics_config Optional[Union[hsfs.StatisticsConfig, bool, dict]]: A configuration object, or a dictionary with keys "enabled" to generally enable descriptive statistics computation for this feature group, "correlations" to turn on feature correlation computation and "histograms" to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation pass statistics_config=False. Defaults to None and will compute only descriptive statistics.
  • label Optional[List[str]]: A list of feature names constituting the prediction label/feature of the training dataset. When replaying a Query during model inference, the label features can be omitted from the feature vector retrieval. Defaults to [], no label.
  • transformation_functions Optional[Dict[str, hsfs.transformation_function.TransformationFunction]]: A dictionary mapping tansformation functions to to the features they should be applied to before writing out the training data and at inference time. Defaults to {}, no transformations.
  • train_split Optional[str]: If splits is set, provide the name of the split that is going to be used for training. The statistics of this split will be used for transformation functions if necessary. Defaults to None.

Returns:

TrainingDataset: The training dataset metadata object.


[source]

create_transformation_function#

FeatureStore.create_transformation_function(transformation_function, output_type, version=None)

Create a transformation function metadata object.

Example

# define function
def plus_one(value):
    return value + 1

# create transformation function
plus_one_meta = fs.create_transformation_function(
        transformation_function=plus_one,
        output_type=int,
        version=1
    )

# persist transformation function in backend
plus_one_meta.save()

Lazy

This method is lazy and does not persist the transformation function in the feature store on its own. To materialize the transformation function and save call the save() method of the transformation function metadata object.

Arguments

  • transformation_function callable: callable object.
  • output_type Union[str, bytes, int, numpy.int8, numpy.int16, numpy.int32, numpy.int64, float, numpy.float64, datetime.datetime, numpy.datetime64, datetime.date, bool]: python or numpy output type that will be inferred as pyspark.sql.types type.

Returns:

TransformationFunction: The TransformationFunction metadata object.


[source]

from_response_json#

FeatureStore.from_response_json(json_dict)

[source]

get_external_feature_group#

FeatureStore.get_external_feature_group(name, version=None)

Get a external feature group entity from the feature store.

Getting a external feature group from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame or use the Query-API to perform joins between feature groups.

Example

# connect to the Feature Store
fs = ...

external_fg = fs.get_external_feature_group("external_fg_test")

Arguments

  • name str: Name of the external feature group to get.
  • version Optional[int]: Version of the external feature group to retrieve, defaults to None and will return the version=1.

Returns

ExternalFeatureGroup: The external feature group metadata object.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature group from the feature store.

[source]

get_external_feature_groups#

FeatureStore.get_external_feature_groups(name)

Get a list of all versions of an external feature group entity from the feature store.

Getting a external feature group from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame or use the Query-API to perform joins between feature groups.

Example

# connect to the Feature Store
fs = ...

external_fgs_list = fs.get_external_feature_groups("external_fg_test")

Arguments

  • name str: Name of the external feature group to get.

Returns

ExternalFeatureGroup: List of external feature group metadata objects.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature group from the feature store.

[source]

get_feature_group#

FeatureStore.get_feature_group(name, version=None)

Get a feature group entity from the feature store.

Getting a feature group from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame or use the Query-API to perform joins between feature groups.

Example

# connect to the Feature Store
fs = ...

fg = fs.get_feature_group(
        name="electricity_prices",
        version=1,
    )

Arguments

  • name str: Name of the feature group to get.
  • version Optional[int]: Version of the feature group to retrieve, defaults to None and will return the version=1.

Returns

FeatureGroup: The feature group metadata object.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature group from the feature store.

[source]

get_feature_groups#

FeatureStore.get_feature_groups(name)

Get a list of all versions of a feature group entity from the feature store.

Getting a feature group from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame or use the Query-API to perform joins between feature groups.

Example

# connect to the Feature Store
fs = ...

fgs_list = fs.get_feature_groups(
        name="electricity_prices"
    )

Arguments

  • name str: Name of the feature group to get.

Returns

FeatureGroup: List of feature group metadata objects.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature group from the feature store.

[source]

get_feature_view#

FeatureStore.get_feature_view(name, version=None)

Get a feature view entity from the feature store.

Getting a feature view from the Feature Store means getting its metadata.

Example

# get feature store instance
fs = ...

# get feature view instance
feature_view = fs.get_feature_view(
    name='feature_view_name',
    version=1
)

Arguments

  • name str: Name of the feature view to get.
  • version Optional[int]: Version of the feature view to retrieve, defaults to None and will return the version=1.

Returns

FeatureView: The feature view metadata object.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature view from the feature store.

[source]

get_feature_views#

FeatureStore.get_feature_views(name)

Get a list of all versions of a feature view entity from the feature store.

Getting a feature view from the Feature Store means getting its metadata.

Example

# get feature store instance
fs = ...

# get a list of all versions of a feature view
feature_view = fs.get_feature_views(
    name='feature_view_name'
)

Arguments

  • name: Name of the feature view to get.

Returns

FeatureView: List of feature view metadata objects.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature view from the feature store.

[source]

get_on_demand_feature_group#

FeatureStore.get_on_demand_feature_group(name, version=None)

Get a external feature group entity from the feature store.

Deprecated

get_on_demand_feature_group method is deprecated. Use the get_external_feature_group method instead.

Getting a external feature group from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame or use the Query-API to perform joins between feature groups.

Arguments

  • name str: Name of the external feature group to get.
  • version Optional[int]: Version of the external feature group to retrieve, defaults to None and will return the version=1.

Returns

ExternalFeatureGroup: The external feature group metadata object.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature group from the feature store.

[source]

get_on_demand_feature_groups#

FeatureStore.get_on_demand_feature_groups(name)

Get a list of all versions of an external feature group entity from the feature store.

Deprecated

get_on_demand_feature_groups method is deprecated. Use the get_external_feature_groups method instead.

Getting a external feature group from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame or use the Query-API to perform joins between feature groups.

Arguments

  • name str: Name of the external feature group to get.

Returns

ExternalFeatureGroup: List of external feature group metadata objects.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature group from the feature store.

[source]

get_online_storage_connector#

FeatureStore.get_online_storage_connector()

Get the storage connector for the Online Feature Store of the respective project's feature store.

The returned storage connector depends on the project that you are connected to.

Example

# connect to the Feature Store
fs = ...

online_storage_connector = fs.get_online_storage_connector()

Returns

StorageConnector. JDBC storage connector to the Online Feature Store.


[source]

get_or_create_feature_group#

FeatureStore.get_or_create_feature_group(
    name,
    version,
    description="",
    online_enabled=False,
    time_travel_format="HUDI",
    partition_key=[],
    primary_key=[],
    hudi_precombine_key=None,
    features=[],
    statistics_config=None,
    expectation_suite=None,
    event_time=None,
    stream=False,
    parents=[],
    topic_name=None,
)

Get feature group metadata object or create a new one if it doesn't exist. This method doesn't update existing feature group metadata object.

Example

# connect to the Feature Store
fs = ...

fg = fs.get_or_create_feature_group(
        name="electricity_prices",
        version=1,
        description="Electricity prices from NORD POOL",
        primary_key=["day", "area"],
        online_enabled=True,
        event_time="timestamp",
        )

Lazy

This method is lazy and does not persist any metadata or feature data in the feature store on its own. To persist the feature group and save feature data along the metadata in the feature store, call the insert() method with a DataFrame.

Arguments

  • name str: Name of the feature group to create.
  • version int: Version of the feature group to retrieve or create.
  • description Optional[str]: A string describing the contents of the feature group to improve discoverability for Data Scientists, defaults to empty string "".
  • online_enabled Optional[bool]: Define whether the feature group should be made available also in the online feature store for low latency access, defaults to False.
  • time_travel_format Optional[str]: Format used for time travel, defaults to "HUDI".
  • partition_key Optional[List[str]]: A list of feature names to be used as partition key when writing the feature data to the offline storage, defaults to empty list [].
  • primary_key Optional[List[str]]: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list [], and the feature group won't have any primary key.
  • hudi_precombine_key Optional[str]: A feature name to be used as a precombine key for the "HUDI" feature group. Defaults to None. If feature group has time travel format "HUDI" and hudi precombine key was not specified then the first primary key of the feature group will be used as hudi precombine key.
  • features Optional[List[hsfs.feature.Feature]]: Optionally, define the schema of the feature group manually as a list of Feature objects. Defaults to empty list [] and will use the schema information of the DataFrame provided in the save method.
  • statistics_config Optional[Union[hsfs.StatisticsConfig, bool, dict]]: A configuration object, or a dictionary with keys "enabled" to generally enable descriptive statistics computation for this feature group, "correlations" to turn on feature correlation computation, "histograms" to compute feature value frequencies and "exact_uniqueness" to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation pass statistics_config=False. Defaults to None and will compute only descriptive statistics.
  • expectation_suite Optional[Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]]: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults to None.
  • event_time Optional[str]: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults to None.

    Event time data type restriction

    The supported data types for the event time column are: timestamp, date and bigint.

    • stream: Optionally, Define whether the feature group should support real time stream writing capabilities. Stream enabled Feature Groups have unified single API for writing streaming features transparently to both online and offline store.
    • parents Optional[List[hsfs.feature_group.FeatureGroup]]: Optionally, Define the parents of this feature group as the origin where the data is coming from.
    • topic_name Optional[str]: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic.

Returns

FeatureGroup. The feature group metadata object.


[source]

get_or_create_feature_view#

FeatureStore.get_or_create_feature_view(
    name, query, version, description="", labels=[], transformation_functions={}
)

Get feature view metadata object or create a new one if it doesn't exist. This method doesn't update existing feature view metadata object.

Example

# connect to the Feature Store
fs = ...

feature_view = fs.get_or_create_feature_view(
    name='bitcoin_feature_view',
    version=1,
    transformation_functions=transformation_functions,
    query=query
)

Arguments

  • name str: Name of the feature view to create.
  • query hsfs.constructor.query.Query: Feature store Query.
  • version int: Version of the feature view to create.
  • description Optional[str]: A string describing the contents of the feature view to improve discoverability for Data Scientists, defaults to empty string "".
  • labels Optional[List[str]]: A list of feature names constituting the prediction label/feature of the feature view. When replaying a Query during model inference, the label features can be omitted from the feature vector retrieval. Defaults to [], no label.
  • transformation_functions Optional[Dict[str, hsfs.transformation_function.TransformationFunction]]: A dictionary mapping tansformation functions to to the features they should be applied to before writing out the vector and at inference time. Defaults to {}, no transformations.

Returns:

FeatureView: The feature view metadata object.


[source]

get_or_create_spine_group#

FeatureStore.get_or_create_spine_group(
    name, version=None, description="", primary_key=[], event_time=None, features=[], dataframe=None
)

Create a spine group metadata object.

Instead of using a feature group to save a label/prediction target, you can use a spine together with a dataframe containing the labels. A Spine is essentially a metadata object similar to a feature group, however, the data is not materialized in the feature store. It only containes the needed metadata such as the relevant event time column and primary key columns to perform point-in-time correct joins.

Example

# connect to the Feature Store
fs = ...

spine_df = pd.Dataframe()

spine_group = fs.get_or_create_spine_group(
                    name="sales",
                    version=1,
                    description="Physical shop sales features",
                    primary_key=['ss_store_sk'],
                    event_time='sale_date',
                    dataframe=spine_df
                    )

Note that you can inspect the dataframe in the spine group, or replace the dataframe:

spine_group.dataframe.show()

spine_group.dataframe = new_df

The spine can then be used to construct queries, with only one speciality:

Note

Spines can only be used on the left side of a feature join, as this is the base set of entities for which features are to be fetched and the left side of the join determines the event timestamps to compare against.

If you want to use the query for a feature view to be used for online serving, you can only select the label or target feature from the spine. For the online lookup, the label is not required, therefore it is important to only select label from the left feature group, so that we don't need to provide a spine for online serving.

These queries can then be used to create feature views. Since the dataframe contained in the spine is not being materialized, every time you use a feature view created with spine to read data you will have to provide a dataframe with the same structure again.

For example, to generate training data:

X_train, X_test, y_train, y_test = feature_view_spine.train_test_split(0.2, spine=training_data_entities)

Or to get batches of fresh data for batch scoring:

feature_view_spine.get_batch_data(spine=scoring_entities_df).show()

Here you have the chance to pass a different set of entities to generate the training dataset.

Sometimes it might be handy to create a feature view with a regular feature group containing the label, but then at serving time to use a spine in order to fetch features for example only for a small set of primary key values. To do this, you can pass the spine group instead of a dataframe. Just make sure it contains the needed primary key, event time and label column.

feature_view.get_batch_data(spine=spine_group)

Arguments

  • name str: Name of the spine group to create.
  • version Optional[int]: Version of the spine group to retrieve, defaults to None and will create the spine group with incremented version from the last version in the feature store.
  • description Optional[str]: A string describing the contents of the spine group to improve discoverability for Data Scientists, defaults to empty string "".
  • primary_key Optional[List[str]]: A list of feature names to be used as primary key for the spine group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list [], and the spine group won't have any primary key.
  • event_time Optional[str]: Optionally, provide the name of the feature containing the event time for the features in this spine group. If event_time is set the spine group can be used for point-in-time joins. Defaults to None.
  • features Optional[List[hsfs.feature.Feature]]: Optionally, define the schema of the spine group manually as a list of Feature objects. Defaults to empty list [] and will use the schema information of the DataFrame resulting by executing the provided query against the data source.

Event time data type restriction

The supported data types for the event time column are: timestamp, date and bigint.

  • dataframe Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]]: DataFrame, RDD, Ndarray, list. Spine dataframe with primary key, event time and label column to use for point in time join when fetching features.

Returns

SpineGroup. The spine group metadata object.


[source]

get_storage_connector#

FeatureStore.get_storage_connector(name)

Get a previously created storage connector from the feature store.

Storage connectors encapsulate all information needed for the execution engine to read and write to specific storage. This storage can be S3, a JDBC compliant database or the distributed filesystem HOPSFS.

If you want to connect to the online feature store, see the get_online_storage_connector method to get the JDBC connector for the Online Feature Store.

Example

# connect to the Feature Store
fs = ...

sc = fs.get_storage_connector("demo_fs_meb10000_Training_Datasets")

Arguments

  • name str: Name of the storage connector to retrieve.

Returns

StorageConnector. Storage connector object.


[source]

get_training_dataset#

FeatureStore.get_training_dataset(name, version=None)

Get a training dataset entity from the feature store.

Deprecated

TrainingDataset is deprecated, use FeatureView instead. You can still retrieve old training datasets using this method, but after upgrading the old training datasets will also be available under a Feature View with the same name and version.

It is recommended to use this method only for old training datasets that have been created directly from Dataframes and not with Query objects.

Getting a training dataset from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame.

Arguments

  • name str: Name of the training dataset to get.
  • version Optional[int]: Version of the training dataset to retrieve, defaults to None and will return the version=1.

Returns

TrainingDataset: The training dataset metadata object.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature group from the feature store.

[source]

get_training_datasets#

FeatureStore.get_training_datasets(name)

Get a list of all versions of a training dataset entity from the feature store.

Deprecated

TrainingDataset is deprecated, use FeatureView instead.

Getting a training dataset from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame.

Arguments

  • name str: Name of the training dataset to get.

Returns

TrainingDataset: List of training dataset metadata objects.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature group from the feature store.

[source]

get_transformation_function#

FeatureStore.get_transformation_function(name, version=None)

Get transformation function metadata object.

Get transformation function by name. This will default to version 1

# get feature store instance
fs = ...

# get transformation function metadata object
plus_one_fn = fs.get_transformation_function(name="plus_one")

Get built-in transformation function min max scaler

# get feature store instance
fs = ...

# get transformation function metadata object
min_max_scaler_fn = fs.get_transformation_function(name="min_max_scaler")

Get transformation function by name and version

# get feature store instance
fs = ...

# get transformation function metadata object
min_max_scaler = fs.get_transformation_function(name="min_max_scaler", version=2)

You can define in the feature view transformation functions as dict, where key is feature name and value is online transformation function instance. Then the transformation functions are applied when you read training data, get batch data, or get feature vector(s).

Attach transformation functions to the feature view

# get feature store instance
fs = ...

# define query object
query = ...

# get transformation function metadata object
min_max_scaler = fs.get_transformation_function(name="min_max_scaler", version=1)

# attach transformation functions
feature_view = fs.create_feature_view(
    name='feature_view_name',
    query=query,
    labels=["target_column"],
    transformation_functions={
        "column_to_transform": min_max_scaler
    }
)

Built-in transformation functions are attached in the same way. The only difference is that it will compute the necessary statistics for the specific function in the background. For example min and max values for min_max_scaler; mean and standard deviation for standard_scaler etc.

Attach built-in transformation functions to the feature view

# get feature store instance
fs = ...

# define query object
query = ...

# retrieve transformation functions
min_max_scaler = fs.get_transformation_function(name="min_max_scaler")
standard_scaler = fs.get_transformation_function(name="standard_scaler")
robust_scaler = fs.get_transformation_function(name="robust_scaler")
label_encoder = fs.get_transformation_function(name="label_encoder")

# attach built-in transformation functions while creating feature view
feature_view = fs.create_feature_view(
    name='transactions_view',
    query=query,
    labels=["fraud_label"],
    transformation_functions = {
        "category_column": label_encoder,
        "weight": robust_scaler,
        "age": min_max_scaler,
        "salary": standard_scaler
    }
)

Arguments

  • name str: name of transformation function.
  • version Optional[int]: version of transformation function. Optional, if not provided all functions that match to provided name will be retrieved.

Returns:

TransformationFunction: The TransformationFunction metadata object.


[source]

get_transformation_functions#

FeatureStore.get_transformation_functions()

Get all transformation functions metadata objects.

Get all transformation functions

# get feature store instance
fs = ...

# get all transformation functions
list_transformation_fns = fs.get_transformation_functions()

Returns:

List[TransformationFunction]. List of transformation function instances.


[source]

sql#

FeatureStore.sql(query, dataframe_type="default", online=False, read_options={})

Execute SQL command on the offline or online feature store database

Example

# connect to the Feature Store
fs = ...

# construct the query and show head rows
query_res_head = fs.sql("SELECT * FROM `fg_1`").head()

Arguments

  • query str: The SQL query to execute.
  • dataframe_type Optional[str]: The type of the returned dataframe. Defaults to "default". which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Hive engine.
  • online Optional[bool]: Set to true to execute the query against the online feature store. Defaults to False.
  • read_options Optional[dict]: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. For python engine:
    • key "hive_config" to pass a dictionary of hive or tez configurations. For example: {"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}} If running queries on the online feature store, users can provide an entry {'external': True}, this instructs the library to use the host parameter in the hsfs.connection() to establish the connection to the online feature store. If not set, or set to False, the online feature store storage connector is used which relies on the private ip. Defaults to {}.

Returns

DataFrame: DataFrame depending on the chosen type.