Skip to content

hsfs.feature_group #

[source] ExternalFeatureGroup #

Bases: FeatureGroupBase

A feature group that references data stored outside Hopsworks.

[source] id property #

id: int | None

ID of the feature group, set by backend.

[source] description property writable #

description: str | None

Description of the feature group, as it appears in the UI.

[source] creator property #

creator: user.User | None

User who created the feature group.

[source] feature_store_name property #

feature_store_name: str | None

Name of the feature store in which the feature group is located.

[source] save #

save() -> None

Persist the metadata for this external feature group.

Without calling this method, your feature group will only exist in your Python Kernel, but not in Hopsworks.

query = "SELECT * FROM sales"

fg = feature_store.create_external_feature_group(name="sales",
    version=1,
    description="Physical shop sales features",
    data_source=ds,
    primary_key=['ss_store_sk'],
    event_time='sale_date'
)

fg.save()

[source] insert #

insert(
    features: pd.DataFrame
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | np.ndarray
    | list[list],
    write_options: dict[str, Any] | None = None,
    validation_options: dict[str, Any] | None = None,
    wait: bool = False,
) -> tuple[
    None,
    great_expectations.core.ExpectationSuiteValidationResult
    | None,
]

Insert the dataframe feature values ONLY in the online feature store.

External Feature Groups contains metadata about feature data in an external storage system. External storage system are usually offline, meaning feature values cannot be retrieved in real-time. In order to use the feature values for real-time use-cases, you can insert them in Hopsoworks Online Feature Store via this method.

The Online Feature Store has a single-entry per primary key value, meaining that providing a new value with for a given primary key will overwrite the existing value. No record of the previous value is kept.

Example
# connect to the Feature Store
fs = ...

# get the External Feature Group instance
fg = fs.get_feature_group(name="external_sales_records", version=1)

# get the feature values, e.g reading from csv files in a S3 bucket
feature_values = ...

# insert the feature values in the online feature store
fg.insert(feature_values)
Note

Data Validation via Great Expectation is supported if you have attached an expectation suite to your External Feature Group. However, as opposed to regular Feature Groups, this can lead to discrepancies between the data in the external storage system and the online feature store.

PARAMETER DESCRIPTION
features

Features to be saved.

TYPE: pd.DataFrame | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | np.ndarray | list[list]

write_options

Additional write options as key-value pairs.

When using the python engine, write_options can contain the following entries:

  • key wait_for_job and value True or False to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits.
  • key wait_for_online_ingestion and value True or False to configure whether or not to the save call should return only after the Hopsworks online ingestion has finished. By default it does not wait.
  • key online_ingestion_options and value a dict to configure waiting on online ingestion. Applied when wait_for_online_ingestion write option is True or the wait parameter is True. Supported keys are timeout (seconds to wait, default 60, set to 0 for indefinite) and period (polling interval in seconds, default 1).
  • key kafka_producer_config and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties.
  • key internal_kafka and value True or False in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to False and will use external listeners when connecting from outside of Hopsworks.

TYPE: dict[str, Any] | None DEFAULT: None

validation_options

Additional validation options as key-value pairs.

  • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
  • key save_report boolean value, set to False to skip upload of the validation report to Hopsworks.
  • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
  • key fetch_expectation_suite a boolean value, by default True, to control whether the expectation suite of the feature group should be fetched before every insert.

TYPE: dict[str, Any] | None DEFAULT: None

wait

Wait for job and online ingestion to finish before returning. Shortcut for write_options {"wait_for_job": False, "wait_for_online_ingestion": False}.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
tuple[None, great_expectations.core.ExpectationSuiteValidationResult | None]

The validation report if validation is enabled.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

e.g., fail to create feature group, dataframe schema does not match existing feature group schema, etc.

hsfs.client.exceptions.DataValidationException

If data validation fails and the expectation suite validation_ingestion_policy is set to STRICT. Data is NOT ingested.

[source] read #

read(
    dataframe_type: Literal[
        "default",
        "spark",
        "pandas",
        "polars",
        "numpy",
        "python",
    ] = "default",
    online: bool = False,
    read_options: dict[str, Any] | None = None,
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | pl.DataFrame
    | np.ndarray
)

Get the feature group as a DataFrame.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

df = fg.read()
Engine Support

Spark only

Reading an External Feature Group directly into a Pandas Dataframe using Python/Pandas as Engine is not supported, however, you can use the Query API to create Feature Views/Training Data containing External Feature Groups.

PARAMETER DESCRIPTION
dataframe_type

