public class FeatureGroup extends FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
Modifier and Type | Field and Description |
---|---|
protected StatisticsEngine |
statisticsEngine |
created, creator, deltaStreamerJobConf, description, eventTime, expectationsNames, featureGroupEngineBase, features, featureStore, hudiPrecombineKey, id, location, LOGGER, name, onlineEnabled, onlineTopicName, partitionKeys, primaryKeys, statisticColumns, statisticsConfig, subject, timeTravelFormat, type, utils, version
Constructor and Description |
---|
FeatureGroup() |
FeatureGroup(FeatureStore featureStore,
int id) |
FeatureGroup(FeatureStore featureStore,
Integer id) |
FeatureGroup(FeatureStore featureStore,
@NonNull String name,
Integer version,
String description,
List<String> primaryKeys,
List<String> partitionKeys,
String hudiPrecombineKey,
boolean onlineEnabled,
TimeTravelFormat timeTravelFormat,
List<Feature> features,
StatisticsConfig statisticsConfig,
String onlineTopicName,
String eventTime) |
FeatureGroup(Integer id,
String description,
List<Feature> features) |
Modifier and Type | Method and Description |
---|---|
void |
appendFeatures(Feature features)
Append a single feature to the schema of the feature group.
|
void |
appendFeatures(List<Feature> features)
Append features to the schema of the feature group.
|
Query |
asOf(String wallclockTime)
Get Query object to retrieve all features of the group at a point in the past.
|
Query |
asOf(String wallclockTime,
String excludeUntil)
Get Query object to retrieve all features of the group at a point in the past.
|
void |
commitDeleteRecord(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData)
Drops records present in the provided DataFrame and commits it as update to this Feature group.
|
void |
commitDeleteRecord(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Map<String,String> writeOptions)
Drops records present in the provided DataFrame and commits it as update to this Feature group.
|
Map<Long,Map<String,String>> |
commitDetails()
Retrieves commit timeline for this feature group.
|
Map<Long,Map<String,String>> |
commitDetails(Integer limit)
Retrieves commit timeline for this feature group.
|
Map<Long,Map<String,String>> |
commitDetails(String wallclockTime)
Return commit details as of specific point in time.
|
Map<Long,Map<String,String>> |
commitDetails(String wallclockTime,
Integer limit)
Return commit details as of specific point in time.
|
Statistics |
computeStatistics()
Recompute the statistics for the feature group and save them to the feature store.
|
Statistics |
computeStatistics(String wallclockTime)
Recompute the statistics for the feature group and save them to the feature store.
|
Statistics |
getStatistics()
Get the last statistics commit for the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData)
Incrementally insert data to a feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
boolean overwrite)
Incrementally insert data to a feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
boolean overwrite,
Map<String,String> writeOptions)
Incrementally insert data to a feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
boolean overwrite,
Map<String,String> writeOptions,
JobConfiguration jobConfiguration) |
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
HudiOperationType operation)
Incrementally insert data to a feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
JobConfiguration jobConfiguration) |
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Map<String,String> writeOptions)
Incrementally insert data to a feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Storage storage)
Incrementally insert data to a feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Storage storage,
boolean overwrite)
Incrementally insert data to a feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Storage storage,
boolean overwrite,
HudiOperationType operation,
Map<String,String> writeOptions)
Incrementally insert data to a feature group or overwrite all data contained in the feature group.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData)
Deprecated.
insertStream method is deprecated FeatureGroups. Full capability insertStream is available for StreamFeatureGroups.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Map<String,String> writeOptions)
Deprecated.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName)
Deprecated.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
Map<String,String> writeOptions) |
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode)
Deprecated.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout)
Deprecated.
insertStream method is deprecated FeatureGroups. Full capability insertStream is available for StreamFeatureGroups.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout,
String checkpointLocation)
Deprecated.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout,
String checkpointLocation,
Map<String,String> writeOptions)
Deprecated.
|
Object |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout,
String checkpointLocation,
Map<String,String> writeOptions,
JobConfiguration jobConfiguration) |
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
String checkpointLocation)
Deprecated.
insertStream method is deprecated FeatureGroups. Full capability insertStream is available for StreamFeatureGroups.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
String checkpointLocation)
Deprecated.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read()
Reads the feature group from the offline storage as Spark DataFrame.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(boolean online)
Reads the feature group from the offline or online storage as Spark DataFrame.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(boolean online,
Map<String,String> readOptions)
Reads the feature group from the offline or online storage as Spark DataFrame.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(Map<String,String> readOptions)
Reads the feature group from the offline storage as Spark DataFrame.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(String wallclockTime)
Reads Feature group into a dataframe at a specific point in time.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(String wallclockTime,
Map<String,String> readOptions)
Reads Feature group into a dataframe at a specific point in time.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
readChanges(String wallclockStartTime,
String wallclockEndTime)
Deprecated.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
readChanges(String wallclockStartTime,
String wallclockEndTime,
Map<String,String> readOptions)
Deprecated.
|
void |
save(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData)
Deprecated.
|
void |
save(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Map<String,String> writeOptions)
Deprecated.
|
Query |
select(List<String> features)
Select a subset of features of the feature group and return a query object.
|
Query |
selectAll()
Select all features of the feature group and return a query object.
|
Query |
selectExcept(List<String> features)
Select all features including primary key and event time feature of the feature group except provided `features`
and return a query object.
|
Query |
selectExceptFeatures(List<Feature> features)
Select all features including primary key and event time feature of the feature group except provided `features`
and return a query object.
|
Query |
selectFeatures(List<Feature> features)
Select a subset of features of the feature group and return a query object.
|
void |
show(int numRows)
Show the first `n` rows of the feature group.
|
void |
show(int numRows,
boolean online)
Show the first `n` rows of the feature group.
|
void |
updateFeatures(Feature feature)
Update the metadata of feature.
|
void |
updateFeatures(List<Feature> features)
Update the metadata of multiple features.
|
addTag, delete, deleteTag, getAvroSchema, getComplexFeatures, getDeserializedAvroSchema, getDeserializedEncodedAvroSchema, getEncodedAvroSchema, getFeature, getFeatureAvroSchema, getPrimaryKeys, getSubject, getTag, getTags, unloadSubject, updateDescription, updateFeatureDescription, updateStatisticsConfig
protected StatisticsEngine statisticsEngine
public FeatureGroup(FeatureStore featureStore, @NonNull @NonNull String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName, String eventTime)
public FeatureGroup()
public FeatureGroup(FeatureStore featureStore, Integer id)
public FeatureGroup(FeatureStore featureStore, int id)
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read() throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// read feature group
fg.read()
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was
found for this feature group;IOException
- Generic IO exception.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(boolean online) throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// read feature group data from online storage
fg.read(true)
// read feature group data from offline storage
fg.read(false)
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
online
- Set `online` to `true` to read from the online storage.FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was found
for this feature group;IOException
- Generic IO exception.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(Map<String,String> readOptions) throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// define additional read options (this example applies to HUDI enabled FGs)
Map<String, String> readOptions = new HashMap<String, String>() {{
put("hoodie.datasource.read.end.instanttime", "20230401211015")
}};
// read feature group data
fg.read(readOptions)
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
readOptions
- Additional read options as key/value pairs.FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was found
for this feature group.IOException
- Generic IO exception.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(boolean online, Map<String,String> readOptions) throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// define additional read options (this example applies to HUDI enabled FGs)
Map<String, String> readOptions = new HashMap<String, String>() {{
put("hoodie.datasource.read.end.instanttime", "20230401211015")
}};
// read feature group data from online storage
fg.read(true, readOptions)
// read feature group data from offline storage
fg.read(false, readOptions)
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
online
- Set `online` to `true` to read from the online storage.readOptions
- Additional read options as key/value pairs.FeatureStoreException
- In case it cannot run read query on storage and/or no
commit information was found for this feature group.IOException
- Generic IO exception.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(String wallclockTime) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// read feature group data as of specific point in time (Hudi commit timestamp).
fg.read("20230205210923")
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Read data as of this point in time. Datetime string. The String should be formatted in one of
the following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.FeatureStoreException
- In case it's unable to identify format of the provided wallclockTime date formatIOException
- Generic IO exception.ParseException
- In case it's unable to parse provided wallclockTime to date type.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(String wallclockTime, Map<String,String> readOptions) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// define additional read options (this example applies to HUDI enabled FGs)
Map<String, String> readOptions = new HashMap<String, String>() {{
put("hoodie.datasource.read.end.instanttime", "20230401211015")
}};
// read feature group data as of specific point in time (Hudi commit timestamp).
fg.read("20230205210923", readOptions)
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Datetime string. The String should be formatted in one of the
following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.readOptions
- Additional read options as key-value pairs.FeatureStoreException
- In case it's unable to identify format of the provided wallclockTime date formatIOException
- Generic IO exception.ParseException
- In case it's unable to parse provided wallclockTime to date type.@Deprecated public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> readChanges(String wallclockStartTime, String wallclockEndTime) throws FeatureStoreException, IOException, ParseException
@Deprecated public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> readChanges(String wallclockStartTime, String wallclockEndTime, Map<String,String> readOptions) throws FeatureStoreException, IOException, ParseException
public Query asOf(String wallclockTime) throws FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// get query object to retrieve feature group feature data as of
// specific point in time (Hudi commit timestamp).
fg.asOf("20230205210923")
asOf
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Read data as of this point in time. Datetime string. The String should be formatted in one of
the following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.FeatureStoreException
- In case it's unable to identify format of the provided wallclockTime date formatParseException
- In case it's unable to parse provided wallclockTime to date type.public Query asOf(String wallclockTime, String excludeUntil) throws FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// get query object to retrieve feature group feature data as of specific point in time "20230205210923"
// but exclude commits until "20230204073411" (Hudi commit timestamp).
fg.asOf("20230205210923", "20230204073411")
asOf
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Read data as of this point in time. Datetime string. The String should be formatted in one of
the following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.excludeUntil
- Exclude commits until this point in time. Datetime string. The String should be formatted in
one of the following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.FeatureStoreException
- In case it's unable to identify format of the provided wallclockTime date format.ParseException
- In case it's unable to parse provided wallclockTime to date type.public void show(int numRows) throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// show top 5 lines of feature group data.
fg.show(5);
show
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
numRows
- Number of rows to show.FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was found
for this feature group;IOException
- Generic IO exception.public void show(int numRows, boolean online) throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// show top 5 lines of feature data from online storage.
fg.show(5, true);
show
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
numRows
- Number of rows to show.online
- If `true` read from online feature store.FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was found
for this feature group;IOException
- Generic IO exception.@Deprecated public void save(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData) throws FeatureStoreException, IOException, ParseException
@Deprecated public void save(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
//insert feature data
fg.insert(featureData);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- spark DataFrame, RDD. Features to be saved.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// Define additional write options (this example applies to HUDI enabled FGs)
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// insert feature data
fg.insert(featureData, writeOptions);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.writeOptions
- Additional write options as key-value pairs.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Storage storage) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// insert feature data in offline only
fg.insert(featureData, Storage.OFFLINE);
// Or insert feature data in online only
fg.insert(featureData, Storage.ONLINE);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.storage
- Overwrite default behaviour, write to offline storage only with `Storage.OFFLINE` or online only
with `Storage.ONLINE`IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, boolean overwrite) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// insert feature data and drop all data in the feature group before inserting new data
fg.insert(featureData, true);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.overwrite
- Drop all data in the feature group before inserting new data. This does not affect metadata.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Storage storage, boolean overwrite) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// insert feature data in offline only and drop all data in the feature group before inserting new data
fg.insert(featureData, Storage.OFFLINE, true);
// Or insert feature data in online only and drop all data in the feature group before inserting new data
fg.insert(featureData, Storage.ONLINE, true);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.storage
- Overwrite default behaviour, write to offline storage only with `Storage.OFFLINE` or online only
with `Storage.ONLINE`.overwrite
- Drop all data in the feature group before inserting new data. This does not affect metadata.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, boolean overwrite, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// Define additional write options (this example applies to HUDI enabled FGs)
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// insert feature data and drop all data in the feature group before inserting new data
fg.insert(featureData, true, writeOptions);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.overwrite
- Drop all data in the feature group before inserting new data. This does not affect metadata.writeOptions
- Additional write options as key-value pairs.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, HudiOperationType operation) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// Define additional write options (this example applies to HUDI enabled FGs)
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// insert feature data
fg.insert(featureData, HudiOperationType.INSERT);
// upsert feature data
fg.insert(featureData, HudiOperationType.UPSERT);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.operation
- commit operation type, INSERT or UPSERT.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Storage storage, boolean overwrite, HudiOperationType operation, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// Define additional write options (this example applies to HUDI enabled FGs)
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// insert feature data in offline only with additional write options and drop all previous data before new
// data is inserted
fg.insert(featureData, Storage.OFFLINE, true, HudiOperationType.INSERT, writeOptions);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.storage
- Overwrite default behaviour, write to offline storage only with `Storage.OFFLINE` or online only
with `Storage.ONLINE`.overwrite
- Drop all data in the feature group before inserting new data. This does not affect metadata.operation
- commit operation type, INSERT or UPSERT.writeOptions
- Additional write options as key-value pairs.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
IOException
ParseException
public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, boolean overwrite, Map<String,String> writeOptions, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
IOException
ParseException
@Deprecated public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData) throws org.apache.spark.sql.streaming.StreamingQueryException, IOException, FeatureStoreException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark dataframe containing feature dataorg.apache.spark.sql.streaming.StreamingQueryException
- StreamingQueryExceptionIOException
- IOExceptionFeatureStoreException
- FeatureStoreExceptionTimeoutException
- TimeoutExceptionParseException
- ParseException@Deprecated public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName) throws org.apache.spark.sql.streaming.StreamingQueryException, IOException, FeatureStoreException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
org.apache.spark.sql.streaming.StreamingQueryException
IOException
FeatureStoreException
TimeoutException
ParseException
@Deprecated public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Map<String,String> writeOptions) throws FeatureStoreException, IOException, org.apache.spark.sql.streaming.StreamingQueryException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
IOException
org.apache.spark.sql.streaming.StreamingQueryException
TimeoutException
ParseException
public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, Map<String,String> writeOptions) throws FeatureStoreException, IOException, org.apache.spark.sql.streaming.StreamingQueryException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
IOException
org.apache.spark.sql.streaming.StreamingQueryException
TimeoutException
ParseException
@Deprecated public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode) throws org.apache.spark.sql.streaming.StreamingQueryException, IOException, FeatureStoreException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
org.apache.spark.sql.streaming.StreamingQueryException
IOException
FeatureStoreException
TimeoutException
ParseException
@Deprecated public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, String checkpointLocation) throws FeatureStoreException, IOException, org.apache.spark.sql.streaming.StreamingQueryException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
IOException
org.apache.spark.sql.streaming.StreamingQueryException
TimeoutException
ParseException
@Deprecated public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout) throws org.apache.spark.sql.streaming.StreamingQueryException, IOException, FeatureStoreException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark dataframe containing feature dataqueryName
- name of spark StreamingQueryoutputMode
- outputModeawaitTermination
- whether or not to wait for query Terminationtimeout
- timeoutorg.apache.spark.sql.streaming.StreamingQueryException
- StreamingQueryExceptionIOException
- IOExceptionFeatureStoreException
- FeatureStoreExceptionTimeoutException
- TimeoutExceptionParseException
- ParseException@Deprecated public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation) throws FeatureStoreException, IOException, org.apache.spark.sql.streaming.StreamingQueryException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
IOException
org.apache.spark.sql.streaming.StreamingQueryException
TimeoutException
ParseException
@Deprecated public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, boolean awaitTermination, String checkpointLocation) throws org.apache.spark.sql.streaming.StreamingQueryException, IOException, FeatureStoreException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark dataframe containing feature dataqueryName
- name of spark StreamingQueryoutputMode
- outputModeawaitTermination
- whether or not to wait for query TerminationcheckpointLocation
- path to checkpoint location directoryorg.apache.spark.sql.streaming.StreamingQueryException
- StreamingQueryExceptionIOException
- IOExceptionFeatureStoreException
- FeatureStoreExceptionTimeoutException
- TimeoutExceptionParseException
- ParseException@Deprecated public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String,String> writeOptions) throws FeatureStoreException, IOException, org.apache.spark.sql.streaming.StreamingQueryException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
IOException
org.apache.spark.sql.streaming.StreamingQueryException
TimeoutException
ParseException
public Object insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String,String> writeOptions, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, org.apache.spark.sql.streaming.StreamingQueryException, TimeoutException, ParseException
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
IOException
org.apache.spark.sql.streaming.StreamingQueryException
TimeoutException
ParseException
public void commitDeleteRecord(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// Drops records of feature data and commit
fg.commitDeleteRecord(featureData);
commitDeleteRecord
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Feature data to be deleted.FeatureStoreException
- If Client is not connected to Hopsworks and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void commitDeleteRecord(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// Define additional write options (this example applies to HUDI enabled FGs)
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// Drops records of feature data and commit
fg.commitDeleteRecord(featureData, writeOptions);
commitDeleteRecord
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Feature data to be deleted.writeOptions
- Additional write options as key-value pairs.FeatureStoreException
- If Client is not connected to Hopsworks and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public Map<Long,Map<String,String>> commitDetails() throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// get commit timeline.
fg.commitDetails();
commitDetails
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
- If Client is not connected to Hopsworks and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public Map<Long,Map<String,String>> commitDetails(Integer limit) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// get latest 10 commit details.
fg.commitDetails(10);
commitDetails
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
limit
- number of commits to return.FeatureStoreException
- If Client is not connected to Hopsworks and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public Map<Long,Map<String,String>> commitDetails(String wallclockTime) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// get commit details as of 20230206
fg.commitDetails("20230206");
commitDetails
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Datetime string. The String should be formatted in one of the
following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.FeatureStoreException
- If Client is not connected to Hopsworks, unable to identify format of the
provided wallclockTime date format and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public Map<Long,Map<String,String>> commitDetails(String wallclockTime, Integer limit) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// get top 10 commit details as of 20230206
fg.commitDetails("20230206", 10);
commitDetails
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Datetime string. The String should be formatted in one of the
following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.limit
- number of commits to return.FeatureStoreException
- If Client is not connected to Hopsworks, unable to identify format of the
provided wallclockTime date format and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public Query selectFeatures(List<Feature> features)
selectFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature meta data objects.public Query select(List<String> features)
select
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature names.public Query selectAll()
selectAll
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
public Query selectExceptFeatures(List<Feature> features)
selectExceptFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature meta data objects.public Query selectExcept(List<String> features)
selectExcept
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature names.public void updateFeatures(List<Feature> features) throws FeatureStoreException, IOException, ParseException
updateFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature metadata objectsFeatureStoreException
- If Client is not connected to Hopsworks, unable to identify date format and/or
no commit information was found for this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse date string to date type.public void updateFeatures(Feature feature) throws FeatureStoreException, IOException, ParseException
updateFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
feature
- Feature metadata objectFeatureStoreException
- If Client is not connected to Hopsworks, unable to identify date format and/or
no commit information was found for this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse date string to date type.public void appendFeatures(List<Feature> features) throws FeatureStoreException, IOException, ParseException
appendFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- list of Feature metadata objectsFeatureStoreException
- If Client is not connected to Hopsworks, unable to identify date format and/or
no commit information was found for this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse date string to date type.public void appendFeatures(Feature features) throws FeatureStoreException, IOException, ParseException
appendFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature metadata objectsFeatureStoreException
- If Client is not connected to Hopsworks, unable to identify date format and/or
no commit information was found for this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse date string to date type.public Statistics computeStatistics() throws FeatureStoreException, IOException
computeStatistics
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
- If Client is not connected to Hopsworks,IOException
- Generic IO exception.public Statistics computeStatistics(String wallclockTime) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
FeatureGroup fg = fs.getFeatureGroup("electricity_prices", 1);
// compute statistics as of 20230206
fg.computeStatistics("20230206", 10);
computeStatistics
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Datetime string. The String should be formatted in one of the
following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.FeatureStoreException
- In case Client is not connected to Hopsworks, unable to identify format of the
provided wallclockTime date format and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI and or statistics commit date string to date type.public Statistics getStatistics() throws FeatureStoreException, IOException
getStatistics
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
- In case Client is not connected to Hopsworks, unable to identify format of the
provided wallclockTime date format and/or no commit information was found for
this feature group;IOException
- Generic IO exception.Copyright © 2023. All rights reserved.