Skip to content

ExternalFeatureGroup#

[source]

ExternalFeatureGroup#

hsfs.feature_group.ExternalFeatureGroup(
    storage_connector,
    query=None,
    data_format=None,
    path=None,
    options={},
    name=None,
    version=None,
    description=None,
    primary_key=None,
    featurestore_id=None,
    featurestore_name=None,
    created=None,
    creator=None,
    id=None,
    features=None,
    location=None,
    statistics_config=None,
    event_time=None,
    expectation_suite=None,
    online_enabled=False,
    href=None,
    online_topic_name=None,
    topic_name=None,
    notification_topic_name=None,
    spine=False,
    deprecated=False,
    **kwargs
)

Creation#

[source]

create_external_feature_group#

FeatureStore.create_external_feature_group(
    name,
    storage_connector,
    query=None,
    data_format=None,
    path="",
    options={},
    version=None,
    description="",
    primary_key=[],
    features=[],
    statistics_config=None,
    event_time=None,
    expectation_suite=None,
    online_enabled=False,
    topic_name=None,
    notification_topic_name=None,
)

Create a external feature group metadata object.

Example

# connect to the Feature Store
fs = ...

external_fg = fs.create_external_feature_group(
                    name="sales",
                    version=1,
                    description="Physical shop sales features",
                    query=query,
                    storage_connector=connector,
                    primary_key=['ss_store_sk'],
                    event_time='sale_date'
                    )

Lazy

This method is lazy and does not persist any metadata in the feature store on its own. To persist the feature group metadata in the feature store, call the save() method.

You can enable online storage for external feature groups, however, the sync from the external storage to Hopsworks online storage needs to be done manually:

external_fg = fs.create_external_feature_group(
            name="sales",
            version=1,
            description="Physical shop sales features",
            query=query,
            storage_connector=connector,
            primary_key=['ss_store_sk'],
            event_time='sale_date',
            online_enabled=True
            )
external_fg.save()

# read from external storage and filter data to sync to online
df = external_fg.read().filter(external_fg.customer_status == "active")

# insert to online storage
external_fg.insert(df)

Arguments

  • name str: Name of the external feature group to create.
  • storage_connector hsfs.StorageConnector: the storage connector to use to establish connectivity with the data source.
  • query Optional[str]: A string containing a SQL query valid for the target data source. the query will be used to pull data from the data sources when the feature group is used.
  • data_format Optional[str]: If the external feature groups refers to a directory with data, the data format to use when reading it
  • path Optional[str]: The location within the scope of the storage connector, from where to read the data for the external feature group
  • options Optional[Dict[str, str]]: Additional options to be used by the engine when reading data from the specified storage connector. For example, {"header": True} when reading CSV files with column names in the first row.
  • version Optional[int]: Version of the external feature group to retrieve, defaults to None and will create the feature group with incremented version from the last version in the feature store.
  • description Optional[str]: A string describing the contents of the external feature group to improve discoverability for Data Scientists, defaults to empty string "".
  • primary_key Optional[List[str]]: A list of feature names to be used as primary key for the feature group. This primary key can be a composite key of multiple features and will be used as joining key, if not specified otherwise. Defaults to empty list [], and the feature group won't have any primary key.
  • features Optional[List[hsfs.feature.Feature]]: Optionally, define the schema of the external feature group manually as a list of Feature objects. Defaults to empty list [] and will use the schema information of the DataFrame resulting by executing the provided query against the data source.
  • statistics_config Optional[Union[hsfs.StatisticsConfig, bool, dict]]: A configuration object, or a dictionary with keys "enabled" to generally enable descriptive statistics computation for this external feature group, "correlations" to turn on feature correlation computation, "histograms" to compute feature value frequencies and "exact_uniqueness" to compute uniqueness, distinctness and entropy. The values should be booleans indicating the setting. To fully turn off statistics computation pass statistics_config=False. Defaults to None and will compute only descriptive statistics.
  • event_time Optional[str]: Optionally, provide the name of the feature containing the event time for the features in this feature group. If event_time is set the feature group can be used for point-in-time joins. Defaults to None.

    Event time data type restriction

    The supported data types for the event time column are: timestamp, date and bigint.

  • expectation_suite: Optionally, attach an expectation suite to the feature group which dataframes should be validated against upon insertion. Defaults to None.

  • online_enabled Optional[bool]: Define whether it should be possible to sync the feature group to the online feature store for low latency access, defaults to False.
  • topic_name Optional[str]: Optionally, define the name of the topic used for data ingestion. If left undefined it defaults to using project topic.
  • notification_topic_name Optional[str]: Optionally, define the name of the topic used for sending notifications when entries are inserted or updated on the online feature store. If left undefined no notifications are sent.

