Skip to content

On-Demand (External) Feature Groups#

On-demand (External) Feature Groups are Feature Groups for which the data is stored on an external storage system (e.g. Data Warehouse, S3, ADLS). From an API perspective, on-demand feature groups can be used in the same way as regular feature groups. Users can pick features from on-demand feature groups to create training datasets. On-demand feature groups can be also used as data source to create derived features, meaning features on which additional feature engineering is applied.

On-demand feature groups rely on Storage Connectors to identify the location and to authenticate with the external storage. When the on-demand feature group is defined on top of an external database capabale of running SQL statements (i.e. when using the JDBC, Redshift or Snowflake connectors), the on-demand feature group needs to be defined as a SQL statement. SQL statements can contain feature engineering transformations, when reading the on-demand feature group, the SQL statement is pushed down to the storage for execution.

Define a SQL based on-demand feature group

# Retrieve the storage connector defined before
redshift_conn = fs.get_storage_connector("telco_redshift_cluster")
telco_on_dmd = fs.create_on_demand_feature_group(name="telco_redshift",
                                        version=1,
                                        query="select * from telco",
                                        description="On-demand feature group for telecom customer data",
                                        storage_connector=redshift_conn,
                                        statistics_config=True)
telco_on_dmd.save()

Connecting from Hopsworks

val redshiftConn = fs.getRedshiftConnector("telco_redshift_cluster")
val telcoOnDmd = (fs.createOnDemandFeatureGroup()
            .name("telco_redshift_scala")
            .version(2)
            .query("select * from telco")
            .description("On-demand feature group for telecom customer data")
            .storageConnector(redshiftConn)
            .statisticsEnabled(true)
            .build())
telcoOnDmd.save()

When defining an on-demand feature group on top of a object store/external filesystem (i.e. when using the S3 or the ADLS connector) the underlying data is required to have a schema. The underlying data can be stored in ORC, Parquet, Delta, Hudi or Avro, and the schema for the feature group will be extracted by the files metadata.

Define a SQL based on-demand feature group

# Retrieve the storage connector defined before
s3_conn = fs.get_storage_connector("telco_s3_bucket")
telco_on_dmd = fs.create_on_demand_feature_group(name="telco_s3",
                                        version=1,
                                        data_format="parquet",
                                        description="On-demand feature group for telecom customer data",
                                        storage_connector=s3_conn,
                                        statistics_config=True)
telco_on_dmd.save()

Connecting from Hopsworks

val s3Conn = fs.getS3Connector("telco_s3_bucket")
val telcoOnDmd = (fs.createOnDemandFeatureGroup()
            .name("telco_s3")
            .version(1)
            .dataFormat(OnDemandDataFormat.PARQUET)
            .description("On-demand feature group for telecom customer data")
            .storageConnector(s3Conn)
            .statisticsEnabled(true)
            .build())
telcoOnDmd.save()

Use cases#

There are two use cases in which a user can benefit from on-demand feature groups:

  • Existing feature engineering pipelines: in case users have recently migrated to Hopsworks Feature Store and they have existing feature engineering pipelines in production. Users can register the output of the existing pipelines as on-demand feature groups in Hopsworks, and immediately use their features to build training datasets. With on-demand feature groups, users do not have to modify the existing pipelines to write to the Hopsworks Feature Store.

  • Data Ingestion: on-demand feature groups can be used as a data source. The benefit of using on-demand feature groups to ingest data from external sources is that the Hopsworks Feature Store keeps track of where the data is located and how to authenticate with the external storage system. In addition to that, the Hopsworks Feature Store tracks also the schema of the underlying data and will make sure that, if something changes in the underlying schema, the ingestion pipeline fails with a clear error.

Limitations#

Hopsworks Feature Store does not support time-travel capabilities for on-demand feature groups. Moreover, as the data resides on external systems, on-demand feature groups cannot be made available online for low latency serving. To make data from an on-demand feature group available online, users need to define an online enabled feature group and hava a job that periodically reads data from the on-demand feature group and writes in the online feature group.