The type of the returned dataframe. By default, maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: Literal['default', 'spark', 'pandas', 'polars', 'numpy', 'python'] DEFAULT: 'default'

online

If True read from online feature store.

TYPE: bool DEFAULT: False

read_options

Additional options as key/value pairs to pass to the spark engine.

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray

One of:

TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray
  • DataFrame: The spark dataframe containing the feature data.
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray
  • pyspark.DataFrame: A Spark DataFrame.
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray
  • pandas.DataFrame: A Pandas DataFrame.
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray
  • numpy.ndarray: A two-dimensional Numpy array.
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray
  • list: A two-dimensional Python list.
RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If trying to read an external feature group directly in.

[source] show #

show(n: int, online: bool = False) -> list[list[Any]]

Show the first n rows of the feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

# make a query and show top 5 rows
fg.select(['date','weekly_sales','is_holiday']).show(5)
PARAMETER DESCRIPTION
n

Number of rows to show.

TYPE: int

online

If True read from online feature store.

TYPE: bool DEFAULT: False

[source] find_neighbors #

find_neighbors(
    embedding: list[int | float],
    col: str | None = None,
    k: int | None = 10,
    filter: Filter | Logic | None = None,
    options: dict | None = None,
) -> list[tuple[float, list[Any]]]

Finds the nearest neighbors for a given embedding in the vector database.

If filter is specified, or if embedding feature is stored in default project index, the number of results returned may be less than k. Try using a large value of k and extract the top k items from the results if needed.

PARAMETER DESCRIPTION
embedding

The target embedding for which neighbors are to be found.

TYPE: list[int | float]

col

The column name used to compute similarity score. Required only if there are multiple embeddings.

TYPE: str | None DEFAULT: None

k

The number of nearest neighbors to retrieve.

TYPE: int | None DEFAULT: 10

filter

A filter expression to restrict the search space.

TYPE: Filter | Logic | None DEFAULT: None

options

The options used for the request to the vector database. The keys are attribute values of the hsfs.core.opensearch.OpensearchRequestOption class.

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
list[tuple[float, list[Any]]]

A list of tuples representing the nearest neighbors.

list[tuple[float, list[Any]]]

Each tuple contains: (The similarity score, A list of feature values).

Example
embedding_index = EmbeddingIndex()
embedding_index.add_embedding(name="user_vector", dimension=3)
fg = fs.create_feature_group(
            name='air_quality',
            embedding_index = embedding_index,
            version=1,
            primary_key=['id1'],
            online_enabled=True,
        )
fg.insert(data)
fg.find_neighbors(
    [0.1, 0.2, 0.3],
    k=5,
)

# apply filter
fg.find_neighbors(
    [0.1, 0.2, 0.3],
    k=5,
    filter=(fg.id1 > 10) & (fg.id1 < 30)
)

[source] FeatureGroup #

Bases: FeatureGroupBase

[source] id property #

id: int | None

Feature group id.

[source] description property writable #

description: str | None

Description of the feature group contents.

[source] time_travel_format property writable #

time_travel_format: str | None

Setting of the feature group time travel format.

[source] partition_key property writable #

partition_key: list[str]

List of features building the partition key.

[source] hudi_precombine_key property writable #

hudi_precombine_key: str | None

Feature name that is the hudi precombine key.

[source] feature_store_name property #

feature_store_name: str | None

Name of the feature store in which the feature group is located.

[source] creator property #

creator: user.User | None

Username of the creator.

[source] created property #

created: str | None

Timestamp when the feature group was created.

[source] stream property writable #

stream: bool

Whether to enable real time stream writing capabilities.

[source] parents property writable #

Parent feature groups as origin of the data in the current feature group.

This is part of explicit provenance.

[source] materialization_job property #

materialization_job: Job | None

Get the Job object reference for the materialization job for this Feature Group.

[source] statistics property #

statistics: Statistics

Get the latest computed statistics for the whole feature group.

[source] transformation_functions property writable #

transformation_functions: list[TransformationFunction]

Get transformation functions.

[source] offline_backfill_every_hr property writable #

offline_backfill_every_hr: int | str | None

On Feature Group creation, used to set scheduled run of the materialisation job.

[source] sink_enabled property #

sink_enabled: bool

Get whether sink is enabled for this feature group.

[source] sink_job property #

sink_job: job.Job | None

Return the sink job created for this feature group, if any.

[source] sink_job_conf property writable #

sink_job_conf: SinkJobConfiguration

Sink job configuration object defining the settings for sink job of the feature group.

[source] read #

