Skip to content

FeatureGroup#

[source]

FeatureGroup#

hsfs.feature_group.FeatureGroup(
    name,
    version,
    featurestore_id,
    description="",
    partition_key=None,
    primary_key=None,
    hudi_precombine_key=None,
    featurestore_name=None,
    created=None,
    creator=None,
    id=None,
    features=None,
    location=None,
    jobs=None,
    online_enabled=False,
    time_travel_format=None,
    statistics_config=None,
)

Creation#

[source]

create_feature_group#

FeatureStore.create_feature_group(
    name,
    version=None,
    description="",
    online_enabled=False,
    time_travel_format="HUDI",
    partition_key=[],
    primary_key=[],
    hudi_precombine_key=None,
    features=[],
    statistics_config=None,
)

Create a feature group metadata object.

Lazy

This method is lazy and does not persist any metadata or feature data in the feature store on its own. To persist the feature group and save feature data along the metadata in the feature store, call the save() method with a DataFrame.

Arguments

  • name str: Name of the feature group to create.
  • version Optional[int]: Version of the feature group to retrieve, defaults to None and will create the feature group with incremented version from the last version in the feature store.
  • description Optional[str]: A string describing the contents of the feature group to improve discoverability for Data Scientists, defaults to empty string "".
  • online_enabled Optional[bool]: Define whether the feature group should be made available also in the online feature store for low latency access, defaults to False.
  • time_travel_format Optional[str]: Format used for time travel, defaults to "HUDI".
  • partition_key Optional[List[str]]: A list of feature names to be used as partition key when writing the feature data to the offline storage, defaults to empty list [].
  • primary_key Optional[List[str]]: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list [], and the first column of the DataFrame will be used as primary key.
  • hudi_precombine_key Optional[str]: A feature name to be used as a precombine key for the "HUDI" feature group. Defaults to None. If feature group has time travel format "HUDI" and hudi precombine key was not specified then the first primary key of the feature group will be used as hudi precombine key.
  • features Optional[List[hsfs.feature.Feature]]: Optionally, define the schema of the feature group manually as a list of Feature objects. Defaults to empty list [] and will use the schema information of the DataFrame provided in the save method.
  • statistics_config Optional[Union[hsfs.StatisticsConfig, bool, dict]]: A configuration object, or a dictionary with keys "enabled" to generally enable descriptive statistics computation for this feature group, "correlations" to turn on feature correlation computation and "histograms" to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation pass statistics_config=False. Defaults to None and will compute only descriptive statistics.

Returns

FeatureGroup. The feature group metadata object.


Retrieval#

[source]

get_feature_group#

FeatureStore.get_feature_group(name, version=None)

Get a feature group entity from the feature store.

Getting a feature group from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame or use the Query-API to perform joins between feature groups.

Arguments

  • name str: Name of the feature group to get.
  • version Optional[int]: Version of the feature group to retrieve, defaults to None and will return the version=1.

Returns

FeatureGroup: The feature group metadata object.

Raises

  • RestAPIError: If unable to retrieve feature group from the feature store.

Properties#

[source]

created#

Timestamp when the feature group was created.


[source]

creator#

Username of the creator.


[source]

description#

Description of the feature group contents.


[source]

feature_store_id#


[source]

feature_store_name#

Name of the feature store in which the feature group is located.


[source]

features#

Schema information.


[source]

hudi_precombine_key#

Feature name that is the hudi precombine key.


[source]

id#

Feature group id.


[source]

location#


[source]

name#

Name of the feature group.


[source]

online_enabled#

Setting if the feature group is available in online storage.


[source]

partition_key#

List of features building the partition key.


[source]

primary_key#

List of features building the primary key.


[source]

statistics#

Get the latest computed statistics for the feature group.


[source]

statistics_config#

Statistics configuration object defining the settings for statistics computation of the feature group.


[source]

time_travel_format#

Setting of the feature group time travel format.


[source]

version#

Version number of the feature group.


Methods#

[source]

add_tag#

FeatureGroup.add_tag(name, value=None)

Attach a name/value tag to a feature group.

A tag can consist of a name only or a name/value pair. Tag names are unique identifiers.

Arguments

  • name str: Name of the tag to be added.
  • value Optional[str]: Value of the tag to be added, defaults to None.

Raises

RestAPIError.


[source]

append_features#

FeatureGroup.append_features(features)

Append features to the schema of the feature group.

It is only possible to append features to a feature group. Removing features is considered a breaking change.

Arguments

  • features: Feature or list. A feature object or list thereof to append to the schema of the feature group.

Returns

FeatureGroup. The updated feature group object.


[source]

commit_delete_record#

FeatureGroup.commit_delete_record(delete_df, write_options={})

Drops records present in the provided DataFrame and commits it as update to this Feature group.