Python support

Currently the HSFS library does not support calling the read() or show() methods on on-demand feature groups. Likewise it is not possibile to call the read() or show() methods on queries containing on-demand feature groups. Nevertheless, on-demand feature groups can be used from a Python engine to create training datasets.

Creation#

[source]

create_on_demand_feature_group#

FeatureStore.create_on_demand_feature_group(
    name,
    storage_connector,
    query=None,
    data_format=None,
    path="",
    options={},
    version=None,
    description="",
    primary_key=[],
    features=[],
    statistics_config=None,
    event_time=None,
    validation_type="NONE",
    expectations=[],
)

Create a on-demand feature group metadata object.

Lazy

This method is lazy and does not persist any metadata in the feature store on its own. To persist the feature group metadata in the feature store, call the save() method.

Arguments

  • name str: Name of the on-demand feature group to create.
  • query Optional[str]: A string containing a SQL query valid for the target data source. the query will be used to pull data from the data sources when the feature group is used.
  • data_format Optional[str]: If the on-demand feature groups refers to a directory with data, the data format to use when reading it
  • path Optional[str]: The location within the scope of the storage connector, from where to read the data for the on-demand feature group
  • storage_connector hsfs.StorageConnector: the storage connector to use to establish connectivity with the data source.
  • version Optional[int]: Version of the on-demand feature group to retrieve, defaults to None and will create the feature group with incremented version from the last version in the feature store.
  • description Optional[str]: A string describing the contents of the on-demand feature group to improve discoverability for Data Scientists, defaults to empty string "".
  • primary_key Optional[List[str]]: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list [], and the feature group won't have any primary key.
  • features Optional[List[hsfs.feature.Feature]]: Optionally, define the schema of the on-demand feature group manually as a list of Feature objects. Defaults to empty list [] and will use the schema information of the DataFrame resulting by executing the provided query against the data source.
  • statistics_config Optional[Union[hsfs.StatisticsConfig, bool, dict]]: A configuration object, or a dictionary with keys "enabled" to generally enable descriptive statistics computation for this on-demand feature group, "correlations" to turn on feature correlation computation, "histograms" to compute feature value frequencies and "exact_uniqueness" to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation pass statistics_config=False. Defaults to None and will compute only descriptive statistics.
  • event_time Optional[str]: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults to None.
  • validation_type Optional[str]: Optionally, set the validation type to one of "NONE", "STRICT", "WARNING", "ALL". Determines the mode in which data validation is applied on ingested or already existing feature group data.
  • expectations Optional[List[hsfs.expectation.Expectation]]: Optionally, a list of expectations to be attached to the feature group. The expectations list contains Expectation metadata objects which can be retrieved with the get_expectation() and get_expectations() functions.

Returns

OnDemandFeatureGroup. The on-demand feature group metadata object.


Retrieval#

[source]

get_on_demand_feature_group#

FeatureStore.get_on_demand_feature_group(name, version=None)

Get a on-demand feature group entity from the feature store.

Getting a on-demand feature group from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame or use the Query-API to perform joins between feature groups.

Arguments

  • name str: Name of the on-demand feature group to get.
  • version Optional[int]: Version of the on-demand feature group to retrieve, defaults to None and will return the version=1.

Returns

OnDemandFeatureGroup: The on-demand feature group metadata object.

Raises

  • RestAPIError: If unable to retrieve feature group from the feature store.

Properties#

[source]

created#


[source]

creator#


[source]

data_format#


[source]

description#


[source]

event_time#

Event time feature in the feature group.


[source]

expectations_names#

The names of expectations attached to this feature group.


[source]

features#


[source]

id#


[source]

location#


[source]

name#


[source]

options#


[source]

path#


[source]

primary_key#

List of features building the primary key.


[source]

query#


[source]

statistics#

Get the latest computed statistics for the feature group.


[source]

statistics_config#

Statistics configuration object defining the settings for statistics computation of the feature group.