read(
    wallclock_time: str
    | int
    | datetime
    | date
    | None = None,
    online: bool = False,
    dataframe_type: Literal[
        "default",
        "spark",
        "pandas",
        "polars",
        "numpy",
        "python",
    ] = "default",
    read_options: dict | None = None,
) -> (
    pd.DataFrame
    | np.ndarray
    | list[list[Any]]
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pl.DataFrame
)

Read the feature group into a dataframe.

Reads the feature group by default from the offline storage as Spark DataFrame on Hopsworks and Databricks, and as Pandas dataframe on AWS Sagemaker and pure Python environments.

Set online to True to read from the online storage, or change dataframe_type to read as a different format.

Reading feature group as of latest state
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.read()
Reading feature group as of specific point in time:
fg = fs.get_or_create_feature_group(...)
fg.read("2020-10-20 07:34:11")
PARAMETER DESCRIPTION
wallclock_time

If specified, retrieves feature group as of specific point in time. If not specified, returns as of most recent time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date | None DEFAULT: None

online

If True, read from online feature store.

TYPE: bool DEFAULT: False

dataframe_type

The type of the returned dataframe. By default, maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: Literal['default', 'spark', 'pandas', 'polars', 'numpy', 'python'] DEFAULT: 'default'

read_options

Additional options as key/value pairs to pass to the execution engine.

For spark engine: Dictionary of read options for Spark.

For python engine: - key "arrow_flight_config" to pass a dictionary of arrow flight configurations. For example: {"arrow_flight_config": {"timeout": 900}}. - key "pandas_types" and value True to retrieve columns as Pandas nullable types rather than numpy/object(string) types (experimental).

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame

One of the following:

pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • DataFrame: The spark dataframe containing the feature data.
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • pyspark.DataFrame: A Spark DataFrame.
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • pandas.DataFrame: A Pandas DataFrame.
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • polars.DataFrame: A Polars DataFrame.
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • numpy.ndarray: A two-dimensional Numpy array.
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • list: A two-dimensional Python list.
RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

No data is available for feature group with this commit date, if time travel enabled.

[source] read_changes #

read_changes(
    start_wallclock_time: str | int | datetime | date,
    end_wallclock_time: str | int | datetime | date,
    read_options: dict | None = None,
) -> (
    pd.DataFrame
    | np.ndarray
    | list[list[Any]]
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pl.DataFrame
)

Reads updates of this feature that occurred between specified points in time.

Deprecated

read_changes method is deprecated. Use as_of(end_wallclock_time, exclude_until=start_wallclock_time).read(read_options=read_options) instead.

Pyspark/Spark Only

Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context.

Warning

This function only works for feature groups with time_travel_format='HUDI'.

PARAMETER DESCRIPTION
start_wallclock_time

Start time of the time travel query. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date

end_wallclock_time

End time of the time travel query. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date

read_options

Additional options as key/value pairs to pass to the execution engine. For spark engine, it is a dictionary of read options for Spark.

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame

The spark dataframe containing the incremental changes of feature data.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

No data is available for feature group with this commit date.

hopsworks.client.exceptions.FeatureStoreException

If the feature group does not have HUDI time travel format.

[source] find_neighbors #

find_neighbors(
    embedding: list[int | float],
    col: str | None = None,
    k: int | None = 10,
    filter: Filter | Logic | None = None,
    options: dict | None = None,
) -> list[tuple[float, list[Any]]]

Finds the nearest neighbors for a given embedding in the vector database.

If filter is specified, or if embedding feature is stored in default project index, the number of results returned may be less than k. Try using a large value of k and extract the top k items from the results if needed.

PARAMETER DESCRIPTION
embedding

The target embedding for which neighbors are to be found.

TYPE: list[int | float]

col

The column name used to compute similarity score. Required only if there are multiple embeddings.

TYPE: str | None DEFAULT: None

k

The number of nearest neighbors to retrieve.

TYPE: int | None DEFAULT: 10

filter

A filter expression to restrict the search space.

TYPE: Filter | Logic | None DEFAULT: None

options

The options used for the request to the vector database. The keys are attribute values of the hsfs.core.opensearch.OpensearchRequestOption class.

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
list[tuple[float, list[Any]]]

A list of tuples representing the nearest neighbors.

list[tuple[float, list[Any]]]

Each tuple contains: (The similarity score, A list of feature values)

Example
embedding_index = EmbeddingIndex()
embedding_index.add_embedding(name="user_vector", dimension=3)
fg = fs.create_feature_group(
            name='air_quality',
            embedding_index = embedding_index,
            version=1,
            primary_key=['id1'],
            online_enabled=True,
        )