Returns

ExternalFeatureGroup. The external feature group metadata object.


Retrieval#

[source]

get_external_feature_group#

FeatureStore.get_external_feature_group(name, version=None)

Get a external feature group entity from the feature store.

Getting a external feature group from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame or use the Query-API to perform joins between feature groups.

Example

# connect to the Feature Store
fs = ...

external_fg = fs.get_external_feature_group("external_fg_test")

Arguments

  • name str: Name of the external feature group to get.
  • version int: Version of the external feature group to retrieve, defaults to None and will return the version=1.

Returns

ExternalFeatureGroup: The external feature group metadata object.

Raises

  • hsfs.client.exceptions.RestAPIError: If unable to retrieve feature group from the feature store.

Properties#

[source]

avro_schema#

Avro schema representation of the feature group.


[source]

created#


[source]

creator#


[source]

data_format#


[source]

deprecated#

Setting if the feature group is deprecated.


[source]

description#


[source]

embedding_index#


[source]

event_time#

Event time feature in the feature group.


[source]

expectation_suite#

Expectation Suite configuration object defining the settings for data validation of the feature group.


[source]

feature_store#


[source]

feature_store_id#


[source]

feature_store_name#

Name of the feature store in which the feature group is located.


[source]

features#

Feature Group schema (alias)


[source]

id#


[source]

location#


[source]

name#

Name of the feature group.


[source]

notification_topic_name#

The topic used for feature group notifications.


[source]

online_enabled#

Setting if the feature group is available in online storage.


[source]

options#


[source]

path#


[source]

primary_key#

List of features building the primary key.


[source]

query#


[source]

schema#

Feature Group schema


[source]

statistics#

Get the latest computed statistics for the whole feature group.

Raises

hsfs.client.exceptions.FeatureStoreException.


[source]

statistics_config#

Statistics configuration object defining the settings for statistics computation of the feature group.

Raises

hsfs.client.exceptions.FeatureStoreException.


[source]

storage_connector#


[source]

subject#

Subject of the feature group.


[source]

topic_name#

The topic used for feature group data ingestion.


[source]

version#

Version number of the feature group.


Methods#

[source]

add_tag#

ExternalFeatureGroup.add_tag(name, value)

Attach a tag to a feature group.

A tag consists of a pair. Tag names are unique identifiers across the whole cluster. The value of a tag can be any valid json - primitives, arrays or json objects.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.add_tag(name="example_tag", value="42")

Arguments

  • name str: Name of the tag to be added.
  • value: Value of the tag to be added.

Raises

hsfs.client.exceptions.RestAPIError in case the backend fails to add the tag.


[source]

append_features#

ExternalFeatureGroup.append_features(features)

Append features to the schema of the feature group.

Example

# connect to the Feature Store
fs = ...

# define features to be inserted in the feature group
features = [
    Feature(name="id",type="int",online_type="int"),
    Feature(name="name",type="string",online_type="varchar(20)")
]

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.append_features(features)

Safe append

This method appends the features to the feature group description safely. In case of failure your local metadata object will contain the correct schema.

It is only possible to append features to a feature group. Removing features is considered a breaking change.

Arguments

  • features Union[hsfs.feature.Feature, List[hsfs.feature.Feature]]: Feature or list. A feature object or list thereof to append to the schema of the feature group.

Returns

FeatureGroup. The updated feature group object.


[source]

check_deprecated#

ExternalFeatureGroup.check_deprecated()

[source]

compute_statistics#

ExternalFeatureGroup.compute_statistics()

Recompute the statistics for the feature group and save them to the feature store. Statistics are only computed for data in the offline storage of the feature group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

statistics_metadata = fg.compute_statistics()

Returns

Statistics. The statistics metadata object.

Raises

hsfs.client.exceptions.RestAPIError. Unable to persist the statistics. hsfs.client.exceptions.FeatureStoreException.


[source]

create_feature_monitoring#

ExternalFeatureGroup.create_feature_monitoring(
    name,
    feature_name,
    description=None,
    start_date_time=None,
    end_date_time=None,
    cron_expression="0 0 12 ? * * *",
)

Enable feature monitoring to compare statistics on snapshots of feature data over time.

Experimental

Public API is subject to change, this feature is not suitable for production use-cases.

Example

