Feature View#
FeatureView#
hsfs.feature_view.FeatureView(
name,
query,
featurestore_id,
id=None,
version=None,
description="",
labels=[],
inference_helper_columns=[],
training_helper_columns=[],
transformation_functions={},
featurestore_name=None,
serving_keys=None,
**kwargs
)
Creation#
create_feature_view#
FeatureStore.create_feature_view(
name,
query,
version=None,
description="",
labels=[],
inference_helper_columns=[],
training_helper_columns=[],
transformation_functions={},
)
Create a feature view metadata object and saved it to hopsworks.
Example
# connect to the Feature Store
fs = ...
# get the feature group instances
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)
# construct the query
query = fg1.select_all().join(fg2.select_all())
# get the transformation functions
standard_scaler = fs.get_transformation_function(name='standard_scaler')
# construct dictionary of "feature - transformation function" pairs
transformation_functions = {col_name: standard_scaler for col_name in df.columns}
feature_view = fs.create_feature_view(
name='air_quality_fv',
version=1,
transformation_functions=transformation_functions,
query=query
)
Example
# get feature store instance
fs = ...
# define query object
query = ...
# define dictionary with column names and transformation functions pairs
mapping_transformers = ...
# create feature view
feature_view = fs.create_feature_view(
name='feature_view_name',
version=1,
transformation_functions=mapping_transformers,
query=query
)
Warning
as_of
argument in the Query
will be ignored because
feature view does not support time travel query.
Arguments
- name
str
: Name of the feature view to create. - query
hsfs.constructor.query.Query
: Feature storeQuery
. - version
Optional[int]
: Version of the feature view to create, defaults toNone
and will create the feature view with incremented version from the last version in the feature store. - description
Optional[str]
: A string describing the contents of the feature view to improve discoverability for Data Scientists, defaults to empty string""
. - labels
Optional[List[str]]
: A list of feature names constituting the prediction label/feature of the feature view. When replaying aQuery
during model inference, the label features can be omitted from the feature vector retrieval. Defaults to[]
, no label. - inference_helper_columns
Optional[List[str]]
: A list of feature names that are not used in training the model itself but can be used during batch or online inference for extra information. Inference helper column name(s) must be part of theQuery
object. If inference helper column name(s) belong to feature group that is part of aJoin
withprefix
defined, then this prefix needs to be prepended to the original column name when defininginference_helper_columns
list. When replaying aQuery
during model inference, the inference helper columns optionally can be omitted during batch (get_batch_data
) and will be omitted during online inference (get_feature_vector(s)
). To get inference helper column(s) during online inference useget_inference_helper(s)
method. Defaults to `[], no helper columns. - training_helper_columns
Optional[List[str]]
: A list of feature names that are not the part of the model schema itself but can be used during training as a helper for extra information. Training helper column name(s) must be part of theQuery
object. If training helper column name(s) belong to feature group that is part of aJoin
withprefix
defined, then this prefix needs to prepended to the original column name when definingtraining_helper_columns
list. When replaying aQuery
during model inference, the training helper columns will be omitted during both batch and online inference. Training helper columns can be optionally fetched with training data. For more details see documentation for feature view's get training data methods. Defaults to `[], no training helper columns. - transformation_functions
Optional[Dict[str, hsfs.transformation_function.TransformationFunction]]
: A dictionary mapping tansformation functions to to the features they should be applied to before writing out the vector and at inference time. Defaults to{}
, no transformations.
Returns:
FeatureView
: The feature view metadata object.
Retrieval#
get_feature_view#
FeatureStore.get_feature_view(name, version=None)
Get a feature view entity from the feature store.
Getting a feature view from the Feature Store means getting its metadata.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(
name='feature_view_name',
version=1
)
Arguments
- name
str
: Name of the feature view to get. - version
int
: Version of the feature view to retrieve, defaults toNone
and will return theversion=1
.
Returns
FeatureView
: The feature view metadata object.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature view from the feature store.
get_feature_views#
FeatureStore.get_feature_views(name)
Get a list of all versions of a feature view entity from the feature store.
Getting a feature view from the Feature Store means getting its metadata.
Example
# get feature store instance
fs = ...
# get a list of all versions of a feature view
feature_view = fs.get_feature_views(
name='feature_view_name'
)
Arguments
- name: Name of the feature view to get.
Returns
FeatureView
: List of feature view metadata objects.
Raises
hsfs.client.exceptions.RestAPIError
: If unable to retrieve feature view from the feature store.
Properties#
description#
Description of the feature view.
feature_store_name#
Name of the feature store in which the feature group is located.
features#
Feature view schema. (alias)
featurestore_id#
Feature store id.
id#
Feature view id.
inference_helper_columns#
The helper column sof the feature view.
Can be a composite of multiple features.
labels#
The labels/prediction feature of the feature view.
Can be a composite of multiple features.
name#
Name of the feature view.
primary_keys#
Set of primary key names that is required as keys in input dict object
for get_feature_vector(s)
method.
When there are duplicated primary key names and prefix is not defined in the query,
prefix is generated and prepended to the primary key name in this format
"fgId_{feature_group_id}_{join_index}" where join_index
is the order of the join.
query#
Query of the feature view.
schema#
Feature view schema.
serving_keys#
All primary keys of the feature groups included in the query.
training_helper_columns#
The helper column sof the feature view.
Can be a composite of multiple features.
transformation_functions#
Get transformation functions.
version#
Version number of the feature view.
Methods#
add_tag#
FeatureView.add_tag(name, value)
Attach a tag to a feature view.
A tag consists of a name and value pair. Tag names are unique identifiers across the whole cluster. The value of a tag can be any valid json - primitives, arrays or json objects.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# attach a tag to a feature view
feature_view.add_tag(name="tag_schema", value={"key", "value"})
Arguments
- name
str
: Name of the tag to be added. - value: Value of the tag to be added.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to add the tag.
add_training_dataset_tag#
FeatureView.add_training_dataset_tag(training_dataset_version, name, value)
Attach a tag to a training dataset.
Example
# get feature store instance
fs = ...
# get feature feature view instance
feature_view = fs.get_feature_view(...)
# attach a tag to a training dataset
feature_view.add_training_dataset_tag(
training_dataset_version=1,
name="tag_schema",
value={"key", "value"}
)
Arguments
- training_dataset_version
int
: training dataset version - name
str
: Name of the tag to be added. - value: Value of the tag to be added.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to add the tag.
clean#
FeatureView.clean(feature_store_id, feature_view_name, feature_view_version)
Delete the feature view and all associated metadata and training data. This can delete corrupted feature view which cannot be retrieved due to a corrupted query for example.
Example
# delete a feature view and all associated metadata
from hsfs.feature_view import FeatureView
FeatureView.clean(
feature_store_id=1,
feature_view_name='feature_view_name',
feature_view_version=1
)
Potentially dangerous operation
This operation drops all metadata associated with this version of the feature view and related training dataset and materialized data in HopsFS.
Arguments
- feature_store_id
int
: int. Id of feature store. - feature_view_name
str
: str. Name of feature view. - feature_view_version
str
: str. Version of feature view.
Raises
hsfs.client.exceptions.RestAPIError
.
create_feature_monitoring#
FeatureView.create_feature_monitoring(
name,
feature_name,
description=None,
start_date_time=None,
end_date_time=None,
cron_expression="0 0 12 ? * * *",
)
Enable feature monitoring to compare statistics on snapshots of feature data over time.
Experimental
Public API is subject to change, this feature is not suitable for production use-cases.
Example
# fetch feature view
fg = fs.get_feature_view(name="my_feature_view", version=1)
# enable feature monitoring
my_config = fg.create_feature_monitoring(
name="my_monitoring_config",
feature_name="my_feature",
description="my monitoring config description",
cron_expression="0 0 12 ? * * *",
).with_detection_window(
# Data inserted in the last day
time_offset="1d",
window_length="1d",
).with_reference_window(
# compare to a given value
specific_value=0.5,
).compare_on(
metric="mean",
threshold=0.5,
).save()
Arguments
- name
str
: Name of the feature monitoring configuration. name must be unique for all configurations attached to the feature group. - feature_name
str
: Name of the feature to monitor. - description
Optional[str]
: Description of the feature monitoring configuration. - start_date_time
Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]
: Start date and time from which to start computing statistics. - end_date_time
Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]
: End date and time at which to stop computing statistics. - cron_expression
Optional[str]
: Cron expression to use to schedule the job. The cron expression must be in UTC and follow the Quartz specification. Default is '0 0 12 ? * ', every day at 12pm UTC.
Raises
hsfs.client.exceptions.FeatureStoreException
.
Return
FeatureMonitoringConfig
Configuration with minimal information about the feature monitoring.
Additional information are required before feature monitoring is enabled.
create_statistics_monitoring#
FeatureView.create_statistics_monitoring(
name,
feature_name=None,
description=None,
start_date_time=None,
end_date_time=None,
cron_expression="0 0 12 ? * * *",
)
Run a job to compute statistics on snapshot of feature data on a schedule.
Experimental
Public API is subject to change, this feature is not suitable for production use-cases.
Example
# fetch feature view
fv = fs.get_feature_view(name="my_feature_view", version=1)
# enable statistics monitoring
my_config = fv._create_statistics_monitoring(
name="my_config",
start_date_time="2021-01-01 00:00:00",
description="my description",
cron_expression="0 0 12 ? * * *",
).with_detection_window(
# Statistics computed on 10% of the last week of data
time_offset="1w",
row_percentage=0.1,
).save()
Arguments
- name
str
: Name of the feature monitoring configuration. name must be unique for all configurations attached to the feature view. - feature_name
Optional[str]
: Name of the feature to monitor. If not specified, statistics will be computed for all features. - description
Optional[str]
: Description of the feature monitoring configuration. - start_date_time
Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]
: Start date and time from which to start computing statistics. - end_date_time
Optional[Union[str, int, datetime.date, datetime.datetime, pandas._libs.tslibs.timestamps.Timestamp]]
: End date and time at which to stop computing statistics. - cron_expression
Optional[str]
: Cron expression to use to schedule the job. The cron expression must be in UTC and follow the Quartz specification. Default is '0 0 12 ? * ', every day at 12pm UTC.
Raises
hsfs.client.exceptions.FeatureStoreException
.
Return
FeatureMonitoringConfig
Configuration with minimal information about the feature monitoring.
Additional information are required before feature monitoring is enabled.
create_train_test_split#
FeatureView.create_train_test_split(
test_size=None,
train_start="",
train_end="",
test_start="",
test_end="",
storage_connector=None,
location="",
description="",
extra_filter=None,
data_format="parquet",
coalesce=False,
seed=None,
statistics_config=None,
write_options={},
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
)
Create the metadata for a training dataset and save the corresponding training data into location
.
The training data is split into train and test set at random or according to time ranges.
The training data can be retrieved by calling feature_view.get_train_test_split
.
Create random splits
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# create a train-test split dataset
version, job = feature_view.create_train_test_split(
test_size=0.2,
description='Description of a dataset',
# you can have different data formats such as csv, tsv, tfrecord, parquet and others
data_format='csv'
)
Create time series splits by specifying date as string
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up dates
train_start = "2022-01-01 00:00:00"
train_end = "2022-06-06 23:59:59"
test_start = "2022-06-07 00:00:00"
test_end = "2022-12-25 23:59:59"
# create a train-test split dataset
version, job = feature_view.create_train_test_split(
train_start=train_start,
train_end=train_end,
test_start=test_start,
test_end=test_end,
description='Description of a dataset',
# you can have different data formats such as csv, tsv, tfrecord, parquet and others
data_format='csv'
)
Create time series splits by specifying date as datetime object
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up dates
from datetime import datetime
date_format = "%Y-%m-%d %H:%M:%S"
train_start = datetime.strptime("2022-01-01 00:00:00", date_format)
train_end = datetime.strptime("2022-06-06 23:59:59", date_format)
test_start = datetime.strptime("2022-06-07 00:00:00", date_format)
test_end = datetime.strptime("2022-12-25 23:59:59" , date_format)
# create a train-test split dataset
version, job = feature_view.create_train_test_split(
train_start=train_start,
train_end=train_end,
test_start=test_start,
test_end=test_end,
description='Description of a dataset',
# you can have different data formats such as csv, tsv, tfrecord, parquet and others
data_format='csv'
)
Write training dataset to external storage
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get storage connector instance
external_storage_connector = fs.get_storage_connector("storage_connector_name")
# create a train-test split dataset
version, job = feature_view.create_train_test_split(
train_start=...,
train_end=...,
test_start=...,
test_end=...,
storage_connector = external_storage_connector,
description=...,
# you can have different data formats such as csv, tsv, tfrecord, parquet and others
data_format=...
)
Data Formats
The feature store currently supports the following data formats for training datasets:
- tfrecord
- csv
- tsv
- parquet
- avro
- orc
Currently not supported petastorm, hdf5 and npy file formats.
Warning, the following code will fail because category column contains sparse values and training dataset may not have all values available in test split.
import pandas as pd
df = pd.DataFrame({
'category_col':['category_a','category_b','category_c','category_d'],
'numeric_col': [40,10,60,40]
})
feature_group = fs.get_or_create_feature_group(
name='feature_group_name',
version=1,
primary_key=['category_col']
)
feature_group.insert(df)
label_encoder = fs.get_transformation_function(name='label_encoder')
feature_view = fs.create_feature_view(
name='feature_view_name',
query=feature_group.select_all(),
transformation_functions={'category_col':label_encoder}
)
feature_view.create_train_test_split(
test_size=0.5
)
# Output: KeyError: 'category_c'
Spine Groups/Dataframes
Spine groups and dataframes are currently only supported with the Spark engine and Spark dataframes.
Arguments
- test_size
Optional[float]
: size of test set. - train_start
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the train split query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - train_end
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the train split query, exclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - test_start
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the test split query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - test_end
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the test split query, exclusive. Strings should be formatted in one of the following ormats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - storage_connector
Optional[hsfs.StorageConnector]
: Storage connector defining the sink location for the training dataset, defaults toNone
, and materializes training dataset on HopsFS. - location
Optional[str]
: Path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset. Defaults to""
, saving the training dataset at the root defined by the storage connector. - description
Optional[str]
: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string""
. - extra_filter
Optional[Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]]
: Additional filters to be attached to the training dataset. The filters will be also applied inget_batch_data
. - data_format
Optional[str]
: The data format used to save the training dataset, defaults to"parquet"
-format. - coalesce
Optional[bool]
: If true the training dataset data will be coalesced into a single partition before writing. The resulting training dataset will be a single file per split. Default False. - seed
Optional[int]
: Optionally, define a seed to create the random splits with, in order to guarantee reproducability, defaults toNone
. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation and"histograms"
to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - write_options
Optional[Dict[Any, Any]]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. When using thepython
engine, write_options can contain the following entries:- key
use_spark
and valueTrue
to materialize training dataset with Spark instead of Hopsworks Feature Query Service. - key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to compute the training dataset. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the save call should return only after the Hopsworks Job has finished. By default it waits. Defaults to{}
.
- key
- spine
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list], SpineGroup]]
: Spine dataframe with primary key, event time and label column to use for point in time join when fetching features. Defaults toNone
and is only required when feature view was created with spine group in the feature query. It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - primary_keys: whether to include primary key features or not. Defaults to
False
, no primary key features. - event_time: whether to include event time feature or not. Defaults to
False
, no event time feature. - training_helper_columns: whether to include training helper columns or not.
Training helper columns are a list of feature names in the feature view, defined during its creation,
that are not the part of the model schema itself but can be used during training as a helper for
extra information. If training helper columns were not defined in the feature view
then
training_helper_columns=True
will not have any effect. Defaults toFalse
, no training helper columns.
Returns
(td_version, Job
): Tuple of training dataset version and job.
When using the python
engine, it returns the Hopsworks Job
that was launched to create the training dataset.
create_train_validation_test_split#
FeatureView.create_train_validation_test_split(
validation_size=None,
test_size=None,
train_start="",
train_end="",
validation_start="",
validation_end="",
test_start="",
test_end="",
storage_connector=None,
location="",
description="",
extra_filter=None,
data_format="parquet",
coalesce=False,
seed=None,
statistics_config=None,
write_options={},
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
)
Create the metadata for a training dataset and save the corresponding training data into location
.
The training data is split into train, validation, and test set at random or according to time range.
The training data can be retrieved by calling feature_view.get_train_validation_test_split
.
Create random splits
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# create a train-validation-test split dataset
version, job = feature_view.create_train_validation_test_split(
validation_size=0.3,
test_size=0.2,
description='Description of a dataset',
data_format='csv'
)
Create time series splits by specifying date as string
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up dates
train_start = "2022-01-01 00:00:00"
train_end = "2022-06-01 23:59:59"
validation_start = "2022-06-02 00:00:00"
validation_end = "2022-07-01 23:59:59"
test_start = "2022-07-02 00:00:00"
test_end = "2022-08-01 23:59:59"
# create a train-validation-test split dataset
version, job = feature_view.create_train_validation_test_split(
train_start=train_start,
train_end=train_end,
validation_start=validation_start,
validation_end=validation_end,
test_start=test_start,
test_end=test_end,
description='Description of a dataset',
# you can have different data formats such as csv, tsv, tfrecord, parquet and others
data_format='csv'
)
Create time series splits by specifying date as datetime object
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up dates
from datetime import datetime
date_format = "%Y-%m-%d %H:%M:%S"
train_start = datetime.strptime("2022-01-01 00:00:00", date_format)
train_end = datetime.strptime("2022-06-06 23:59:59", date_format)
validation_start = datetime.strptime("2022-06-02 00:00:00", date_format)
validation_end = datetime.strptime("2022-07-01 23:59:59", date_format)
test_start = datetime.strptime("2022-06-07 00:00:00", date_format)
test_end = datetime.strptime("2022-12-25 23:59:59", date_format)
# create a train-validation-test split dataset
version, job = feature_view.create_train_validation_test_split(
train_start=train_start,
train_end=train_end,
validation_start=validation_start,
validation_end=validation_end,
test_start=test_start,
test_end=test_end,
description='Description of a dataset',
# you can have different data formats such as csv, tsv, tfrecord, parquet and others
data_format='csv'
)
Write training dataset to external storage
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get storage connector instance
external_storage_connector = fs.get_storage_connector("storage_connector_name")
# create a train-validation-test split dataset
version, job = feature_view.create_train_validation_test_split(
train_start=...,
train_end=...,
validation_start=...,
validation_end=...,
test_start=...,
test_end=...,
description=...,
storage_connector = external_storage_connector,
# you can have different data formats such as csv, tsv, tfrecord, parquet and others
data_format=...
)
Data Formats
The feature store currently supports the following data formats for training datasets:
- tfrecord
- csv
- tsv
- parquet
- avro
- orc
Currently not supported petastorm, hdf5 and npy file formats.
Spine Groups/Dataframes
Spine groups and dataframes are currently only supported with the Spark engine and Spark dataframes.
Arguments
- validation_size
Optional[float]
: size of validation set. - test_size
Optional[float]
: size of test set. - train_start
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the train split query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - train_end
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the train split query, exclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - validation_start
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the validation split query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - validation_end
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the validation split query, exclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - test_start
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the test split query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - test_end
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the test split query, exclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - storage_connector
Optional[hsfs.StorageConnector]
: Storage connector defining the sink location for the training dataset, defaults toNone
, and materializes training dataset on HopsFS. - location
Optional[str]
: Path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset. Defaults to""
, saving the training dataset at the root defined by the storage connector. - description
Optional[str]
: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string""
. - extra_filter
Optional[Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]]
: Additional filters to be attached to the training dataset. The filters will be also applied inget_batch_data
. - data_format
Optional[str]
: The data format used to save the training dataset, defaults to"parquet"
-format. - coalesce
Optional[bool]
: If true the training dataset data will be coalesced into a single partition before writing. The resulting training dataset will be a single file per split. Default False. - seed
Optional[int]
: Optionally, define a seed to create the random splits with, in order to guarantee reproducability, defaults toNone
. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation and"histograms"
to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - write_options
Optional[Dict[Any, Any]]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. When using thepython
engine, write_options can contain the following entries:- key
use_spark
and valueTrue
to materialize training dataset with Spark instead of Hopsworks Feature Query Service. - key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to compute the training dataset. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the save call should return only after the Hopsworks Job has finished. By default it waits. Defaults to{}
.
- key
- spine
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list], SpineGroup]]
: Spine dataframe with primary key, event time and label column to use for point in time join when fetching features. Defaults toNone
and is only required when feature view was created with spine group in the feature query. It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - primary_keys: whether to include primary key features or not. Defaults to
False
, no primary key features. - event_time: whether to include event time feature or not. Defaults to
False
, no event time feature. - training_helper_columns: whether to include training helper columns or not.
Training helper columns are a list of feature names in the feature view, defined during its creation,
that are not the part of the model schema itself but can be used during training as a helper for
extra information. If training helper columns were not defined in the feature view
then
training_helper_columns=True
will not have any effect. Defaults toFalse
, no training helper columns.
Returns
(td_version, Job
): Tuple of training dataset version and job.
When using the python
engine, it returns the Hopsworks Job
that was launched to create the training dataset.
create_training_data#
FeatureView.create_training_data(
start_time="",
end_time="",
storage_connector=None,
location="",
description="",
extra_filter=None,
data_format="parquet",
coalesce=False,
seed=None,
statistics_config=None,
write_options={},
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
)
Create the metadata for a training dataset and save the corresponding training data into location
.
The training data can be retrieved by calling feature_view.get_training_data
.
Create training dataset
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# create a training dataset
version, job = feature_view.create_training_data(
description='Description of a dataset',
data_format='csv',
# async creation in order not to wait till finish of the job
write_options={"wait_for_job": False}
)
Create training data specifying date range with dates as strings
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up dates
start_time = "2022-01-01 00:00:00"
end_time = "2022-06-06 23:59:59"
# create a training dataset
version, job = feature_view.create_training_data(
start_time=start_time,
end_time=end_time,
description='Description of a dataset',
# you can have different data formats such as csv, tsv, tfrecord, parquet and others
data_format='csv'
)
# When we want to read the training data, we need to supply the training data version returned by the create_training_data method:
X_train, X_test, y_train, y_test = feature_view.get_training_data(version)
Create training data specifying date range with dates as datetime objects
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up dates
from datetime import datetime
date_format = "%Y-%m-%d %H:%M:%S"
start_time = datetime.strptime("2022-01-01 00:00:00", date_format)
end_time = datetime.strptime("2022-06-06 23:59:59", date_format)
# create a training dataset
version, job = feature_view.create_training_data(
start_time=start_time,
end_time=end_time,
description='Description of a dataset',
# you can have different data formats such as csv, tsv, tfrecord, parquet and others
data_format='csv'
)
Write training dataset to external storage
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get storage connector instance
external_storage_connector = fs.get_storage_connector("storage_connector_name")
# create a train-test split dataset
version, job = feature_view.create_training_data(
start_time=...,
end_time=...,
storage_connector = external_storage_connector,
description=...,
# you can have different data formats such as csv, tsv, tfrecord, parquet and others
data_format=...
)
Data Formats
The feature store currently supports the following data formats for training datasets:
- tfrecord
- csv
- tsv
- parquet
- avro
- orc
Currently not supported petastorm, hdf5 and npy file formats.
Spine Groups/Dataframes
Spine groups and dataframes are currently only supported with the Spark engine and Spark dataframes.
Arguments
- start_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the training dataset query, inclusive. Optional. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - end_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the training dataset query, exclusive. Optional. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - storage_connector
Optional[hsfs.StorageConnector]
: Storage connector defining the sink location for the training dataset, defaults toNone
, and materializes training dataset on HopsFS. - location
Optional[str]
: Path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset. Defaults to""
, saving the training dataset at the root defined by the storage connector. - description
Optional[str]
: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string""
. - extra_filter
Optional[Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]]
: Additional filters to be attached to the training dataset. The filters will be also applied inget_batch_data
. - data_format
Optional[str]
: The data format used to save the training dataset, defaults to"parquet"
-format. - coalesce
Optional[bool]
: If true the training dataset data will be coalesced into a single partition before writing. The resulting training dataset will be a single file per split. Default False. - seed
Optional[int]
: Optionally, define a seed to create the random splits with, in order to guarantee reproducability, defaults toNone
. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation and"histograms"
to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - write_options
Optional[Dict[Any, Any]]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. When using thepython
engine, write_options can contain the following entries:- key
use_spark
and valueTrue
to materialize training dataset with Spark instead of Hopsworks Feature Query Service. - key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to compute the training dataset. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the save call should return only after the Hopsworks Job has finished. By default it waits. Defaults to{}
.
- key
- spine
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list], SpineGroup]]
: Spine dataframe with primary key, event time and label column to use for point in time join when fetching features. Defaults toNone
and is only required when feature view was created with spine group in the feature query. It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - primary_keys: whether to include primary key features or not. Defaults to
False
, no primary key features. - event_time: whether to include event time feature or not. Defaults to
False
, no event time feature. - training_helper_columns: whether to include training helper columns or not. Training helper columns are a
list of feature names in the feature view, defined during its creation, that are not the part of the
model schema itself but can be used during training as a helper for extra information.
If training helper columns were not defined in the feature view then
training_helper_columns=True
will not have any effect. Defaults toFalse
, no training helper columns.
Returns
(td_version, Job
): Tuple of training dataset version and job.
When using the python
engine, it returns the Hopsworks Job
that was launched to create the training dataset.
delete#
FeatureView.delete()
Delete current feature view, all associated metadata and training data.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# delete a feature view
feature_view.delete()
Potentially dangerous operation
This operation drops all metadata associated with this version of the feature view and related training dataset and materialized data in HopsFS.
Raises
hsfs.client.exceptions.RestAPIError
.
delete_all_training_datasets#
FeatureView.delete_all_training_datasets()
Delete all training datasets. This will delete both metadata and training data.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# delete all training datasets
feature_view.delete_all_training_datasets()
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to delete the training datasets.
delete_tag#
FeatureView.delete_tag(name)
Delete a tag attached to a feature view.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# delete a tag
feature_view.delete_tag('name_of_tag')
Arguments
- name
str
: Name of the tag to be removed.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to delete the tag.
delete_training_dataset#
FeatureView.delete_training_dataset(training_dataset_version)
Delete a training dataset. This will delete both metadata and training data.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# delete a training dataset
feature_view.delete_training_dataset(
training_dataset_version=1
)
Arguments
- training_dataset_version
int
: Version of the training dataset to be removed.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to delete the training dataset.
delete_training_dataset_tag#
FeatureView.delete_training_dataset_tag(training_dataset_version, name)
Delete a tag attached to a training dataset.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# delete training dataset tag
feature_view.delete_training_dataset_tag(
training_dataset_version=1,
name='name_of_dataset'
)
Arguments
- training_dataset_version
int
: training dataset version - name
str
: Name of the tag to be removed.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to delete the tag.
find_neighbors#
FeatureView.find_neighbors(
embedding, feature=None, k=10, filter=None, min_score=0, external=None
)
Finds the nearest neighbors for a given embedding in the vector database.
Arguments
- embedding
List[Union[int, float]]
: The target embedding for which neighbors are to be found. - feature
Optional[hsfs.feature.Feature]
: The feature used to compute similarity score. Required only if there are multiple embeddings (optional). - k
Optional[int]
: The number of nearest neighbors to retrieve (default is 10). - filter
Optional[Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]]
: A filter expression to restrict the search space (optional). - min_score
Optional[float]
: The minimum similarity score for neighbors to be considered (default is 0).
Returns
A list of feature values
Example
embedding_index = EmbeddingIndex()
embedding_index.add_embedding(name="user_vector", dimension=3)
fg = fs.create_feature_group(
name='air_quality',
embedding_index=embedding_index,
version=1,
primary_key=['id1'],
online_enabled=True,
)
fg.insert(data)
fv = fs.create_feature_view("air_quality", fg.select_all())
fv.find_neighbors(
[0.1, 0.2, 0.3],
k=5,
)
# apply filter
fg.find_neighbors(
[0.1, 0.2, 0.3],
k=5,
feature=fg.user_vector, # optional
filter=(fg.id1 > 10) & (fg.id1 < 30)
)
from_response_json#
FeatureView.from_response_json(json_dict)
get_batch_data#
FeatureView.get_batch_data(
start_time=None,
end_time=None,
read_options=None,
spine=None,
primary_keys=False,
event_time=False,
inference_helper_columns=False,
)
Get a batch of data from an event time interval from the offline feature store.
Batch data for the last 24 hours
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up dates
import datetime
start_date = (datetime.datetime.now() - datetime.timedelta(hours=24))
end_date = (datetime.datetime.now())
# get a batch of data
df = feature_view.get_batch_data(
start_time=start_date,
end_time=end_date
)
Spine Groups/Dataframes
Spine groups and dataframes are currently only supported with the Spark engine and Spark dataframes.
Arguments
- start_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the batch query, inclusive. Optional. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - end_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the batch query, exclusive. Optional. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - read_options: User provided read options.
Dictionary of read options for python engine:
- key
"use_hive"
and valueTrue
to read batch data with Hive instead of Hopsworks Feature Query Service. Defaults to{}
. - key
"arrow_flight_config"
to pass a dictionary of arrow flight configurations. For example:{"arrow_flight_config": {"timeout": 900}}
- key
- spine
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list], SpineGroup]]
: Spine dataframe with primary key, event time and label column to use for point in time join when fetching features. Defaults toNone
and is only required when feature view was created with spine group in the feature query. It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - primary_keys: whether to include primary key features or not. Defaults to
False
, no primary key features. - event_time: whether to include event time feature or not. Defaults to
False
, no event time feature. - inference_helper_columns: whether to include inference helper columns or not.
Inference helper columns are a list of feature names in the feature view, defined during its creation,
that may not be used in training the model itself but can be used during batch or online inference
for extra information. If inference helper columns were not defined in the feature view
inference_helper_columns=True
will not any effect. Defaults toFalse
, no helper columns.
Returns
DataFrame
: A dataframe
get_batch_query#
FeatureView.get_batch_query(start_time=None, end_time=None)
Get a query string of the batch query.
Batch query for the last 24 hours
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up dates
import datetime
start_date = (datetime.datetime.now() - datetime.timedelta(hours=24))
end_date = (datetime.datetime.now())
# get a query string of batch query
query_str = feature_view.get_batch_query(
start_time=start_date,
end_time=end_date
)
# print query string
print(query_str)
Arguments
- start_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the batch query, inclusive. Optional. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - end_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the batch query, exclusive. Optional. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds.
Returns
str
: batch query
get_feature_monitoring_configs#
FeatureView.get_feature_monitoring_configs(name=None, feature_name=None, config_id=None)
Fetch feature monitoring configs attached to the feature view. If no arguments is provided the method will return all feature monitoring configs attached to the feature view, meaning all feature monitoring configs that are attach to a feature in the feature view. If you wish to fetch a single config, provide the its name. If you wish to fetch all configs attached to a particular feature, provide the feature name.
Example
# fetch your feature view
fv = fs.get_feature_view(name="my_feature_view", version=1)
# fetch all feature monitoring configs attached to the feature view
fm_configs = fv.get_feature_monitoring_configs()
# fetch a single feature monitoring config by name
fm_config = fv.get_feature_monitoring_configs(name="my_config")
# fetch all feature monitoring configs attached to a particular feature
fm_configs = fv.get_feature_monitoring_configs(feature_name="my_feature")
# fetch a single feature monitoring config with a particular id
fm_config = fv.get_feature_monitoring_configs(config_id=1)
Arguments
- name
Optional[str]
: If provided fetch only the feature monitoring config with the given name. Defaults to None. - feature_name
Optional[str]
: If provided, fetch only configs attached to a particular feature. Defaults to None. - config_id
Optional[int]
: If provided, fetch only the feature monitoring config with the given id. Defaults to None.
Raises
hsfs.client.exceptions.RestAPIError
.
hsfs.client.exceptions.FeatureStoreException
.
- ValueError: if both name and feature_name are provided.
- TypeError: if name or feature_name are not string or None.
Return
Union[FeatureMonitoringConfig
, List[FeatureMonitoringConfig
], None]
A list of feature monitoring configs. If name provided,
returns either a single config or None if not found.
get_feature_monitoring_history#
FeatureView.get_feature_monitoring_history(
config_name=None, config_id=None, start_time=None, end_time=None, with_statistics=True
)
Fetch feature monitoring history for a given feature monitoring config.
Example
# fetch your feature view
fv = fs.get_feature_view(name="my_feature_group", version=1)
# fetch feature monitoring history for a given feature monitoring config
fm_history = fv.get_feature_monitoring_history(
config_name="my_config",
start_time="2020-01-01",
)
# or use the config id
fm_history = fv.get_feature_monitoring_history(
config_id=1,
start_time=datetime.now() - timedelta(weeks=2),
end_time=datetime.now() - timedelta(weeks=1),
with_statistics=False,
)
Arguments
- config_name
Optional[str]
: The name of the feature monitoring config to fetch history for. Defaults to None. - config_id
Optional[int]
: The id of the feature monitoring config to fetch history for. Defaults to None. - start_date: The start date of the feature monitoring history to fetch. Defaults to None.
- end_date: The end date of the feature monitoring history to fetch. Defaults to None.
- with_statistics
Optional[bool]
: Whether to include statistics in the feature monitoring history. Defaults to True. If False, only metadata about the monitoring will be fetched.
Raises
hsfs.client.exceptions.RestAPIError
.
hsfs.client.exceptions.FeatureStoreException
.
- ValueError: if both config_name and config_id are provided.
- TypeError: if config_name or config_id are not respectively string, int or None.
Return
List[FeatureMonitoringResult
]
A list of feature monitoring results containing the monitoring metadata
as well as the computed statistics for the detection and reference window
if requested.
get_feature_vector#
FeatureView.get_feature_vector(
entry,
passed_features=None,
external=None,
return_type="list",
allow_missing=False,
force_rest_client=False,
force_sql_client=False,
)
Returns assembled feature vector from online feature store.
Call feature_view.init_serving
before this method if the following configurations are needed.
1. The training dataset version of the transformation statistics
2. Additional configurations of online serving engine (e.g init_online_store_rest_client=True
to use Online Store REST Client instead of SQL connector) !!! warning "Missing primary key entries"
If the provided primary key entry
can't be found in one or more of the feature groups
used by this feature view the call to this method will raise an exception.
Alternatively, setting allow_missing
to True
returns a feature vector with missing values.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get assembled serving vector as a python list
feature_view.get_feature_vector(
entry = {"pk1": 1, "pk2": 2}
)
# get assembled serving vector as a pandas dataframe
feature_view.get_feature_vector(
entry = {"pk1": 1, "pk2": 2},
return_type = "pandas"
)
# get assembled serving vector as a numpy array
feature_view.get_feature_vector(
entry = {"pk1": 1, "pk2": 2},
return_type = "numpy"
)
Get feature vector with user-supplied features
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# the application provides a feature value 'app_attr'
app_attr = ...
# get a feature vector
feature_view.get_feature_vector(
entry = {"pk1": 1, "pk2": 2},
passed_features = { "app_feature" : app_attr }
)
Arguments
- entry
Dict[str, Any]
: dictionary of feature group primary key and values provided by serving application. Set of required primary keys isfeature_view.primary_keys
If the required primary keys is not provided, it will look for name of the primary key in feature group in the entry. - passed_features
Optional[Dict[str, Any]]
: dictionary of feature values provided by the application at runtime. They can replace features values fetched from the feature store as well as providing feature values which are not available in the feature store. - external
Optional[bool]
: boolean, optional. If set to True, the connection to the online feature store is established using the same host as for thehost
parameter in thehsfs.connection()
method. If set to False, the online feature store storage connector is used which relies on the private IP. Defaults to True if connection to Hopsworks is established from external environment (e.g AWS Sagemaker or Google Colab), otherwise to False. - return_type
Optional[str]
:"list"
,"pandas"
or"numpy"
. Defaults to"list"
. - allow_missing
Optional[bool]
: Setting toTrue
returns feature vectors with missing values. - force_rest_client
bool
: bool, optional. If set to True, the Online Store REST Client will be used to retrieve the feature vector. Defaults to False. - force_sql_client
bool
: bool, optional. If set to True, the SQL connector will be used to retrieve the feature vector. Defaults to False.
Returns
list
, pd.DataFrame
or np.ndarray
if return type
is set to "list"
, "pandas"
or "numpy"
respectively. Defaults to list
.
Returned list
, pd.DataFrame
or np.ndarray
contains feature values related to provided primary keys,
ordered according to positions of this features in the feature view query.
Raises
hsfs.client.exceptions.RestAPIError
. If using the Online Store REST Client, and the response status code is not 200.
- 400: Requested Metadata does not exist or the request is malformed.
- 401: Access denied. API key does not give access to the feature store (e.g feature store not shared with user),
or authorization header (x-api-key) is not properly set.
- 500: Internal server error.
ValueError
.
- A force_*
parameter is set to True
and the corresponding client is not initialised.
- Both force_rest_client
and force_sql_client
are set to True
.
- The return_type
is not one of "list"
, "pandas"
or "numpy"
.
- Training Dataset version is not set and the feature view is not initialised.
- Serving keys do not match the provided entry dictionary
get_feature_vectors#
FeatureView.get_feature_vectors(
entry,
passed_features=None,
external=None,
return_type="list",
allow_missing=False,
force_rest_client=False,
force_sql_client=False,
)
Returns assembled feature vectors in batches from online feature store.
Call feature_view.init_serving
before this method if the following configurations are needed.
1. The training dataset version of the transformation statistics
2. Additional configurations of online serving engine (e.g init_online_store_rest_client=True
to use Online Store REST Client instead of SQL connector)
Missing primary key entries
If any of the provided primary key elements in entry
can't be found in any
of the feature groups, no feature vector for that primary key value will be
returned.
If it can be found in at least one but not all feature groups used by
this feature view the call to this method will raise an exception.
Alternatively, setting allow_missing
to True
returns feature vectors with missing values.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get assembled serving vectors as a python list of lists
feature_view.get_feature_vectors(
entry = [
{"pk1": 1, "pk2": 2},
{"pk1": 3, "pk2": 4},
{"pk1": 5, "pk2": 6}
]
)
# get assembled serving vectors as a pandas dataframe
feature_view.get_feature_vectors(
entry = [
{"pk1": 1, "pk2": 2},
{"pk1": 3, "pk2": 4},
{"pk1": 5, "pk2": 6}
],
return_type = "pandas"
)
# get assembled serving vectors as a numpy array
feature_view.get_feature_vectors(
entry = [
{"pk1": 1, "pk2": 2},
{"pk1": 3, "pk2": 4},
{"pk1": 5, "pk2": 6}
],
return_type = "numpy"
)
Arguments
- entry
List[Dict[str, Any]]
: a list of dictionary of feature group primary key and values provided by serving application. Set of required primary keys isfeature_view.primary_keys
If the required primary keys is not provided, it will look for name of the primary key in feature group in the entry. - passed_features
Optional[List[Dict[str, Any]]]
: a list of dictionary of feature values provided by the application at runtime. They can replace features values fetched from the feature store as well as providing feature values which are not available in the feature store. - external
Optional[bool]
: boolean, optional. If set to True, the connection to the online feature store is established using the same host as for thehost
parameter in thehsfs.connection()
method. If set to False, the online feature store storage connector is used which relies on the private IP. Defaults to True if connection to Hopsworks is established from external environment (e.g AWS Sagemaker or Google Colab), otherwise to False. - return_type
Optional[str]
:"list"
,"pandas"
or"numpy"
. Defaults to"list"
. - allow_missing
Optional[bool]
: Setting toTrue
returns feature vectors with missing values.
Returns
List[list]
, pd.DataFrame
or np.ndarray
if return type
is set to "list",
"pandas"or
"numpy"respectively. Defaults to
List[list]`.
Returned List[list]
, pd.DataFrame
or np.ndarray
contains feature values related to provided primary
keys, ordered according to positions of this features in the feature view query.
Raises
hsfs.client.exceptions.RestAPIError
. If using the Online Store REST client, and the response status code is not 200.
- 400: Requested Metadata does not exist or the request is malformed.
- 401: Access denied. API key does not give access to the feature store (e.g feature store not shared with user),
or authorization header (x-api-key) is not properly set.
- 500: Internal server error.
ValueError
.
- A force_*
parameter is set to True
and the corresponding client is not initialised.
- Both force_rest_client
and force_sql_client
are set to True
.
- The return_type
is not one of "list"
, "pandas"
or "numpy"
.
- Training Dataset version is not set and the feature view is not initialised.
- Serving keys do not match the provided entry dictionary
get_inference_helper#
FeatureView.get_inference_helper(entry, external=None, return_type="pandas")
Returns assembled inference helper column vectors from online feature store.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get assembled inference helper column vector
feature_view.get_inference_helper(
entry = {"pk1": 1, "pk2": 2}
)
Arguments
- entry
Dict[str, Any]
: dictionary of feature group primary key and values provided by serving application. Set of required primary keys isfeature_view.primary_keys
- external
Optional[bool]
: boolean, optional. If set to True, the connection to the online feature store is established using the same host as for thehost
parameter in thehsfs.connection()
method. If set to False, the online feature store storage connector is used which relies on the private IP. Defaults to True if connection to Hopsworks is established from external environment (e.g AWS Sagemaker or Google Colab), otherwise to False. - return_type
Optional[str]
:"pandas"
or"dict"
. Defaults to"pandas"
.
Returns
pd.DataFrame
or dict
. Defaults to pd.DataFrame
.
Raises
Exception
. When primary key entry cannot be found in one or more of the feature groups used by this
feature view.
get_inference_helpers#
FeatureView.get_inference_helpers(entry, external=None, return_type="pandas")
Returns assembled inference helper column vectors in batches from online feature store.
Missing primary key entries
If any of the provided primary key elements in entry
can't be found in any
of the feature groups, no inference helper column vectors for that primary key value will be
returned.
If it can be found in at least one but not all feature groups used by
this feature view the call to this method will raise an exception.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get assembled inference helper column vectors
feature_view.get_inference_helpers(
entry = [
{"pk1": 1, "pk2": 2},
{"pk1": 3, "pk2": 4},
{"pk1": 5, "pk2": 6}
]
)
Arguments
- entry
List[Dict[str, Any]]
: a list of dictionary of feature group primary key and values provided by serving application. Set of required primary keys isfeature_view.primary_keys
- external
Optional[bool]
: boolean, optional. If set to True, the connection to the online feature store is established using the same host as for thehost
parameter in thehsfs.connection()
method. If set to False, the online feature store storage connector is used which relies on the private IP. Defaults to True if connection to Hopsworks is established from external environment (e.g AWS Sagemaker or Google Colab), otherwise to False. - return_type
Optional[str]
:"pandas"
or"dict"
. Defaults to"dict"
.
Returns
pd.DataFrame
or List[dict]
. Defaults to pd.DataFrame
.
Returned pd.DataFrame
or List[dict]
contains feature values related to provided primary
keys, ordered according to positions of this features in the feature view query.
Raises
Exception
. When primary key entry cannot be found in one or more of the feature groups used by this
feature view.
get_parent_feature_groups#
FeatureView.get_parent_feature_groups()
Get the parents of this feature view, based on explicit provenance. Parents are feature groups or external feature groups. These feature groups can be accessible, deleted or inaccessible. For deleted and inaccessible feature groups, only a minimal information is returned.
Returns
ProvenanceLinks
: Object containing the section of provenance graph requested.
get_tag#
FeatureView.get_tag(name)
Get the tags of a feature view.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get a tag of a feature view
name = feature_view.get_tag('tag_name')
Arguments
- name
str
: Name of the tag to get.
Returns
tag value
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to retrieve the tag.
get_tags#
FeatureView.get_tags()
Returns all tags attached to a training dataset.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get tags
list_tags = feature_view.get_tags()
Returns
Dict[str, obj]
of tags.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to retrieve the tags.
get_train_test_split#
FeatureView.get_train_test_split(
training_dataset_version,
read_options=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
)
Get training data created by feature_view.create_train_test_split
or feature_view.train_test_split
.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get training data
X_train, X_test, y_train, y_test = feature_view.get_train_test_split(training_dataset_version=1)
Arguments
- training_dataset_version: training dataset version
- read_options
Optional[Dict[Any, Any]]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. For python engine:- key
"use_hive"
and valueTrue
to read training dataset with the Hopsworks API instead of Hopsworks Feature Query Service. - key
"arrow_flight_config"
to pass a dictionary of arrow flight configurations. For example:{"arrow_flight_config": {"timeout": 900}}
- key
"hive_config"
to pass a dictionary of hive or tez configurations. For example:{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}
Defaults to{}
.
- key
- primary_keys: whether to include primary key features or not. Defaults to
False
, no primary key features. - event_time: whether to include event time feature or not. Defaults to
False
, no event time feature. - training_helper_columns: whether to include training helper columns or not.
Training helper columns are a list of feature names in the feature view, defined during its creation,
that are not the part of the model schema itself but can be used during training as a helper for
extra information. If training helper columns were not defined in the feature view or during
materializing training dataset in the file system then
training_helper_columns=True
will not have any effect. Defaults toFalse
, no training helper columns.
Returns
(X_train, X_test, y_train, y_test): Tuple of dataframe of features and labels
get_train_validation_test_split#
FeatureView.get_train_validation_test_split(
training_dataset_version,
read_options=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
)
Get training data created by feature_view.create_train_validation_test_split
or feature_view.train_validation_test_split
.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get training data
X_train, X_val, X_test, y_train, y_val, y_test = feature_view.get_train_validation_test_splits(training_dataset_version=1)
Arguments
- training_dataset_version: training dataset version
- read_options
Optional[Dict[Any, Any]]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. For python engine:- key
"use_hive"
and valueTrue
to read training dataset with the Hopsworks API instead of Hopsworks Feature Query Service. - key
"arrow_flight_config"
to pass a dictionary of arrow flight configurations. For example:{"arrow_flight_config": {"timeout": 900}}
- key
"hive_config"
to pass a dictionary of hive or tez configurations. For example:{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}
Defaults to{}
.
- key
- primary_keys: whether to include primary key features or not. Defaults to
False
, no primary key features. - event_time: whether to include event time feature or not. Defaults to
False
, no event time feature. - training_helper_columns: whether to include training helper columns or not.
Training helper columns are a list of feature names in the feature view, defined during its creation,
that are not the part of the model schema itself but can be used during training as a helper for
extra information. If training helper columns were not defined in the feature view or during
materializing training dataset in the file system then
training_helper_columns=True
will not have any effect. Defaults toFalse
, no training helper columns.
Returns
(X_train, X_val, X_test, y_train, y_val, y_test): Tuple of dataframe of features and labels
get_training_data#
FeatureView.get_training_data(
training_dataset_version,
read_options=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
)
Get training data created by feature_view.create_training_data
or feature_view.training_data
.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get training data
features_df, labels_df = feature_view.get_training_data(training_dataset_version=1)
External Storage Support
Reading training data that was written to external storage using a Storage Connector other than S3 can currently not be read using HSFS APIs with Python as Engine, instead you will have to use the storage's native client.
Arguments
- training_dataset_version: training dataset version
- read_options
Optional[Dict[Any, Any]]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. For python engine:- key
"use_hive"
and valueTrue
to read training dataset with the Hopsworks API instead of Hopsworks Feature Query Service. - key
"arrow_flight_config"
to pass a dictionary of arrow flight configurations. For example:{"arrow_flight_config": {"timeout": 900}}
- key
"hive_config"
to pass a dictionary of hive or tez configurations. For example:{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}
Defaults to{}
.
- key
- primary_keys: whether to include primary key features or not. Defaults to
False
, no primary key features. - event_time: whether to include event time feature or not. Defaults to
False
, no event time feature. - training_helper_columns: whether to include training helper columns or not.
Training helper columns are a list of feature names in the feature view, defined during its creation,
that are not the part of the model schema itself but can be used during training as a helper for
extra information. If training helper columns were not defined in the feature view or during
materializing training dataset in the file system then
training_helper_columns=True
will not have any effect. Defaults toFalse
, no training helper columns.
Returns
(X, y): Tuple of dataframe of features and labels
get_training_dataset_statistics#
FeatureView.get_training_dataset_statistics(
training_dataset_version, before_transformation=False, feature_names=None
)
Get statistics of a training dataset.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get training dataset statistics
statistics = feature_view.get_training_dataset_statistics(training_dataset_version=1)
Arguments
- training_dataset_version: Training dataset version
- before_transformation: Whether the statistics were computed before transformation functions or not.
- feature_names
Optional[List[str]]
: List of feature names of which statistics are retrieved.
Returns
Statistics
get_training_dataset_tag#
FeatureView.get_training_dataset_tag(training_dataset_version, name)
Get the tags of a training dataset.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get a training dataset tag
tag_str = feature_view.get_training_dataset_tag(
training_dataset_version=1,
name="tag_schema"
)
Arguments
- training_dataset_version
int
: training dataset version - name
str
: Name of the tag to get.
Returns
tag value
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to retrieve the tag.
get_training_dataset_tags#
FeatureView.get_training_dataset_tags(training_dataset_version)
Returns all tags attached to a training dataset.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get a training dataset tags
list_tags = feature_view.get_training_dataset_tags(
training_dataset_version=1
)
Returns
Dict[str, obj]
of tags.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to retrieve the tags.
get_training_datasets#
FeatureView.get_training_datasets()
Returns the metadata of all training datasets created with this feature view.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get all training dataset metadata
list_tds_meta = feature_view.get_training_datasets()
Returns
List[TrainingDatasetBase]
List of training datasets metadata.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to retrieve the training datasets metadata.
init_batch_scoring#
FeatureView.init_batch_scoring(training_dataset_version=None)
Initialise feature view to retrieve feature vector from offline feature store.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# initialise feature view to retrieve feature vector from offline feature store
feature_view.init_batch_scoring(training_dataset_version=1)
# get batch data
batch_data = feature_view.get_batch_data(...)
Arguments
- training_dataset_version
Optional[int]
: int, optional. Default to be None. Transformation statistics are fetched from training dataset and applied to the feature vector.
init_serving#
FeatureView.init_serving(
training_dataset_version=None,
external=None,
options=None,
init_online_store_sql_client=None,
init_online_store_rest_client=False,
)
Initialise feature view to retrieve feature vector from online and offline feature store.
The Online Feature Store now supports feature vector retrieval using either the SQL connector
or a REST http client. Defaults to SQL connector to match the previous behaviour. To use the
the REST client, set init_online_store_rest_client
to True
.
Both get_feature_vector
and get_feature_vectors
methods will default to using the initialised
client. If both are initialised, the SQL client will be used by default. You can override this behaviour
on a per-call basis using the methods kwargs or set the default via set_default_online_store_client
method.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# initialise feature view to retrieve a feature vector
feature_view.init_serving(training_dataset_version=1)
Initialising the Online Store REST Client to retrieve feature vectors from the online feature store, with additional configuration options.
Example
# initialise feature view to retrieve a feature vector using the RonDB REST http client
feature_view.init_serving(
training_dataset_version=1,
init_online_store_rest_client=True,
)
You can reset the Online Store REST Client connection to fix configuration options. In particular, if you have
called get_feature_vector
or get_feature_vectors
without first initialising the client, it results in a default configuration
being set for the rest client. This will reset the client and apply the new configuration options.
Example
# reset the RonDB REST http client connection
feature_view.init_serving(
training_dataset_version=1,
init_online_store_rest_client=True,
options={"reset_online_store_rest_client": True, "config_online_store_rest_client": {"host": "new_host", "timeout": 1000}},
)
Note that both the SQL connector and the REST client can be initialised at the same time. This is useful if you want to fallback on one connector if the other fails.
Example
# initialise feature view to retrieve a feature vector using both the SQL connector and the RonDB REST http client
feature_view.init_serving(
training_dataset_version=1,
init_online_store_sql_client=True,
init_online_store_rest_client=True,
)
# When initialising both clients, the SQL connector will be used by default. Change the default client using `set_default_online_client`.
feature_view.set_default_online_client("rest")
Arguments
- training_dataset_version
Optional[int]
: int, optional. Default to be 1 for online feature store. Transformation statistics are fetched from training dataset and applied to the feature vector. - external
Optional[bool]
: boolean, optional. If set to True, the connection to the online feature store is established using the same host as for thehost
parameter in thehsfs.connection()
method. If set to False, the online feature store storage connector is used which relies on the private IP. Defaults to True if connection to Hopsworks is established from external environment (e.g AWS Sagemaker or Google Colab), otherwise to False. - init_online_store_sql_client
Optional[bool]
: bool, optional. If set to True, initialise the SQL client to retrieve feature vector(s) from the online feature store. Defaults to True if init_online_store_rest_client is False, otherwise False. - init_online_store_rest_client
bool
: bool, optional. If set to True, initialise the Online Store REST Client to retrieve feature vector(s) from the online feature store. Defaults to False, meaning the sql client will be initialised. - options
Optional[dict]
: Additional options as key/value pairs for configuring online serving engine.- key: kwargs of SqlAlchemy engine creation (See: https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine).
For example:
{"pool_size": 10}
- key: "config_online_store_rest_client" - dict, optional. Optional configuration options to override defaults for the Online Store REST Client.
- key: "api_key" - str. The API key to use for the Online Store REST Client. THIS IS REQUIRED FOR INTERNAL CLIENTS.
- key: "host" - str, optional. The host of the Online Store REST Client.
- key: "port" - int, optional. The port of the Online Store REST Client.
- key: "verify_certs" - bool, optional. If set to True, the Online Store REST Client will verify the server's certificate.
- key: "ca_chain" - str, optional. The path to the CA chain file.
- key: "use_ssl" - bool, optional. If set to True, the Online Store REST Client will use SSL.
- key: "timeout" - int, optional. The timeout of the Online Store REST Client.
- key: "server_api_version" - str, optional. The version of the RonDB Server FeatureStore API.
- key: "http_authorization" - str, optional. The HTTP authorization header to use for the Online Store REST Client.
- key: "reset_online_store_rest_client" - bool, optional. If set to True, the Online Store REST Client will be reset. Provide "config_online_store_rest_client" to override defaults.
- key: kwargs of SqlAlchemy engine creation (See: https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine).
For example:
json#
FeatureView.json()
purge_all_training_data#
FeatureView.purge_all_training_data()
Delete all training datasets (data only).
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# purge all training data
feature_view.purge_all_training_data()
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to delete the training datasets.
purge_training_data#
FeatureView.purge_training_data(training_dataset_version)
Delete a training dataset (data only).
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# purge training data
feature_view.purge_training_data(training_dataset_version=1)
Arguments
- training_dataset_version
int
: Version of the training dataset to be removed.
Raises
hsfs.client.exceptions.RestAPIError
in case the backend fails to delete the training dataset.
recreate_training_dataset#
FeatureView.recreate_training_dataset(
training_dataset_version, statistics_config=None, write_options=None, spine=None
)
Recreate a training dataset.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# recreate a training dataset that has been deleted
feature_view.recreate_training_dataset(training_dataset_version=1)
Info
If a materialised training data has deleted. Use recreate_training_dataset()
to
recreate the training data.
Spine Groups/Dataframes
Spine groups and dataframes are currently only supported with the Spark engine and Spark dataframes.
Arguments
- training_dataset_version
int
: training dataset version - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation and"histograms"
to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - write_options
Optional[Dict[Any, Any]]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. When using thepython
engine, write_options can contain the following entries:- key
use_spark
and valueTrue
to materialize training dataset with Spark instead of Hopsworks Feature Query Service. - key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to compute the training dataset. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the save call should return only after the Hopsworks Job has finished. By default it waits. Defaults to{}
.
- key
- spine
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list], SpineGroup]]
: Spine dataframe with primary key, event time and label column to use for point in time join when fetching features. Defaults toNone
and is only required when feature view was created with spine group in the feature query. It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group.
Returns
Job
: When using the python
engine, it returns the Hopsworks Job
that was launched to create the training dataset.
set_default_online_client#
FeatureView.set_default_online_client(client)
Set the default client to either 'sql' or 'rest' to retrieve feature vectors from the online feature store.
If only one client is initialised when calling init_serving
, this client will be used by default.
If both clients are initialised, the SQL client will be used by default. This method allows you to
specify the default client. You can override this behaviour on a per-call basis using the methods kwargs.
Arguments
- client
str
: str. The default online client to be used for the feature view. The default online client can be set to "rest" or "sql".
Raises
ValueError
. - If vector server is not initialised via init_serving
- If setting default to a client not initialised. Use init_serving
with either init_online_store_sql_client
or init_online_store_rest_client
to initialise the client.
- If client is not "rest" or "sql".
to_dict#
FeatureView.to_dict()
train_test_split#
FeatureView.train_test_split(
test_size=None,
train_start="",
train_end="",
test_start="",
test_end="",
description="",
extra_filter=None,
statistics_config=None,
read_options=None,
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
)
Create the metadata for a training dataset and get the corresponding training data from the offline feature store.
This returns the training data in memory and does not materialise data in storage.
The training data is split into train and test set at random or according to time ranges.
The training data can be recreated by calling feature_view.get_train_test_split
with the metadata created.
Create random train/test splits
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get training data
X_train, X_test, y_train, y_test = feature_view.train_test_split(
test_size=0.2
)
Create time-series train/test splits
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up dates
train_start = "2022-05-01 00:00:00"
train_end = "2022-06-04 23:59:59"
test_start = "2022-07-01 00:00:00"
test_end= "2022-08-04 23:59:59"
# you can also pass dates as datetime objects
# get training data
X_train, X_test, y_train, y_test = feature_view.train_test_split(
train_start=train_start,
train_end=train_end,
test_start=test_start,
test_end=test_end,
description='Description of a dataset'
)
Spine Groups/Dataframes
Spine groups and dataframes are currently only supported with the Spark engine and Spark dataframes.
Arguments
- test_size
Optional[float]
: size of test set. Should be between 0 and 1. - train_start
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the train split query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. - train_end
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the train split query, exclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - test_start
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the test split query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - test_end
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the test split query, exclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - description
Optional[str]
: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string""
. - extra_filter
Optional[Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]]
: Additional filters to be attached to the training dataset. The filters will be also applied inget_batch_data
. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation and"histograms"
to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - read_options
Optional[Dict[Any, Any]]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. When using thepython
engine, read_options can contain the following entries:- key
"use_hive"
and valueTrue
to create in-memory training dataset with Hive instead of Hopsworks Feature Query Service. - key
"arrow_flight_config"
to pass a dictionary of arrow flight configurations. For example:{"arrow_flight_config": {"timeout": 900}}
- key
"hive_config"
to pass a dictionary of hive or tez configurations. For example:{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}
- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to compute the training dataset. Defaults to{}
.
- key
- spine
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list], SpineGroup]]
: Spine dataframe with primary key, event time and label column to use for point in time join when fetching features. Defaults toNone
and is only required when feature view was created with spine group in the feature query. It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - primary_keys: whether to include primary key features or not. Defaults to
False
, no primary key features. - event_time: whether to include event time feature or not. Defaults to
False
, no event time feature. - training_helper_columns: whether to include training helper columns or not.
Training helper columns are a list of feature names in the feature view, defined during its creation,
that are not the part of the model schema itself but can be used during training as a helper for
extra information. If training helper columns were not defined in the feature view
then
training_helper_columns=True
will not have any effect. Defaults toFalse
, no training helper columns.
Returns
(X_train, X_test, y_train, y_test): Tuple of dataframe of features and labels
train_validation_test_split#
FeatureView.train_validation_test_split(
validation_size=None,
test_size=None,
train_start="",
train_end="",
validation_start="",
validation_end="",
test_start="",
test_end="",
description="",
extra_filter=None,
statistics_config=None,
read_options=None,
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
)
Create the metadata for a training dataset and get the corresponding training data from the offline feature store.
This returns the training data in memory and does not materialise data in storage.
The training data is split into train, validation, and test set at random or according to time ranges.
The training data can be recreated by calling feature_view.get_train_validation_test_split
with the metadata created.
Example
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get training data
X_train, X_val, X_test, y_train, y_val, y_test = feature_view.train_validation_test_split(
validation_size=0.3,
test_size=0.2
)
Time Series split
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up dates
start_time_train = '2017-01-01 00:00:01'
end_time_train = '2018-02-01 23:59:59'
start_time_val = '2018-02-02 23:59:59'
end_time_val = '2019-02-01 23:59:59'
start_time_test = '2019-02-02 23:59:59'
end_time_test = '2020-02-01 23:59:59'
# you can also pass dates as datetime objects
# get training data
X_train, X_val, X_test, y_train, y_val, y_test = feature_view.train_validation_test_split(
train_start=start_time_train,
train_end=end_time_train,
validation_start=start_time_val,
validation_end=end_time_val,
test_start=start_time_test,
test_end=end_time_test
)
Spine Groups/Dataframes
Spine groups and dataframes are currently only supported with the Spark engine and Spark dataframes.
Arguments
- validation_size
Optional[float]
: size of validation set. Should be between 0 and 1. - test_size
Optional[float]
: size of test set. Should be between 0 and 1. - train_start
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the train split query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - train_end
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the train split query, exclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - validation_start
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the validation split query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - validation_end
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the validation split query, exclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - test_start
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the test split query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - test_end
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the test split query, exclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - description
Optional[str]
: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string""
. - extra_filter
Optional[Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]]
: Additional filters to be attached to the training dataset. The filters will be also applied inget_batch_data
. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation and"histograms"
to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - read_options
Optional[Dict[Any, Any]]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. When using thepython
engine, read_options can contain the following entries:- key
"use_hive"
and valueTrue
to create in-memory training dataset with Hive instead of Hopsworks Feature Query Service. - key
"arrow_flight_config"
to pass a dictionary of arrow flight configurations. For example:{"arrow_flight_config": {"timeout": 900}}
- key
"hive_config"
to pass a dictionary of hive or tez configurations. For example:{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}
- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to compute the training dataset. Defaults to{}
.
- key
- spine
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list], SpineGroup]]
: Spine dataframe with primary key, event time and label column to use for point in time join when fetching features. Defaults toNone
and is only required when feature view was created with spine group in the feature query. It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - primary_keys: whether to include primary key features or not. Defaults to
False
, no primary key features. - event_time: whether to include event time feature or not. Defaults to
False
, no event time feature. - training_helper_columns: whether to include training helper columns or not.
Training helper columns are a list of feature names in the feature view, defined during its creation,
that are not the part of the model schema itself but can be used during training as a helper for
extra information. If training helper columns were not defined in the feature view
then
training_helper_columns=True
will not have any effect. Defaults toFalse
, no training helper columns.
Returns
(X_train, X_val, X_test, y_train, y_val, y_test): Tuple of dataframe of features and labels
training_data#
FeatureView.training_data(
start_time=None,
end_time=None,
description="",
extra_filter=None,
statistics_config=None,
read_options=None,
spine=None,
primary_keys=False,
event_time=False,
training_helper_columns=False,
)
Create the metadata for a training dataset and get the corresponding training data from the offline feature store.
This returns the training data in memory and does not materialise data in storage.
The training data can be recreated by calling feature_view.get_training_data
with the metadata created.
Create random splits
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# get training data
features_df, labels_df = feature_view.training_data(
description='Descriprion of a dataset',
)
Create time-series based splits
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
# set up a date
start_time = "2022-05-01 00:00:00"
end_time = "2022-06-04 23:59:59"
# you can also pass dates as datetime objects
# get training data
features_df, labels_df = feature_view.training_data(
start_time=start_time,
end_time=end_time,
description='Description of a dataset'
)
Spine Groups/Dataframes
Spine groups and dataframes are currently only supported with the Spark engine and Spark dataframes.
Arguments
- start_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Start event time for the training dataset query, inclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - end_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: End event time for the training dataset query, exclusive. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
,%Y-%m-%d %H:%M:%S
, or%Y-%m-%d %H:%M:%S.%f
. Int, i.e Unix Epoch should be in seconds. - description
Optional[str]
: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string""
. - extra_filter
Optional[Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]]
: Additional filters to be attached to the training dataset. The filters will be also applied inget_batch_data
. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation and"histograms"
to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - read_options
Optional[Dict[Any, Any]]
: Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. When using thepython
engine, read_options can contain the following entries:- key
"use_hive"
and valueTrue
to create in-memory training dataset with Hive instead of Hopsworks Feature Query Service. - key
"arrow_flight_config"
to pass a dictionary of arrow flight configurations. For example:{"arrow_flight_config": {"timeout": 900}}
- key
"hive_config"
to pass a dictionary of hive or tez configurations. For example:{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}
- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to compute the training dataset. Defaults to{}
.
- key
- spine
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list], SpineGroup]]
: Spine dataframe with primary key, event time and label column to use for point in time join when fetching features. Defaults toNone
and is only required when feature view was created with spine group in the feature query. It is possible to directly pass a spine group instead of a dataframe to overwrite the left side of the feature join, however, the same features as in the original feature group that is being replaced need to be available in the spine group. - primary_keys: whether to include primary key features or not. Defaults to
False
, no primary key features. - event_time: whether to include event time feature or not. Defaults to
False
, no event time feature. - training_helper_columns: whether to include training helper columns or not.
Training helper columns are a list of feature names in the feature view, defined during its creation,
that are not the part of the model schema itself but can be used during training as a helper for
extra information. If training helper columns were not defined in the feature view
then
training_helper_columns=True
will not have any effect. Defaults toFalse
, no training helper columns.
Returns
(X, y): Tuple of dataframe of features and labels. If there are no labels, y returns None
.
update#
FeatureView.update()
Update the description of the feature view.
Update the feature view with a new description.
# get feature store instance
fs = ...
# get feature view instance
feature_view = fs.get_feature_view(...)
feature_view.description = "new description"
feature_view.update()
# Description is updated in the metadata. Below should return "new description".
fs.get_feature_view("feature_view_name", 1).description
Returns
FeatureView
Updated feature view.
Raises
hsfs.client.exceptions.RestAPIError
.
update_from_response_json#
FeatureView.update_from_response_json(json_dict)