fg.insert(data)
fg.find_neighbors(
    [0.1, 0.2, 0.3],
    k=5,
)

# apply filter
fg.find_neighbors(
    [0.1, 0.2, 0.3],
    k=5,
    filter=(fg.id1 > 10) & (fg.id1 < 30)
)

[source] show #

show(n: int, online: bool = False) -> list[list[Any]]

Show the first n rows of the feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

# make a query and show top 5 rows
fg.select(['date','weekly_sales','is_holiday']).show(5)
PARAMETER DESCRIPTION
n

Number of rows to show.

TYPE: int

online

If True read from online feature store.

TYPE: bool DEFAULT: False

[source] save #

save(
    features: pd.DataFrame
    | pl.DataFrame
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | np.ndarray
    | list[feature.Feature] = None,
    write_options: dict[str, Any] | None = None,
    validation_options: dict[str, Any] | None = None,
    wait: bool = False,
) -> tuple[
    Job | None,
    great_expectations.core.ExpectationSuiteValidationResult
    | None,
]

Persist the metadata and materialize the feature group to the feature store.

Changed in 3.3.0

insert and save methods are now async by default in non-spark clients. To achieve the old behaviour, set wait argument to True.

Calling save creates the metadata for the feature group in the feature store. If a Pandas DataFrame, Polars DatFrame, RDD or Ndarray is provided, the data is written to the online/offline feature store as specified. By default, this writes the feature group to the offline storage, and if online_enabled for the feature group, also to the online feature store. The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, or a two-dimensional Numpy array or a two-dimensional Python nested list.

PARAMETER DESCRIPTION
features

Features to be saved. This argument is optional if the feature list is provided in the create_feature_group or in the get_or_create_feature_group method invokation.

TYPE: pd.DataFrame | pl.DataFrame | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | np.ndarray | list[feature.Feature] DEFAULT: None

write_options

Additional write options as key-value pairs.

When using the python engine, write_options can contain the following entries:

  • key spark and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group.
  • key wait_for_job and value True or False to configure whether or not to the save call should return only after the Hopsworks Job has finished. By default it does not wait.
  • key wait_for_online_ingestion and value True or False to configure whether or not to the save call should return only after the Hopsworks online ingestion has finished. By default it does not wait.
  • key start_offline_backfill and value True or False to configure whether or not to start the materialization job to write data to the offline storage. start_offline_backfill is deprecated. Use start_offline_materialization instead.
  • key start_offline_materialization and value True or False to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job gets started immediately.
  • key kafka_producer_config and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection, consider changing the producer properties.
  • key internal_kafka and value True or False in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to False and will use external listeners when connecting from outside of Hopsworks.
  • key delta.enableChangeDataFeed set to a string value of true or false to enable or disable cdf operations on the feature group delta table. Set to true by default on Feature Group creation.

TYPE: dict[str, Any] | None DEFAULT: None

validation_options

Additional validation options as key-value pairs.

  • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
  • key save_report boolean value, set to False to skip upload of the validation report to Hopsworks.
  • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
  • key schema_validation boolean value, set to True to validate the schema.

TYPE: dict[str, Any] | None DEFAULT: None

wait

Wait for job and online ingestion to finish before returning. Shortcut for write_options {"wait_for_job": False, "wait_for_online_ingestion": False}.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
tuple[Job | None, great_expectations.core.ExpectationSuiteValidationResult | None]

When using the python engine, it returns the Hopsworks Job that was launched to ingest the feature group data.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] insert #

insert(
    features: pd.DataFrame
    | pl.DataFrame
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | np.ndarray
    | list[list],
    overwrite: bool = False,
    operation: Literal["insert", "upsert"] = "upsert",
    storage: str | None = None,
    write_options: dict[str, Any] | None = None,
    validation_options: dict[str, Any] | None = None,
    wait: bool = False,
    transformation_context: dict[str, Any] = None,
    transform: bool = True,
) -> tuple[Job | None, ValidationReport | None]

Persist the metadata and materialize the feature group to the feature store or insert data from a dataframe into the existing feature group.

Incrementally insert data to a feature group or overwrite all data contained in the feature group. By default, the data is inserted into the offline storage as well as the online storage if the feature group is online_enabled=True.

The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, a Polars DataFrame or a two-dimensional Numpy array or a two-dimensional Python nested list. If statistics are enabled, statistics are recomputed for the entire feature group. If feature group's time travel format is HUDI then operation argument can be either insert or upsert.

If feature group doesn't exist the insert method will create the necessary metadata the first time it is invoked and writes the specified features dataframe as feature group to the online/offline feature store.