# fetch feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# enable feature monitoring
my_config = fg.create_feature_monitoring(
    name="my_monitoring_config",
    feature_name="my_feature",
    description="my monitoring config description",
    cron_expression="0 0 12 ? * * *",
).with_detection_window(
    # Data inserted in the last day
    time_offset="1d",
    window_length="1d",
).with_reference_window(
    # Data inserted last week on the same day
    time_offset="1w1d",
    window_length="1d",
).compare_on(
    metric="mean",
    threshold=0.5,
).save()

Arguments

  • name str: Name of the feature monitoring configuration. name must be unique for all configurations attached to the feature group.
  • feature_name str: Name of the feature to monitor.
  • description Optional[str]: Description of the feature monitoring configuration.
  • start_date_time Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]: Start date and time from which to start computing statistics.
  • end_date_time Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]: End date and time at which to stop computing statistics.
  • cron_expression Optional[str]: Cron expression to use to schedule the job. The cron expression must be in UTC and follow the Quartz specification. Default is '0 0 12 ? * ', every day at 12pm UTC.

Raises

hsfs.client.exceptions.FeatureStoreException.

Return

FeatureMonitoringConfig Configuration with minimal information about the feature monitoring. Additional information are required before feature monitoring is enabled.


[source]

create_statistics_monitoring#

ExternalFeatureGroup.create_statistics_monitoring(
    name,
    feature_name=None,
    description=None,
    start_date_time=None,
    end_date_time=None,
    cron_expression="0 0 12 ? * * *",
)

Run a job to compute statistics on snapshot of feature data on a schedule.

Experimental

Public API is subject to change, this feature is not suitable for production use-cases.

Example

# fetch feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# enable statistics monitoring
my_config = fg.create_statistics_monitoring(
    name="my_config",
    start_date_time="2021-01-01 00:00:00",
    description="my description",
    cron_expression="0 0 12 ? * * *",
).with_detection_window(
    # Statistics computed on 10% of the last week of data
    time_offset="1w",
    row_percentage=0.1,
).save()

Arguments

  • name str: Name of the feature monitoring configuration. name must be unique for all configurations attached to the feature group.
  • feature_name Optional[str]: Name of the feature to monitor. If not specified, statistics will be computed for all features.
  • description Optional[str]: Description of the feature monitoring configuration.
  • start_date_time Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]: Start date and time from which to start computing statistics.
  • end_date_time Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]: End date and time at which to stop computing statistics.
  • cron_expression Optional[str]: Cron expression to use to schedule the job. The cron expression must be in UTC and follow the Quartz specification. Default is '0 0 12 ? * ', every day at 12pm UTC.

Raises

hsfs.client.exceptions.FeatureStoreException.

Return

FeatureMonitoringConfig Configuration with minimal information about the feature monitoring. Additional information are required before feature monitoring is enabled.


[source]

delete#

ExternalFeatureGroup.delete()

Drop the entire feature group along with its feature data.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(
        name='bitcoin_price',
        version=1
        )

# delete the feature group
fg.delete()

Potentially dangerous operation

This operation drops all metadata associated with this version of the feature group and all the feature data in offline and online storage associated with it.

Raises

hsfs.client.exceptions.RestAPIError.


[source]

delete_expectation_suite#

ExternalFeatureGroup.delete_expectation_suite()

Delete the expectation suite attached to the Feature Group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.delete_expectation_suite()

Raises

hsfs.client.exceptions.RestAPIError.


[source]

delete_tag#

ExternalFeatureGroup.delete_tag(name)

Delete a tag attached to a feature group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.delete_tag("example_tag")

Arguments

  • name str: Name of the tag to be removed.

Raises

hsfs.client.exceptions.RestAPIError in case the backend fails to delete the tag.


[source]

filter#

ExternalFeatureGroup.filter(f)

Apply filter to the feature group.

Selects all features and returns the resulting Query with the applied filter.

Example

from hsfs.feature import Feature

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.filter(Feature("weekly_sales") > 1000)

If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:

Example

fg.filter(fg.feature1 == 1).show(10)

Composite filters require parenthesis:

Example

fg.filter((fg.feature1 == 1) | (fg.feature2 >= 2))

Arguments

  • f Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]: Filter object.

Returns

Query. The query object with the applied filter.


[source]

from_response_json#

ExternalFeatureGroup.from_response_json(json_dict)

[source]

get_all_statistics#

ExternalFeatureGroup.get_all_statistics(computation_time=None, feature_names=None)

Returns all the statistics metadata computed before a specific time for the current feature group.

If computation_time is None, all the statistics metadata are returned.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg_statistics = fg.get_statistics(computation_time=None)

Arguments

  • computation_time Optional[Union[str, int, float, datetime.datetime, datetime.date]]: Date and time when statistics were computed. Defaults to None. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.
  • feature_names Optional[List[str]]: List of feature names of which statistics are retrieved.

