Skip to content

Query#

Query objects are strictly generated by HSFS APIs called on Feature Group objects. Users will never construct a Query object using the constructor of the class. For this reason we do not provide the full documentation of the class here.

Methods#

[source]

append_feature#

Query.append_feature(feature)

Append a feature to the query.

Arguments

  • feature str | hsfs.feature.Feature: [str, Feature]. Name of the feature to append to the query.

[source]

as_of#

Query.as_of(wallclock_time=None, exclude_until=None)

Perform time travel on the given Query.

Pyspark/Spark Only

Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context

This method returns a new Query object at the specified point in time. Optionally, commits before a specified point in time can be excluded from the query. The Query can then either be read into a Dataframe or used further to perform joins or construct a training dataset.

Reading features at a specific point in time:

fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:34:11").read().show()

Reading commits incrementally between specified points in time:

fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:34:11", exclude_until="2020-10-19 07:34:11").read().show()

The first parameter is inclusive while the latter is exclusive. That means, in order to query a single commit, you need to query that commit time and exclude everything just before the commit.

Reading only the changes from a single commit

fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:31:38", exclude_until="2020-10-20 07:31:37").read().show()

When no wallclock_time is given, the latest state of features is returned. Optionally, commits before a specified point in time can still be excluded.

Reading the latest state of features, excluding commits before a specified point in time

fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of(None, exclude_until="2020-10-20 07:31:38").read().show()

Note that the interval will be applied to all joins in the query. If you want to query different intervals for different feature groups in the query, you have to apply them in a nested fashion:

query1.as_of(..., ...)
    .join(query2.as_of(..., ...))

If instead you apply another as_of selection after the join, all joined feature groups will be queried with this interval:

query1.as_of(..., ...)  # as_of is not applied
    .join(query2.as_of(..., ...))  # as_of is not applied
    .as_of(..., ...)

Warning

This function only works for queries on feature groups with time_travel_format='HUDI'.

Warning

Excluding commits via exclude_until is only possible within the range of the Hudi active timeline. By default, Hudi keeps the last 20 to 30 commits in the active timeline. If you need to keep a longer active timeline, you can overwrite the options: hoodie.keep.min.commits and hoodie.keep.max.commits when calling the insert() method.

Arguments

  • wallclock_time str | int | datetime.datetime | datetime.date | None: Read data as of this point in time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, or %Y-%m-%d %H:%M:%S.
  • exclude_until str | int | datetime.datetime | datetime.date | None: Exclude commits until this point in time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, or %Y-%m-%d %H:%M:%S.

Returns

Query. The query object with the applied time travel condition.


[source]

filter#

Query.filter(f)

Apply filter to the feature group.

Selects all features and returns the resulting Query with the applied filter.

Example

from hsfs.feature import Feature

query.filter(Feature("weekly_sales") > 1000)
query.filter(Feature("name").like("max%"))

If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:

query.filter(fg.feature1 == 1).show(10)

Composite filters require parenthesis and symbols for logical operands (e.g. &, |, ...):

query.filter((fg.feature1 == 1) | (fg.feature2 >= 2))

Filters are fully compatible with joins

fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")

query = fg1.select_all()
    .join(fg2.select_all(), on=["date", "location_id"])
    .join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
    .filter((fg1.location_id == 10) | (fg1.location_id == 20))

Filters can be applied at any point of the query

fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")

query = fg1.select_all()
    .join(fg2.select_all().filter(fg2.avg_temp >= 22), on=["date", "location_id"])
    .join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
    .filter(fg1.location_id == 10)

Arguments

  • f hsfs.constructor.filter.Filter | hsfs.constructor.filter.Logic: Filter object.

Returns

Query. The query object with the applied filter.


[source]

from_response_json#

Query.from_response_json(json_dict)

[source]

get_feature#

Query.get_feature(feature_name)

Get a feature by name.

Arguments

  • feature_name str: str. Name of the feature to get.

Returns

Feature. Feature object.


[source]

is_cache_feature_group_only#

Query.is_cache_feature_group_only()

Query contains only cached feature groups


[source]

is_time_travel#

Query.is_time_travel()

Query contains time travel


[source]

join#

Query.join(sub_query, on=None, left_on=None, right_on=None, join_type="inner", prefix=None)

Join Query with another Query.

If no join keys are specified, Hopsworks will use the maximal matching subset of the primary keys of the feature groups you are joining. Joins of one level are supported, no nested joins.

Join two feature groups

fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")

query = fg1.select_all().join(fg2.select_all())

More complex join

fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")

query = fg1.select_all()
        .join(fg2.select_all(), on=["date", "location_id"])
        .join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")

Arguments

  • sub_query hsfs.constructor.query.Query: Right-hand side query to join.
  • on List[str] | None: List of feature names to join on if they are available in both feature groups. Defaults to [].
  • left_on List[str] | None: List of feature names to join on from the left feature group of the join. Defaults to [].
  • right_on List[str] | None: List of feature names to join on from the right feature group of the join. Defaults to [].
  • join_type str | None: Type of join to perform, can be "inner", "outer", "left" or "right". Defaults to "inner".
  • prefix str | None: User provided prefix to avoid feature name clash. Prefix is applied to the right feature group of the query. Defaults to None.

Returns

Query: A new Query object representing the join.


[source]

pull_changes#

Query.pull_changes(wallclock_start_time, wallclock_end_time)

Deprecated

pull_changes method is deprecated. Use `as_of(end_wallclock_time, exclude_until=start_wallclock_time) instead.


[source]

read#

Query.read(online=False, dataframe_type="default", read_options=None)

Read the specified query into a DataFrame.

It is possible to specify the storage (online/offline) to read from and the type of the output DataFrame (Spark, Pandas, Numpy, Python Lists).

External Feature Group Engine Support

Spark only

Reading a Query containing an External Feature Group directly into a Pandas Dataframe using Python/Pandas as Engine is not supported, however, you can use the Query API to create Feature Views/Training Data containing External Feature Groups.

Arguments

  • online bool: Read from online storage. Defaults to False.
  • dataframe_type str: DataFrame type to return. Defaults to "default".
  • read_options Dict[str, Any] | None: Dictionary of read options for Spark in spark engine. Only for python engine:
    • key "use_hive" and value True to read query with Hive instead of Hopsworks Feature Query Service.
    • key "arrow_flight_config" to pass a dictionary of arrow flight configurations. For example: {"arrow_flight_config": {"timeout": 900}}
    • key "hive_config" to pass a dictionary of hive or tez configurations. For example: {"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}} Defaults to {}.

Returns

DataFrame: DataFrame depending on the chosen type.


[source]

show#

Query.show(n, online=False)

Show the first N rows of the Query.

Show the first 10 rows

fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")

query = fg1.select_all().join(fg2.select_all())

query.show(10)

Arguments

  • n int: Number of rows to show.
  • online bool: Show from online storage. Defaults to False.

[source]

to_string#

Query.to_string(online=False, arrow_flight=False)

Example

fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")

query = fg1.select_all().join(fg2.select_all())

query.to_string()

Properties#

[source]

featuregroups#

List of feature groups used in the query


[source]

features#

List of all features in the query


[source]

filters#

All filters used in the query


[source]

joins#

List of joins in the query


[source]

left_feature_group_end_time#

End time of time travel for the left feature group.


[source]

left_feature_group_start_time#

Start time of time travel for the left feature group.