Changed in 3.3.0

insert and save methods are now async by default in non-spark clients. To achieve the old behaviour, set wait argument to True.

Upsert new feature data with time travel format HUDI
# connect to the Feature Store
fs = ...

fg = fs.get_or_create_feature_group(
    name='bitcoin_price',
    description='Bitcoin price aggregated for days',
    version=1,
    primary_key=['unix'],
    online_enabled=True,
    event_time='unix'
)

fg.insert(df_bitcoin_processed)
Async insert
# connect to the Feature Store
fs = ...

fg1 = fs.get_or_create_feature_group(
    name='feature_group_name1',
    description='Description of the first FG',
    version=1,
    primary_key=['unix'],
    online_enabled=True,
    event_time='unix'
)
# async insertion in order not to wait till finish of the job
fg.insert(df_for_fg1, write_options={"wait_for_job" : False})

fg2 = fs.get_or_create_feature_group(
    name='feature_group_name2',
    description='Description of the second FG',
    version=1,
    primary_key=['unix'],
    online_enabled=True,
    event_time='unix'
)
fg.insert(df_for_fg2)
PARAMETER DESCRIPTION
features

Features to be saved.

TYPE: pd.DataFrame | pl.DataFrame | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | np.ndarray | list[list]

overwrite

Drop all data in the feature group before inserting new data. This does not affect metadata.

TYPE: bool DEFAULT: False

operation

Apache Hudi operation type "insert" or "upsert".

TYPE: Literal['insert', 'upsert'] DEFAULT: 'upsert'

storage

Overwrite default behaviour, write to offline storage only with "offline" or online only with "online". If the streaming APIs are enabled, specifying the storage option is not supported.

TYPE: str | None DEFAULT: None

write_options

Additional write options as key-value pairs.

When using the python engine, write_options can contain the following entries:

  • key spark and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group.
  • key wait_for_job and value True or False to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits.
  • key wait_for_online_ingestion and value True or False to configure whether or not to the save call should return only after the Hopsworks online ingestion has finished. By default it does not wait.
  • key online_ingestion_options and value a dict to configure waiting on online ingestion. Applied when wait_for_online_ingestion write option is True or the wait parameter is True. Supported keys are timeout (seconds to wait, default 60, set to 0 for indefinite) and period (polling interval in seconds, default 1).
  • key start_offline_backfill and value True or False to configure whether or not to start the materialization job to write data to the offline storage. start_offline_backfill is deprecated. Use start_offline_materialization instead.
  • key start_offline_materialization and value True or False to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job gets started immediately.
  • key kafka_producer_config and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties.
  • key internal_kafka and value True or False in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to False and will use external listeners when connecting from outside of Hopsworks.
  • key delta.enableChangeDataFeed set to a string value of true or false to enable or disable cdf operations on the feature group delta table. Set to true by default on Feature Group creation.

TYPE: dict[str, Any] | None DEFAULT: None

validation_options

Additional validation options as key-value pairs.

  • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
  • key save_report boolean value, set to False to skip upload of the validation report to Hopsworks.
  • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
  • key fetch_expectation_suite a boolean value, by default True, to control whether the expectation suite of the feature group should be fetched before every insert.
  • key schema_validation boolean value, set to True to validate the schema.

TYPE: dict[str, Any] | None DEFAULT: None

wait

Wait for job and online ingestion to finish before returning. Shortcut for write_options {"wait_for_job": False, "wait_for_online_ingestion": False}.

TYPE: bool DEFAULT: False

transformation_context

A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. The context variable must be explicitly defined as parameters in the transformation function for these to be accessible during execution.

TYPE: dict[str, Any] DEFAULT: None

transform

When set to False, the dataframe is inserted without applying any on-demand transformations In this case, all required on-demand features must already exist in the provided dataframe.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Job

The job information if python engine is used.

TYPE: Job | None

ValidationReport

The validation report if validation is enabled.

TYPE: ValidationReport | None

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

e.g., fail to create feature group, dataframe schema does not match existing feature group schema, etc.

hsfs.client.exceptions.DataValidationException

If data validation fails and the expectation suite validation_ingestion_policy is set to STRICT. Data is NOT ingested.

[source] multi_part_insert #

multi_part_insert(
    features: pd.DataFrame
    | pl.DataFrame
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | np.ndarray
    | list[list]
    | None = None,
    overwrite: bool = False,
    operation: Literal["insert", "upsert"] = "upsert",
    storage: str | None = None,
    write_options: dict[str, Any] | None = None,
    validation_options: dict[str, Any] | None = None,
    transformation_context: dict[str, Any] = None,
    transform: bool = True,
) -> (
    tuple[Job | None, ValidationReport | None]
    | feature_group_writer.FeatureGroupWriter
)