Returns

Statistics. Statistics object.

Raises

hsfs.client.exceptions.RestAPIError hsfs.client.exceptions.FeatureStoreException.


[source]

get_all_validation_reports#

ExternalFeatureGroup.get_all_validation_reports(ge_type=True)

Return the latest validation report attached to the feature group if it exists.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

val_reports = fg.get_all_validation_reports()

Arguments

  • ge_type bool: If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type. Defaults to True.

Returns

Union[List[ValidationReport], ValidationReport]. All validation reports attached to the feature group.

Raises

hsfs.client.exceptions.RestAPIError. hsfs.client.exceptions.FeatureStoreException.


[source]

get_complex_features#

ExternalFeatureGroup.get_complex_features()

Returns the names of all features with a complex data type in this feature group.

Example

complex_dtype_features = fg.get_complex_features()

[source]

get_expectation_suite#

ExternalFeatureGroup.get_expectation_suite(ge_type=True)

Return the expectation suite attached to the feature group if it exists.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

exp_suite = fg.get_expectation_suite()

Arguments

  • ge_type bool: If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type. Defaults to True.

Returns

ExpectationSuite. The expectation suite attached to the feature group.

Raises

hsfs.client.exceptions.RestAPIError.


[source]

get_feature#

ExternalFeatureGroup.get_feature(name)

Retrieve a Feature object from the schema of the feature group.

There are several ways to access features of a feature group:

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

# get Feature instanse
fg.feature1
fg["feature1"]
fg.get_feature("feature1")

Note

Attribute access to features works only for non-reserved names. For example features named id or name will not be accessible via fg.name, instead this will return the name of the feature group itself. Fall back on using the get_feature method.

Arguments:

name: The name of the feature to retrieve

Returns:

Feature: The feature object

Raises

hsfs.client.exceptions.FeatureStoreException.


[source]

get_feature_monitoring_configs#

ExternalFeatureGroup.get_feature_monitoring_configs(
    name=None, feature_name=None, config_id=None
)

Fetch all feature monitoring configs attached to the feature group, or fetch by name or feature name only. If no arguments is provided the method will return all feature monitoring configs attached to the feature group, meaning all feature monitoring configs that are attach to a feature in the feature group. If you wish to fetch a single config, provide the its name. If you wish to fetch all configs attached to a particular feature, provide the feature name.

Example

# fetch your feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# fetch all feature monitoring configs attached to the feature group
fm_configs = fg.get_feature_monitoring_configs()
# fetch a single feature monitoring config by name
fm_config = fg.get_feature_monitoring_configs(name="my_config")
# fetch all feature monitoring configs attached to a particular feature
fm_configs = fg.get_feature_monitoring_configs(feature_name="my_feature")
# fetch a single feature monitoring config with a given id
fm_config = fg.get_feature_monitoring_configs(config_id=1)

Arguments

  • name Optional[str]: If provided fetch only the feature monitoring config with the given name. Defaults to None.
  • feature_name Optional[str]: If provided, fetch only configs attached to a particular feature. Defaults to None.
  • config_id Optional[int]: If provided, fetch only the feature monitoring config with the given id. Defaults to None.

Raises

hsfs.client.exceptions.RestAPIError. hsfs.client.exceptions.FeatureStoreException. - ValueError: if both name and feature_name are provided. - TypeError: if name or feature_name are not string or None.

Return

Union[FeatureMonitoringConfig, List[FeatureMonitoringConfig], None] A list of feature monitoring configs. If name provided, returns either a single config or None if not found.


[source]

get_feature_monitoring_history#

ExternalFeatureGroup.get_feature_monitoring_history(
    config_name=None, config_id=None, start_time=None, end_time=None, with_statistics=True
)

Fetch feature monitoring history for a given feature monitoring config.

Example

# fetch your feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# fetch feature monitoring history for a given feature monitoring config
fm_history = fg.get_feature_monitoring_history(
    config_name="my_config",
    start_time="2020-01-01",
)
# fetch feature monitoring history for a given feature monitoring config id
fm_history = fg.get_feature_monitoring_history(
    config_id=1,
    start_time=datetime.now() - timedelta(weeks=2),
    end_time=datetime.now() - timedelta(weeks=1),
    with_statistics=False,
)