[source]

storage_connector#


[source]

validation_type#

Validation type, one of "STRICT", "WARNING", "ALL", "NONE".


[source]

version#


Methods#

[source]

add_tag#

OnDemandFeatureGroup.add_tag(name, value)

Attach a tag to a feature group.

A tag consists of a pair. Tag names are unique identifiers across the whole cluster. The value of a tag can be any valid json - primitives, arrays or json objects.

Arguments

  • name str: Name of the tag to be added.
  • value: Value of the tag to be added.

Raises

RestAPIError in case the backend fails to add the tag.


[source]

append_features#

OnDemandFeatureGroup.append_features(features)

Append features to the schema of the feature group.

Safe append

This method appends the features to the feature group description safely. In case of failure your local metadata object will contain the correct schema.

It is only possible to append features to a feature group. Removing features is considered a breaking change.

Arguments

  • features Union[hsfs.feature.Feature, List[hsfs.feature.Feature]]: Feature or list. A feature object or list thereof to append to the schema of the feature group.

Returns

FeatureGroup. The updated feature group object.


[source]

attach_expectation#

OnDemandFeatureGroup.attach_expectation(expectation)

Attach a feature group expectation. If feature group validation is not already enabled, it will be enabled and set to the stricter setting.

Arguments

  • name: The expectation name.

Returns

Expectation. The expectation metadata object.


[source]

compute_statistics#

OnDemandFeatureGroup.compute_statistics()

Recompute the statistics for the feature group and save them to the feature store. Statistics are only computed for data in the offline storage of the feature group. Returns

Statistics. The statistics metadata object.

Raises

RestAPIError. Unable to persist the statistics.


[source]

delete#

OnDemandFeatureGroup.delete()

Drop the entire feature group along with its feature data.

Potentially dangerous operation

This operation drops all metadata associated with this version of the feature group and all the feature data in offline and online storage associated with it.

Raises

RestAPIError.


[source]

delete_tag#

OnDemandFeatureGroup.delete_tag(name)

Delete a tag attached to a feature group.

Arguments

  • name str: Name of the tag to be removed.

Raises

RestAPIError in case the backend fails to delete the tag.


[source]

detach_expectation#

OnDemandFeatureGroup.detach_expectation(expectation)

Remove an expectation from a feature group.

Arguments

  • name: The expectation name.

Returns

Expectation. The expectation metadata object.


[source]

filter#

OnDemandFeatureGroup.filter(f)

Apply filter to the feature group.

Selects all features and returns the resulting Query with the applied filter.

from hsfs.feature import Feature

fg.filter(Feature("weekly_sales") > 1000)

If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:

fg.filter(fg.feature1 == 1).show(10)

Composite filters require parenthesis:

fg.filter((fg.feature1 == 1) | (fg.feature2 >= 2))

Arguments

  • f Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]: Filter object.

Returns

Query. The query object with the applied filter.


[source]

get_expectation#

OnDemandFeatureGroup.get_expectation(name)

Get attached expectation by name for this feature group. Name is unique across a feature store.

Arguments

  • name str: The expectation name.

Returns

Expectation. The expectation metadata object.


[source]

get_expectations#

OnDemandFeatureGroup.get_expectations()

Get all feature group expectations.

Arguments

  • name: The expectation name.

Returns

Expectation. A list of expectation metadata objects.


[source]

get_feature#

OnDemandFeatureGroup.get_feature(name)

Retrieve a Feature object from the schema of the feature group.

There are several ways to access features of a feature group:

fg.feature1
fg["feature1"]
fg.get_feature("feature1")

Note

Attribute access to features works only for non-reserved names. For example features named id or name will not be accessible via fg.name, instead this will return the name of the feature group itself. Fall back on using the get_feature method.

Args: name (str): [description]

Returns: [type]: [description]


[source]

get_statistics#

OnDemandFeatureGroup.get_statistics(commit_time=None)

Returns the statistics for this feature group at a specific time.

If commit_time is None, the most recent statistics are returned.