Get FeatureGroupWriter for optimized multi part inserts or call this method to start manual multi part optimized inserts.

In use cases where very small batches (1 to 1000) rows per Dataframe need to be written to the feature store repeatedly, it might be inefficient to use the standard feature_group.insert() method as it performs some background actions to update the metadata of the feature group object first.

For these cases, the feature group provides the multi_part_insert API, which is optimized for writing many small Dataframes after another.

There are two ways to use this API:

Python Context Manager

Using the Python with syntax you can acquire a FeatureGroupWriter object that implements the same multi_part_insert API.

feature_group = fs.get_or_create_feature_group("fg_name", version=1)

with feature_group.multi_part_insert() as writer:
    # run inserts in a loop:
    while loop:
        small_batch_df = ...
        writer.insert(small_batch_df)

The writer batches the small Dataframes and transmits them to Hopsworks efficiently. When exiting the context, the feature group writer is sure to exit only once all the rows have been transmitted.

Multi part insert with manual context management

Instead of letting Python handle the entering and exiting of the multi part insert context, you can start and finalize the context manually.

feature_group = fs.get_or_create_feature_group("fg_name", version=1)

while loop:
    small_batch_df = ...
    feature_group.multi_part_insert(small_batch_df)

# IMPORTANT: finalize the multi part insert to make sure all rows
# have been transmitted
feature_group.finalize_multi_part_insert()

Note that the first call to multi_part_insert initiates the context and be sure to finalize it. The finalize_multi_part_insert is a blocking call that returns once all rows have been transmitted.

Once you are done with the multi part insert, it is good practice to start the materialization job in order to write the data to the offline storage:

feature_group.materialization_job.run(await_termination=True)
PARAMETER DESCRIPTION
features

Features to be saved.

TYPE: pd.DataFrame | pl.DataFrame | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | np.ndarray | list[list] | None DEFAULT: None

overwrite

Drop all data in the feature group before inserting new data. This does not affect metadata.

TYPE: bool DEFAULT: False

operation

Apache Hudi operation type "insert" or "upsert".

TYPE: Literal['insert', 'upsert'] DEFAULT: 'upsert'

storage

Overwrite default behaviour, write to offline storage only with "offline" or online only with "online".

TYPE: str | None DEFAULT: None

write_options

Additional write options as key-value pairs.

When using the python engine, write_options can contain the following entries:

  • key spark and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group.
  • key wait_for_job and value True or False to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits.
  • key start_offline_backfill and value True or False to configure whether or not to start the materialization job to write data to the offline storage. start_offline_backfill is deprecated. Use start_offline_materialization instead.
  • key start_offline_materialization and value True or False to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job does not get started automatically for multi part inserts.
  • key kafka_producer_config and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties.
  • key internal_kafka and value True or False in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to False and will use external listeners when connecting from outside of Hopsworks.

TYPE: dict[str, Any] | None DEFAULT: None

validation_options

Additional validation options as key-value pairs.

  • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
  • key save_report boolean value, set to False to skip upload of the validation report to Hopsworks.
  • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
  • key fetch_expectation_suite a boolean value, by default False for multi part inserts, to control whether the expectation suite of the feature group should be fetched before every insert.

TYPE: dict[str, Any] | None DEFAULT: None

transformation_context

A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. The context variable must be explicitly defined as parameters in the transformation function for these to be accessible during execution.

TYPE: dict[str, Any] DEFAULT: None

transform

When set to False, the dataframe is inserted without applying any on-demand transformations. In this case, all required on-demand features must already exist in the provided dataframe.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter

One of:

tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter
  • A tuple with job information if python engine is used and the validation report if validation is enabled, or
tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter
  • FeatureGroupWriter when used as a context manager with Python with statement.

[source] finalize_multi_part_insert #

finalize_multi_part_insert() -> None

Finalizes and exits the multi part insert context opened by multi_part_insert in a blocking fashion once all rows have been transmitted.

Multi part insert with manual context management

Instead of letting Python handle the entering and exiting of the multi part insert context, you can start and finalize the context manually.

feature_group = fs.get_or_create_feature_group("fg_name", version=1)

while loop:
    small_batch_df = ...
    feature_group.multi_part_insert(small_batch_df)

# IMPORTANT: finalize the multi part insert to make sure all rows
# have been transmitted
feature_group.finalize_multi_part_insert()