Arguments

  • config_name Optional[str]: The name of the feature monitoring config to fetch history for. Defaults to None.
  • config_id Optional[int]: The id of the feature monitoring config to fetch history for. Defaults to None.
  • start_time Optional[Union[str, int, datetime.datetime, datetime.date]]: The start date of the feature monitoring history to fetch. Defaults to None.
  • end_time Optional[Union[str, int, datetime.datetime, datetime.date]]: The end date of the feature monitoring history to fetch. Defaults to None.
  • with_statistics Optional[bool]: Whether to include statistics in the feature monitoring history. Defaults to True. If False, only metadata about the monitoring will be fetched.

Raises

hsfs.client.exceptions.RestAPIError. hsfs.client.exceptions.FeatureStoreException. - ValueError: if both config_name and config_id are provided. - TypeError: if config_name or config_id are not respectively string, int or None.

Return

List[FeatureMonitoringResult] A list of feature monitoring results containing the monitoring metadata as well as the computed statistics for the detection and reference window if requested.


[source]

get_fg_name#

ExternalFeatureGroup.get_fg_name()

[source]

get_generated_feature_groups#

ExternalFeatureGroup.get_generated_feature_groups()

Get the generated feature groups using this feature group, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

Returns

ProvenanceLinks: Object containing the section of provenance graph requested.

Raises

hsfs.client.exceptions.RestAPIError.


[source]

get_generated_feature_views#

ExternalFeatureGroup.get_generated_feature_views()

Get the generated feature view using this feature group, based on explicit provenance. These feature views can be accessible or inaccessible. Explicit provenance does not track deleted generated feature view links, so deleted will always be empty. For inaccessible feature views, only a minimal information is returned.

Returns

ProvenanceLinks: Object containing the section of provenance graph requested.

Raises

hsfs.client.exceptions.RestAPIError.


[source]

get_latest_validation_report#

ExternalFeatureGroup.get_latest_validation_report(ge_type=True)

Return the latest validation report attached to the Feature Group if it exists.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

latest_val_report = fg.get_latest_validation_report()

Arguments

  • ge_type bool: If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type. Defaults to True.

Returns

ValidationReport. The latest validation report attached to the Feature Group.

Raises

hsfs.client.exceptions.RestAPIError.


[source]

get_parent_feature_groups#

ExternalFeatureGroup.get_parent_feature_groups()

Get the parents of this feature group, based on explicit provenance. Parents are feature groups or external feature groups. These feature groups can be accessible, deleted or inaccessible. For deleted and inaccessible feature groups, only a minimal information is returned.

Returns

ProvenanceLinks: Object containing the section of provenance graph requested.

Raises

hsfs.client.exceptions.RestAPIError.


[source]

get_statistics#

ExternalFeatureGroup.get_statistics(computation_time=None, feature_names=None)

Returns the statistics computed at a specific time for the current feature group.

If computation_time is None, the most recent statistics are returned.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg_statistics = fg.get_statistics(computation_time=None)

Arguments

  • computation_time Optional[Union[str, int, float, datetime.datetime, datetime.date]]: Date and time when statistics were computed. Defaults to None. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.
  • feature_names Optional[List[str]]: List of feature names of which statistics are retrieved.

Returns

Statistics. Statistics object.

Raises

hsfs.client.exceptions.RestAPIError hsfs.client.exceptions.FeatureStoreException.


[source]

get_tag#

ExternalFeatureGroup.get_tag(name)

Get the tags of a feature group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg_tag_value = fg.get_tag("example_tag")

Arguments

  • name str: Name of the tag to get.

Returns

tag value

Raises

hsfs.client.exceptions.RestAPIError in case the backend fails to retrieve the tag.


[source]

get_tags#

ExternalFeatureGroup.get_tags()

Retrieves all tags attached to a feature group.

Returns

Dict[str, obj] of tags.

Raises

hsfs.client.exceptions.RestAPIError in case the backend fails to retrieve the tags.


[source]

get_validation_history#

ExternalFeatureGroup.get_validation_history(
    expectation_id, start_validation_time=None, end_validation_time=None, filter_by=[], ge_type=True
)

Fetch validation history of an Expectation specified by its id.

Example

validation_history = fg.get_validation_history(
    expectation_id=1,
    filter_by=["REJECTED", "UNKNOWN"],
    start_validation_time="2022-01-01 00:00:00",
    end_validation_time=datetime.datetime.now(),
    ge_type=False
)

Arguments

  • expectation_id int: id of the Expectation for which to fetch the validation history
  • filter_by List[str]: list of ingestion_result category to keep. Ooptions are "INGESTED", "REJECTED", "FG_DATA", "EXPERIMENT", "UNKNOWN".
  • start_validation_time Optional[Union[str, int, datetime.datetime, datetime.date]]: fetch only validation result posterior to the provided time, inclusive. Supported format include timestamps(int), datetime, date or string formatted to be datutils parsable. See examples above.
  • end_validation_time Optional[Union[str, int, datetime.datetime, datetime.date]]: fetch only validation result prior to the provided time, inclusive. Supported format include timestamps(int), datetime, date or string formatted to be datutils parsable. See examples above.

