FeatureGroup#
FeatureGroup#
hsfs.feature_group.FeatureGroup(
name,
version,
featurestore_id,
description="",
partition_key=None,
primary_key=None,
hudi_precombine_key=None,
featurestore_name=None,
created=None,
creator=None,
id=None,
features=None,
location=None,
online_enabled=False,
time_travel_format=None,
statistics_config=None,
online_topic_name=None,
event_time=None,
stream=False,
expectation_suite=None,
)
Creation#
create_feature_group#
FeatureStore.create_feature_group(
name,
version=None,
description="",
online_enabled=False,
time_travel_format="HUDI",
partition_key=[],
primary_key=[],
hudi_precombine_key=None,
features=[],
statistics_config=None,
event_time=None,
stream=False,
expectation_suite=None,
)
Create a feature group metadata object.
Lazy
This method is lazy and does not persist any metadata or feature data in the
feature store on its own. To persist the feature group and save feature data
along the metadata in the feature store, call the save()
method with a
DataFrame.
Arguments
- name
str
: Name of the feature group to create. - version
Optional[int]
: Version of the feature group to retrieve, defaults toNone
and will create the feature group with incremented version from the last version in the feature store. - description
Optional[str]
: A string describing the contents of the feature group to improve discoverability for Data Scientists, defaults to empty string""
. - online_enabled
Optional[bool]
: Define whether the feature group should be made available also in the online feature store for low latency access, defaults toFalse
. - time_travel_format
Optional[str]
: Format used for time travel, defaults to"HUDI"
. - partition_key
Optional[List[str]]
: A list of feature names to be used as partition key when writing the feature data to the offline storage, defaults to empty list[]
. - primary_key
Optional[List[str]]
: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list[]
, and the feature group won't have any primary key. - hudi_precombine_key
Optional[str]
: A feature name to be used as a precombine key for the"HUDI"
feature group. Defaults toNone
. If feature group has time travel format"HUDI"
and hudi precombine key was not specified then the first primary key of the feature group will be used as hudi precombine key. - features
Optional[List[hsfs.feature.Feature]]
: Optionally, define the schema of the feature group manually as a list ofFeature
objects. Defaults to empty list[]
and will use the schema information of the DataFrame provided in thesave
method. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation,"histograms"
to compute feature value frequencies and"exact_uniqueness"
to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - event_time
Optional[str]
: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults toNone
. - stream
Optional[bool]
: Optionally, Define whether the feature group should support real time stream writing capabilities. Stream enabled Feature Groups have unified single API for writing streaming features transparently to both online and offline store. - expectation_suite
Optional[Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]]
: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults toNone
.
Returns
FeatureGroup
. The feature group metadata object.
Retrieval#
get_feature_group#
FeatureStore.get_feature_group(name, version=None)
Get a feature group entity from the feature store.
Getting a feature group from the Feature Store means getting its metadata handle
so you can subsequently read the data into a Spark or Pandas DataFrame or use
the Query
-API to perform joins between feature groups.
Arguments
- name
str
: Name of the feature group to get. - version
Optional[int]
: Version of the feature group to retrieve, defaults toNone
and will return theversion=1
.
Returns
FeatureGroup
: The feature group metadata object.
Raises
RestAPIError
: If unable to retrieve feature group from the feature store.
Properties#
avro_schema#
Avro schema representation of the feature group.
created#
Timestamp when the feature group was created.
creator#
Username of the creator.
description#
Description of the feature group contents.
event_time#
Event time feature in the feature group.
expectation_suite#
Expectation Suite configuration object defining the settings for data validation of the feature group.
feature_store_id#
feature_store_name#
Name of the feature store in which the feature group is located.
features#
Schema information.
hudi_precombine_key#
Feature name that is the hudi precombine key.
id#
Feature group id.
location#
name#
Name of the feature group.
online_enabled#
Setting if the feature group is available in online storage.
partition_key#
List of features building the partition key.
primary_key#
List of features building the primary key.
statistics#
Get the latest computed statistics for the feature group.
statistics_config#
Statistics configuration object defining the settings for statistics computation of the feature group.
stream#
Whether to enable real time stream writing capabilities.
time_travel_format#
Setting of the feature group time travel format.
version#
Version number of the feature group.
Methods#
add_tag#
FeatureGroup.add_tag(name, value)
Attach a tag to a feature group.
A tag consists of a
Arguments
- name
str
: Name of the tag to be added. - value: Value of the tag to be added.
Raises
RestAPIError
in case the backend fails to add the tag.
append_features#
FeatureGroup.append_features(features)
Append features to the schema of the feature group.
Safe append
This method appends the features to the feature group description safely. In case of failure your local metadata object will contain the correct schema.
It is only possible to append features to a feature group. Removing features is considered a breaking change.
Arguments
- features
Union[hsfs.feature.Feature, List[hsfs.feature.Feature]]
: Feature or list. A feature object or list thereof to append to the schema of the feature group.
Returns
FeatureGroup
. The updated feature group object.
as_of#
FeatureGroup.as_of(wallclock_time)
Get Query object to retrieve all features of the group at a point in the past.
This method selects all features in the feature group and returns a Query object at the specified point in time. This can then either be read into a Dataframe or used further to perform joins or construct a training dataset.
Arguments
- wallclock_time: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the
following formats
%Y%m%d
,%Y%m%d%H
,%Y%m%d%H%M
, or%Y%m%d%H%M%S
.
Returns
Query
. The query object with the applied time travel condition.
commit_delete_record#
FeatureGroup.commit_delete_record(delete_df, write_options={})
Drops records present in the provided DataFrame and commits it as update to this Feature group. This method can only be used on time travel enabled feature groups
Arguments
- delete_df
pyspark.sql.DataFrame
: dataFrame containing records to be deleted. - write_options
Optional[Dict[Any, Any]]
: User provided write options. Defaults to{}
.
Raises
RestAPIError
.
commit_details#
FeatureGroup.commit_details(wallclock_time=None, limit=None)
Retrieves commit timeline for this feature group. This method can only be used on time travel enabled feature groups
Arguments
- wallclock_time
Optional[str]
: Commit details as of specific point in time. Defaults toNone
. - limit
Optional[int]
: Number of commits to retrieve. Defaults toNone
. datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the following formats%Y%m%d
,%Y%m%d%H
,%Y%m%d%H%M
,%Y%m%d%H%M%S
, or%Y%m%d%H%M%S%f
.
Returns
Dict[str, Dict[str, str]]
. Dictionary object of commit metadata timeline, where Key is commit id and value
is Dict[str, str]
with key value pairs of date committed on, number of rows updated, inserted and deleted.
Raises
RestAPIError
.
FeatureStoreException
. If the feature group does not have HUDI
time travel format
compute_statistics#
FeatureGroup.compute_statistics(wallclock_time=None)
Recompute the statistics for the feature group and save them to the feature store.
Statistics are only computed for data in the offline storage of the feature group.
Arguments
- wallclock_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the following formats%Y%m%d
,%Y%m%d%H
,%Y%m%d%H%M
,%Y%m%d%H%M%S
, or%Y%m%d%H%M%S%f
. If specified will recompute statistics on feature group as of specific point in time. If not specified then will compute statistics as of most recent time of this feature group. Defaults toNone
.
Returns
Statistics
. The statistics metadata object.
Raises
RestAPIError
. Unable to persist the statistics.
delete#
FeatureGroup.delete()
Drop the entire feature group along with its feature data.
Potentially dangerous operation
This operation drops all metadata associated with this version of the feature group and all the feature data in offline and online storage associated with it.
Raises
RestAPIError
.
delete_expectation_suite#
FeatureGroup.delete_expectation_suite()
Delete the expectation suite attached to the featuregroup.
Raises
RestAPIException
.
delete_tag#
FeatureGroup.delete_tag(name)
Delete a tag attached to a feature group.
Arguments
- name
str
: Name of the tag to be removed.
Raises
RestAPIError
in case the backend fails to delete the tag.
filter#
FeatureGroup.filter(f)
Apply filter to the feature group.
Selects all features and returns the resulting Query
with the applied filter.
from hsfs.feature import Feature
fg.filter(Feature("weekly_sales") > 1000)
If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:
fg.filter(fg.feature1 == 1).show(10)
Composite filters require parenthesis:
fg.filter((fg.feature1 == 1) | (fg.feature2 >= 2))
Arguments
- f
Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]
: Filter object.
Returns
Query
. The query object with the applied filter.
from_response_json#
FeatureGroup.from_response_json(json_dict)
get_all_validation_reports#
FeatureGroup.get_all_validation_reports(ge_type=True)
Return the latest validation report attached to the feature group if it exists.
Arguments
- ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
.
Returns
ValidationReport
. The latest validation report attached to the feature group.
Raises
RestAPIException
.
get_complex_features#
FeatureGroup.get_complex_features()
Returns the names of all features with a complex data type in this feature group.
get_expectation_suite#
FeatureGroup.get_expectation_suite(ge_type=True)
Return the expectation suite attached to the feature group if it exists.
Arguments
- ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
.
Returns
ExpectationSuite
. The expectation suite attached to the feature group.
Raises
RestAPIException
.
get_feature#
FeatureGroup.get_feature(name)
Retrieve a Feature
object from the schema of the feature group.
There are several ways to access features of a feature group:
fg.feature1
fg["feature1"]
fg.get_feature("feature1")
Note
Attribute access to features works only for non-reserved names. For example
features named id
or name
will not be accessible via fg.name
, instead
this will return the name of the feature group itself. Fall back on using
the get_feature
method.
Args: name (str): [description]
Returns: [type]: [description]
get_latest_validation_report#
FeatureGroup.get_latest_validation_report(ge_type=True)
Return the latest validation report attached to the feature group if it exists.
Arguments
- ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
.
Returns
ValidationReport
. The latest validation report attached to the feature group.
Raises
RestAPIException
.
get_statistics#
FeatureGroup.get_statistics(commit_time=None)
Returns the statistics for this feature group at a specific time.
If commit_time
is None
, the most recent statistics are returned.
Arguments
- commit_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the following formats%Y%m%d
,%Y%m%d%H
,%Y%m%d%H%M
,%Y%m%d%H%M%S
, or%Y%m%d%H%M%S%f
. Defaults toNone
. Defaults toNone
.
Returns
Statistics
. Statistics object.
Raises
RestAPIError
.
get_tag#
FeatureGroup.get_tag(name)
Get the tags of a feature group.
Arguments
- name
str
: Name of the tag to get.
Returns
tag value
Raises
RestAPIError
in case the backend fails to retrieve the tag.
get_tags#
FeatureGroup.get_tags()
Retrieves all tags attached to a feature group.
Returns
Dict[str, obj]
of tags.
Raises
RestAPIError
in case the backend fails to retrieve the tags.
insert#
FeatureGroup.insert(
features,
overwrite=False,
operation="upsert",
storage=None,
write_options={},
validation_options={},
)
Persist the metadata and materialize the feature group to the feature store or insert data from a dataframe into the existing feature group.
Incrementally insert data to a feature group or overwrite all data contained in the feature group. By
default, the data is inserted into the offline storag as well as the online storage if the feature group is
online_enabled=True
. To insert only into the online storage, set storage="online"
, or oppositely
storage="offline"
.
The features
dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame,
or a two-dimensional Numpy array or a two-dimensional Python nested list.
If statistics are enabled, statistics are recomputed for the entire feature
group.
If feature group's time travel format is HUDI
then operation
argument can be
either insert
or upsert
.
If feature group doesn't exists the insert method will create the necessary metadata the first time it is
invoked and writes the specified features
dataframe as feature group to the online/offline feature store.
Upsert new feature data with time travel format HUDI
:
fs = conn.get_feature_store();
fg = fs.get_feature_group("example_feature_group", 1)
upsert_df = ...
fg.insert(upsert_df)
Arguments
- features
Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]
: DataFrame, RDD, Ndarray, list. Features to be saved. - overwrite
Optional[bool]
: Drop all data in the feature group before inserting new data. This does not affect metadata, defaults to False. - operation
Optional[str]
: Apache Hudi operation type"insert"
or"upsert"
. Defaults to"upsert"
. - storage
Optional[str]
: Overwrite default behaviour, write to offline storage only with"offline"
or online only with"online"
, defaults toNone
. - write_options
Optional[Dict[Any, Any]]
: Additional write options as key-value pairs, defaults to{}
. When using thepython
engine, write_options can contain the following entries:- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits. - key
start_offline_backfill
and valueTrue
orFalse
to configure whether or not to start the backfill job to write data to the offline storage. By default the backfill job gets started immediately. - key
internal_kafka
and valueTrue
orFalse
in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults toFalse
and will use external listeners when connecting from outside of Hopsworks.
- key
- validation_options
Optional[Dict[Any, Any]]
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
save_report
boolean value, set toFalse
to skip upload of the validation report to Hopsworks. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations.
- key
Returns
FeatureGroup
. Updated feature group metadata object.
insert_stream#
FeatureGroup.insert_stream(
features,
query_name=None,
output_mode="append",
await_termination=False,
timeout=None,
checkpoint_dir=None,
write_options={},
)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
This method creates a long running Spark Streaming Query, you can control the termination of the query through the arguments.
It is possible to stop the returned query with the .stop()
and check its
status with .isActive
.
To get a list of all active queries, use:
sqm = spark.streams
# get the list of active streaming queries
[q.name for q in sqm.active]
Engine Support
Spark only
Stream ingestion using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.
Data Validation Support
insert_stream
does not perform any data validation using Great Expectations
even when a expectation suite is attached.
Arguments
- features
pyspark.sql.DataFrame
: Features in Streaming Dataframe to be saved. - query_name
Optional[str]
: It is possible to optionally specify a name for the query to make it easier to recognise in the Spark UI. Defaults toNone
. - output_mode
Optional[str]
: Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. (1)"append"
: Only the new rows in the streaming DataFrame/Dataset will be written to the sink. (2)"complete"
: All the rows in the streaming DataFrame/Dataset will be written to the sink every time there is some update. (3)"update"
: only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. If the query doesn’t contain aggregations, it will be equivalent to append mode. Defaults to"append"
. - await_termination
Optional[bool]
: Waits for the termination of this query, either by query.stop() or by an exception. If the query has terminated with an exception, then the exception will be thrown. If timeout is set, it returns whether the query has terminated or not within the timeout seconds. Defaults toFalse
. - timeout
Optional[int]
: Only relevant in combination withawait_termination=True
. Defaults toNone
. - checkpoint_dir
Optional[str]
: Checkpoint directory location. This will be used to as a reference to from where to resume the streaming job. IfNone
then hsfs will construct as "insert_stream_" + online_topic_name. Defaults toNone
. write_options: Additional write options for Spark as key-value pairs. Defaults to{}
.
Returns
StreamingQuery
: Spark Structured Streaming Query object.
json#
FeatureGroup.json()
read#
FeatureGroup.read(wallclock_time=None, online=False, dataframe_type="default", read_options={})
Read the feature group into a dataframe.
Reads the feature group by default from the offline storage as Spark DataFrame on Hopsworks and Databricks, and as Pandas dataframe on AWS Sagemaker and pure Python environments.
Set online
to True
to read from the online storage, or change
dataframe_type
to read as a different format.
Read feature group as of latest state:
fs = connection.get_feature_store();
fg = fs.get_feature_group("example_feature_group", 1)
fg.read()
Read feature group as of specific point in time:
fs = connection.get_feature_store();
fg = fs.get_feature_group("example_feature_group", 1)
fg.read("2020-10-20 07:34:11")
Arguments
- wallclock_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the following formats%Y%m%d
,%Y%m%d%H
,%Y%m%d%H%M
,%Y%m%d%H%M%S
, or%Y%m%d%H%M%S%f
. If Specified will retrieve feature group as of specific point in time. If not specified will return as of most recent time. Defaults toNone
. - online
Optional[bool]
: bool, optional. IfTrue
read from online feature store, defaults toFalse
. - dataframe_type
Optional[str]
: str, optional. Possible values are"default"
,"spark"
,"pandas"
,"numpy"
or"python"
, defaults to"default"
. - read_options
Optional[dict]
: Additional read options as key/value pairs, defaults to{}
.
Returns
DataFrame
: The spark dataframe containing the feature data.
pyspark.DataFrame
. A Spark DataFrame.
pandas.DataFrame
. A Pandas DataFrame.
numpy.ndarray
. A two-dimensional Numpy array.
list
. A two-dimensional Python list.
Raises
RestAPIError
. No data is available for feature group with this commit date, If time travel enabled.
read_changes#
FeatureGroup.read_changes(start_wallclock_time, end_wallclock_time, read_options={})
Reads updates of this feature that occurred between specified points in time.
This function only works on feature groups with HUDI
time travel format.
Reading commits incrementally between specified points in time:
fs = connection.get_feature_store();
fg = fs.get_feature_group("example_feature_group", 1)
fg.read_changes("2020-10-20 07:31:38", "2020-10-20 07:34:11").show()
Arguments
- start_wallclock_time
Union[str, int, datetime.datetime, datetime.date]
: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the following formats%Y%m%d
,%Y%m%d%H
,%Y%m%d%H%M
,%Y%m%d%H%M%S
, or%Y%m%d%H%M%S%f
. - end_wallclock_time
Union[str, int, datetime.datetime, datetime.date]
: datatime.datetime, datetime.date, unix timestamp in seconds (int), or string. The String should be formatted in one of the following formats%Y%m%d
,%Y%m%d%H
,%Y%m%d%H%M
,%Y%m%d%H%M%S
, or%Y%m%d%H%M%S%f
. - read_options
Optional[dict]
: User provided read options. Defaults to{}
.
Returns
DataFrame
. The spark dataframe containing the incremental changes of
feature data.
Raises
RestAPIError
. No data is available for feature group with this commit date.
FeatureStoreException
. If the feature group does not have HUDI
time travel format
save#
FeatureGroup.save(features, write_options={}, validation_options={})
Persist the metadata and materialize the feature group to the feature store.
Deprecated
savemethod is deprecated. Use the
insert` method instead.
Calling save
creates the metadata for the feature group in the feature store
and writes the specified features
dataframe as feature group to the
online/offline feature store as specified.
By default, this writes the feature group to the offline storage, and if
online_enabled
for the feature group, also to the online feature store.
The features
dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame,
or a two-dimensional Numpy array or a two-dimensional Python nested list.
Arguments
- features
Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]
: Query, DataFrame, RDD, Ndarray, list. Features to be saved. - write_options
Optional[Dict[Any, Any]]
: Additional write options as key-value pairs, defaults to{}
. When using thepython
engine, write_options can contain the following entries:- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the save call should return only after the Hopsworks Job has finished. By default it waits. - key
start_offline_backfill
and valueTrue
orFalse
to configure whether or not to start the backfill job to write data to the offline storage. By default the backfill job gets started immediately. - key
internal_kafka
and valueTrue
orFalse
in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults toFalse
and will use external listeners when connecting from outside of Hopsworks.
- key
- validation_options
Optional[Dict[Any, Any]]
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
save_report
boolean value, set toFalse
to skip upload of the validation report to Hopsworks. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations.
- key
Returns
Job
: When using the python
engine, it returns the Hopsworks Job
that was launched to ingest the feature group data.
Raises
RestAPIError
. Unable to create feature group.
save_expectation_suite#
FeatureGroup.save_expectation_suite(
expectation_suite, run_validation=True, validation_ingestion_policy="ALWAYS"
)
Attach an expectation suite to a feature group and saves it for future use. If an expectation suite is already attached, it is replaced. Note that the provided expectation suite is modified inplace to include expectationId fields.
Arguments
- expectation_suite
Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]
: The expectation suite to attach to the featuregroup. - run_validation: Set whether the expectation_suite will run on ingestion
- validation_ingestion_policy: Set the policy for ingestion to the featuregroup.
- "STRICT" only allows DataFrame passing validation to be inserted into featuregroup.
- "ALWAYS" always insert the DataFrame to the featuregroup, irrespective of overall validation result.
Raises
RestAPIException
.
save_validation_report#
FeatureGroup.save_validation_report(validation_report, ge_type=True)
Save validation report to hopsworks platform along previous reports of the same featuregroup.
Arguments
- validation_report
Union[dict, hsfs.validation_report.ValidationReport, great_expectations.core.expectation_validation_result.ExpectationSuiteValidationResult]
: The validation report to attach to the featuregroup. - ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
.
Raises
RestAPIException
.
select#
FeatureGroup.select(features=[])
Select a subset of features of the feature group and return a query object.
The query can be used to construct joins of feature groups or create a training dataset with a subset of features of the feature group.
Arguments
- features
List[Union[str, hsfs.feature.Feature]]
: list, optional. A list ofFeature
objects or feature names as strings to be selected, defaults to [].
Returns
Query
: A query object with the selected features of the feature group.
select_all#
FeatureGroup.select_all()
Select all features in the feature group and return a query object.
The query can be used to construct joins of feature groups or create a training dataset immediately.
Returns
Query
. A query object with all features of the feature group.
select_except#
FeatureGroup.select_except(features=[])
Select all features of the feature group except a few and return a query object.
The query can be used to construct joins of feature groups or create a training dataset with a subset of features of the feature group.
Arguments
- features
List[Union[str, hsfs.feature.Feature]]
: list, optional. A list ofFeature
objects or feature names as strings to be excluded from the selection. Defaults to [], selecting all features.
Returns
Query
: A query object with the selected features of the feature group.
show#
FeatureGroup.show(n, online=False)
Show the first n
rows of the feature group.
Arguments
- n
int
: int. Number of rows to show. - online
Optional[bool]
: bool, optional. IfTrue
read from online feature store, defaults toFalse
.
to_dict#
FeatureGroup.to_dict()
update_description#
FeatureGroup.update_description(description)
Update the description of the feature group.
Safe update
This method updates the feature group description safely. In case of failure your local metadata object will keep the old description.
Arguments
- description
str
: New description string.
Returns
FeatureGroup
. The updated feature group object.
update_feature_description#
FeatureGroup.update_feature_description(feature_name, description)
Update the description of a single feature in this feature group.
Safe update
This method updates the feature description safely. In case of failure your local metadata object will keep the old description.
Arguments
- feature_name
str
: Name of the feature to be updated. - description
str
: New description string.
Returns
FeatureGroup
. The updated feature group object.
update_features#
FeatureGroup.update_features(features)
Update metadata of features in this feature group.
Currently it's only supported to update the description of a feature.
Unsafe update
Note that if you use an existing Feature
object of the schema in the
feature group metadata object, this might leave your metadata object in a
corrupted state if the update fails.
Arguments
- features
Union[hsfs.feature.Feature, List[hsfs.feature.Feature]]
:Feature
or list of features. A feature object or list thereof to be updated.
Returns
FeatureGroup
. The updated feature group object.
update_from_response_json#
FeatureGroup.update_from_response_json(json_dict)
update_statistics_config#
FeatureGroup.update_statistics_config()
Update the statistics configuration of the feature group.
Change the statistics_config
object and persist the changes by calling
this method.
Returns
FeatureGroup
. The updated metadata object of the feature group.
Raises
RestAPIError
.
validate#
FeatureGroup.validate(dataframe=None, save_report=False, validation_options={})
Run validation based on the attached expectations.
Runs any expectation attached with Deequ. But also runs attached Great Expectation Suites.
Arguments
- dataframe
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame]]
: The PySpark dataframe to run the data validation expectations against. - expectation_suite: Optionally provide an Expectation Suite to override the
one that is possibly attached to the feature group. This is useful for
testing new Expectation suites. When an extra suite is provided, the results
will never be persisted. Defaults to
None
. - validation_options
Optional[Dict[Any, Any]]
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
save_report
boolean value, set toFalse
to skip upload of the validation report to Hopsworks. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations.
- key
Returns
FeatureGroupValidation
, ValidationReport
. The feature group validation metadata object,
as well as the Validation Report produced by Great Expectations.