Note that the first call to multi_part_insert initiates the context and be sure to finalize it. The finalize_multi_part_insert is a blocking call that returns once all rows have been transmitted.

[source] insert_stream #

insert_stream(
    features: TypeVar("pyspark.sql.DataFrame"),
    query_name: str | None = None,
    output_mode: Literal[
        "append", "complete", "update"
    ] = "append",
    await_termination: bool = False,
    timeout: int | None = None,
    checkpoint_dir: str | None = None,
    write_options: dict[str, Any] | None = None,
    transformation_context: dict[str, Any] = None,
    transform: bool = True,
) -> TypeVar("StreamingQuery")

Ingest a Spark Structured Streaming Dataframe to the online feature store.

This method creates a long running Spark Streaming Query, you can control the termination of the query through the arguments.

It is possible to stop the returned query with the .stop() and check its status with .isActive.

To get a list of all active queries, use:

sqm = spark.streams

# get the list of active streaming queries
[q.name for q in sqm.active]
Engine Support

Spark only

Stream ingestion using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.

Data Validation Support

insert_stream does not perform any data validation using Great Expectations even when a expectation suite is attached.

PARAMETER DESCRIPTION
features

Features in Streaming Dataframe to be saved.

TYPE: TypeVar('pyspark.sql.DataFrame')

query_name

It is possible to optionally specify a name for the query to make it easier to recognise in the Spark UI.

TYPE: str | None DEFAULT: None

output_mode

Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.

  • "append": Only the new rows in the streaming DataFrame/Dataset will be written to the sink.
  • "complete": All the rows in the streaming DataFrame/Dataset will be written to the sink every time there is some update.
  • "update": Only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. If the query doesn't contain aggregations, it will be equivalent to append mode.

TYPE: Literal['append', 'complete', 'update'] DEFAULT: 'append'

await_termination

Waits for the termination of this query, either by query.stop() or by an exception. If the query has terminated with an exception, then the exception will be thrown. If timeout is set, it returns whether the query has terminated or not within the timeout seconds.

TYPE: bool DEFAULT: False

timeout

Only relevant in combination with await_termination=True.

TYPE: int | None DEFAULT: None

checkpoint_dir

Checkpoint directory location. This will be used to as a reference to from where to resume the streaming job. If None then hsfs will construct as "insert_stream_" + online_topic_name.

TYPE: str | None DEFAULT: None

write_options

Additional write options for Spark as key-value pairs.

TYPE: dict[str, Any] | None DEFAULT: None

transformation_context

A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. The context variable must be explicitly defined as parameters in the transformation function for these to be accessible during execution.

TYPE: dict[str, Any] DEFAULT: None

transform

When set to False, the dataframe is inserted without applying any on-demand transformations. In this case, all required on-demand features must already exist in the provided dataframe.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
TypeVar('StreamingQuery')

Spark Structured Streaming Query object.

[source] commit_details #

commit_details(
    wallclock_time: str
    | int
    | datetime
    | date
    | None = None,
    limit: int | None = None,
) -> dict[str, dict[str, str]]

Retrieves commit timeline for this feature group.

This method can only be used on time travel enabled feature groups

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

commit_details = fg.commit_details()
PARAMETER DESCRIPTION
wallclock_time

Commit details as of specific point in time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date | None DEFAULT: None

limit

Number of commits to retrieve.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, dict[str, str]]

Dictionary object of commit metadata timeline, where Key is commit id and value is Dict[str, str] with key value pairs of date committed on, number of rows updated, inserted and deleted.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If the feature group does not have HUDI time travel format.

[source] commit_delete_record #

commit_delete_record(
    delete_df: TypeVar("pyspark.sql.DataFrame"),
    write_options: dict[Any, Any] | None = None,
) -> None

Drops records present in the provided DataFrame and commits it as update to this Feature group.

This method can only be used on feature groups stored as HUDI or DELTA.

PARAMETER DESCRIPTION
delete_df

dataFrame containing records to be deleted.

TYPE: TypeVar('pyspark.sql.DataFrame')

write_options

User provided write options.

TYPE: dict[Any, Any] | None DEFAULT: None

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] delta_vacuum #

delta_vacuum(retention_hours: int = None) -> None

Vacuum files that are no longer referenced by a Delta table and are older than the retention threshold.

This method can only be used on feature groups stored as DELTA.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

commit_details = fg.delta_vacuum(retention_hours = 168)
PARAMETER DESCRIPTION
retention_hours

User provided retention period. The default retention threshold for the files is 7 days.

TYPE: int DEFAULT: None

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] as_of #