Arguments

  • delete_df pyspark.sql.DataFrame: dataFrame containing records to be deleted.
  • write_options Optional[Dict[Any, Any]]: User provided write options. Defaults to {}.

Raises

RestAPIError.


[source]

commit_details#

FeatureGroup.commit_details(limit=None)

Retrieves commit timeline for this feature group.

Arguments

  • limit Optional[int]: Number of commits to retrieve. Defaults to None.

Returns

Dict[str, Dict[str, str]]. Dictionary object of commit metadata timeline, where Key is commit id and value is Dict[str, str] with key value pairs of date committed on, number of rows updated, inserted and deleted.

Raises

RestAPIError.


[source]

compute_statistics#

FeatureGroup.compute_statistics()

Recompute the statistics for the feature group and save them to the feature store. Statistics are only computed for data in the offline storage of the feature group. Returns

Statistics. The statistics metadata object.

Raises

RestAPIError. Unable to persist the statistics.


[source]

delete#

FeatureGroup.delete()

Drop the entire feature group along with its feature data.

Potentially dangerous operation

This operation drops all metadata associated with this version of the feature group and all the feature data in offline and online storage associated with it.

Raises

RestAPIError.


[source]

delete_tag#

FeatureGroup.delete_tag(name)

Delete a tag from a feature group.

Tag names are unique identifiers.

Arguments

  • name str: Name of the tag to be removed.

Raises

RestAPIError.


[source]

filter#

FeatureGroup.filter(f)

Apply filter to the feature group.

Selects all features and returns the resulting Query with the applied filter.

from hsfs.feature import Feature

fg.filter(Feature("weekly_sales") > 1000)

If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:

fg.filter(fg.feature1 == 1).show(10)

Composite filters require parenthesis:

fg.filter((fg.feature1 == 1) | (fg.feature2 >= 2))

Arguments

  • f Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]: Filter object.

Returns

Query. The query object with the applied filter.


[source]

from_response_json#

FeatureGroup.from_response_json(json_dict)

[source]

get_feature#

FeatureGroup.get_feature(name)

Retrieve a Feature object from the schema of the feature group.

There are several ways to access features of a feature group:

fg.feature1
fg["feature1"]
fg.get_feature("feature1")

Note

Attribute access to features works only for non-reserved names. For example features named id or name will not be accessible via fg.name, instead this will return the name of the feature group itself. Fall back on using the get_feature method.

Args: name (str): [description]

Returns: [type]: [description]


[source]

get_statistics#

FeatureGroup.get_statistics(commit_time=None)

Returns the statistics for this feature group at a specific time.

If commit_time is None, the most recent statistics are returned.

Arguments

  • commit_time Optional[str]: Commit time in the format YYYYMMDDhhmmss, defaults to None.

Returns

Statistics. Statistics object.

Raises

RestAPIError.


[source]

get_tag#

FeatureGroup.get_tag(name=None)

Get the tags of a feature group.

Tag names are unique identifiers. Returns all tags if no tag name is specified.

Arguments

  • name Optional[str]: Name of the tag to get, defaults to None.

Returns

list[Tag]. List of tags as name/value pairs.

Raises

RestAPIError.


[source]

insert#

FeatureGroup.insert(
    features, overwrite=False, operation="upsert", storage=None, write_options={}
)

Insert data from a dataframe into the feature group.

Incrementally insert data to a feature group or overwrite all data contained in the feature group. By default, the data is inserted into the offline storage as well as the online storage if the feature group is online_enabled=True. To insert only into the online storage, set storage="online", or oppositely storage="offline".

The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, or a two-dimensional Numpy array or a two-dimensional Python nested list.

If statistics are enabled, statistics are recomputed for the entire feature group.

If feature group's time travel format is HUDI then operation argument can be either insert or upsert.

Upsert new feature data with time travel format HUDI:

fs = conn.get_feature_store();
fg = fs.get_feature_group("example_feature_group", 1)
upsert_df = ...
fg.insert(upsert_df)

Arguments

  • features Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]: DataFrame, RDD, Ndarray, list. Features to be saved.
  • overwrite Optional[bool]: Drop all data in the feature group before inserting new data. This does not affect metadata, defaults to False.
  • operation Optional[str]: Apache Hudi operation type "insert" or "upsert". Defaults to "upsert".
  • storage Optional[str]: Overwrite default behaviour, write to offline storage only with "offline" or online only with "online", defaults to None.
  • write_options Optional[Dict[Any, Any]]: Additional write options for Spark as key-value pairs, defaults to {}.

Returns

FeatureGroup. Updated feature group metadata object.


[source]

json#

FeatureGroup.json()

[source]

read#

FeatureGroup.read(wallclock_time=None, online=False, dataframe_type="default", read_options={})

Read the feature group into a dataframe.

Reads the feature group by default from the offline storage as Spark DataFrame on Hopsworks and Databricks, and as Pandas dataframe on AWS Sagemaker and pure Python environments.

