Skip to content

Query vs DataFrame#

HSFS provides a DataFrame API to ingest data into the Hopsworks Feature Store. You can also retrieve feature data in a DataFrame, that can either be used directly to train models or materialized to file(s) for later use to train models.

The idea of the Feature Store is to have pre-computed features available for both training and serving models. The key functionality required to generate training datasets from reusable features are: feature selection, joins, filters, and point in time queries. The Query object enables you to select features from different feature groups to join together to be used in a feature view.

The joining functionality is heavily inspired by the APIs used by Pandas to merge DataFrames. The APIs allow you to specify which features to select from which feature group, how to join them and which features to use in join conditions.

# create a query
feature_join = rain_fg.select_all() \
    .join(temperature_fg.select_all(), on=["date", "location_id"]) \
    .join(location_fg.select_all())

# save the query to feature view
feature_view = fs.create_feature_view(
    name='rain_dataset',
    query=feature_join
)

# retrieve the query back from the feature view
feature_view = fs.get_feature_view(rain_dataset, version=1)
query = feature_view.query
// create a query
val featureJoin = (rainFg.selectAll()
    .join(temperatureFg.selectAll(), on=Seq("date", "location_id"))
    .join(locationFg.selectAll()))

val featureView = featureStore.createFeatureView()
    .name("rain_dataset")
    .query(featureJoin)
    .build();

// retrieve the query back from the feature view
val featureView = fs.getFeatureView(rain_dataset, 1)
val query = featureView.getQuery()

If a data scientist wants to modify a new feature that is not available in the feature store, she can write code to compute the new feature (using existing features or external data) and ingest the new feature values into the feature store. If the new feature is based solely on existing feature values in the Feature Store, we call it a derived feature. The same HSFS APIs can be used to compute derived features as well as features using external data sources.

The Query Abstraction#

Most operations performed on FeatureGroup metadata objects will return a Query with the applied operation.

Examples#

Selecting features from a feature group is a lazy operation, returning a query with the selected features only:

rain_fg = fs.get_feature_group("rain_fg")

# Returns Query
feature_join = rain_fg.select(["location_id", "weekly_rainfall"])
val rainFg = fs.getFeatureGroup("rain_fg")

# Returns Query
val featureJoin = rainFg.select(Seq("location_id", "weekly_rainfall"))

Join#

Similarly, joins return query objects. The simplest join in one where we join all of the features together from two different feature groups without specifying a join key - HSFS will infer the join key as a common primary key between the two feature groups. By default, Hopsworks will use the maximal matching subset of the primary keys of the two feature groups as joining key(s), if not specified otherwise.

# Returns Query
feature_join = rain_fg.join(temperature_fg)
// Returns Query
val featureJoin = rainFg.join(temperatureFg)

More complex joins are possible by selecting subsets of features from the joined feature groups and by specifying a join key and type. Possible join types are "inner", "left" or "right". Furthermore, it is possible to specify different features for the join key of the left and right feature group. The join key lists should contain the names of the features to join on.