as_of(
    wallclock_time: str
    | int
    | datetime
    | date
    | None = None,
    exclude_until: str
    | int
    | datetime
    | date
    | None = None,
) -> query.Query

Get Query object to retrieve all features of the group at a point in the past.

Pyspark/Spark Only

Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context

This method selects all features in the feature group and returns a Query object at the specified point in time. Optionally, commits before a specified point in time can be excluded from the query. The Query can then either be read into a Dataframe or used further to perform joins or construct a training dataset.

Reading features at a specific point in time
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

# get data at a specific point in time and show it
fg.as_of("2020-10-20 07:34:11").read().show()
Reading commits incrementally between specified points in time
fg.as_of("2020-10-20 07:34:11", exclude_until="2020-10-19 07:34:11").read().show()

The first parameter is inclusive while the latter is exclusive. That means, in order to query a single commit, you need to query that commit time and exclude everything just before the commit.

Reading only the changes from a single commit
fg.as_of("2020-10-20 07:31:38", exclude_until="2020-10-20 07:31:37").read().show()

When no wallclock_time is given, the latest state of features is returned. Optionally, commits before a specified point in time can still be excluded.

Reading the latest state of features, excluding commits before a specified point in time
fg.as_of(None, exclude_until="2020-10-20 07:31:38").read().show()

Note that the interval will be applied to all joins in the query. If you want to query different intervals for different feature groups in the query, you have to apply them in a nested fashion:

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)

fg1.select_all().as_of("2020-10-20", exclude_until="2020-10-19")
    .join(fg2.select_all().as_of("2020-10-20", exclude_until="2020-10-19"))

If instead you apply another as_of selection after the join, all joined feature groups will be queried with this interval:

Example
fg1.select_all().as_of("2020-10-20", exclude_until="2020-10-19")  # as_of is not applied
    .join(fg2.select_all().as_of("2020-10-20", exclude_until="2020-10-15"))  # as_of is not applied
    .as_of("2020-10-20", exclude_until="2020-10-19")
Warning

This function only works for feature groups with time_travel_format='HUDI'.

Warning

Excluding commits via exclude_until is only possible within the range of the Hudi active timeline. By default, Hudi keeps the last 20 to 30 commits in the active timeline. If you need to keep a longer active timeline, you can overwrite the options: hoodie.keep.min.commits and hoodie.keep.max.commits when calling the insert() method.

PARAMETER DESCRIPTION
wallclock_time

Read data as of this point in time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, or %Y-%m-%d %H:%M:%S.

TYPE: str | int | datetime | date | None DEFAULT: None

exclude_until

Exclude commits until this point in time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, or %Y-%m-%d %H:%M:%S.

TYPE: str | int | datetime | date | None DEFAULT: None

RETURNS DESCRIPTION
query.Query

The query object with the applied time travel condition.

[source] get_statistics_by_commit_window #

get_statistics_by_commit_window(
    from_commit_time: str
    | int
    | datetime
    | date
    | None = None,
    to_commit_time: str
    | int
    | datetime
    | date
    | None = None,
    feature_names: list[str] | None = None,
) -> Statistics | list[Statistics] | None

Returns the statistics computed on a specific commit window for this feature group.

If time travel is not enabled, it raises an exception.

If from_commit_time is None, the commit window starts from the first commit. If to_commit_time is None, the commit window ends at the last commit.

Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_statistics = fg.get_statistics_by_commit_window(from_commit_time=None, to_commit_time=None)
PARAMETER DESCRIPTION
to_commit_time

Date and time of the last commit of the window. Defaults to None. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date | None DEFAULT: None

from_commit_time

Date and time of the first commit of the window. Defaults to None. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date | None DEFAULT: None

feature_names

List of feature names of which statistics are retrieved.

TYPE: list[str] | None DEFAULT: None

RETURNS DESCRIPTION
Statistics | list[Statistics] | None

Statistics object or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] compute_statistics #

compute_statistics(
    wallclock_time: str
    | int
    | datetime
    | date
    | None = None,
) -> None

Recompute the statistics for the feature group and save them to the feature store.

Statistics are only computed for data in the offline storage of the feature group.

PARAMETER DESCRIPTION
wallclock_time

If specified will recompute statistics on feature group as of specific point in time. If not specified then will compute statistics as of most recent time of this feature group. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date | None DEFAULT: None

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] SpineGroup #

Bases: FeatureGroupBase

[source] dataframe property writable #

dataframe: (
    pd.DataFrame | TypeVar("pyspark.sql.DataFrame") | None
)

Spine dataframe with primary key, event time and label column to use for point in time join when fetching features.