Training Dataset#
The training dataset abstraction in Hopsworks Feature Store allows users to group a set of features (potentially from
multiple different feature groups) with labels for training a model to do a particular prediction task. The training
dataset is a versioned and managed dataset and is stored in HopsFS as tfrecords
, parquet
, csv
, or tsv
files.
Versioning#
Training Dataset can be versioned. Data Scientist should use the version to indicate to the model, as well as to the schema or the feature engineering logic of the features associated to this training dataset.
Creation#
To create training dataset, the user supplies a Pandas, Numpy or Spark dataframe with features and labels together with metadata. Once the training dataset has been created, the dataset is discoverable in the feature registry and users can use it to train models.
create_training_dataset#
FeatureStore.create_training_dataset(
name,
version=None,
description="",
data_format="tfrecords",
coalesce=False,
storage_connector=None,
splits={},
location="",
seed=None,
statistics_config=None,
label=[],
transformation_functions={},
train_split=None,
)
Create a training dataset metadata object.
Lazy
This method is lazy and does not persist any metadata or feature data in the
feature store on its own. To materialize the training dataset and save
feature data along the metadata in the feature store, call the save()
method with a DataFrame
or Query
.
Data Formats
The feature store currently supports the following data formats for training datasets:
- tfrecord
- csv
- tsv
- parquet
- avro
- orc
Currently not supported petastorm, hdf5 and npy file formats.
Arguments
- name
str
: Name of the training dataset to create. - version
Optional[int]
: Version of the training dataset to retrieve, defaults toNone
and will create the training dataset with incremented version from the last version in the feature store. - description
Optional[str]
: A string describing the contents of the training dataset to improve discoverability for Data Scientists, defaults to empty string""
. - data_format
Optional[str]
: The data format used to save the training dataset, defaults to"tfrecords"
-format. - coalesce
Optional[bool]
: If true the training dataset data will be coalesced into a single partition before writing. The resulting training dataset will be a single file per split. Default False. - storage_connector
Optional[hsfs.StorageConnector]
: Storage connector defining the sink location for the training dataset, defaults toNone
, and materializes training dataset on HopsFS. - splits
Optional[Dict[str, float]]
: A dictionary defining training dataset splits to be created. Keys in the dictionary define the name of the split asstr
, values represent percentage of samples in the split asfloat
. Currently, only random splits are supported. Defaults to empty dict{}
, creating only a single training dataset without splits. - location
Optional[str]
: Path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset. Defaults to""
, saving the training dataset at the root defined by the storage connector. - seed
Optional[int]
: Optionally, define a seed to create the random splits with, in order to guarantee reproducability, defaults toNone
. - statistics_config
Optional[Union[hsfs.StatisticsConfig, bool, dict]]
: A configuration object, or a dictionary with keys "enabled
" to generally enable descriptive statistics computation for this feature group,"correlations
" to turn on feature correlation computation and"histograms"
to compute feature value frequencies. The values should be booleans indicating the setting. To fully turn off statistics computation passstatistics_config=False
. Defaults toNone
and will compute only descriptive statistics. - label
Optional[List[str]]
: A list of feature names constituting the prediction label/feature of the training dataset. When replaying aQuery
during model inference, the label features can be omitted from the feature vector retrieval. Defaults to[]
, no label. - transformation_functions
Optional[Dict[str, hsfs.transformation_function.TransformationFunction]]
: A dictionary mapping tansformation functions to to the features they should be applied to before writing out the training data and at inference time. Defaults to{}
, no transformations. - train_split
Optional[str]
: Ifsplits
is set, provide the name of the split that is going to be used for training. The statistics of this split will be used for transformation functions if necessary. Defaults toNone
.
Returns:
TrainingDataset
: The training dataset metadata object.
Retrieval#
get_training_dataset#
FeatureStore.get_training_dataset(name, version=None)
Get a training dataset entity from the feature store.
Getting a training dataset from the Feature Store means getting its metadata handle so you can subsequently read the data into a Spark or Pandas DataFrame.
Arguments
- name
str
: Name of the training dataset to get. - version
Optional[int]
: Version of the training dataset to retrieve, defaults toNone
and will return theversion=1
.
Returns
TrainingDataset
: The training dataset metadata object.
Raises
RestAPIError
: If unable to retrieve feature group from the feature store.
Properties#
coalesce#
If true the training dataset data will be coalesced into a single partition before writing. The resulting training dataset will be a single file per split
data_format#
File format of the training dataset.
description#
feature_store_id#
id#
Training dataset id.
label#
The label/prediction feature of the training dataset.
Can be a composite of multiple features.
location#
Path to the training dataset location.
name#
Name of the training dataset.
prepared_statement_engine#
JDBC connection engine to retrieve connections to online features store from.
prepared_statements#
The dict object of prepared_statements as values and kes as indices of positions in the query for selecting features from feature groups of the training dataset.
query#
Query to generate this training dataset from online feature store.
schema#
Training dataset schema.
seed#
Seed.
serving_keys#
Set of primary key names that is used as keys in input dict object for get_serving_vector
method.
splits#
Training dataset splits. train
, test
or eval
and corresponding percentages.
statistics#
Get the latest computed statistics for the training dataset.
statistics_config#
Statistics configuration object defining the settings for statistics computation of the training dataset.
storage_connector#
Storage connector.
train_split#
Set name of training dataset split that is used for training.
transformation_functions#
Set transformation functions.
version#
Version number of the training dataset.
write_options#
User provided options to write training dataset.
Methods#
add_tag#
TrainingDataset.add_tag(name, value)
Attach a tag to a training dataset.
A tag consists of a
Arguments
- name
str
: Name of the tag to be added. - value: Value of the tag to be added.
Raises
RestAPIError
in case the backend fails to add the tag.
compute_statistics#
TrainingDataset.compute_statistics()
Recompute the statistics for the training dataset and save them to the feature store.
delete#
TrainingDataset.delete()
Delete training dataset and all associated metadata.
Drops only HopsFS data
Note that this operation drops only files which were materialized in HopsFS. If you used a Storage Connector for a cloud storage such as S3, the data will not be deleted, but you will not be able to track it anymore from the Feature Store.
Potentially dangerous operation
This operation drops all metadata associated with this version of the training dataset and and the materialized data in HopsFS.
Raises
RestAPIError
.
delete_tag#
TrainingDataset.delete_tag(name)
Delete a tag attached to a training dataset.
Arguments
- name
str
: Name of the tag to be removed.
Raises
RestAPIError
in case the backend fails to delete the tag.
get_query#
TrainingDataset.get_query(online=True, with_label=False)
Returns the query used to generate this training dataset
Arguments
- online
bool
: boolean, optional. Return the query for the online storage, else for offline storage, defaults toTrue
- for online storage. - with_label
bool
: Indicator whether the query should contain features which were marked as prediction label/feature when the training dataset was created, defaults toFalse
.
Returns
str
. Query string for the chosen storage used to generate this training
dataset.
get_serving_vector#
TrainingDataset.get_serving_vector(entry, external=False)
Returns assembled serving vector from online feature store.
Arguments
- entry
Dict[str, Any]
: dictionary of training dataset feature group primary key names as keys and values provided by serving application. - external
Optional[bool]
: boolean, optional. If set to True, the connection to the online feature store is established using the same host as for thehost
parameter in thehsfs.connection()
method. If set to False, the online feature store storage connector is used which relies on the private IP.
Returns
list
List of feature values related to provided primary keys, ordered according to positions of this
features in training dataset query.
get_serving_vectors#
TrainingDataset.get_serving_vectors(entry, external=False)
Returns assembled serving vectors in batches from online feature store.
Arguments
- entry
Dict[str, List[Any]]
: dict of feature group primary key names as keys and value as list of primary keys provided by serving application. - external
Optional[bool]
: boolean, optional. If set to True, the connection to the online feature store is established using the same host as for thehost
parameter in thehsfs.connection()
method. If set to False, the online feature store storage connector is used which relies on the private IP.
Returns
List[list]
List of lists of feature values related to provided primary keys, ordered according to
positions of this features in training dataset query.
get_statistics#
TrainingDataset.get_statistics(commit_time=None)
Returns the statistics for this training dataset at a specific time.
If commit_time
is None
, the most recent statistics are returned.
Arguments
- commit_time
Optional[str]
: Commit time in the formatYYYYMMDDhhmmss
, defaults toNone
.
Returns
Statistics
. Object with statistics information.
get_tag#
TrainingDataset.get_tag(name)
Get the tags of a training dataset.
Arguments
- name: Name of the tag to get.
Returns
tag value
Raises
RestAPIError
in case the backend fails to retrieve the tag.
get_tags#
TrainingDataset.get_tags()
Returns all tags attached to a training dataset.
Returns
Dict[str, obj]
of tags.
Raises
RestAPIError
in case the backend fails to retrieve the tags.
init_prepared_statement#
TrainingDataset.init_prepared_statement(batch=None, external=False)
Initialise and cache parametrized prepared statement to retrieve feature vector from online feature store.
Arguments
- batch
Optional[bool]
: boolean, optional. If set to True, prepared statements will be initialised for retrieving serving vectors as a batch. - external
Optional[bool]
: boolean, optional. If set to True, the connection to the online feature store is established using the same host as for thehost
parameter in thehsfs.connection()
method. If set to False, the online feature store storage connector is used which relies on the private IP.
insert#
TrainingDataset.insert(features, overwrite, write_options={})
Insert additional feature data into the training dataset.
This method appends data to the training dataset either from a Feature Store
Query
, a Spark or Pandas DataFrame
, a Spark RDD, two-dimensional Python
lists or Numpy ndarrays. The schemas must match for this operation.
This can also be used to overwrite all data in an existing training dataset.
Arguments
- features
Union[hsfs.constructor.query.Query, pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]
: Feature data to be materialized. - overwrite
bool
: Whether to overwrite the entire data in the training dataset. - write_options
Optional[Dict[Any, Any]]
: Additional write options as key-value pairs, defaults to{}
. When using thehive
engine, write_options can contain the following entries:- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to compute the training dataset. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits.
- key
Returns
Job
: When using the hive
engine, it returns the Hopsworks Job
that was launched to create the training dataset.
Raises
RestAPIError
: Unable to create training dataset metadata.
read#
TrainingDataset.read(split=None, read_options={})
Read the training dataset into a dataframe.
It is also possible to read only a specific split.
Arguments
- split: Name of the split to read, defaults to
None
, reading the entire training dataset. If the training dataset has split, thesplit
parameter is mandatory. - read_options: Additional read options as key/value pairs, defaults to
{}
.
Returns
DataFrame
: The spark dataframe containing the feature data of the
training dataset.
save#
TrainingDataset.save(features, write_options={})
Materialize the training dataset to storage.
This method materializes the training dataset either from a Feature Store
Query
, a Spark or Pandas DataFrame
, a Spark RDD, two-dimensional Python
lists or Numpy ndarrays.
From v2.5 onward, filters are saved along with the Query
.
Arguments
- features
Union[hsfs.constructor.query.Query, pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]
: Feature data to be materialized. - write_options
Optional[Dict[Any, Any]]
: Additional write options as key-value pairs, defaults to{}
. When using thehive
engine, write_options can contain the following entries:- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to compute the training dataset. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the save call should return only after the Hopsworks Job has finished. By default it waits.
- key
Returns
Job
: When using the hive
engine, it returns the Hopsworks Job
that was launched to create the training dataset.
Raises
RestAPIError
: Unable to create training dataset metadata.
show#
TrainingDataset.show(n, split=None)
Show the first n
rows of the training dataset.
You can specify a split from which to retrieve the rows.
Arguments
- n
int
: Number of rows to show. - split
Optional[str]
: Name of the split to show, defaults toNone
, showing the first rows when taking all splits together.
tf_data#
TrainingDataset.tf_data(
target_name,
split=None,
feature_names=None,
var_len_features=[],
is_training=True,
cycle_length=2,
deterministic=False,
file_pattern="*.tfrecord*",
)
Returns an object with utility methods to read training dataset as tf.data.Dataset
object and handle it for further processing.
Arguments
- target_name
str
: Name of the target variable. - split
Optional[str]
: Name of training dataset split. For example,"train"
,"test"
or"val"
, defaults toNone
, returning the full training dataset. - feature_names
Optional[list]
: Names of training variables, defaults toNone
. - var_len_features
Optional[list]
: Feature names that have variable length and need to be returned astf.io.VarLenFeature
, - defaults to
[]
. is_training: Whether it is for training, testing or validation. Defaults toTrue
. - cycle_length
Optional[int]
: Number of files to be read and deserialized in parallel, defaults to2
. - deterministic
Optional[bool]
: Controls the order in which the transformation produces elements. If set to False, the transformation is allowed to yield elements out of order to trade determinism for performance. Defaults toFalse
. - file_pattern
Optional[str]
: Returns a list of files that match the given pattern Defaults to*.tfrecord*
.
Returns
TFDataEngine
. An object with utility methods to generate and handle tf.data.Dataset
object.
update_statistics_config#
TrainingDataset.update_statistics_config()
Update the statistics configuration of the training dataset.
Change the statistics_config
object and persist the changes by calling
this method.
Returns
TrainingDataset
. The updated metadata object of the training dataset.
Raises
RestAPIError
.
TFData engine#
tf_record_dataset#
TFDataEngine.tf_record_dataset(
batch_size=None,
num_epochs=None,
one_hot_encode_labels=False,
num_classes=None,
process=False,
serialized_ndarray_fname=[],
)
Reads tfrecord files and returns ParallelMapDataset
or PrefetchDataset
object, depending on process
set
to False
or True
, respectively.
If process
set to False
returned object ParallelMapDataset
can be further processed by user. For example
applying custom transformations to features, batching, caching etc. process=True
will return PrefetchDataset
object, that contains tuple of feature vector and label, already batched and ready to input into model training.
Example of using tf_record_dataset:
connection = hsfs.connection()
fs = connection.get_feature_store();
td = fs.get_training_dataset("sample_model", 3)
td.tf_data(target_name = "id").tf_record_dataset(batch_size=1, num_epochs=1, process=True)
Arguments
- batch_size
Optional[int]
: Size of batch, defaults toNone
. - num_epochs
Optional[int]
: Number of epochs to train, defaults toNone
. - one_hot_encode_labels
Optional[bool]
: If setTrue
then one hot encode labels, defaults toFalse
. - num_classes
Optional[int]
: If aboveTrue
then provide number of target classes, defaults toNone
. - process
Optional[bool]
: If setTrue
api will optimise tf data read operation, and return feature vector for model with single input, defaults toFalse
. - serialized_ndarray_fname
Optional[list]
: Names of features that contain serialised multi dimensional arrays, defaults to[]
.
Returns
PrefetchDataset
. If process
is set to True
.
ParallelMapDataset
. If process
is set to False
.
tf_csv_dataset#
TFDataEngine.tf_csv_dataset(
batch_size=None, num_epochs=None, one_hot_encode_labels=False, num_classes=None, process=False
)
Reads csv files and returns CsvDatasetV2
or PrefetchDataset
object, depending on process
set to False
or True
, respectively.
If process
set to False
returned object CsvDatasetV2
can be further processed by user. For example
applying custom transformations to features, batching, caching etc. process=True
will return PrefetchDataset
object, that contains tuple of feature vector and label, already batched and ready to input into model training.
Example of using tf_record_dataset:
connection = hsfs.connection()
fs = connection.get_feature_store();
td = fs.get_training_dataset("sample_model", 1)
td.tf_data(target_name = "id").tf_csv_dataset(batch_size=1, num_epochs=1, process=True)
Arguments
- batch_size
Optional[int]
: Size of batch, defaults toNone
. - num_epochs
Optional[int]
: Number of epochs to train, defaults toNone
. - one_hot_encode_labels
Optional[bool]
: If set true then one hot encode labels, defaults toFalse
. - num_classes
Optional[int]
: If above true then provide number of target classes, defaults toNone
. - process
Optional[bool]
: If set true api will optimise tf data read operation, and return feature vector for model with single input, defaults toFalse
. - serialized_ndarray_fname: Names of features that contain serialised multi dimensional arrays, defaults to
[]
.
Returns
PrefetchDataset
. If process
is set to True
.
CsvDatasetV2
. If process
is set to False
.