Set online to True to read from the online storage, or change dataframe_type to read as a different format.

Read feature group as of latest state:

fs = connection.get_feature_store();
fg = fs.get_feature_group("example_feature_group", 1)
fg.read()

Read feature group as of specific point in time:

fs = connection.get_feature_store();
fg = fs.get_feature_group("example_feature_group", 1)
fg.read("2020-10-20 07:34:11")

Arguments

  • wallclock_time Optional[str]: Date string in the format of "YYYYMMDD" or "YYYYMMDDhhmmss". If Specified will retrieve feature group as of specific point in time. If not specified will return as of most recent time. Defaults to None.
  • online Optional[bool]: bool, optional. If True read from online feature store, defaults to False.
  • dataframe_type Optional[str]: str, optional. Possible values are "default", "spark", "pandas", "numpy" or "python", defaults to "default".
  • read_options Optional[dict]: Additional read options as key/value pairs, defaults to {}.

Returns

DataFrame: The spark dataframe containing the feature data. pyspark.DataFrame. A Spark DataFrame. pandas.DataFrame. A Pandas DataFrame. numpy.ndarray. A two-dimensional Numpy array. list. A two-dimensional Python list.

Raises

RestAPIError. No data is available for feature group with this commit date, If time travel enabled.


[source]

read_changes#

FeatureGroup.read_changes(start_wallclock_time, end_wallclock_time, read_options={})

Reads updates of this feature that occurred between specified points in time.

This function only works on feature group's with HUDI time travel format.

Reading commits incrementally between specified points in time:

fs = connection.get_feature_store();
fg = fs.get_feature_group("example_feature_group", 1)
fg.read_changes("2020-10-20 07:31:38", "2020-10-20 07:34:11").show()

Arguments

  • start_wallclock_time str: Date string in the format of "YYYYMMDD" or "YYYYMMDDhhmmss".
  • end_wallclock_time str: Date string in the format of "YYYYMMDD" or "YYYYMMDDhhmmss".
  • read_options Optional[dict]: User provided read options. Defaults to {}.

Returns

DataFrame. The spark dataframe containing the incremental changes of feature data.

Raises

RestAPIError. No data is available for feature group with this commit date.


[source]

save#

FeatureGroup.save(features, write_options={})

Persist the metadata and materialize the feature group to the feature store.

Calling save creates the metadata for the feature group in the feature store and writes the specified features dataframe as feature group to the online/offline feature store as specified.

By default, this writes the feature group to the offline storage, and if online_enabled for the feature group, also to the online feature store.

The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, or a two-dimensional Numpy array or a two-dimensional Python nested list.

Arguments

  • features Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]: Query, DataFrame, RDD, Ndarray, list. Features to be saved.
  • write_options Optional[Dict[Any, Any]]: Additional write options for Spark as key-value pairs, defaults to {}.

Returns

FeatureGroup. Returns the persisted FeatureGroup metadata object.

Raises

RestAPIError. Unable to create feature group.


[source]

select#

FeatureGroup.select(features=[])

Select a subset of features of the feature group and return a query object.

The query can be used to construct joins of feature groups or create a training dataset with a subset of features of the feature group.

Arguments

  • features List[Union[str, hsfs.feature.Feature]]: list, optional. A list of Feature objects or feature names as strings to be selected, defaults to [].

Returns

Query: A query object with the selected features of the feature group.


[source]

select_all#

FeatureGroup.select_all()

Select all features in the feature group and return a query object.

The query can be used to construct joins of feature groups or create a training dataset immediately.

Returns

Query. A query object with all features of the feature group.


[source]

select_except#

FeatureGroup.select_except(features=[])

Select all features of the feature group except a few and return a query object.

The query can be used to construct joins of feature groups or create a training dataset with a subset of features of the feature group.

Arguments

  • features List[Union[str, hsfs.feature.Feature]]: list, optional. A list of Feature objects or feature names as strings to be selected, defaults to [], selecting all features.

Returns

Query: A query object with the selected features of the feature group.


[source]

show#

FeatureGroup.show(n, online=False)

Show the first n rows of the feature group.

Arguments

  • n int: int. Number of rows to show.
  • online Optional[bool]: bool, optional. If True read from online feature store, defaults to False.

[source]

to_dict#

FeatureGroup.to_dict()

[source]

update_description#

FeatureGroup.update_description(description)

Update the description of the feature gorup.

Arguments

  • description str: str. New description string.

Returns

FeatureGroup. The updated feature group object.


[source]

update_from_response_json#

FeatureGroup.update_from_response_json(json_dict)

[source]

update_statistics_config#

FeatureGroup.update_statistics_config()

Update the statistics configuration of the feature group.

Change the statistics_config object and persist the changes by calling this method.

Returns

FeatureGroup. The updated metadata object of the feature group.

Raises

RestAPIError.