Arguments

  • commit_time Optional[str]: Commit time in the format YYYYMMDDhhmmss, defaults to None.

Returns

Statistics. Statistics object.

Raises

RestAPIError.


[source]

get_tag#

OnDemandFeatureGroup.get_tag(name)

Get the tags of a feature group.

Arguments

  • name str: Name of the tag to get.

Returns

tag value

Raises

RestAPIError in case the backend fails to retrieve the tag.


[source]

get_tags#

OnDemandFeatureGroup.get_tags()

Retrieves all tags attached to a feature group.

Returns

Dict[str, obj] of tags.

Raises

RestAPIError in case the backend fails to retrieve the tags.


[source]

get_validations#

OnDemandFeatureGroup.get_validations(validation_time=None, commit_time=None)

Get feature group data validation results based on the attached expectations.

Arguments

  • validation_time: The data validation time, when the data validation started. commit_time: The commit time of a time travel enabled feature group.

Returns

FeatureGroupValidation. The feature group validation metadata object.


[source]

read#

OnDemandFeatureGroup.read(dataframe_type="default")

Get the feature group as a DataFrame.


[source]

save#

OnDemandFeatureGroup.save()

[source]

select#

OnDemandFeatureGroup.select(features=[])

Select a subset of features of the feature group and return a query object.

The query can be used to construct joins of feature groups or create a training dataset with a subset of features of the feature group.

Arguments

  • features List[Union[str, hsfs.feature.Feature]]: list, optional. A list of Feature objects or feature names as strings to be selected, defaults to [].

Returns

Query: A query object with the selected features of the feature group.


[source]

select_all#

OnDemandFeatureGroup.select_all()

Select all features in the feature group and return a query object.

The query can be used to construct joins of feature groups or create a training dataset immediately.

Returns

Query. A query object with all features of the feature group.


[source]

select_except#

OnDemandFeatureGroup.select_except(features=[])

Select all features of the feature group except a few and return a query object.

The query can be used to construct joins of feature groups or create a training dataset with a subset of features of the feature group.

Arguments

  • features List[Union[str, hsfs.feature.Feature]]: list, optional. A list of Feature objects or feature names as strings to be excluded from the selection. Defaults to [], selecting all features.

Returns

Query: A query object with the selected features of the feature group.


[source]

show#

OnDemandFeatureGroup.show(n)

Show the first n rows of the feature group.


[source]

update_description#

OnDemandFeatureGroup.update_description(description)

Update the description of the feature group.

Safe update

This method updates the feature group description safely. In case of failure your local metadata object will keep the old description.

Arguments

  • description str: New description string.

Returns

FeatureGroup. The updated feature group object.


[source]

update_feature_description#

OnDemandFeatureGroup.update_feature_description(feature_name, description)

Update the description of a single feature in this feature group.

Safe update

This method updates the feature description safely. In case of failure your local metadata object will keep the old description.

Arguments

  • feature_name str: Name of the feature to be updated.
  • description str: New description string.

Returns

FeatureGroup. The updated feature group object.


[source]

update_features#

OnDemandFeatureGroup.update_features(features)

Update metadata of features in this feature group.

Currently it's only supported to update the description of a feature.

Unsafe update

Note that if you use an existing Feature object of the schema in the feature group metadata object, this might leave your metadata object in a corrupted state if the update fails.

Arguments

  • features Union[hsfs.feature.Feature, List[hsfs.feature.Feature]]: Feature or list of features. A feature object or list thereof to be updated.

Returns

FeatureGroup. The updated feature group object.


[source]

update_statistics_config#

OnDemandFeatureGroup.update_statistics_config()

Update the statistics configuration of the feature group.

Change the statistics_config object and persist the changes by calling this method.

Returns

FeatureGroup. The updated metadata object of the feature group.

Raises

RestAPIError.


[source]

validate#

OnDemandFeatureGroup.validate()

Run validation based on the attached expectations

Returns

FeatureGroupValidation. The feature group validation metadata object.