Raises

hsfs.client.exceptions.RestAPIError.

Return

Union[List[ValidationResult], List[ExpectationValidationResult]] A list of validation result connected to the expectation_id


[source]

insert#

ExternalFeatureGroup.insert(
    features, write_options={}, validation_options={}, save_code=True, wait=False
)

Insert the dataframe feature values ONLY in the online feature store.

External Feature Groups contains metadata about feature data in an external storage system. External storage system are usually offline, meaning feature values cannot be retrieved in real-time. In order to use the feature values for real-time use-cases, you can insert them in Hopsoworks Online Feature Store via this method.

The Online Feature Store has a single-entry per primary key value, meaining that providing a new value with for a given primary key will overwrite the existing value. No record of the previous value is kept.

Example

# connect to the Feature Store
fs = ...

# get the External Feature Group instance
fg = fs.get_feature_group(name="external_sales_records", version=1)

# get the feature values, e.g reading from csv files in a S3 bucket
feature_values = ...

# insert the feature values in the online feature store
fg.insert(feature_values)

Note

Data Validation via Great Expectation is supported if you have attached an expectation suite to your External Feature Group. However, as opposed to regular Feature Groups, this can lead to discrepancies between the data in the external storage system and the online feature store.

Arguments

  • features Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]: DataFrame, RDD, Ndarray, list. Features to be saved.
  • write_options Optional[Dict[str, Any]]: Additional write options as key-value pairs, defaults to {}. When using the python engine, write_options can contain the following entries:
    • key kafka_producer_config and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties.
    • key internal_kafka and value True or False in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to False and will use external listeners when connecting from outside of Hopsworks.
  • validation_options Optional[Dict[str, Any]]: Additional validation options as key-value pairs, defaults to {}.
    • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
    • key save_report boolean value, set to False to skip upload of the validation report to Hopsworks.
    • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
    • key fetch_expectation_suite a boolean value, by default True, to control whether the expectation suite of the feature group should be fetched before every insert.
  • save_code Optional[bool]: When running HSFS on Hopsworks or Databricks, HSFS can save the code/notebook used to create the feature group or used to insert data to it. When calling the insert method repeatedly with small batches of data, this can slow down the writes. Use this option to turn off saving code. Defaults to True.

Returns

Tuple(Job, ValidationReport) The validation report if validation is enabled.

Raises

hsfs.client.exceptions.RestAPIError. e.g fail to create feature group, dataframe schema does not match existing feature group schema, etc. hsfs.client.exceptions.DataValidationException. If data validation fails and the expectation suite validation_ingestion_policy is set to STRICT. Data is NOT ingested.


[source]

json#

ExternalFeatureGroup.json()

[source]

read#

ExternalFeatureGroup.read(dataframe_type="default", online=False)

Get the feature group as a DataFrame.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

df = fg.read()

Engine Support

Spark only

Reading an External Feature Group directly into a Pandas Dataframe using Python/Pandas as Engine is not supported, however, you can use the Query API to create Feature Views/Training Data containing External Feature Groups.

Arguments

  • dataframe_type Optional[str]: str, optional. Possible values are "default", "spark", "pandas", "numpy" or "python", defaults to "default".
  • online Optional[bool]: bool, optional. If True read from online feature store, defaults to False.

Returns

DataFrame: The spark dataframe containing the feature data. pyspark.DataFrame. A Spark DataFrame. pandas.DataFrame. A Pandas DataFrame. numpy.ndarray. A two-dimensional Numpy array. list. A two-dimensional Python list.

Raises

hsfs.client.exceptions.RestAPIError.


[source]

save#

ExternalFeatureGroup.save()

Persist the metadata for this external feature group.

Without calling this method, your feature group will only exist in your Python Kernel, but not in Hopsworks.

query = "SELECT * FROM sales"

fg = feature_store.create_external_feature_group(name="sales",
    version=1,
    description="Physical shop sales features",
    query=query,
    storage_connector=connector,
    primary_key=['ss_store_sk'],
    event_time='sale_date'
)

fg.save()


----

