hsfs.feature_group #
[source] ExternalFeatureGroup #
Bases: FeatureGroupBase
A feature group that references data stored outside Hopsworks.
[source] description property writable #
description: str | None
Description of the feature group, as it appears in the UI.
[source] feature_store_name property #
feature_store_name: str | None
Name of the feature store in which the feature group is located.
[source] save #
save() -> None
Persist the metadata for this external feature group.
Without calling this method, your feature group will only exist in your Python Kernel, but not in Hopsworks.
query = "SELECT * FROM sales"
fg = feature_store.create_external_feature_group(name="sales",
version=1,
description="Physical shop sales features",
data_source=ds,
primary_key=['ss_store_sk'],
event_time='sale_date'
)
fg.save()
[source] insert #
insert(
features: pd.DataFrame
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| np.ndarray
| list[list],
write_options: dict[str, Any] | None = None,
validation_options: dict[str, Any] | None = None,
wait: bool = False,
) -> tuple[
None,
great_expectations.core.ExpectationSuiteValidationResult
| None,
]
Insert the dataframe feature values ONLY in the online feature store.
External Feature Groups contains metadata about feature data in an external storage system. External storage system are usually offline, meaning feature values cannot be retrieved in real-time. In order to use the feature values for real-time use-cases, you can insert them in Hopsoworks Online Feature Store via this method.
The Online Feature Store has a single-entry per primary key value, meaining that providing a new value with for a given primary key will overwrite the existing value. No record of the previous value is kept.
Example
# connect to the Feature Store
fs = ...
# get the External Feature Group instance
fg = fs.get_feature_group(name="external_sales_records", version=1)
# get the feature values, e.g reading from csv files in a S3 bucket
feature_values = ...
# insert the feature values in the online feature store
fg.insert(feature_values)
Note
Data Validation via Great Expectation is supported if you have attached an expectation suite to your External Feature Group. However, as opposed to regular Feature Groups, this can lead to discrepancies between the data in the external storage system and the online feature store.
| PARAMETER | DESCRIPTION |
|---|---|
features | Features to be saved. TYPE: |
write_options | Additional write options as key-value pairs. When using the
|
validation_options | Additional validation options as key-value pairs.
|
wait | Wait for job and online ingestion to finish before returning. Shortcut for write_options TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
tuple[None, great_expectations.core.ExpectationSuiteValidationResult | None] | The validation report if validation is enabled. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | e.g., fail to create feature group, dataframe schema does not match existing feature group schema, etc. |
hsfs.client.exceptions.DataValidationException | If data validation fails and the expectation suite |
[source] read #
read(
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
online: bool = False,
read_options: dict[str, Any] | None = None,
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| pl.DataFrame
| np.ndarray
)
Get the feature group as a DataFrame.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
df = fg.read()
Engine Support
Spark only
Reading an External Feature Group directly into a Pandas Dataframe using Python/Pandas as Engine is not supported, however, you can use the Query API to create Feature Views/Training Data containing External Feature Groups.
| PARAMETER | DESCRIPTION |
|---|---|
dataframe_type | The type of the returned dataframe. By default, maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. TYPE: |
online | If TYPE: |
read_options | Additional options as key/value pairs to pass to the spark engine. |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray | One of: |
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray |
|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray |
|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray |
|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray |
|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray |
|
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
hopsworks.client.exceptions.FeatureStoreException | If trying to read an external feature group directly in. |
[source] show #
Show the first n rows of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# make a query and show top 5 rows
fg.select(['date','weekly_sales','is_holiday']).show(5)
| PARAMETER | DESCRIPTION |
|---|---|
n | Number of rows to show. TYPE: |
online | If TYPE: |
[source] find_neighbors #
find_neighbors(
embedding: list[int | float],
col: str | None = None,
k: int | None = 10,
filter: Filter | Logic | None = None,
options: dict | None = None,
) -> list[tuple[float, list[Any]]]
Finds the nearest neighbors for a given embedding in the vector database.
If filter is specified, or if embedding feature is stored in default project index, the number of results returned may be less than k. Try using a large value of k and extract the top k items from the results if needed.
| PARAMETER | DESCRIPTION |
|---|---|
embedding | The target embedding for which neighbors are to be found. |
col | The column name used to compute similarity score. Required only if there are multiple embeddings. TYPE: |
k | The number of nearest neighbors to retrieve. TYPE: |
filter | A filter expression to restrict the search space. TYPE: |
options | The options used for the request to the vector database. The keys are attribute values of the TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[float, list[Any]]] | A list of tuples representing the nearest neighbors. |
list[tuple[float, list[Any]]] | Each tuple contains: |
Example
embedding_index = EmbeddingIndex()
embedding_index.add_embedding(name="user_vector", dimension=3)
fg = fs.create_feature_group(
name='air_quality',
embedding_index = embedding_index,
version=1,
primary_key=['id1'],
online_enabled=True,
)
fg.insert(data)
fg.find_neighbors(
[0.1, 0.2, 0.3],
k=5,
)
# apply filter
fg.find_neighbors(
[0.1, 0.2, 0.3],
k=5,
filter=(fg.id1 > 10) & (fg.id1 < 30)
)
[source] FeatureGroup #
Bases: FeatureGroupBase
[source] description property writable #
description: str | None
Description of the feature group contents.
[source] time_travel_format property writable #
time_travel_format: str | None
Setting of the feature group time travel format.
[source] hudi_precombine_key property writable #
hudi_precombine_key: str | None
Feature name that is the hudi precombine key.
[source] feature_store_name property #
feature_store_name: str | None
Name of the feature store in which the feature group is located.
[source] stream property writable #
stream: bool
Whether to enable real time stream writing capabilities.
[source] parents property writable #
parents: list[explicit_provenance.Links]
Parent feature groups as origin of the data in the current feature group.
This is part of explicit provenance.
[source] materialization_job property #
materialization_job: Job | None
Get the Job object reference for the materialization job for this Feature Group.
[source] statistics property #
statistics: Statistics
Get the latest computed statistics for the whole feature group.
[source] transformation_functions property writable #
transformation_functions: list[TransformationFunction]
Get transformation functions.
[source] offline_backfill_every_hr property writable #
On Feature Group creation, used to set scheduled run of the materialisation job.
[source] sink_enabled property #
sink_enabled: bool
Get whether sink is enabled for this feature group.
[source] sink_job property #
sink_job: job.Job | None
Return the sink job created for this feature group, if any.
[source] sink_job_conf property writable #
sink_job_conf: SinkJobConfiguration
Sink job configuration object defining the settings for sink job of the feature group.
[source] read #
read(
wallclock_time: str
| int
| datetime
| date
| None = None,
online: bool = False,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
read_options: dict | None = None,
) -> (
pd.DataFrame
| np.ndarray
| list[list[Any]]
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pl.DataFrame
)
Read the feature group into a dataframe.
Reads the feature group by default from the offline storage as Spark DataFrame on Hopsworks and Databricks, and as Pandas dataframe on AWS Sagemaker and pure Python environments.
Set online to True to read from the online storage, or change dataframe_type to read as a different format.
Reading feature group as of latest state
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.read()
Reading feature group as of specific point in time:
fg = fs.get_or_create_feature_group(...)
fg.read("2020-10-20 07:34:11")
| PARAMETER | DESCRIPTION |
|---|---|
wallclock_time | If specified, retrieves feature group as of specific point in time. If not specified, returns as of most recent time. Strings should be formatted in one of the following formats |
online | If TYPE: |
dataframe_type | The type of the returned dataframe. By default, maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. TYPE: |
read_options | Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. For python engine: - key TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame | One of the following: |
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | No data is available for feature group with this commit date, if time travel enabled. |
[source] read_changes #
read_changes(
start_wallclock_time: str | int | datetime | date,
end_wallclock_time: str | int | datetime | date,
read_options: dict | None = None,
) -> (
pd.DataFrame
| np.ndarray
| list[list[Any]]
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pl.DataFrame
)
Reads updates of this feature that occurred between specified points in time.
Deprecated
read_changes method is deprecated. Use as_of(end_wallclock_time, exclude_until=start_wallclock_time).read(read_options=read_options) instead.
Pyspark/Spark Only
Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context.
Warning
This function only works for feature groups with time_travel_format='HUDI'.
| PARAMETER | DESCRIPTION |
|---|---|
start_wallclock_time | Start time of the time travel query. Strings should be formatted in one of the following formats |
end_wallclock_time | End time of the time travel query. Strings should be formatted in one of the following formats |
read_options | Additional options as key/value pairs to pass to the execution engine. For spark engine, it is a dictionary of read options for Spark. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame | The spark dataframe containing the incremental changes of feature data. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | No data is available for feature group with this commit date. |
hopsworks.client.exceptions.FeatureStoreException | If the feature group does not have |
[source] find_neighbors #
find_neighbors(
embedding: list[int | float],
col: str | None = None,
k: int | None = 10,
filter: Filter | Logic | None = None,
options: dict | None = None,
) -> list[tuple[float, list[Any]]]
Finds the nearest neighbors for a given embedding in the vector database.
If filter is specified, or if embedding feature is stored in default project index, the number of results returned may be less than k. Try using a large value of k and extract the top k items from the results if needed.
| PARAMETER | DESCRIPTION |
|---|---|
embedding | The target embedding for which neighbors are to be found. |
col | The column name used to compute similarity score. Required only if there are multiple embeddings. TYPE: |
k | The number of nearest neighbors to retrieve. TYPE: |
filter | A filter expression to restrict the search space. TYPE: |
options | The options used for the request to the vector database. The keys are attribute values of the TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[float, list[Any]]] | A list of tuples representing the nearest neighbors. |
list[tuple[float, list[Any]]] | Each tuple contains: |
Example
embedding_index = EmbeddingIndex()
embedding_index.add_embedding(name="user_vector", dimension=3)
fg = fs.create_feature_group(
name='air_quality',
embedding_index = embedding_index,
version=1,
primary_key=['id1'],
online_enabled=True,
)
fg.insert(data)
fg.find_neighbors(
[0.1, 0.2, 0.3],
k=5,
)
# apply filter
fg.find_neighbors(
[0.1, 0.2, 0.3],
k=5,
filter=(fg.id1 > 10) & (fg.id1 < 30)
)
[source] show #
Show the first n rows of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# make a query and show top 5 rows
fg.select(['date','weekly_sales','is_holiday']).show(5)
| PARAMETER | DESCRIPTION |
|---|---|
n | Number of rows to show. TYPE: |
online | If TYPE: |
[source] save #
save(
features: pd.DataFrame
| pl.DataFrame
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| np.ndarray
| list[feature.Feature] = None,
write_options: dict[str, Any] | None = None,
validation_options: dict[str, Any] | None = None,
wait: bool = False,
) -> tuple[
Job | None,
great_expectations.core.ExpectationSuiteValidationResult
| None,
]
Persist the metadata and materialize the feature group to the feature store.
Changed in 3.3.0
insert and save methods are now async by default in non-spark clients. To achieve the old behaviour, set wait argument to True.
Calling save creates the metadata for the feature group in the feature store. If a Pandas DataFrame, Polars DatFrame, RDD or Ndarray is provided, the data is written to the online/offline feature store as specified. By default, this writes the feature group to the offline storage, and if online_enabled for the feature group, also to the online feature store. The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, or a two-dimensional Numpy array or a two-dimensional Python nested list.
| PARAMETER | DESCRIPTION |
|---|---|
features | Features to be saved. This argument is optional if the feature list is provided in the TYPE: |
write_options | Additional write options as key-value pairs. When using the
|
validation_options | Additional validation options as key-value pairs.
|
wait | Wait for job and online ingestion to finish before returning. Shortcut for write_options TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
tuple[Job | None, great_expectations.core.ExpectationSuiteValidationResult | None] | When using the |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] insert #
insert(
features: pd.DataFrame
| pl.DataFrame
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| np.ndarray
| list[list],
overwrite: bool = False,
operation: Literal["insert", "upsert"] = "upsert",
storage: str | None = None,
write_options: dict[str, Any] | None = None,
validation_options: dict[str, Any] | None = None,
wait: bool = False,
transformation_context: dict[str, Any] = None,
transform: bool = True,
) -> tuple[Job | None, ValidationReport | None]
Persist the metadata and materialize the feature group to the feature store or insert data from a dataframe into the existing feature group.
Incrementally insert data to a feature group or overwrite all data contained in the feature group. By default, the data is inserted into the offline storage as well as the online storage if the feature group is online_enabled=True.
The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, a Polars DataFrame or a two-dimensional Numpy array or a two-dimensional Python nested list. If statistics are enabled, statistics are recomputed for the entire feature group. If feature group's time travel format is HUDI then operation argument can be either insert or upsert.
If feature group doesn't exist the insert method will create the necessary metadata the first time it is invoked and writes the specified features dataframe as feature group to the online/offline feature store.
Changed in 3.3.0
insert and save methods are now async by default in non-spark clients. To achieve the old behaviour, set wait argument to True.
Upsert new feature data with time travel format HUDI
# connect to the Feature Store
fs = ...
fg = fs.get_or_create_feature_group(
name='bitcoin_price',
description='Bitcoin price aggregated for days',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
fg.insert(df_bitcoin_processed)
Async insert
# connect to the Feature Store
fs = ...
fg1 = fs.get_or_create_feature_group(
name='feature_group_name1',
description='Description of the first FG',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
# async insertion in order not to wait till finish of the job
fg.insert(df_for_fg1, write_options={"wait_for_job" : False})
fg2 = fs.get_or_create_feature_group(
name='feature_group_name2',
description='Description of the second FG',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
fg.insert(df_for_fg2)
| PARAMETER | DESCRIPTION |
|---|---|
features | Features to be saved. TYPE: |
overwrite | Drop all data in the feature group before inserting new data. This does not affect metadata. TYPE: |
operation | Apache Hudi operation type TYPE: |
storage | Overwrite default behaviour, write to offline storage only with TYPE: |
write_options | Additional write options as key-value pairs. When using the
|
validation_options | Additional validation options as key-value pairs.
|
wait | Wait for job and online ingestion to finish before returning. Shortcut for write_options TYPE: |
transformation_context | A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. The |
transform | When set to TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
Job | The job information if python engine is used. TYPE: |
ValidationReport | The validation report if validation is enabled. TYPE: |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | e.g., fail to create feature group, dataframe schema does not match existing feature group schema, etc. |
hsfs.client.exceptions.DataValidationException | If data validation fails and the expectation suite |
[source] multi_part_insert #
multi_part_insert(
features: pd.DataFrame
| pl.DataFrame
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| np.ndarray
| list[list]
| None = None,
overwrite: bool = False,
operation: Literal["insert", "upsert"] = "upsert",
storage: str | None = None,
write_options: dict[str, Any] | None = None,
validation_options: dict[str, Any] | None = None,
transformation_context: dict[str, Any] = None,
transform: bool = True,
) -> (
tuple[Job | None, ValidationReport | None]
| feature_group_writer.FeatureGroupWriter
)
Get FeatureGroupWriter for optimized multi part inserts or call this method to start manual multi part optimized inserts.
In use cases where very small batches (1 to 1000) rows per Dataframe need to be written to the feature store repeatedly, it might be inefficient to use the standard feature_group.insert() method as it performs some background actions to update the metadata of the feature group object first.
For these cases, the feature group provides the multi_part_insert API, which is optimized for writing many small Dataframes after another.
There are two ways to use this API:
Python Context Manager
Using the Python with syntax you can acquire a FeatureGroupWriter object that implements the same multi_part_insert API.
feature_group = fs.get_or_create_feature_group("fg_name", version=1)
with feature_group.multi_part_insert() as writer:
# run inserts in a loop:
while loop:
small_batch_df = ...
writer.insert(small_batch_df)
The writer batches the small Dataframes and transmits them to Hopsworks efficiently. When exiting the context, the feature group writer is sure to exit only once all the rows have been transmitted.
Multi part insert with manual context management
Instead of letting Python handle the entering and exiting of the multi part insert context, you can start and finalize the context manually.
feature_group = fs.get_or_create_feature_group("fg_name", version=1)
while loop:
small_batch_df = ...
feature_group.multi_part_insert(small_batch_df)
# IMPORTANT: finalize the multi part insert to make sure all rows
# have been transmitted
feature_group.finalize_multi_part_insert()
Note that the first call to multi_part_insert initiates the context and be sure to finalize it. The finalize_multi_part_insert is a blocking call that returns once all rows have been transmitted.
Once you are done with the multi part insert, it is good practice to start the materialization job in order to write the data to the offline storage:
feature_group.materialization_job.run(await_termination=True)
| PARAMETER | DESCRIPTION |
|---|---|
features | Features to be saved. TYPE: |
overwrite | Drop all data in the feature group before inserting new data. This does not affect metadata. TYPE: |
operation | Apache Hudi operation type TYPE: |
storage | Overwrite default behaviour, write to offline storage only with TYPE: |
write_options | Additional write options as key-value pairs. When using the
|
validation_options | Additional validation options as key-value pairs.
|
transformation_context | A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. The |
transform | When set to TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter | One of: |
tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter |
|
tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter |
|
[source] finalize_multi_part_insert #
finalize_multi_part_insert() -> None
Finalizes and exits the multi part insert context opened by multi_part_insert in a blocking fashion once all rows have been transmitted.
Multi part insert with manual context management
Instead of letting Python handle the entering and exiting of the multi part insert context, you can start and finalize the context manually.
feature_group = fs.get_or_create_feature_group("fg_name", version=1)
while loop:
small_batch_df = ...
feature_group.multi_part_insert(small_batch_df)
# IMPORTANT: finalize the multi part insert to make sure all rows
# have been transmitted
feature_group.finalize_multi_part_insert()
Note that the first call to multi_part_insert initiates the context and be sure to finalize it. The finalize_multi_part_insert is a blocking call that returns once all rows have been transmitted.
[source] insert_stream #
insert_stream(
features: TypeVar("pyspark.sql.DataFrame"),
query_name: str | None = None,
output_mode: Literal[
"append", "complete", "update"
] = "append",
await_termination: bool = False,
timeout: int | None = None,
checkpoint_dir: str | None = None,
write_options: dict[str, Any] | None = None,
transformation_context: dict[str, Any] = None,
transform: bool = True,
) -> TypeVar("StreamingQuery")
Ingest a Spark Structured Streaming Dataframe to the online feature store.
This method creates a long running Spark Streaming Query, you can control the termination of the query through the arguments.
It is possible to stop the returned query with the .stop() and check its status with .isActive.
To get a list of all active queries, use:
sqm = spark.streams
# get the list of active streaming queries
[q.name for q in sqm.active]
Engine Support
Spark only
Stream ingestion using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.
Data Validation Support
insert_stream does not perform any data validation using Great Expectations even when a expectation suite is attached.
| PARAMETER | DESCRIPTION |
|---|---|
features | Features in Streaming Dataframe to be saved. TYPE: |
query_name | It is possible to optionally specify a name for the query to make it easier to recognise in the Spark UI. TYPE: |
output_mode | Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
TYPE: |
await_termination | Waits for the termination of this query, either by query.stop() or by an exception. If the query has terminated with an exception, then the exception will be thrown. If timeout is set, it returns whether the query has terminated or not within the timeout seconds. TYPE: |
timeout | Only relevant in combination with TYPE: |
checkpoint_dir | Checkpoint directory location. This will be used to as a reference to from where to resume the streaming job. If TYPE: |
write_options | Additional write options for Spark as key-value pairs. |
transformation_context | A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. The |
transform | When set to TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('StreamingQuery') | Spark Structured Streaming Query object. |
[source] commit_details #
commit_details(
wallclock_time: str
| int
| datetime
| date
| None = None,
limit: int | None = None,
) -> dict[str, dict[str, str]]
Retrieves commit timeline for this feature group.
This method can only be used on time travel enabled feature groups
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
commit_details = fg.commit_details()
| PARAMETER | DESCRIPTION |
|---|---|
wallclock_time | Commit details as of specific point in time. Strings should be formatted in one of the following formats |
limit | Number of commits to retrieve. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, dict[str, str]] | Dictionary object of commit metadata timeline, where Key is commit id and value is |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
hopsworks.client.exceptions.FeatureStoreException | If the feature group does not have |
[source] commit_delete_record #
commit_delete_record(
delete_df: TypeVar("pyspark.sql.DataFrame"),
write_options: dict[Any, Any] | None = None,
) -> None
Drops records present in the provided DataFrame and commits it as update to this Feature group.
This method can only be used on feature groups stored as HUDI or DELTA.
| PARAMETER | DESCRIPTION |
|---|---|
delete_df | dataFrame containing records to be deleted. TYPE: |
write_options | User provided write options. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] delta_vacuum #
delta_vacuum(retention_hours: int = None) -> None
Vacuum files that are no longer referenced by a Delta table and are older than the retention threshold.
This method can only be used on feature groups stored as DELTA.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
commit_details = fg.delta_vacuum(retention_hours = 168)
| PARAMETER | DESCRIPTION |
|---|---|
retention_hours | User provided retention period. The default retention threshold for the files is 7 days. TYPE: |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] as_of #
as_of(
wallclock_time: str
| int
| datetime
| date
| None = None,
exclude_until: str
| int
| datetime
| date
| None = None,
) -> query.Query
Get Query object to retrieve all features of the group at a point in the past.
Pyspark/Spark Only
Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context
This method selects all features in the feature group and returns a Query object at the specified point in time. Optionally, commits before a specified point in time can be excluded from the query. The Query can then either be read into a Dataframe or used further to perform joins or construct a training dataset.
Reading features at a specific point in time
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# get data at a specific point in time and show it
fg.as_of("2020-10-20 07:34:11").read().show()
Reading commits incrementally between specified points in time
fg.as_of("2020-10-20 07:34:11", exclude_until="2020-10-19 07:34:11").read().show()
The first parameter is inclusive while the latter is exclusive. That means, in order to query a single commit, you need to query that commit time and exclude everything just before the commit.
Reading only the changes from a single commit
fg.as_of("2020-10-20 07:31:38", exclude_until="2020-10-20 07:31:37").read().show()
When no wallclock_time is given, the latest state of features is returned. Optionally, commits before a specified point in time can still be excluded.
Reading the latest state of features, excluding commits before a specified point in time
fg.as_of(None, exclude_until="2020-10-20 07:31:38").read().show()
Note that the interval will be applied to all joins in the query. If you want to query different intervals for different feature groups in the query, you have to apply them in a nested fashion:
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)
fg1.select_all().as_of("2020-10-20", exclude_until="2020-10-19")
.join(fg2.select_all().as_of("2020-10-20", exclude_until="2020-10-19"))
If instead you apply another as_of selection after the join, all joined feature groups will be queried with this interval:
Example
fg1.select_all().as_of("2020-10-20", exclude_until="2020-10-19") # as_of is not applied
.join(fg2.select_all().as_of("2020-10-20", exclude_until="2020-10-15")) # as_of is not applied
.as_of("2020-10-20", exclude_until="2020-10-19")
Warning
This function only works for feature groups with time_travel_format='HUDI'.
Warning
Excluding commits via exclude_until is only possible within the range of the Hudi active timeline. By default, Hudi keeps the last 20 to 30 commits in the active timeline. If you need to keep a longer active timeline, you can overwrite the options: hoodie.keep.min.commits and hoodie.keep.max.commits when calling the insert() method.
| PARAMETER | DESCRIPTION |
|---|---|
wallclock_time | Read data as of this point in time. Strings should be formatted in one of the following formats |
exclude_until | Exclude commits until this point in time. Strings should be formatted in one of the following formats |
| RETURNS | DESCRIPTION |
|---|---|
query.Query | The query object with the applied time travel condition. |
[source] get_statistics_by_commit_window #
get_statistics_by_commit_window(
from_commit_time: str
| int
| datetime
| date
| None = None,
to_commit_time: str
| int
| datetime
| date
| None = None,
feature_names: list[str] | None = None,
) -> Statistics | list[Statistics] | None
Returns the statistics computed on a specific commit window for this feature group.
If time travel is not enabled, it raises an exception.
If from_commit_time is None, the commit window starts from the first commit. If to_commit_time is None, the commit window ends at the last commit.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_statistics = fg.get_statistics_by_commit_window(from_commit_time=None, to_commit_time=None)
| PARAMETER | DESCRIPTION |
|---|---|
to_commit_time | Date and time of the last commit of the window. Defaults to |
from_commit_time | Date and time of the first commit of the window. Defaults to |
feature_names | List of feature names of which statistics are retrieved. |
| RETURNS | DESCRIPTION |
|---|---|
Statistics | list[Statistics] | None | Statistics object or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] compute_statistics #
Recompute the statistics for the feature group and save them to the feature store.
Statistics are only computed for data in the offline storage of the feature group.
| PARAMETER | DESCRIPTION |
|---|---|
wallclock_time | If specified will recompute statistics on feature group as of specific point in time. If not specified then will compute statistics as of most recent time of this feature group. Strings should be formatted in one of the following formats |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |