Skip to content

Query vs DataFrame#

HSFS provides a DataFrame API to ingest data into the Hopsworks Feature Store. You can also retrieve feature data in a DataFrame, that can either be used directly to train models or materialized to file(s) for later use to train models.

The idea of the Feature Store is to have pre-computed features available for both training and serving models. The key functionality required to generate training datasets from reusable features are: feature selection, joins, filters and point in time queries. To enable this functionality, we are introducing a new expressive Query abstraction with HSFS that provides these operations and guarantees reproducible creation of training datasets from features in the Feature Store.

The new joining functionality is heavily inspired by the APIs used by Pandas to merge DataFrames. The APIs allow you to specify which features to select from which feature group, how to join them and which features to use in join conditions.

# create a query
feature_join = rain_fg.select_all() \
                        .join(temperature_fg.select_all(), on=["date", "location_id"]) \
                        .join(location_fg.select_all())

td = fs.create_training_dataset("rain_dataset",
                        version=1,
                        label=weekly_rain,
                        data_format=tfrecords)

# materialize query in the specified file format
td.save(feature_join)

# use materialized training dataset for training, possibly in a different environment
td = fs.get_training_dataset(rain_dataset, version=1)

# get TFRecordDataset to use in a TensorFlow model
dataset = td.tf_data().tf_record_dataset(batch_size=32, num_epochs=100)

# reproduce query for online feature store and drop label for inference
jdbc_querystring = td.get_query(online=True, with_label=False)
# create a query
val featureJoin = (rainFg.selectAll()
                        .join(temperatureFg.selectAll(), on=Seq("date", "location_id"))
                        .join(locationFg.selectAll()))

val td = (fs.createTrainingDataset()
                        .name("rain_dataset")
                        .version(1)
                        .label(weekly_rain)
                        .dataFormat(tfrecords)
                        .build())

# materialize query in the specified file format
td.save(featureJoin)

# use materialized training dataset for training, possibly in a different environment
val td = fs.getTrainingDataset(rain_dataset, 1)

# reproduce query for online feature store and drop label for inference
val jdbcQuerystring = td.getQuery(true, false)

If a data scientist wants to modify a new feature that is not available in the Feature Store, she can write code to compute the new feature (using existing features or external data) and ingest the new feature values into the Feature Store. If the new feature is based solely on existing feature values in the Feature Store, we call it a derived feature. The same HSFS APIs can be used to compute derived features as well as features using external data sources.

The Query Abstraction#

Most operations performed on FeatureGroup metadata objects will return a Query with the applied operation.

Examples#

Selecting features from a feature group is a lazy operation, returning a query with the selected features only:

rain_fg = fs.get_feature_group("rain_fg")

# Returns Query
feature_join = rain_fg.select(["location_id", "weekly_rainfall"])
val rainFg = fs.getFeatureGroup("rain_fg")

# Returns Query
val featureJoin = rainFg.select(Seq("location_id", "weekly_rainfall"))

Join#

Similarly joins return queries. The simplest join, is one of two feature groups without specifying a join key or type. By default Hopsworks will use the maximal matching subset of the primary key of the two feature groups as joining key, if not specified otherwise.

# Returns Query
feature_join = rain_fg.join(temperature_fg)
# Returns Query
val featureJoin = rainFg.join(temperatureFg)

More complex joins are possible by selecting subsets of features from the joines feature groups and by specifying a join key and type. Possible join types are "inner", "left" or "right". Furthermore, it is possible to specify different features for the join key of the left and right feature group. The join key lists should contain the name of the features to join on.

feature_join = rain_fg.select_all() \
                        .join(temperature_fg.select_all(), on=["date", "location_id"]) \
                        .join(location_fg.select_all(), left_on=["location_id"], right_on=["id"], how="left")
val featureJoin = (rainFg.selectAll()
                        .join(temperatureFg.selectAll(), Seq("date", "location_id"))
                        .join(locationFg.selectAll(), Seq("location_id"), Seq("id"), "left"))

Nested Joins

The API currently does not support nested joins. That is joins of joins. You can fall back to Spark DataFrames to cover these cases. However, if you have to use joins of joins, most likely there is potential to optimise your feature group structure.

Filter#

In the same way as joins, applying filters to feature groups creates a query with the applied filter.

Filters are constructed with Python Operators ==, >=, <=, !=, >, < and using the Bitwise Operators & and | to construct conjunctions. For the Scala part of the API, equivalent methods are available in the Feature and Filter classes.

filtered_rain = rain_fg.filter(rain_fg.location_id == 10)
val filteredRain = rainFg.filter(rainFg.getFeature("location_id").eq(10))

Filters are fully compatible with joins:

