Query#
Query objects are strictly generated by HSFS APIs called on Feature Group objects. Users will never construct a Query object using the constructor of the class. For this reason we do not provide the full documentation of the class here.
Methods#
append_feature#
Query.append_feature(feature)
Append a feature to the query.
Arguments
- feature:
[str, Feature]
. Name of the feature to append to the query.
as_of#
Query.as_of(wallclock_time=None, exclude_until=None)
Perform time travel on the given Query.
This method returns a new Query object at the specified point in time. Optionally, commits before a specified point in time can be excluded from the query. The Query can then either be read into a Dataframe or used further to perform joins or construct a training dataset.
Reading features at a specific point in time:
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:34:11").read().show()
Reading commits incrementally between specified points in time:
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:34:11", exclude_until="2020-10-19 07:34:11").read().show()
The first parameter is inclusive while the latter is exclusive. That means, in order to query a single commit, you need to query that commit time and exclude everything just before the commit.
Reading only the changes from a single commit
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:31:38", exclude_until="2020-10-20 07:31:37").read().show()
When no wallclock_time is given, the latest state of features is returned. Optionally, commits before a specified point in time can still be excluded.
Reading the latest state of features, excluding commits before a specified point in time
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of(None, exclude_until="2020-10-20 07:31:38").read().show()
Note that the interval will be applied to all joins in the query. If you want to query different intervals for different feature groups in the query, you have to apply them in a nested fashion:
query1.as_of(..., ...)
.join(query2.as_of(..., ...))
If instead you apply another as_of
selection after the join, all
joined feature groups will be queried with this interval:
query1.as_of(..., ...) # as_of is not applied
.join(query2.as_of(..., ...)) # as_of is not applied
.as_of(..., ...)
Warning
This function only works for queries on feature groups with time_travel_format='HUDI'.
Warning
Excluding commits via exclude_until is only possible within the range of the Hudi active timeline.
By default, Hudi keeps the last 20 to 30 commits in the active timeline.
If you need to keep a longer active timeline, you can overwrite the options:
hoodie.keep.min.commits
and hoodie.keep.max.commits
when calling the insert()
method.
Arguments
- wallclock_time
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Read data as of this point in time. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
, or%Y-%m-%d %H:%M:%S
. - exclude_until
Optional[Union[str, int, datetime.datetime, datetime.date]]
: Exclude commits until this point in time. Strings should be formatted in one of the following formats%Y-%m-%d
,%Y-%m-%d %H
,%Y-%m-%d %H:%M
, or%Y-%m-%d %H:%M:%S
.
Returns
Query
. The query object with the applied time travel condition.
filter#
Query.filter(f)
Apply filter to the feature group.
Selects all features and returns the resulting Query
with the applied filter.
Example
from hsfs.feature import Feature
query.filter(Feature("weekly_sales") > 1000)
query.filter(Feature("name").like("max%"))
If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:
query.filter(fg.feature1 == 1).show(10)
Composite filters require parenthesis:
query.filter((fg.feature1 == 1) | (fg.feature2 >= 2))
Filters are fully compatible with joins
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")
query = fg1.select_all()
.join(fg2.select_all(), on=["date", "location_id"])
.join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
.filter((fg1.location_id == 10) | (fg1.location_id == 20))
Filters can be applied at any point of the query
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")
query = fg1.select_all()
.join(fg2.select_all().filter(fg2.avg_temp >= 22), on=["date", "location_id"])
.join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
.filter(fg1.location_id == 10)
Arguments
- f
Union[hsfs.constructor.filter.Filter, hsfs.constructor.filter.Logic]
: Filter object.
Returns
Query
. The query object with the applied filter.
from_response_json#
Query.from_response_json(json_dict)
get_feature#
Query.get_feature(feature_name)
Get a feature by name.
Arguments
- feature_name:
str
. Name of the feature to get.
Returns
Feature
. Feature object.
is_cache_feature_group_only#
Query.is_cache_feature_group_only()
Query contains only cached feature groups
is_time_travel#
Query.is_time_travel()
Query contains time travel
join#
Query.join(sub_query, on=[], left_on=[], right_on=[], join_type="inner", prefix=None)
Join Query with another Query.
If no join keys are specified, Hopsworks will use the maximal matching subset of the primary keys of the feature groups you are joining. Joins of one level are supported, no nested joins.
Join two feature groups
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
query = fg1.select_all().join(fg2.select_all())
More complex join
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")
query = fg1.select_all()
.join(fg2.select_all(), on=["date", "location_id"])
.join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
Arguments
- sub_query
hsfs.constructor.query.Query
: Right-hand side query to join. - on
Optional[List[str]]
: List of feature names to join on if they are available in both feature groups. Defaults to[]
. - left_on
Optional[List[str]]
: List of feature names to join on from the left feature group of the join. Defaults to[]
. - right_on
Optional[List[str]]
: List of feature names to join on from the right feature group of the join. Defaults to[]
. - join_type
Optional[str]
: Type of join to perform, can be"inner"
,"outer"
,"left"
or"right"
. Defaults to "inner". - prefix
Optional[str]
: User provided prefix to avoid feature name clash. Prefix is applied to the right feature group of the query. Defaults toNone
.
Returns
Query
: A new Query object representing the join.
pull_changes#
Query.pull_changes(wallclock_start_time, wallclock_end_time)
Deprecated
pull_changes
method is deprecated. Use
`as_of(end_wallclock_time, exclude_until=start_wallclock_time) instead.
read#
Query.read(online=False, dataframe_type="default", read_options={})
Read the specified query into a DataFrame.
It is possible to specify the storage (online/offline) to read from and the type of the output DataFrame (Spark, Pandas, Numpy, Python Lists).
External Feature Group Engine Support
Spark only
Reading a Query containing an External Feature Group directly into a Pandas Dataframe using Python/Pandas as Engine is not supported, however, you can use the Query API to create Feature Views/Training Data containing External Feature Groups.
Arguments
- online
Optional[bool]
: Read from online storage. Defaults toFalse
. - dataframe_type
Optional[str]
: DataFrame type to return. Defaults to"default"
. - read_options
Optional[dict]
: Dictionary of read options for Spark in spark engine. Only for python engine:- key
"use_hive"
and valueTrue
to read query with Hive instead of ArrowFlight Server. - key
"arrow_flight_config"
to pass a dictionary of arrow flight configurations. For example:{"arrow_flight_config": {"timeout": 900}}
- key "hive_config" to pass a dictionary of hive or tez configurations.
For example:
{"hive_config": {"hive.tez.cpu.vcores": 2, "tez.grouping.split-count": "3"}}
Defaults to{}
.
- key
Returns
DataFrame
: DataFrame depending on the chosen type.
show#
Query.show(n, online=False)
Show the first N rows of the Query.
Show the first 10 rows
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
query = fg1.select_all().join(fg2.select_all())
query.show(10)
Arguments
- n
int
: Number of rows to show. - online
Optional[bool]
: Show from online storage. Defaults toFalse
.
to_string#
Query.to_string(online=False, arrow_flight=False)
Example
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
query = fg1.select_all().join(fg2.select_all())
query.to_string()
Properties#
featuregroups#
List of feature groups used in the query
features#
List of all features in the query
filters#
All filters used in the query
joins#
List of joins in the query
left_feature_group_end_time#
End time of time travel for the left feature group.
left_feature_group_start_time#
Start time of time travel for the left feature group.