FeatureGroup#
FeatureGroup#
hsfs.feature_group.FeatureGroup(
name,
version,
featurestore_id,
description="",
partition_key=None,
primary_key=None,
hudi_precombine_key=None,
featurestore_name=None,
embedding_index=None,
created=None,
creator=None,
id=None,
features=None,
location=None,
online_enabled=False,
time_travel_format=None,
statistics_config=None,
online_topic_name=None,
topic_name=None,
notification_topic_name=None,
event_time=None,
stream=False,
expectation_suite=None,
parents=None,
href=None,
delta_streamer_job_conf=None,
deprecated=False,
**kwargs
)
Creation#
create_feature_group#
FeatureStore.create_feature_group(
name,
version=None,
description="",
online_enabled=False,
time_travel_format="HUDI",
partition_key=[],
primary_key=[],
embedding_index=None,
hudi_precombine_key=None,
features=[],
statistics_config=None,
event_time=None,
stream=False,
expectation_suite=None,
parents=[],
topic_name=None,
notification_topic_name=None,
)
Create a feature group metadata object.
Example
# connect to the Feature Store
fs = ...
fg = fs.create_feature_group(
name='air_quality',
description='Air Quality characteristics of each day',
version=1,
primary_key=['city','date'],
online_enabled=True,
event_time='date'
)
Lazy
This method is lazy and does not persist any metadata or feature data in the
feature store on its own. To persist the feature group and save feature data
along the metadata in the feature store, call the save()
method with a
DataFrame.
Arguments
- name
str
: Name of the feature group to create. - version
Optional[int]
: Version of the feature group to retrieve, defaults toNone
and will create the feature group with incremented version from the last version in the feature store. - description
Optional[str]
: A string describing the contents of the feature group to improve discoverability for Data Scientists, defaults to empty string""
. - online_enabled
Optional[bool]
: Define whether the feature group should be made available also in the online feature store for low latency access, defaults toFalse
. - time_travel_format
Optional[str]
: Format used for time travel, defaults to"HUDI"
. - partition_key
Optional[List[str]]
: A list of feature names to be used as partition key when writing the feature data to the offline storage, defaults to empty list[]
. - primary_key
Optional[List[str]]
: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list[]
, and the feature group won't have any primary key. - embedding_index
Optional[hsfs.embedding.EmbeddingIndex]
:EmbeddingIndex
. If an embedding index is provided, vector database is used as online feature store. This enables similarity search by usingfind_neighbors
. default toNone
- hudi_precombine_key
Optional[str]
: A feature name to be used as a precombine key for the"HUDI"
feature group. Defaults toNone
. If feature group has time travel format"HUDI"
and hudi precombine key was not specified then the first primary key of the feature group will be used as hudi precombine key. - features
Optional[List[hsfs.feature.Feature]]
: Optionally, define the schema of the feature group manually as a list ofFeature
objects. Defaults to empty list[]
and will use the schema information of the DataFrame provided in thesave
method. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation,"histograms"
to compute feature value frequencies and"exact_uniqueness"
to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. -
event_time
Optional[str]
: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults toNone
.Event time data type restriction
The supported data types for the event time column are:
timestamp
,date
andbigint
. -
stream: Optionally, Define whether the feature group should support real time stream writing capabilities. Stream enabled Feature Groups have unified single API for writing streaming features transparently to both online and offline store.
- expectation_suite
Optional[Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]]
: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults toNone
. - parents
Optional[List[hsfs.feature_group.FeatureGroup]]
: Optionally, Define the parents of this feature group as the origin where the data is coming from. - topic_name
Optional[str]
: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic. - notification_topic_name
Optional[str]
: Optionally, define the name of the topic used for sending notifications when entries are inserted or updated on the online feature store. If left undefined no notifications are sent.
Returns
FeatureGroup
. The feature group metadata object.
get_or_create_feature_group#
FeatureStore.get_or_create_feature_group(
name,
version,
description="",
online_enabled=False,
time_travel_format="HUDI",
partition_key=[],
primary_key=[],
embedding_index=None,
hudi_precombine_key=None,
features=[],
statistics_config=None,
expectation_suite=None,
event_time=None,
stream=False,
parents=[],
topic_name=None,
notification_topic_name=None,
)
Get feature group metadata object or create a new one if it doesn't exist. This method doesn't update existing feature group metadata object.
Example
# connect to the Feature Store
fs = ...
fg = fs.get_or_create_feature_group(
name="electricity_prices",
version=1,
description="Electricity prices from NORD POOL",
primary_key=["day", "area"],
online_enabled=True,
event_time="timestamp",
)
Lazy
This method is lazy and does not persist any metadata or feature data in the
feature store on its own. To persist the feature group and save feature data
along the metadata in the feature store, call the insert()
method with a
DataFrame.
Arguments
- name
str
: Name of the feature group to create. - version
int
: Version of the feature group to retrieve or create. - description
Optional[str]
: A string describing the contents of the feature group to improve discoverability for Data Scientists, defaults to empty string""
. - online_enabled
Optional[bool]
: Define whether the feature group should be made available also in the online feature store for low latency access, defaults toFalse
. - time_travel_format
Optional[str]
: Format used for time travel, defaults to"HUDI"
. - partition_key
Optional[List[str]]
: A list of feature names to be used as partition key when writing the feature data to the offline storage, defaults to empty list[]
. - primary_key
Optional[List[str]]
: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list[]
, and the feature group won't have any primary key. - embedding_index
Optional[hsfs.embedding.EmbeddingIndex]
:EmbeddingIndex
. If an embedding index is provided, the vector database is used as online feature store. This enables similarity search by usingfind_neighbors
. default isNone
- hudi_precombine_key
Optional[str]
: A feature name to be used as a precombine key for the"HUDI"
feature group. Defaults toNone
. If feature group has time travel format"HUDI"
and hudi precombine key was not specified then the first primary key of the feature group will be used as hudi precombine key. - features
Optional[List[hsfs.feature.Feature]]
: Optionally, define the schema of the feature group manually as a list ofFeature
objects. Defaults to empty list[]
and will use the schema information of the DataFrame provided in thesave
method. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation,"histograms"
to compute feature value frequencies and"exact_uniqueness"
to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - expectation_suite
Optional[Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]]
: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults toNone
. -
event_time
Optional[str]
: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults toNone
.Event time data type restriction
The supported data types for the event time column are:
timestamp
,date
andbigint
. -
stream: Optionally, Define whether the feature group should support real time stream writing capabilities. Stream enabled Feature Groups have unified single API for writing streaming features transparently to both online and offline store.
- parents
Optional[List[hsfs.feature_group.FeatureGroup]]
: Optionally, Define the parents of this feature group as the origin where the data is coming from. - topic_name
Optional[str]
: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic. - notification_topic_name
Optional[str]
: Optionally, define the name of the topic used for sending notifications when entries are inserted or updated on the online feature store. If left undefined no notifications are sent.
Returns
FeatureGroup
. The feature group metadata object.
Retrieval#
get_feature_group#
FeatureStore.get_feature_group(name, version=None)
Get a feature group entity from the feature store.
Getting a feature group from the Feature Store means getting its metadata handle
so you can subsequently read the data into a Spark or Pandas DataFrame or use
the Query
-API to perform joins between feature groups.
Example
# connect to the Feature Store
fs = ...
fg = fs.get_feature_group(
name="electricity_prices",
version=1,
)
Arguments
- name
str
: Name of the feature group to get. - version
Optional[int]
: Version of the feature group to retrieve, defaults toNone
and will return theversion=1
.
Returns
FeatureGroup
: The feature group metadata object.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature group from the feature store.
Properties#
avro_schema#
Avro schema representation of the feature group.
created#
Timestamp when the feature group was created.
creator#
Username of the creator.
deprecated#
Setting if the feature group is deprecated.
description#
Description of the feature group contents.
embedding_index#
event_time#
Event time feature in the feature group.
expectation_suite#
Expectation Suite configuration object defining the settings for data validation of the feature group.
feature_store#
feature_store_id#
feature_store_name#
Name of the feature store in which the feature group is located.
features#
Feature Group schema (alias)
hudi_precombine_key#
Feature name that is the hudi precombine key.
id#
Feature group id.
location#
materialization_job#
Get the Job object reference for the materialization job for this Feature Group.
name#
Name of the feature group.
notification_topic_name#
The topic used for feature group notifications.
online_enabled#
Setting if the feature group is available in online storage.
parents#
Parent feature groups as origin of the data in the current feature group. This is part of explicit provenance
partition_key#
List of features building the partition key.
primary_key#
List of features building the primary key.
schema#
Feature Group schema
statistics#
Get the latest computed statistics for the whole feature group.
statistics_config#
Statistics configuration object defining the settings for statistics computation of the feature group.
Raises
hsfs.client.exceptions.FeatureStoreException
.
stream#
Whether to enable real time stream writing capabilities.
subject#
Subject of the feature group.
time_travel_format#
Setting of the feature group time travel format.
topic_name#
The topic used for feature group data ingestion.
version#
Version number of the feature group.
Methods#
add_tag#
FeatureGroup.add_tag(name, value)
Attach a tag to a feature group.
A tag consists of a
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.add_tag(name="example_tag", value="42")
Arguments
- name
str
: Name of the tag to be added. - value: Value of the tag to be added.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to add the tag.
append_features#
FeatureGroup.append_features(features)
Append features to the schema of the feature group.
Example
# connect to the Feature Store
fs = ...
# define features to be inserted in the feature group
features = [
Feature(name="id",type="int",online_type="int"),
Feature(name="name",type="string",online_type="varchar(20)")
]
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.append_features(features)
Safe append
This method appends the features to the feature group description safely. In case of failure your local metadata object will contain the correct schema.
It is only possible to append features to a feature group. Removing features is considered a breaking change.
Arguments
- features
Union[hsfs.feature.Feature, List[hsfs.feature.Feature]]
: Feature or list. A feature object or list thereof to append to the schema of the feature group.
Returns
FeatureGroup
. The updated feature group object.
as_of#
FeatureGroup.as_of(wallclock_time=None, exclude_until=None)
Get Query object to retrieve all features of the group at a point in the past.
This method selects all features in the feature group and returns a Query object at the specified point in time. Optionally, commits before a specified point in time can be excluded from the query. The Query can then either be read into a Dataframe or used further to perform joins or construct a training dataset.
Reading features at a specific point in time:
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# get data at a specific point in time and show it
fg.as_of("2020-10-20 07:34:11").read().show()
Reading commits incrementally between specified points in time:
fg.as_of("2020-10-20 07:34:11", exclude_until="2020-10-19 07:34:11").read().show()
The first parameter is inclusive while the latter is exclusive. That means, in order to query a single commit, you need to query that commit time and exclude everything just before the commit.
Reading only the changes from a single commit
fg.as_of("2020-10-20 07:31:38", exclude_until="2020-10-20 07:31:37").read().show()
When no wallclock_time is given, the latest state of features is returned. Optionally, commits before a specified point in time can still be excluded.
Reading the latest state of features, excluding commits before a specified point in time:
fg.as_of(None, exclude_until="2020-10-20 07:31:38").read().show()
Note that the interval will be applied to all joins in the query. If you want to query different intervals for different feature groups in the query, you have to apply them in a nested fashion:
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)
fg1.select_all().as_of("2020-10-20", exclude_until="2020-10-19")
.join(fg2.select_all().as_of("2020-10-20", exclude_until="2020-10-19"))
If instead you apply another as_of
selection after the join, all
joined feature groups will be queried with this interval:
Example
fg1.select_all().as_of("2020-10-20", exclude_until="2020-10-19") # as_of is not applied
.join(fg2.select_all().as_of("2020-10-20", exclude_until="2020-10-15")) # as_of is not applied
.as_of("2020-10-20", exclude_until="2020-10-19")
Warning
This function only works for feature groups with time_travel_format='HUDI'.
Warning
Excluding commits via exclude_until is only possible within the range of the Hudi active timeline.
By default, Hudi keeps the last 20 to 30 commits in the active timeline.
If you need to keep a longer active timeline, you can overwrite the options:
hoodie.keep.min.commits
and hoodie.keep.max.commits
when calling the insert()
method.
Arguments
- wallclock_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Read data as of this point in time. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
, or%Y-%m-%d %H:%M:%S
. - exclude_until
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Exclude commits until this point in time. String should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
, or%Y-%m-%d %H:%M:%S
.
Returns
Query
. The query object with the applied time travel condition.
check_deprecated#
FeatureGroup.check_deprecated()
commit_delete_record#
FeatureGroup.commit_delete_record(delete_df, write_options={})
Drops records present in the provided DataFrame and commits it as update to this Feature group. This method can only be used on feature groups stored as HUDI or DELTA.
Arguments
- delete_df
pyspark.sql.DataFrame
: dataFrame containing records to be deleted. - write_options
Optional[Dict[Any, Any]]
: User provided write options. Defaults to{}
.
Raises
hsfs.client.exceptions.RestAPIError
.
commit_details#
FeatureGroup.commit_details(wallclock_time=None, limit=None)
Retrieves commit timeline for this feature group. This method can only be used on time travel enabled feature groups
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
commit_details = fg.commit_details()
Arguments
- wallclock_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Commit details as of specific point in time. Defaults toNone
. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. - limit
Optional[int]
: Number of commits to retrieve. Defaults toNone
.
Returns
Dict[str, Dict[str, str]]
. Dictionary object of commit metadata timeline, where Key is commit id and value
is Dict[str, str]
with key value pairs of date committed on, number of rows updated, inserted and deleted.
Raises
hsfs.client.exceptions.RestAPIError
.
hsfs.client.exceptions.FeatureStoreException
. If the feature group does not have HUDI
time travel format
compute_statistics#
FeatureGroup.compute_statistics(wallclock_time=None)
Recompute the statistics for the feature group and save them to the feature store.
Statistics are only computed for data in the offline storage of the feature group.
Arguments
- wallclock_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: If specified will recompute statistics on feature group as of specific point in time. If not specified then will compute statistics as of most recent time of this feature group. Defaults toNone
. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
.
Returns
Statistics
. The statistics metadata object.
Raises
hsfs.client.exceptions.RestAPIError
. Unable to persist the statistics.
create_feature_monitoring#
FeatureGroup.create_feature_monitoring(
name,
feature_name,
description=None,
start_date_time=None,
end_date_time=None,
cron_expression="0 0 12 ? * * *",
)
Enable feature monitoring to compare statistics on snapshots of feature data over time.
Experimental
Public API is subject to change, this feature is not suitable for production use-cases.
Example
# fetch feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# enable feature monitoring
my_config = fg.create_feature_monitoring(
name="my_monitoring_config",
feature_name="my_feature",
description="my monitoring config description",
cron_expression="0 0 12 ? * * *",
).with_detection_window(
# Data inserted in the last day
time_offset="1d",
window_length="1d",
).with_reference_window(
# Data inserted last week on the same day
time_offset="1w1d",
window_length="1d",
).compare_on(
metric="mean",
threshold=0.5,
).save()
Arguments
- name
str
: Name of the feature monitoring configuration. name must be unique for all configurations attached to the feature group. - feature_name
str
: Name of the feature to monitor. - description
Optional[str]
: Description of the feature monitoring configuration. - start_date_time
Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]
: Start date and time from which to start computing statistics. - end_date_time
Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]
: End date and time at which to stop computing statistics. - cron_expression
Optional[str]
: Cron expression to use to schedule the job. The cron expression must be in UTC and follow the Quartz specification. Default is '0 0 12 ? * ', every day at 12pm UTC.
Raises
hsfs.client.exceptions.FeatureStoreException
.
Return
FeatureMonitoringConfig
Configuration with minimal information about the feature monitoring.
Additional information are required before feature monitoring is enabled.
create_statistics_monitoring#
FeatureGroup.create_statistics_monitoring(
name,
feature_name=None,
description=None,
start_date_time=None,
end_date_time=None,
cron_expression="0 0 12 ? * * *",
)
Run a job to compute statistics on snapshot of feature data on a schedule.
Experimental
Public API is subject to change, this feature is not suitable for production use-cases.
Example
# fetch feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# enable statistics monitoring
my_config = fg.create_statistics_monitoring(
name="my_config",
start_date_time="2021-01-01 00:00:00",
description="my description",
cron_expression="0 0 12 ? * * *",
).with_detection_window(
# Statistics computed on 10% of the last week of data
time_offset="1w",
row_percentage=0.1,
).save()
Arguments
- name
str
: Name of the feature monitoring configuration. name must be unique for all configurations attached to the feature group. - feature_name
Optional[str]
: Name of the feature to monitor. If not specified, statistics will be computed for all features. - description
Optional[str]
: Description of the feature monitoring configuration. - start_date_time
Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]
: Start date and time from which to start computing statistics. - end_date_time
Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]
: End date and time at which to stop computing statistics. - cron_expression
Optional[str]
: Cron expression to use to schedule the job. The cron expression must be in UTC and follow the Quartz specification. Default is '0 0 12 ? * ', every day at 12pm UTC.
Raises
hsfs.client.exceptions.FeatureStoreException
.
Return
FeatureMonitoringConfig
Configuration with minimal information about the feature monitoring.
Additional information are required before feature monitoring is enabled.
delete#
FeatureGroup.delete()
Drop the entire feature group along with its feature data.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(
name='bitcoin_price',
version=1
)
# delete the feature group
fg.delete()
Potentially dangerous operation
This operation drops all metadata associated with this version of the feature group and all the feature data in offline and online storage associated with it.
Raises
hsfs.client.exceptions.RestAPIError
.
delete_expectation_suite#
FeatureGroup.delete_expectation_suite()
Delete the expectation suite attached to the Feature Group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.delete_expectation_suite()
Raises
hsfs.client.exceptions.RestAPIError
.
delete_tag#
FeatureGroup.delete_tag(name)
Delete a tag attached to a feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.delete_tag("example_tag")
Arguments
- name
str
: Name of the tag to be removed.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to delete the tag.
filter#
FeatureGroup.filter(f)
Apply filter to the feature group.
Selects all features and returns the resulting Query
with the applied filter.
Example
from hsfs.feature import Feature
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.filter(Feature("weekly_sales") > 1000)
If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:
Example
fg.filter(fg.feature1 == 1).show(10)
Composite filters require parenthesis:
Example
fg.filter((fg.feature1 == 1) | (fg.feature2 >= 2))
Arguments
- f
Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]
: Filter object.
Returns
Query
. The query object with the applied filter.
finalize_multi_part_insert#
FeatureGroup.finalize_multi_part_insert()
Finalizes and exits the multi part insert context opened by multi_part_insert
in a blocking fashion once all rows have been transmitted.
Multi part insert with manual context management
Instead of letting Python handle the entering and exiting of the multi part insert context, you can start and finalize the context manually.
feature_group = fs.get_or_create_feature_group("fg_name", version=1)
while loop:
small_batch_df = ...
feature_group.multi_part_insert(small_batch_df)
# IMPORTANT: finalize the multi part insert to make sure all rows
# have been transmitted
feature_group.finalize_multi_part_insert()
multi_part_insert
initiates the context
and be sure to finalize it. The finalize_multi_part_insert
is a
blocking call that returns once all rows have been transmitted.
find_neighbors#
FeatureGroup.find_neighbors(embedding, col=None, k=10, filter=None, min_score=0)
Finds the nearest neighbors for a given embedding in the vector database.
Arguments
- embedding
List[Union[int, float]]
: The target embedding for which neighbors are to be found. - col
Optional[str]
: The column name used to compute similarity score. Required only if there are multiple embeddings (optional). - k
Optional[int]
: The number of nearest neighbors to retrieve (default is 10). - filter
Optional[Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]]
: A filter expression to restrict the search space (optional). - min_score
Optional[float]
: The minimum similarity score for neighbors to be considered (default is 0).
Returns
A list of tuples representing the nearest neighbors.
Each tuple contains: (The similarity score, A list of feature values)
Example
embedding_index = EmbeddingIndex()
embedding_index.add_embedding(name="user_vector", dimension=3)
fg = fs.create_feature_group(
name='air_quality',
embedding_index = embedding_index,
version=1,
primary_key=['id1'],
online_enabled=True,
)
fg.insert(data)
fg.find_neighbors(
[0.1, 0.2, 0.3],
k=5,
)
# apply filter
fg.find_neighbors(
[0.1, 0.2, 0.3],
k=5,
filter=(fg.id1 > 10) & (fg.id1 < 30)
)
from_response_json#
FeatureGroup.from_response_json(json_dict)
get_all_statistics#
FeatureGroup.get_all_statistics(computation_time=None, feature_names=None)
Returns all the statistics metadata computed before a specific time for the current feature group.
If computation_time
is None
, all the statistics metadata are returned.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_statistics = fg.get_statistics(computation_time=None)
Arguments
- computation_time
Optional[Union[str, int, float, datetime.datetime, datetime.date]]
: Date and time when statistics were computed. Defaults toNone
. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. - feature_names
Optional[List[str]]
: List of feature names of which statistics are retrieved.
Returns
Statistics
. Statistics object.
Raises
hsfs.client.exceptions.RestAPIError
hsfs.client.exceptions.FeatureStoreException
.
get_all_validation_reports#
FeatureGroup.get_all_validation_reports(ge_type=True)
Return the latest validation report attached to the feature group if it exists.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
val_reports = fg.get_all_validation_reports()
Arguments
- ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
.
Returns
Union[List[ValidationReport
], ValidationReport
]. All validation reports attached to the feature group.
Raises
hsfs.client.exceptions.RestAPIError
.
hsfs.client.exceptions.FeatureStoreException
.
get_complex_features#
FeatureGroup.get_complex_features()
Returns the names of all features with a complex data type in this feature group.
Example
complex_dtype_features = fg.get_complex_features()
get_expectation_suite#
FeatureGroup.get_expectation_suite(ge_type=True)
Return the expectation suite attached to the feature group if it exists.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
exp_suite = fg.get_expectation_suite()
Arguments
- ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
.
Returns
ExpectationSuite
. The expectation suite attached to the feature group.
Raises
hsfs.client.exceptions.RestAPIError
.
get_feature#
FeatureGroup.get_feature(name)
Retrieve a Feature
object from the schema of the feature group.
There are several ways to access features of a feature group:
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# get Feature instanse
fg.feature1
fg["feature1"]
fg.get_feature("feature1")
Note
Attribute access to features works only for non-reserved names. For example
features named id
or name
will not be accessible via fg.name
, instead
this will return the name of the feature group itself. Fall back on using
the get_feature
method.
Arguments:
name: The name of the feature to retrieve
Returns:
Feature: The feature object
Raises
hsfs.client.exceptions.FeatureStoreException
.
get_feature_monitoring_configs#
FeatureGroup.get_feature_monitoring_configs(name=None, feature_name=None, config_id=None)
Fetch all feature monitoring configs attached to the feature group, or fetch by name or feature name only. If no arguments is provided the method will return all feature monitoring configs attached to the feature group, meaning all feature monitoring configs that are attach to a feature in the feature group. If you wish to fetch a single config, provide the its name. If you wish to fetch all configs attached to a particular feature, provide the feature name.
Example
# fetch your feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# fetch all feature monitoring configs attached to the feature group
fm_configs = fg.get_feature_monitoring_configs()
# fetch a single feature monitoring config by name
fm_config = fg.get_feature_monitoring_configs(name="my_config")
# fetch all feature monitoring configs attached to a particular feature
fm_configs = fg.get_feature_monitoring_configs(feature_name="my_feature")
# fetch a single feature monitoring config with a given id
fm_config = fg.get_feature_monitoring_configs(config_id=1)
Arguments
- name
Optional[str]
: If provided fetch only the feature monitoring config with the given name. Defaults to None. - feature_name
Optional[str]
: If provided, fetch only configs attached to a particular feature. Defaults to None. - config_id
Optional[int]
: If provided, fetch only the feature monitoring config with the given id. Defaults to None.
Raises
hsfs.client.exceptions.RestAPIError
.
hsfs.client.exceptions.FeatureStoreException
.
- ValueError: if both name and feature_name are provided.
- TypeError: if name or feature_name are not string or None.
Return
Union[FeatureMonitoringConfig
, List[FeatureMonitoringConfig
], None]
A list of feature monitoring configs. If name provided,
returns either a single config or None if not found.
get_feature_monitoring_history#
FeatureGroup.get_feature_monitoring_history(
config_name=None, config_id=None, start_time=None, end_time=None, with_statistics=True
)
Fetch feature monitoring history for a given feature monitoring config.
Example
# fetch your feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# fetch feature monitoring history for a given feature monitoring config
fm_history = fg.get_feature_monitoring_history(
config_name="my_config",
start_time="2020-01-01",
)
# fetch feature monitoring history for a given feature monitoring config id
fm_history = fg.get_feature_monitoring_history(
config_id=1,
start_time=datetime.now() - timedelta(weeks=2),
end_time=datetime.now() - timedelta(weeks=1),
with_statistics=False,
)
Arguments
- config_name
Optional[str]
: The name of the feature monitoring config to fetch history for. Defaults to None. - config_id
Optional[int]
: The id of the feature monitoring config to fetch history for. Defaults to None. - start_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: The start date of the feature monitoring history to fetch. Defaults to None. - end_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: The end date of the feature monitoring history to fetch. Defaults to None. - with_statistics
Optional[bool]
: Whether to include statistics in the feature monitoring history. Defaults to True. If False, only metadata about the monitoring will be fetched.
Raises
hsfs.client.exceptions.RestAPIError
.
hsfs.client.exceptions.FeatureStoreException
.
- ValueError: if both config_name and config_id are provided.
- TypeError: if config_name or config_id are not respectively string, int or None.
Return
List[FeatureMonitoringResult
]
A list of feature monitoring results containing the monitoring metadata
as well as the computed statistics for the detection and reference window
if requested.
get_fg_name#
FeatureGroup.get_fg_name()
get_generated_feature_groups#
FeatureGroup.get_generated_feature_groups()
Get the generated feature groups using this feature group, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.
Returns
ProvenanceLinks
: Object containing the section of provenance graph requested.
Raises
hsfs.client.exceptions.RestAPIError
.
get_generated_feature_views#
FeatureGroup.get_generated_feature_views()
Get the generated feature view using this feature group, based on explicit provenance. These feature views can be accessible or inaccessible. Explicit provenance does not track deleted generated feature view links, so deleted will always be empty. For inaccessible feature views, only a minimal information is returned.
Returns
ProvenanceLinks
: Object containing the section of provenance graph requested.
Raises
hsfs.client.exceptions.RestAPIError
.
get_latest_validation_report#
FeatureGroup.get_latest_validation_report(ge_type=True)
Return the latest validation report attached to the Feature Group if it exists.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
latest_val_report = fg.get_latest_validation_report()
Arguments
- ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
.
Returns
ValidationReport
. The latest validation report attached to the Feature Group.
Raises
hsfs.client.exceptions.RestAPIError
.
get_parent_feature_groups#
FeatureGroup.get_parent_feature_groups()
Get the parents of this feature group, based on explicit provenance. Parents are feature groups or external feature groups. These feature groups can be accessible, deleted or inaccessible. For deleted and inaccessible feature groups, only a minimal information is returned.
Returns
ProvenanceLinks
: Object containing the section of provenance graph requested.
Raises
hsfs.client.exceptions.RestAPIError
.
get_statistics#
FeatureGroup.get_statistics(computation_time=None, feature_names=None)
Returns the statistics computed at a specific time for the current feature group.
If computation_time
is None
, the most recent statistics are returned.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_statistics = fg.get_statistics(computation_time=None)
Arguments
- computation_time
Optional[Union[str, int, float, datetime.datetime, datetime.date]]
: Date and time when statistics were computed. Defaults toNone
. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. - feature_names
Optional[List[str]]
: List of feature names of which statistics are retrieved.
Returns
Statistics
. Statistics object.
Raises
hsfs.client.exceptions.RestAPIError
hsfs.client.exceptions.FeatureStoreException
.
get_statistics_by_commit_window#
FeatureGroup.get_statistics_by_commit_window(
from_commit_time=None, to_commit_time=None, feature_names=None
)
Returns the statistics computed on a specific commit window for this feature group. If time travel is not enabled, it raises an exception.
If from_commit_time
is None
, the commit window starts from the first commit.
If to_commit_time
is None
, the commit window ends at the last commit.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_statistics = fg.get_statistics_by_commit_window(from_commit_time=None, to_commit_time=None)
Arguments
- to_commit_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Date and time of the last commit of the window. Defaults toNone
. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. - from_commit_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Date and time of the first commit of the window. Defaults toNone
. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. - feature_names
Optional[List[str]]
: List of feature names of which statistics are retrieved.
Returns
Statistics
. Statistics object.
Raises
hsfs.client.exceptions.RestAPIError
.
get_tag#
FeatureGroup.get_tag(name)
Get the tags of a feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_tag_value = fg.get_tag("example_tag")
Arguments
- name
str
: Name of the tag to get.
Returns
tag value
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to retrieve the tag.
get_tags#
FeatureGroup.get_tags()
Retrieves all tags attached to a feature group.
Returns
Dict[str, obj]
of tags.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to retrieve the tags.
get_validation_history#
FeatureGroup.get_validation_history(
expectation_id, start_validation_time=None, end_validation_time=None, filter_by=[], ge_type=True
)
Fetch validation history of an Expectation specified by its id.
Example
validation_history = fg.get_validation_history(
expectation_id=1,
filter_by=["REJECTED", "UNKNOWN"],
start_validation_time="2022-01-01 00:00:00",
end_validation_time=datetime.datetime.now(),
ge_type=False
)
Arguments
- expectation_id
int
: id of the Expectation for which to fetch the validation history - filter_by
List[str]
: list of ingestion_result category to keep. Ooptions are "INGESTED", "REJECTED", "FG_DATA", "EXPERIMENT", "UNKNOWN". - start_validation_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: fetch only validation result posterior to the provided time, inclusive. Supported format include timestamps(int), datetime, date or string formatted to be datutils parsable. See examples above. - end_validation_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: fetch only validation result prior to the provided time, inclusive. Supported format include timestamps(int), datetime, date or string formatted to be datutils parsable. See examples above.
Raises
hsfs.client.exceptions.RestAPIError
.
Return
Union[List[ValidationResult
], List[ExpectationValidationResult
]] A list of validation result connected to the expectation_id
insert#
FeatureGroup.insert(
features,
overwrite=False,
operation="upsert",
storage=None,
write_options={},
validation_options={},
save_code=True,
wait=False,
)
Persist the metadata and materialize the feature group to the feature store or insert data from a dataframe into the existing feature group.
Incrementally insert data to a feature group or overwrite all data contained in the feature group. By
default, the data is inserted into the offline storage as well as the online storage if the feature group is
online_enabled=True
.
The features
dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame,
or a two-dimensional Numpy array or a two-dimensional Python nested list.
If statistics are enabled, statistics are recomputed for the entire feature
group.
If feature group's time travel format is HUDI
then operation
argument can be
either insert
or upsert
.
If feature group doesn't exist the insert method will create the necessary metadata the first time it is
invoked and writes the specified features
dataframe as feature group to the online/offline feature store.
Changed in 3.3.0
insert
and save
methods are now async by default in non-spark clients.
To achieve the old behaviour, set wait
argument to True
.
Upsert new feature data with time travel format HUDI
# connect to the Feature Store
fs = ...
fg = fs.get_or_create_feature_group(
name='bitcoin_price',
description='Bitcoin price aggregated for days',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
fg.insert(df_bitcoin_processed)
Async insert
# connect to the Feature Store
fs = ...
fg1 = fs.get_or_create_feature_group(
name='feature_group_name1',
description='Description of the first FG',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
# async insertion in order not to wait till finish of the job
fg.insert(df_for_fg1, write_options={"wait_for_job" : False})
fg2 = fs.get_or_create_feature_group(
name='feature_group_name2',
description='Description of the second FG',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
fg.insert(df_for_fg2)
Arguments
- features
Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]
: DataFrame, RDD, Ndarray, list. Features to be saved. - overwrite
Optional[bool]
: Drop all data in the feature group before inserting new data. This does not affect metadata, defaults to False. - operation
Optional[str]
: Apache Hudi operation type"insert"
or"upsert"
. Defaults to"upsert"
. - storage
Optional[str]
: Overwrite default behaviour, write to offline storage only with"offline"
or online only with"online"
, defaults toNone
(If the streaming APIs are enabled, specifying the storage option is not supported). - write_options
Optional[Dict[str, Any]]
: Additional write options as key-value pairs, defaults to{}
. When using thepython
engine, write_options can contain the following entries:- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits. - key
start_offline_backfill
and valueTrue
orFalse
to configure whether or not to start the materialization job to write data to the offline storage.start_offline_backfill
is deprecated. Usestart_offline_materialization
instead. - key
start_offline_materialization
and valueTrue
orFalse
to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job gets started immediately. - key
kafka_producer_config
and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties. - key
internal_kafka
and valueTrue
orFalse
in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults toFalse
and will use external listeners when connecting from outside of Hopsworks.
- key
- validation_options
Optional[Dict[str, Any]]
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
save_report
boolean value, set toFalse
to skip upload of the validation report to Hopsworks. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations. - key
fetch_expectation_suite
a boolean value, by defaultTrue
, to control whether the expectation suite of the feature group should be fetched before every insert.
- key
- save_code
Optional[bool]
: When running HSFS on Hopsworks or Databricks, HSFS can save the code/notebook used to create the feature group or used to insert data to it. When calling theinsert
method repeatedly with small batches of data, this can slow down the writes. Use this option to turn off saving code. Defaults toTrue
. - wait
bool
: Wait for job to finish before returning, defaults toFalse
. Shortcut for read_options{"wait_for_job": False}
.
Returns
(Job
, ValidationReport
) A tuple with job information if python engine is used and the validation report if validation is enabled.
Raises
hsfs.client.exceptions.RestAPIError
. e.g fail to create feature group, dataframe schema does not match
existing feature group schema, etc.
hsfs.client.exceptions.DataValidationException
. If data validation fails and the expectation
suite validation_ingestion_policy
is set to STRICT
. Data is NOT ingested.
insert_stream#
FeatureGroup.insert_stream(
features,
query_name=None,
output_mode="append",
await_termination=False,
timeout=None,
checkpoint_dir=None,
write_options={},
)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
This method creates a long running Spark Streaming Query, you can control the termination of the query through the arguments.
It is possible to stop the returned query with the .stop()
and check its
status with .isActive
.
To get a list of all active queries, use:
sqm = spark.streams
# get the list of active streaming queries
[q.name for q in sqm.active]
Engine Support
Spark only
Stream ingestion using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.
Data Validation Support
insert_stream
does not perform any data validation using Great Expectations
even when a expectation suite is attached.
Arguments
- features
pyspark.sql.DataFrame
: Features in Streaming Dataframe to be saved. - query_name
Optional[str]
: It is possible to optionally specify a name for the query to make it easier to recognise in the Spark UI. Defaults toNone
. - output_mode
Optional[str]
: Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. (1)"append"
: Only the new rows in the streaming DataFrame/Dataset will be written to the sink. (2)"complete"
: All the rows in the streaming DataFrame/Dataset will be written to the sink every time there is some update. (3)"update"
: only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. If the query doesn’t contain aggregations, it will be equivalent to append mode. Defaults to"append"
. - await_termination
Optional[bool]
: Waits for the termination of this query, either by query.stop() or by an exception. If the query has terminated with an exception, then the exception will be thrown. If timeout is set, it returns whether the query has terminated or not within the timeout seconds. Defaults toFalse
. - timeout
Optional[int]
: Only relevant in combination withawait_termination=True
. Defaults toNone
. - checkpoint_dir
Optional[str]
: Checkpoint directory location. This will be used to as a reference to from where to resume the streaming job. IfNone
then hsfs will construct as "insert_stream_" + online_topic_name. Defaults toNone
. write_options: Additional write options for Spark as key-value pairs. Defaults to{}
.
Returns
StreamingQuery
: Spark Structured Streaming Query object.
json#
FeatureGroup.json()
Get specific Feature Group metadata in json format.
Example
fg.json()
multi_part_insert#
FeatureGroup.multi_part_insert(
features=None,
overwrite=False,
operation="upsert",
storage=None,
write_options={},
validation_options={},
)
Get FeatureGroupWriter for optimized multi part inserts or call this method to start manual multi part optimized inserts.
In use cases where very small batches (1 to 1000) rows per Dataframe need
to be written to the feature store repeatedly, it might be inefficient to use
the standard feature_group.insert()
method as it performs some background
actions to update the metadata of the feature group object first.
For these cases, the feature group provides the multi_part_insert
API,
which is optimized for writing many small Dataframes after another.
There are two ways to use this API:
Python Context Manager
Using the Python with
syntax you can acquire a FeatureGroupWriter
object that implements the same multi_part_insert
API.
feature_group = fs.get_or_create_feature_group("fg_name", version=1)
with feature_group.multi_part_insert() as writer:
# run inserts in a loop:
while loop:
small_batch_df = ...
writer.insert(small_batch_df)
Multi part insert with manual context management
Instead of letting Python handle the entering and exiting of the multi part insert context, you can start and finalize the context manually.
feature_group = fs.get_or_create_feature_group("fg_name", version=1)
while loop:
small_batch_df = ...
feature_group.multi_part_insert(small_batch_df)
# IMPORTANT: finalize the multi part insert to make sure all rows
# have been transmitted
feature_group.finalize_multi_part_insert()
multi_part_insert
initiates the context
and be sure to finalize it. The finalize_multi_part_insert
is a
blocking call that returns once all rows have been transmitted.
Once you are done with the multi part insert, it is good practice to start the materialization job in order to write the data to the offline storage:
feature_group.materialization_job.run(await_termination=True)
Arguments
- features
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]]
: DataFrame, RDD, Ndarray, list. Features to be saved. - overwrite
Optional[bool]
: Drop all data in the feature group before inserting new data. This does not affect metadata, defaults to False. - operation
Optional[str]
: Apache Hudi operation type"insert"
or"upsert"
. Defaults to"upsert"
. - storage
Optional[str]
: Overwrite default behaviour, write to offline storage only with"offline"
or online only with"online"
, defaults toNone
. - write_options
Optional[Dict[str, Any]]
: Additional write options as key-value pairs, defaults to{}
. When using thepython
engine, write_options can contain the following entries:- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits. - key
start_offline_backfill
and valueTrue
orFalse
to configure whether or not to start the materialization job to write data to the offline storage.start_offline_backfill
is deprecated. Usestart_offline_materialization
instead. - key
start_offline_materialization
and valueTrue
orFalse
to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job does not get started automatically for multi part inserts. - key
kafka_producer_config
and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties. - key
internal_kafka
and valueTrue
orFalse
in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults toFalse
and will use external listeners when connecting from outside of Hopsworks.
- key
- validation_options
Optional[Dict[str, Any]]
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
save_report
boolean value, set toFalse
to skip upload of the validation report to Hopsworks. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations. - key
fetch_expectation_suite
a boolean value, by defaultFalse
for multi part inserts, to control whether the expectation suite of the feature group should be fetched before every insert.
- key
Returns
(Job
, ValidationReport
) A tuple with job information if python engine is used and the validation report if validation is enabled.
FeatureGroupWriter
When used as a context manager with Python with
statement.
read#
FeatureGroup.read(wallclock_time=None, online=False, dataframe_type="default", read_options={})
Read the feature group into a dataframe.
Reads the feature group by default from the offline storage as Spark DataFrame on Hopsworks and Databricks, and as Pandas dataframe on AWS Sagemaker and pure Python environments.
Set online
to True
to read from the online storage, or change
dataframe_type
to read as a different format.
Read feature group as of latest state:
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.read()
Read feature group as of specific point in time:
fg = fs.get_or_create_feature_group(...)
fg.read("2020-10-20 07:34:11")
Arguments
- wallclock_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: If specified will retrieve feature group as of specific point in time. Defaults toNone
. If not specified, will return as of most recent time. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. - online
Optional[bool]
: bool, optional. IfTrue
read from online feature store, defaults toFalse
. - dataframe_type
Optional[str]
: str, optional. Possible values are"default"
,"spark"
,"pandas"
,"numpy"
or"python"
, defaults to"default"
. - read_options
Optional[dict]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. For python engine:- key
"use_hive"
and valueTrue
to read feature group with Hive instead of Hopsworks Feature Query Service. - key
"arrow_flight_config"
to pass a dictionary of arrow flight configurations. For example:{"arrow_flight_config": {"timeout": 900}}
- key
"hive_config"
to pass a dictionary of hive or tez configurations. For example:{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}
- key
"pandas_types"
and valueTrue
to retrieve columns as Pandas nullable types rather than numpy/object(string) types (experimental). Defaults to{}
.
- key
Returns
DataFrame
: The spark dataframe containing the feature data.
pyspark.DataFrame
. A Spark DataFrame.
pandas.DataFrame
. A Pandas DataFrame.
numpy.ndarray
. A two-dimensional Numpy array.
list
. A two-dimensional Python list.
Raises
hsfs.client.exceptions.RestAPIError
. No data is available for feature group with this commit date, If time travel enabled.
read_changes#
FeatureGroup.read_changes(start_wallclock_time, end_wallclock_time, read_options={})
Reads updates of this feature that occurred between specified points in time.
Deprecated
`read_changes` method is deprecated. Use
`as_of(end_wallclock_time, exclude_until=start_wallclock_time).read(read_options=read_options)`
instead.
This function only works on feature groups with HUDI
time travel format.
Arguments
- start_wallclock_time
Union[str, int, datetime.datetime, datetime.date]
: Start time of the time travel query. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. - end_wallclock_time
Union[str, int, datetime.datetime, datetime.date]
: End time of the time travel query. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. - read_options
Optional[dict]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. For python engine:- key
"hive_config"
to pass a dictionary of hive or tez configurations. For example:{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}
Defaults to{}
.
- key
Returns
DataFrame
. The spark dataframe containing the incremental changes of
feature data.
Raises
hsfs.client.exceptions.RestAPIError
. No data is available for feature group with this commit date.
hsfs.client.exceptions.FeatureStoreException
. If the feature group does not have HUDI
time travel format
save#
FeatureGroup.save(features=None, write_options={}, validation_options={}, wait=False)
Persist the metadata and materialize the feature group to the feature store.
Changed in 3.3.0
insert
and save
methods are now async by default in non-spark clients.
To achieve the old behaviour, set wait
argument to True
.
Calling save
creates the metadata for the feature group in the feature store.
If a DataFrame, RDD or Ndarray is provided, the data is written to the
online/offline feature store as specified.
By default, this writes the feature group to the offline storage, and if
online_enabled
for the feature group, also to the online feature store.
The features
dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame,
or a two-dimensional Numpy array or a two-dimensional Python nested list.
Arguments
- features
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[hsfs.feature.Feature]]]
: DataFrame, RDD, Ndarray or a list of features. Features to be saved. This argument is optional if the feature list is provided in the create_feature_group or in the get_or_create_feature_group method invokation. - write_options
Optional[Dict[Any, Any]]
: Additional write options as key-value pairs, defaults to{}
. When using thepython
engine, write_options can contain the following entries:- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the save call should return only after the Hopsworks Job has finished. By default it does not wait. - key
start_offline_backfill
and valueTrue
orFalse
to configure whether or not to start the materialization job to write data to the offline storage.start_offline_backfill
is deprecated. Usestart_offline_materialization
instead. - key
start_offline_materialization
and valueTrue
orFalse
to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job gets started immediately. - key
kafka_producer_config
and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection, consider changing the producer properties. - key
internal_kafka
and valueTrue
orFalse
in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults toFalse
and will use external listeners when connecting from outside of Hopsworks.
- key
- validation_options
Optional[Dict[Any, Any]]
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
save_report
boolean value, set toFalse
to skip upload of the validation report to Hopsworks. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations.
- key
- wait
bool
: Wait for job to finish before returning, defaults toFalse
. Shortcut for read_options{"wait_for_job": False}
.
Returns
Job
: When using the python
engine, it returns the Hopsworks Job
that was launched to ingest the feature group data.
Raises
hsfs.client.exceptions.RestAPIError
. Unable to create feature group.
save_expectation_suite#
FeatureGroup.save_expectation_suite(
expectation_suite, run_validation=True, validation_ingestion_policy="ALWAYS", overwrite=False
)
Attach an expectation suite to a feature group and saves it for future use. If an expectation suite is already attached, it is replaced. Note that the provided expectation suite is modified inplace to include expectationId fields.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.save_expectation_suite(expectation_suite, run_validation=True)
Arguments
- expectation_suite
Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]
: The expectation suite to attach to the Feature Group. - overwrite
bool
: If an Expectation Suite is already attached, overwrite it. The new suite will have its own validation history, but former reports are preserved. - run_validation
bool
: Set whether the expectation_suite will run on ingestion - validation_ingestion_policy
str
: Set the policy for ingestion to the Feature Group.- "STRICT" only allows DataFrame passing validation to be inserted into Feature Group.
- "ALWAYS" always insert the DataFrame to the Feature Group, irrespective of overall validation result.
Raises
hsfs.client.exceptions.RestAPIError
.
save_validation_report#
FeatureGroup.save_validation_report(
validation_report, ingestion_result="UNKNOWN", ge_type=True
)
Save validation report to hopsworks platform along previous reports of the same Feature Group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(..., expectation_suite=expectation_suite)
validation_report = great_expectations.from_pandas(
my_experimental_features_df,
fg.get_expectation_suite()).validate()
fg.save_validation_report(validation_report, ingestion_result="EXPERIMENT")
Arguments
- validation_report
Union[dict, hsfs.validation_report.ValidationReport, great_expectations.core.expectation_validation_result.ExpectationSuiteValidationResult]
: The validation report to attach to the Feature Group. - ingestion_result
str
: Specify the fate of the associated data, defaults to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" for testing and development and "FG_DATA" when validating data already in the Feature Group. - ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
.
Raises
hsfs.client.exceptions.RestAPIError
.
select#
FeatureGroup.select(features)
Select a subset of features of the feature group and return a query object.
The query can be used to construct joins of feature groups or create a feature view with a subset of features of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
from hsfs.feature import Feature
fg = fs.create_feature_group(
"fg",
features=[
Feature("id", type="string"),
Feature("ts", type="bigint"),
Feature("f1", type="date"),
Feature("f2", type="double")
],
primary_key=["id"],
event_time="ts")
# construct query
query = fg.select(["id", "f1"])
query.features
# [Feature('id', ...), Feature('f1', ...)]
Arguments
- features
List[Union[str, hsfs.feature.Feature]]
: A list ofFeature
objects or feature names as strings to be selected.
Returns
Query
: A query object with the selected features of the feature group.
select_all#
FeatureGroup.select_all(include_primary_key=True, include_event_time=True)
Select all features in the feature group and return a query object.
The query can be used to construct joins of feature groups or create a feature view.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instances
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)
# construct the query
query = fg1.select_all().join(fg2.select_all())
# show first 5 rows
query.show(5)
# select all features exclude primary key and event time
from hsfs.feature import Feature
fg = fs.create_feature_group(
"fg",
features=[
Feature("id", type="string"),
Feature("ts", type="bigint"),
Feature("f1", type="date"),
Feature("f2", type="double")
],
primary_key=["id"],
event_time="ts")
query = fg.select_all()
query.features
# [Feature('id', ...), Feature('ts', ...), Feature('f1', ...), Feature('f2', ...)]
query = fg.select_all(include_primary_key=False, include_event_time=False)
query.features
# [Feature('f1', ...), Feature('f2', ...)]
Arguments
- include_primary_key
Optional[bool]
: If True, include primary key of the feature group to the feature list. Defaults to True. - include_event_time
Optional[bool]
: If True, include event time of the feature group to the feature list. Defaults to True.
Returns
Query
. A query object with all features of the feature group.
select_except#
FeatureGroup.select_except(features=[])
Select all features including primary key and event time feature
of the feature group except provided features
and return a query object.
The query can be used to construct joins of feature groups or create a feature view with a subset of features of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
from hsfs.feature import Feature
fg = fs.create_feature_group(
"fg",
features=[
Feature("id", type="string"),
Feature("ts", type="bigint"),
Feature("f1", type="date"),
Feature("f2", type="double")
],
primary_key=["id"],
event_time="ts")
# construct query
query = fg.select_except(["ts", "f1"])
query.features
# [Feature('id', ...), Feature('f1', ...)]
Arguments
- features
Optional[List[Union[str, hsfs.feature.Feature]]]
: A list ofFeature
objects or feature names as strings to be excluded from the selection. Defaults to [], selecting all features.
Returns
Query
: A query object with the selected features of the feature group.
show#
FeatureGroup.show(n, online=False)
Show the first n
rows of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# make a query and show top 5 rows
fg.select(['date','weekly_sales','is_holiday']).show(5)
Arguments
- n
int
: int. Number of rows to show. - online
Optional[bool]
: bool, optional. IfTrue
read from online feature store, defaults toFalse
.
to_dict#
FeatureGroup.to_dict()
Get structured info about specific Feature Group in python dictionary format.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.to_dict()
update_deprecated#
FeatureGroup.update_deprecated(deprecate=True)
Deprecate the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.update_deprecated(deprecate=True)
Safe update
This method updates the feature group safely. In case of failure your local metadata object will be kept unchanged.
Arguments
- deprecate
bool
: Boolean value identifying if the feature group should be deprecated. Defaults to True.
Returns
FeatureGroup
. The updated feature group object.
update_description#
FeatureGroup.update_description(description)
Update the description of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.update_description(description="Much better description.")
Safe update
This method updates the feature group description safely. In case of failure your local metadata object will keep the old description.
Arguments
- description
str
: New description string.
Returns
FeatureGroup
. The updated feature group object.
update_feature_description#
FeatureGroup.update_feature_description(feature_name, description)
Update the description of a single feature in this feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.update_feature_description(feature_name="min_temp",
description="Much better feature description.")
Safe update
This method updates the feature description safely. In case of failure your local metadata object will keep the old description.
Arguments
- feature_name
str
: Name of the feature to be updated. - description
str
: New description string.
Returns
FeatureGroup
. The updated feature group object.
update_features#
FeatureGroup.update_features(features)
Update metadata of features in this feature group.
Currently it's only supported to update the description of a feature.
Unsafe update
Note that if you use an existing Feature
object of the schema in the
feature group metadata object, this might leave your metadata object in a
corrupted state if the update fails.
Arguments
- features
Union[hsfs.feature.Feature, List[hsfs.feature.Feature]]
:Feature
or list of features. A feature object or list thereof to be updated.
Returns
FeatureGroup
. The updated feature group object.
update_from_response_json#
FeatureGroup.update_from_response_json(json_dict)
update_notification_topic_name#
FeatureGroup.update_notification_topic_name(notification_topic_name)
Update the notification topic name of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.update_notification_topic_name(notification_topic_name="notification_topic_name")
Safe update
This method updates the feature group notification topic name safely. In case of failure your local metadata object will keep the old notification topic name.
Arguments
- notification_topic_name
str
: Name of the topic used for sending notifications when entries are inserted or updated on the online feature store. If set to None no notifications are sent.
Returns
FeatureGroup
. The updated feature group object.
update_statistics_config#
FeatureGroup.update_statistics_config()
Update the statistics configuration of the feature group.
Change the statistics_config
object and persist the changes by calling
this method.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.update_statistics_config()
Returns
FeatureGroup
. The updated metadata object of the feature group.
Raises
hsfs.client.exceptions.RestAPIError
.
hsfs.client.exceptions.FeatureStoreException
.
validate#
FeatureGroup.validate(
dataframe=None,
expectation_suite=None,
save_report=False,
validation_options={},
ingestion_result="UNKNOWN",
ge_type=True,
)
Run validation based on the attached expectations.
Runs any expectation attached with Deequ. But also runs attached Great Expectation Suites.
Example
# connect to the Feature Store
fs = ...
# get feature group instance
fg = fs.get_or_create_feature_group(...)
ge_report = fg.validate(df, save_report=False)
Arguments
- dataframe
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame]]
: The dataframe to run the data validation expectations against. - expectation_suite
Optional[hsfs.expectation_suite.ExpectationSuite]
: Optionally provide an Expectation Suite to override the one that is possibly attached to the feature group. This is useful for testing new Expectation suites. When an extra suite is provided, the results will never be persisted. Defaults toNone
. - validation_options
Optional[Dict[Any, Any]]
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations.
- key
- ingestion_result
str
: Specify the fate of the associated data, defaults to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" for testing and development and "FG_DATA" when validating data already in the Feature Group. - save_report
Optional[bool]
: Whether to save the report to the backend. This is only possible if the Expectation suite is initialised and attached to the Feature Group. Defaults to False. - ge_type
bool
: Whether to return a Great Expectations object or Hopsworks own abstraction. Defaults to True.
Returns
A Validation Report produced by Great Expectations.