feature_join = rain_fg.select_all() \
    .join(temperature_fg.select_all(), on=["date", "location_id"]) \
    .join(location_fg.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
val featureJoin = (rainFg.selectAll()
    .join(temperatureFg.selectAll(), Seq("date", "location_id"))
    .join(locationFg.selectAll(), Seq("location_id"), Seq("id"), "left"))

Nested Joins

The API currently does not support nested joins. That is joins of joins. You can fall back to Spark DataFrames to cover these cases. However, if you have to use joins of joins, most likely there is potential to optimise your feature group structure.

Filter#

In the same way as joins, applying filters to feature groups creates a query with the applied filter.

Filters are constructed with Python Operators ==, >=, <=, !=, >, < and additionally with the methods isin and like. Bitwise Operators & and | are used to construct conjunctions. For the Scala part of the API, equivalent methods are available in the Feature and Filter classes.

filtered_rain = rain_fg.filter(rain_fg.location_id == 10)
val filteredRain = rainFg.filter(rainFg.getFeature("location_id").eq(10))

Filters are fully compatible with joins:

feature_join = rain_fg.select_all() \
    .join(temperature_fg.select_all(), on=["date", "location_id"]) \
    .join(location_fg.select_all(), left_on=["location_id"], right_on=["id"], join_type="left") \
    .filter((rain_fg.location_id == 10) | (rain_fg.location_id == 20))
val featureJoin = (rainFg.selectAll()
    .join(temperatureFg.selectAll(), Seq("date", "location_id"))
    .join(locationFg.selectAll(), Seq("location_id"), Seq("id"), "left")
    .filter(rainFg.getFeature("location_id").eq(10).or(rainFg.getFeature("location_id").eq(20))))

The filters can be applied at any point of the query:

feature_join = rain_fg.select_all() \
    .join(temperature_fg.select_all().filter(temperature_fg.avg_temp >= 22), on=["date", "location_id"]) \
    .join(location_fg.select_all(), left_on=["location_id"], right_on=["id"], join_type="left") \
    .filter(rain_fg.location_id == 10)
val featureJoin = (rainFg.selectAll()
    .join(temperatureFg.selectAll().filter(temperatureFg.getFeature("avg_temp").ge(22)), Seq("date", "location_id"))
    .join(locationFg.selectAll(), Seq("location_id"), Seq("id"), "left")
    .filter(rainFg.getFeature("location_id").eq(10)))

Joins and/or Filters on feature view query#

The query retrieved from a feature view can be extended with new joins and/or new filters. However, this operation will not update the metadata and persist the updated query of the feature view itself. This query can then be used to create a new feature view.

fs = ...
wind_speed_fg = fs.get_feature_group(name="wind_speed_fg", version=1)
rain_fg = fs.get_feature_group(name="rain_fg", version=1)
feature_view = fs.get_feature_view(rain_dataset, version=1)
feature_view.query \
    .join(wind_speed_fg.select_all()) \
    .filter((rain_fg.location_id == 54)
val fs = ...
val windSpeedFg = fs.getFeatureGroup("wind_speed_fg", 1)
val rainFg = fs.getFeatureGroup("rain_fg", 1)
val featureView = fs.getFeatureView(rain_dataset, 1)
featureView.getQuery()
    .join(windSpeedFg.selectAll())
    .filter(rainFg.getFeature("location_id").eq(54))

Warning

Every join/filter operation applied to an existing feature view query instance will update its state and accumulate. To successfully apply new join/filter logic it is recommended to refresh the query instance by re-fetching the feature view:

fs = ...
wind_speed_fg = fs.get_feature_group(name="wind_speed_fg", version=1)
solar_irradiance_fg = fs.get_feature_group(name="solar_irradiance_fg", version=1)
rain_fg = fs.get_feature_group(name="rain_fg", version=1)

# fetch new feature view and its query instance
feature_view = fs.get_feature_view(rain_dataset, version=1)

# apply join/filter logic based on location and wind speed
feature_view.query.join(wind_speed_fg.select_all()) \
    .filter((rain_fg.location_id == 54)

# to apply new logic independent of location and wind speed from above 
# re-fetch new feature view and its query instance
feature_view = fs.get_feature_view(rain_dataset, version=1)

# apply new join/filter logic based on solar irradiance
feature_view.query.join(solar_irradiance_fg.select_all()) \
    .filter(solar_irradiance_fg.location_id == 28)
fs = ...
windSpeedFg = fs.getFeatureGroup("wind_speed_fg", 1)
solarIrradianceFg = fs.getFeatureGroup("solar_irradiance_fg", 1)
rainFg = fs.getFeatureGroup("rain_fg", 1)

// fetch new feature view and its query instance
val featureView = fs.getFeatureView(rain_dataset, version=1)

// apply join/filter logic based on location and wind speed
featureView.getQuery.join(windSpeedFg.selectAll())
    .filter(rainFg.getFeature("location_id").eq(54))

// to apply new logic independent of location and wind speed from above 
// re-fetch new feature view and its query instance
val featureView = fs.getFeatureView(rain_dataset, 1)

// apply new join/filter logic based on solar irradiance
featureView.getQuery.join(solarIrradianceFg.selectAll())
    .filter(solarIrradianceFg.getFeature("location_id").eq(28))