<span style="float:right;">[[source]](https://github.com/logicalclocks/feature-store-api/tree/2649e8ed2af5fd3a47b2470a54d4d918677c619a/python/hsfs/feature_group.py#L845)</span>

### save_expectation_suite


```python
ExternalFeatureGroup.save_expectation_suite(
    expectation_suite, run_validation=True, validation_ingestion_policy="ALWAYS", overwrite=False
)

Attach an expectation suite to a feature group and saves it for future use. If an expectation suite is already attached, it is replaced. Note that the provided expectation suite is modified inplace to include expectationId fields.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.save_expectation_suite(expectation_suite, run_validation=True)

Arguments

  • expectation_suite Union[hsfs.expectation_suite.ExpectationSuite, great_expectations.core.expectation_suite.ExpectationSuite]: The expectation suite to attach to the Feature Group.
  • overwrite bool: If an Expectation Suite is already attached, overwrite it. The new suite will have its own validation history, but former reports are preserved.
  • run_validation bool: Set whether the expectation_suite will run on ingestion
  • validation_ingestion_policy str: Set the policy for ingestion to the Feature Group.
    • "STRICT" only allows DataFrame passing validation to be inserted into Feature Group.
    • "ALWAYS" always insert the DataFrame to the Feature Group, irrespective of overall validation result.

Raises

hsfs.client.exceptions.RestAPIError.


[source]

save_validation_report#

ExternalFeatureGroup.save_validation_report(
    validation_report, ingestion_result="UNKNOWN", ge_type=True
)

Save validation report to hopsworks platform along previous reports of the same Feature Group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(..., expectation_suite=expectation_suite)

validation_report = great_expectations.from_pandas(
    my_experimental_features_df,
    fg.get_expectation_suite()).validate()

fg.save_validation_report(validation_report, ingestion_result="EXPERIMENT")

Arguments

  • validation_report Union[dict, hsfs.validation_report.ValidationReport, great_expectations.core.expectation_validation_result.ExpectationSuiteValidationResult]: The validation report to attach to the Feature Group.
  • ingestion_result str: Specify the fate of the associated data, defaults to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" for testing and development and "FG_DATA" when validating data already in the Feature Group.
  • ge_type bool: If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type. Defaults to True.

Raises

hsfs.client.exceptions.RestAPIError.


[source]

select#

ExternalFeatureGroup.select(features)

Select a subset of features of the feature group and return a query object.

The query can be used to construct joins of feature groups or create a feature view with a subset of features of the feature group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
from hsfs.feature import Feature
fg = fs.create_feature_group(
        "fg",
        features=[
                Feature("id", type="string"),
                Feature("ts", type="bigint"),
                Feature("f1", type="date"),
                Feature("f2", type="double")
                ],
        primary_key=["id"],
        event_time="ts")

# construct query
query = fg.select(["id", "f1"])
query.features
# [Feature('id', ...), Feature('f1', ...)]

Arguments

  • features List[Union[str, hsfs.feature.Feature]]: A list of Feature objects or feature names as strings to be selected.

Returns

Query: A query object with the selected features of the feature group.


[source]

select_all#

ExternalFeatureGroup.select_all(include_primary_key=True, include_event_time=True)

Select all features in the feature group and return a query object.

The query can be used to construct joins of feature groups or create a feature view.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instances
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)

# construct the query
query = fg1.select_all().join(fg2.select_all())

# show first 5 rows
query.show(5)


# select all features exclude primary key and event time
from hsfs.feature import Feature
fg = fs.create_feature_group(
        "fg",
        features=[
                Feature("id", type="string"),
                Feature("ts", type="bigint"),
                Feature("f1", type="date"),
                Feature("f2", type="double")
                ],
        primary_key=["id"],
        event_time="ts")

query = fg.select_all()
query.features
# [Feature('id', ...), Feature('ts', ...), Feature('f1', ...), Feature('f2', ...)]

query = fg.select_all(include_primary_key=False, include_event_time=False)
query.features
# [Feature('f1', ...), Feature('f2', ...)]

Arguments

  • include_primary_key Optional[bool]: If True, include primary key of the feature group to the feature list. Defaults to True.
  • include_event_time Optional[bool]: If True, include event time of the feature group to the feature list. Defaults to True.

Returns

Query. A query object with all features of the feature group.


[source]

select_except#

ExternalFeatureGroup.select_except(features=[])

Select all features including primary key and event time feature of the feature group except provided features and return a query object.

The query can be used to construct joins of feature groups or create a feature view with a subset of features of the feature group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
from hsfs.feature import Feature
fg = fs.create_feature_group(
        "fg",
        features=[
                Feature("id", type="string"),
                Feature("ts", type="bigint"),
                Feature("f1", type="date"),
                Feature("f2", type="double")
                ],
        primary_key=["id"],
        event_time="ts")

# construct query
query = fg.select_except(["ts", "f1"])
query.features
# [Feature('id', ...), Feature('f1', ...)]

Arguments

  • features Optional[List[Union[str, hsfs.feature.Feature]]]: A list of Feature objects or feature names as strings to be excluded from the selection. Defaults to [], selecting all features.

Returns

Query: A query object with the selected features of the feature group.


[source]

show#

ExternalFeatureGroup.show(n)

Show the first n rows of the feature group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.show(5)

[source]

to_dict#

ExternalFeatureGroup.to_dict()

[source]

update_deprecated#

ExternalFeatureGroup.update_deprecated(deprecate=True)

Deprecate the feature group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.update_deprecated(deprecate=True)

Safe update

This method updates the feature group safely. In case of failure your local metadata object will be kept unchanged.

Arguments

  • deprecate bool: Boolean value identifying if the feature group should be deprecated. Defaults to True.

Returns

FeatureGroup. The updated feature group object.


[source]

update_description#

ExternalFeatureGroup.update_description(description)

Update the description of the feature group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.update_description(description="Much better description.")

Safe update

This method updates the feature group description safely. In case of failure your local metadata object will keep the old description.

Arguments

  • description str: New description string.

Returns

FeatureGroup. The updated feature group object.


[source]

update_feature_description#

ExternalFeatureGroup.update_feature_description(feature_name, description)

Update the description of a single feature in this feature group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.update_feature_description(feature_name="min_temp",
                              description="Much better feature description.")

Safe update

This method updates the feature description safely. In case of failure your local metadata object will keep the old description.

Arguments

  • feature_name str: Name of the feature to be updated.
  • description str: New description string.

Returns

FeatureGroup. The updated feature group object.


[source]

update_features#

ExternalFeatureGroup.update_features(features)

Update metadata of features in this feature group.

Currently it's only supported to update the description of a feature.

Unsafe update

Note that if you use an existing Feature object of the schema in the feature group metadata object, this might leave your metadata object in a corrupted state if the update fails.

Arguments

  • features Union[hsfs.feature.Feature, List[hsfs.feature.Feature]]: Feature or list of features. A feature object or list thereof to be updated.

Returns

FeatureGroup. The updated feature group object.


[source]

update_from_response_json#

ExternalFeatureGroup.update_from_response_json(json_dict)

[source]

update_notification_topic_name#

ExternalFeatureGroup.update_notification_topic_name(notification_topic_name)

Update the notification topic name of the feature group.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.update_notification_topic_name(notification_topic_name="notification_topic_name")

Safe update

This method updates the feature group notification topic name safely. In case of failure your local metadata object will keep the old notification topic name.

Arguments

  • notification_topic_name str: Name of the topic used for sending notifications when entries are inserted or updated on the online feature store. If set to None no notifications are sent.

Returns

FeatureGroup. The updated feature group object.


[source]

update_statistics_config#

ExternalFeatureGroup.update_statistics_config()

Update the statistics configuration of the feature group.

Change the statistics_config object and persist the changes by calling this method.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.update_statistics_config()

Returns

FeatureGroup. The updated metadata object of the feature group.

Raises

hsfs.client.exceptions.RestAPIError. hsfs.client.exceptions.FeatureStoreException.


[source]

validate#

ExternalFeatureGroup.validate(
    dataframe=None,
    expectation_suite=None,
    save_report=False,
    validation_options={},
    ingestion_result="UNKNOWN",
    ge_type=True,
)

Run validation based on the attached expectations.

Runs any expectation attached with Deequ. But also runs attached Great Expectation Suites.

Example

# connect to the Feature Store
fs = ...

# get feature group instance
fg = fs.get_or_create_feature_group(...)

ge_report = fg.validate(df, save_report=False)

Arguments

  • dataframe Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame]]: The dataframe to run the data validation expectations against.
  • expectation_suite Optional[hsfs.expectation_suite.ExpectationSuite]: Optionally provide an Expectation Suite to override the one that is possibly attached to the feature group. This is useful for testing new Expectation suites. When an extra suite is provided, the results will never be persisted. Defaults to None.
  • validation_options Optional[Dict[Any, Any]]: Additional validation options as key-value pairs, defaults to {}.
    • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
    • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
  • ingestion_result str: Specify the fate of the associated data, defaults to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" for testing and development and "FG_DATA" when validating data already in the Feature Group.
  • save_report Optional[bool]: Whether to save the report to the backend. This is only possible if the Expectation suite is initialised and attached to the Feature Group. Defaults to False.
  • ge_type bool: Whether to return a Great Expectations object or Hopsworks own abstraction. Defaults to True.

Returns

A Validation Report produced by Great Expectations.