feature_join = rain_fg.select_all() \
                        .join(temperature_fg.select_all(), on=["date", "location_id"]) \
                        .join(location_fg.select_all(), left_on=["location_id"], right_on=["id"], how="left") \
                        .filter((rain_fg.location_id == 10) | (rain_fg.location_id == 20))
val featureJoin = (rainFg.selectAll()
                        .join(temperatureFg.selectAll(), Seq("date", "location_id"))
                        .join(locationFg.selectAll(), Seq("location_id"), Seq("id"), "left")
                        .filter(rainFg.getFeature("location_id").eq(10).or(rainFg.getFeature("location_id").eq(20))))

The filters can be applied at any point of the query:

feature_join = rain_fg.select_all() \
                        .join(temperature_fg.select_all().filter(temperature_fg.avg_temp >= 22), on=["date", "location_id"]) \
                        .join(location_fg.select_all(), left_on=["location_id"], right_on=["id"], how="left") \
                        .filter(rain_fg.location_id == 10)
val featureJoin = (rainFg.selectAll()
                        .join(temperatureFg.selectAll().filter(temperatureFg.getFeature("avg_temp").ge(22)), Seq("date", "location_id"))
                        .join(locationFg.selectAll(), Seq("location_id"), Seq("id"), "left")
                        .filter(rainFg.getFeature("location_id").eq(10)))

Methods#

[source]

as_of#

Query.as_of(wallclock_time)

Perform time travel on the given Query.

This method returns a new Query object at the specified point in time.

Warning

The wallclock_time needs to be a time included into the Hudi active timeline. By default Hudi keeps the last 20 to 30 commits in the active timeline. If you need to keep a longer active timeline, you can overwrite the options: hoodie.keep.min.commits and hoodie.keep.max.commits when calling the insert() method.

This can then either be read into a Dataframe or used further to perform joins or construct a training dataset.

Arguments

  • wallclock_time: Datetime string. The String should be formatted in one of the following formats %Y%m%d, %Y%m%d%H, %Y%m%d%H%M, or %Y%m%d%H%M%S.

Returns

Query. The query object with the applied time travel condition.


[source]

filter#

Query.filter(f)

Apply filter to the feature group.

Selects all features and returns the resulting Query with the applied filter.

from hsfs.feature import Feature

query.filter(Feature("weekly_sales") > 1000)

If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:

query.filter(fg.feature1 == 1).show(10)

Composite filters require parenthesis:

query.filter((fg.feature1 == 1) | (fg.feature2 >= 2))

Arguments

  • f Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]: Filter object.

Returns

Query. The query object with the applied filter.


[source]

join#

Query.join(sub_query, on=[], left_on=[], right_on=[], join_type="inner", prefix=None)

Join Query with another Query.

If no join keys are specified, Hopsworks will use the maximal matching subset of the primary keys of the feature groups you are joining. Joins of one level are supported, no neted joins.

Arguments

  • sub_query hsfs.constructor.query.Query: Right-hand side query to join.
  • on Optional[List[str]]: List of feature names to join on if they are available in both feature groups. Defaults to [].
  • left_on Optional[List[str]]: List of feature names to join on from the left feature group of the join. Defaults to [].
  • right_on Optional[List[str]]: List of feature names to join on from the right feature group of the join. Defaults to [].
  • join_type Optional[str]: Type of join to perform, can be "inner", "outer", "left" or "right". Defaults to "inner".
  • prefix Optional[str]: User provided prefix to avoid feature name clash. Prefix is applied to the right feature group of the query. Defaults to None.

Returns

Query: A new Query object representing the join.


[source]

json#

Query.json()

[source]

pull_changes#

Query.pull_changes(wallclock_start_time, wallclock_end_time)

[source]

read#

Query.read(online=False, dataframe_type="default", read_options={})

Read the specified query into a DataFrame.

It is possible to specify the storage (online/offline) to read from and the type of the output DataFrame (Spark, Pandas, Numpy, Python Lists).

Arguments

  • online Optional[bool]: Read from online storage. Defaults to False.
  • dataframe_type Optional[str]: DataFrame type to return. Defaults to "default".
  • read_options Optional[dict]: Optional dictionary with read options for Spark. Defaults to {}.

Returns

DataFrame: DataFrame depending on the chosen type.


[source]

show#

Query.show(n, online=False)

Show the first N rows of the Query.

Arguments

  • n int: Number of rows to show.
  • online Optional[bool]: Show from online storage. Defaults to False.

[source]

to_dict#

Query.to_dict()

[source]

to_string#

Query.to_string(online=False)

Properties#

[source]

left_feature_group_end_time#


[source]

left_feature_group_start_time#