public class StreamFeatureGroup extends FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
Modifier and Type | Field and Description |
---|---|
protected FeatureGroupEngine |
featureGroupEngine |
created, creator, deltaStreamerJobConf, description, eventTime, expectationsNames, featureGroupEngineBase, features, featureStore, hudiPrecombineKey, id, location, LOGGER, name, onlineEnabled, onlineTopicName, partitionKeys, primaryKeys, statisticColumns, statisticsConfig, subject, timeTravelFormat, type, utils, version
Constructor and Description |
---|
StreamFeatureGroup() |
StreamFeatureGroup(FeatureStore featureStore,
int id) |
StreamFeatureGroup(FeatureStore featureStore,
@NonNull String name,
Integer version,
String description,
List<String> primaryKeys,
List<String> partitionKeys,
String hudiPrecombineKey,
boolean onlineEnabled,
List<Feature> features,
StatisticsConfig statisticsConfig,
String onlineTopicName,
String eventTime) |
StreamFeatureGroup(Integer id,
String description,
List<Feature> features) |
Modifier and Type | Method and Description |
---|---|
void |
appendFeatures(Feature features)
Append a single feature to the schema of the stream feature group.
|
void |
appendFeatures(List<Feature> features)
Append features to the schema of the stream feature group.
|
Query |
asOf(String wallclockTime)
Get Query object to retrieve all features of the group at a point in the past.
|
Query |
asOf(String wallclockTime,
String excludeUntil)
Get Query object to retrieve all features of the group at a point in the past.
|
void |
commitDeleteRecord(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData)
Drops records present in the provided DataFrame and commits it as update to this Stream Feature group.
|
void |
commitDeleteRecord(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Map<String,String> writeOptions)
Drops records present in the provided DataFrame and commits it as update to this Stream Feature group.
|
Map<Long,Map<String,String>> |
commitDetails()
Retrieves commit timeline for this stream feature group.
|
Map<Long,Map<String,String>> |
commitDetails(Integer limit)
/**
Retrieves commit timeline for this stream feature group.
|
Map<Long,Map<String,String>> |
commitDetails(String wallclockTime)
Return commit details as of specific point in time.
|
Map<Long,Map<String,String>> |
commitDetails(String wallclockTime,
Integer limit)
Return commit details as of specific point in time.
|
Statistics |
computeStatistics()
Recompute the statistics for the stream feature group and save them to the feature store.
|
Statistics |
computeStatistics(String wallclockTime)
Recompute the statistics for the feature group and save them to the feature store.
|
Statistics |
getStatistics()
Get the last statistics commit for the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData)
Incrementally insert data to a stream feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
boolean overwrite)
Incrementally insert data to a stream feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
boolean overwrite,
Map<String,String> writeOptions)
Incrementally insert data to a stream feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
boolean overwrite,
Map<String,String> writeOptions,
JobConfiguration jobConfiguration)
Incrementally insert data to a stream feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
HudiOperationType operation) |
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
JobConfiguration jobConfiguration)
Incrementally insert data to a stream feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Map<String,String> writeOptions)
Incrementally insert data to a stream feature group or overwrite all data contained in the feature group.
|
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Storage storage) |
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Storage storage,
boolean overwrite) |
void |
insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Storage storage,
boolean overwrite,
HudiOperationType operation,
Map<String,String> writeOptions)
Incrementally insert data to a stream feature group or overwrite all data contained in the feature group.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Map<String,String> writeOptions)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
Map<String,String> writeOptions)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout,
String checkpointLocation)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout,
String checkpointLocation,
Map<String,String> writeOptions)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout,
String checkpointLocation,
Map<String,String> writeOptions,
JobConfiguration jobConfiguration)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
|
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
String checkpointLocation) |
org.apache.spark.sql.streaming.StreamingQuery |
insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
String queryName,
String outputMode,
String checkpointLocation)
Ingest a Spark Structured Streaming Dataframe to the online feature store.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read()
Reads the feature group from the offline storage as Spark DataFrame.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(boolean online)
Reads the stream feature group from the offline or online storage as Spark DataFrame.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(boolean online,
Map<String,String> readOptions)
Reads the stream feature group from the offline or online storage as Spark DataFrame.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(Map<String,String> readOptions)
Reads the stream feature group from the offline storage as Spark DataFrame.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(String wallclockTime)
Reads stream Feature group into a dataframe at a specific point in time.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(String wallclockTime,
Map<String,String> readOptions)
Reads stream Feature group into a dataframe at a specific point in time.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
readChanges(String wallclockStartTime,
String wallclockEndTime)
Deprecated.
`readChanges` method is deprecated. Use `asOf(wallclockEndTime, wallclockStartTime).read()` instead.
|
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
readChanges(String wallclockStartTime,
String wallclockEndTime,
Map<String,String> readOptions)
Deprecated.
|
void |
save(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Map<String,String> writeOptions)
Deprecated.
|
void |
save(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData,
Map<String,String> writeOptions,
JobConfiguration jobConfiguration)
Deprecated.
|
Query |
select(List<String> features)
Select a subset of features of the feature group and return a query object.
|
Query |
selectAll()
Select all features of the feature group and return a query object.
|
Query |
selectExcept(List<String> features)
Select all features including primary key and event time feature of the feature group except provided `features`
and return a query object.
|
Query |
selectExceptFeatures(List<Feature> features)
Select all features including primary key and event time feature of the feature group except provided `features`
and return a query object.
|
Query |
selectFeatures(List<Feature> features)
Select a subset of features of the feature group and return a query object.
|
void |
show(int numRows)
Show the first `n` rows of the feature group.
|
void |
show(int numRows,
boolean online)
Show the first `n` rows of the feature group.
|
void |
updateFeatures(Feature feature)
Update the metadata of feature.
|
void |
updateFeatures(List<Feature> features)
Update the metadata of multiple features.
|
addTag, delete, deleteTag, getAvroSchema, getComplexFeatures, getDeserializedAvroSchema, getDeserializedEncodedAvroSchema, getEncodedAvroSchema, getFeature, getFeatureAvroSchema, getPrimaryKeys, getSubject, getTag, getTags, unloadSubject, updateDescription, updateFeatureDescription, updateStatisticsConfig
protected FeatureGroupEngine featureGroupEngine
public StreamFeatureGroup(FeatureStore featureStore, @NonNull @NonNull String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName, String eventTime)
public StreamFeatureGroup()
public StreamFeatureGroup(Integer id, String description, List<Feature> features)
public StreamFeatureGroup(FeatureStore featureStore, int id)
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read() throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// read feature group
fg.read()
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was found
for this feature group;IOException
- Generic IO exception.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(boolean online) throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// read feature group data from online storage
fg.read(true)
// read feature group data from offline storage
fg.read(false)
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
online
- Set `online` to `true` to read from the online storage.FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was found
for this feature group;IOException
- Generic IO exception.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(Map<String,String> readOptions) throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional read options (this example applies to HUDI enabled FGs)
Map<String, String> readOptions = new HashMap<String, String>() {{
put("hoodie.datasource.read.end.instanttime", "20230401211015")
}};
// read feature group data
fg.read(readOptions)
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
readOptions
- Additional read options as key/value pairs.FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was found
for this feature group.IOException
- Generic IO exception.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(boolean online, Map<String,String> readOptions) throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional read options
Map<String, String> readOptions = new HashMap<String, String>() {{
put("hoodie.datasource.read.end.instanttime", "20230401211015")
}};
// read feature group data from offline storage
fg.read(false, readOptions)
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
online
- Set `online` to `true` to read from the online storage.readOptions
- Additional read options as key/value pairs.FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was found
for this feature group;IOException
- Generic IO exception.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(String wallclockTime) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// read feature group data as of specific point in time (Hudi commit timestamp).
fg.read("20230205210923")
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Read data as of this point in time. Datetime string. The String should be formatted in one of
the following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.FeatureStoreException
- In case it's unable to identify format of the provided wallclockTime date formatIOException
- Generic IO exception.ParseException
- In case it's unable to parse provided wallclockTime to date type.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(String wallclockTime, Map<String,String> readOptions) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional read options
Map<String, String> readOptions = new HashMap<String, String>() {{
put("hoodie.datasource.read.end.instanttime", "20230401211015")
}};
// read stream feature group data as of specific point in time (Hudi commit timestamp).
fg.read("20230205210923", readOptions)
read
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Datetime string. The String should be formatted in one of the
following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.readOptions
- Additional read options as key-value pairs.FeatureStoreException
- In case it's unable to identify format of the provided wallclockTime date formatIOException
- Generic IO exception.ParseException
- In case it's unable to parse provided wallclockTime to date type.public void show(int numRows) throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// show top 5 lines of feature group data.
fg.show(5);
show
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
numRows
- Number of rows to show.FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was found
for this feature group;IOException
- Generic IO exception.public void show(int numRows, boolean online) throws FeatureStoreException, IOException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// show top 5 lines of feature data from online storage.
fg.show(5, true);
show
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
numRows
- Number of rows to show.online
- If `true` read from online feature store.FeatureStoreException
- In case it cannot run read query on storage and/or no commit information was found
for this feature group;IOException
- Generic IO exception.@Deprecated public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> readChanges(String wallclockStartTime, String wallclockEndTime) throws FeatureStoreException, IOException, ParseException
wallclockStartTime
- start date.wallclockEndTime
- end date.FeatureStoreException
- FeatureStoreExceptionIOException
- IOExceptionParseException
- ParseException@Deprecated public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> readChanges(String wallclockStartTime, String wallclockEndTime, Map<String,String> readOptions) throws FeatureStoreException, IOException, ParseException
public Query asOf(String wallclockTime) throws FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// get query object to retrieve stream feature group feature data as of
// specific point in time (Hudi commit timestamp).
fg.asOf("20230205210923")
asOf
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Read data as of this point in time. Datetime string. The String should be formatted in one of
the following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.FeatureStoreException
- In case it's unable to identify format of the provided wallclockTime date formatParseException
- In case it's unable to parse provided wallclockTime to date type.public Query asOf(String wallclockTime, String excludeUntil) throws FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// get query object to retrieve feature group feature data as of specific point in time "20230205210923"
// but exclude commits until "20230204073411" (Hudi commit timestamp).
fg.asOf("20230205210923", "20230204073411")
asOf
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Read data as of this point in time. Datetime string. The String should be formatted in one of
the following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.excludeUntil
- Exclude commits until this point in time. Datetime string. The String should be formatted in
one of the following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.FeatureStoreException
- In case it's unable to identify format of the provided wallclockTime date formatParseException
- In case it's unable to parse provided wallclockTime to date type.@Deprecated public void save(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
@Deprecated public void save(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Map<String,String> writeOptions, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException
public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
//insert feature data
fg.insert(featureData);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- spark DataFrame, RDD. Features to be saved.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional write options
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// insert feature data
fg.insert(featureData, writeOptions);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.writeOptions
- Additional write options as key-value pairs.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Storage storage) throws IOException, FeatureStoreException, ParseException
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
IOException
FeatureStoreException
ParseException
public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, boolean overwrite) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// insert feature data and drop all data in the stream feature group before inserting new data
fg.insert(featureData, true);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.overwrite
- Drop all data in the feature group before inserting new data. This does not affect metadata.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Storage storage, boolean overwrite) throws IOException, FeatureStoreException, ParseException
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
IOException
FeatureStoreException
ParseException
public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, boolean overwrite, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional write options
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// insert feature data and drop all data in the stream feature group before inserting new data
fg.insert(featureData, true, writeOptions);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.overwrite
- Drop all data in the feature group before inserting new data. This does not affect metadata.writeOptions
- Additional write options as key-value pairs.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, HudiOperationType operation) throws FeatureStoreException, IOException, ParseException
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
IOException
ParseException
public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Storage storage, boolean overwrite, HudiOperationType operation, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional write options
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// Define additional write options (this example applies to HUDI enabled FGs)
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// insert feature data in offline only with additional write options and drop all previous data before new
// data is inserted
fg.insert(featureData, Storage.OFFLINE, true, HudiOperationType.INSERT, writeOptions);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.overwrite
- Drop all data in the feature group before inserting new data. This does not affect metadata.operation
- commit operation type, INSERT or UPSERT.writeOptions
- Additional write options as key-value pairs.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// Define job configuration.
JobConfiguration jobConfiguration = new JobConfiguration();
jobConfiguration.setDynamicAllocationEnabled(true);
jobConfiguration.setAmMemory(2048);
// insert feature data
fg.insert(featureData, jobConfiguration);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.jobConfiguration
- configure the Hopsworks Job used to write data into the stream feature group.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void insert(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, boolean overwrite, Map<String,String> writeOptions, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional write options
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// Define job configuration.
JobConfiguration jobConfiguration = new JobConfiguration();
jobConfiguration.setDynamicAllocationEnabled(true);
jobConfiguration.setAmMemory(2048);
// insert feature data
fg.insert(featureData, false, writeOptions, jobConfiguration);
insert
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Features to be saved.overwrite
- Drop all data in the feature group before inserting new data. This does not affect metadata.writeOptions
- Additional write options as key-value pairs.jobConfiguration
- configure the Hopsworks Job used to write data into the stream feature group.IOException
- Generic IO exception.FeatureStoreException
- If client is not connected to Hopsworks; cannot run read query on storage and/or
can't reconcile HUDI schema.ParseException
- In case it's unable to parse HUDI commit date string to date type.public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData)
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// insert feature data
fg.insertStream(featureData);
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Features in Streaming Dataframe to be saved.public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName)
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// insert feature data
fg.insertStream(featureData, queryName);
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Features in Streaming Dataframe to be saved.queryName
- Specify a name for the query to make it easier to recognise in the Spark UIpublic org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Map<String,String> writeOptions)
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional write options
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// insert feature data
fg.insertStream(featureData, writeOptions);
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Features in Streaming Dataframe to be saved.writeOptions
- Additional write options as key-value pairs.public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, Map<String,String> writeOptions)
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional write options
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// insert feature data
fg.insertStream(featureData, queryName, writeOptions);
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Features in Streaming Dataframe to be saved.queryName
- Specify a name for the query to make it easier to recognise in the Spark UIwriteOptions
- Additional write options as key-value pairs.public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode)
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// insert feature data
String queryName = "electricity_prices_streaming_query";
String outputMode = "append";
fg.insertStream(featureData, queryName, outputMode);
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Features in Streaming Dataframe to be saved.queryName
- Specify a name for the query to make it easier to recognise in the Spark UIoutputMode
- Specifies how data of a streaming DataFrame/Dataset is
written to a streaming sink. (1) `"append"`: Only the new rows in the
streaming DataFrame/Dataset will be written to the sink. (2)
`"complete"`: All the rows in the streaming DataFrame/Dataset will be
written to the sink every time there is some update. (3) `"update"`:
only the rows that were updated in the streaming DataFrame/Dataset will
be written to the sink every time there are some updates.
If the query doesn’t contain aggregations, it will be equivalent to
append mode. Default behaviour is `"append"`.public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, String checkpointLocation)
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// insert feature data
String queryName = "electricity_prices_streaming_query";
String outputMode = "append";
String checkpointLocation = "path_to_checkpoint_dir";
fg.insertStream(featureData, queryName outputMode, checkpointLocation);
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Features in Streaming Dataframe to be saved.queryName
- Specify a name for the query to make it easier to recognise in the Spark UIoutputMode
- Specifies how data of a streaming DataFrame/Dataset is
written to a streaming sink. (1) `"append"`: Only the new rows in the
streaming DataFrame/Dataset will be written to the sink. (2)
`"complete"`: All the rows in the streaming DataFrame/Dataset will be
written to the sink every time there is some update. (3) `"update"`:
only the rows that were updated in the streaming DataFrame/Dataset will
be written to the sink every time there are some updates.
If the query doesn’t contain aggregations, it will be equivalent to
append mode.checkpointLocation
- Checkpoint directory location. This will be used to as a reference to
from where to resume the streaming job.public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout)
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// insert feature data
String queryName = "electricity_prices_streaming_query";
String outputMode = "append";
fg.insertStream(featureData, queryName, outputMode, outputMode, true, 1000);
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Features in Streaming Dataframe to be saved.queryName
- Specify a name for the query to make it easier to recognise in the Spark UIoutputMode
- Specifies how data of a streaming DataFrame/Dataset is
written to a streaming sink. (1) `"append"`: Only the new rows in the
streaming DataFrame/Dataset will be written to the sink. (2)
`"complete"`: All the rows in the streaming DataFrame/Dataset will be
written to the sink every time there is some update. (3) `"update"`:
only the rows that were updated in the streaming DataFrame/Dataset will
be written to the sink every time there are some updates.
If the query doesn’t contain aggregations, it will be equivalent to
append mode.awaitTermination
- Waits for the termination of this query, either by
query.stop() or by an exception. If the query has terminated with an
exception, then the exception will be thrown. If timeout is set, it
returns whether the query has terminated or not within the timeout
secondstimeout
- Only relevant in combination with `awaitTermination=true`.public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation)
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// insert feature data
String queryName = "electricity_prices_streaming_query";
String outputMode = "append";
String checkpointLocation = "path_to_checkpoint_dir";
fg.insertStream(featureData, queryName, outputMode, outputMode, true, 1000, checkpointLocation);
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Features in Streaming Dataframe to be saved.queryName
- Specify a name for the query to make it easier to recognise in the Spark UIoutputMode
- Specifies how data of a streaming DataFrame/Dataset is
written to a streaming sink. (1) `"append"`: Only the new rows in the
streaming DataFrame/Dataset will be written to the sink. (2)
`"complete"`: All the rows in the streaming DataFrame/Dataset will be
written to the sink every time there is some update. (3) `"update"`:
only the rows that were updated in the streaming DataFrame/Dataset will
be written to the sink every time there are some updates.
If the query doesn’t contain aggregations, it will be equivalent to
append mode.awaitTermination
- Waits for the termination of this query, either by
query.stop() or by an exception. If the query has terminated with an
exception, then the exception will be thrown. If timeout is set, it
returns whether the query has terminated or not within the timeout
secondstimeout
- Only relevant in combination with `awaitTermination=true`.checkpointLocation
- Checkpoint directory location. This will be used to as a reference to
from where to resume the streaming job.public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String,String> writeOptions)
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional write options
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// insert feature data
String queryName = "electricity_prices_streaming_query";
String outputMode = "append";
String checkpointLocation = "path_to_checkpoint_dir";
fg.insertStream(featureData, queryName, outputMode, outputMode, true, 1000, checkpointLocation,
writeOptions);
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Features in Streaming Dataframe to be saved.queryName
- Specify a name for the query to make it easier to recognise in the Spark UIoutputMode
- Specifies how data of a streaming DataFrame/Dataset is
written to a streaming sink. (1) `"append"`: Only the new rows in the
streaming DataFrame/Dataset will be written to the sink. (2)
`"complete"`: All the rows in the streaming DataFrame/Dataset will be
written to the sink every time there is some update. (3) `"update"`:
only the rows that were updated in the streaming DataFrame/Dataset will
be written to the sink every time there are some updates.
If the query doesn’t contain aggregations, it will be equivalent to
append mode.awaitTermination
- Waits for the termination of this query, either by
query.stop() or by an exception. If the query has terminated with an
exception, then the exception will be thrown. If timeout is set, it
returns whether the query has terminated or not within the timeout
secondstimeout
- Only relevant in combination with `awaitTermination=true`.checkpointLocation
- Checkpoint directory location. This will be used to as a reference to
from where to resume the streaming job.writeOptions
- Additional write options as key-value pairs.public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, boolean awaitTermination, String checkpointLocation)
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
public org.apache.spark.sql.streaming.StreamingQuery insertStream(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String,String> writeOptions, JobConfiguration jobConfiguration)
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional write options
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// Define job configuration.
JobConfiguration jobConfiguration = new JobConfiguration();
jobConfiguration.setDynamicAllocationEnabled(true);
jobConfiguration.setAmMemory(2048);
String queryName = "electricity_prices_streaming_query";
String outputMode = "append";
String checkpointLocation = "path_to_checkpoint_dir";
// insert feature data
fg.insertStream(featureData, queryName, outputMode, outputMode, true, 1000, checkpointLocation,
writeOptions, jobConfiguration);
insertStream
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Features in Streaming Dataframe to be saved.queryName
- Specify a name for the query to make it easier to recognise in the Spark UIoutputMode
- Specifies how data of a streaming DataFrame/Dataset is
written to a streaming sink. (1) `"append"`: Only the new rows in the
streaming DataFrame/Dataset will be written to the sink. (2)
`"complete"`: All the rows in the streaming DataFrame/Dataset will be
written to the sink every time there is some update. (3) `"update"`:
only the rows that were updated in the streaming DataFrame/Dataset will
be written to the sink every time there are some updates.
If the query doesn’t contain aggregations, it will be equivalent to
append mode.awaitTermination
- Waits for the termination of this query, either by
query.stop() or by an exception. If the query has terminated with an
exception, then the exception will be thrown. If timeout is set, it
returns whether the query has terminated or not within the timeout
secondstimeout
- Only relevant in combination with `awaitTermination=true`.checkpointLocation
- Checkpoint directory location. This will be used to as a reference to
from where to resume the streaming job.writeOptions
- Additional write options as key-value pairs.jobConfiguration
- configure the Hopsworks Job used to write data into the stream feature group.public Query selectFeatures(List<Feature> features)
selectFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature meta data objects.public Query select(List<String> features)
select
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature names.public Query selectAll()
selectAll
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
public Query selectExceptFeatures(List<Feature> features)
selectExceptFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature meta data objects.public Query selectExcept(List<String> features)
selectExcept
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature names.public void commitDeleteRecord(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// drop records of feature data and commit
fg.commitDeleteRecord(featureData);
commitDeleteRecord
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Feature data to be deleted.FeatureStoreException
- If Client is not connected to Hopsworks and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void commitDeleteRecord(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> featureData, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// define additional write options
Map<String, String> writeOptions = = new HashMap<String, String>() {{
put("hoodie.bulkinsert.shuffle.parallelism", "5");
put("hoodie.insert.shuffle.parallelism", "5");
put("hoodie.upsert.shuffle.parallelism", "5");}
};
// drop records of feature data and commit
fg.commitDeleteRecord(featureData, writeOptions);
commitDeleteRecord
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
featureData
- Spark DataFrame, RDD. Feature data to be deleted.writeOptions
- Additional write options as key-value pairs.FeatureStoreException
- If Client is not connected to Hopsworks and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public Map<Long,Map<String,String>> commitDetails() throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// get commit timeline
fg.commitDetails();
commitDetails
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
- If Client is not connected to Hopsworks and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public Map<Long,Map<String,String>> commitDetails(Integer limit) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// get latest 10 commit details
fg.commitDetails(10);
commitDetails
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
limit
- number of commits to return.FeatureStoreException
- If Client is not connected to Hopsworks and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public Map<Long,Map<String,String>> commitDetails(String wallclockTime) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
//get commit details as of 20230206
fg.commitDetails("20230206");
commitDetails
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Datetime string. The String should be formatted in one of the
following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.FeatureStoreException
- If Client is not connected to Hopsworks and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public Map<Long,Map<String,String>> commitDetails(String wallclockTime, Integer limit) throws IOException, FeatureStoreException, ParseException
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("electricity_prices", 1);
// get top 10 commit details as of 20230206
fg.commitDetails("20230206", 10);
commitDetails
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- Datetime string. The String should be formatted in one of the
following formats `yyyyMMdd`, `yyyyMMddHH`, `yyyyMMddHHmm`, or `yyyyMMddHHmmss`.limit
- number of commits to return.FeatureStoreException
- If Client is not connected to Hopsworks and/or no commit information was found for
this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse HUDI commit date string to date type.public void updateFeatures(List<Feature> features) throws FeatureStoreException, IOException, ParseException
updateFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature metadata objectsFeatureStoreException
- If Client is not connected to Hopsworks, unable to identify date format and/or
no commit information was found for this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse date string to date type.public void updateFeatures(Feature feature) throws FeatureStoreException, IOException, ParseException
updateFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
feature
- Feature metadata objectFeatureStoreException
- If Client is not connected to Hopsworks, unable to identify date format and/or
no commit information was found for this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse date string to date type.public void appendFeatures(List<Feature> features) throws FeatureStoreException, IOException, ParseException
appendFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- list of Feature metadata objectsFeatureStoreException
- If Client is not connected to Hopsworks, unable to identify date format and/or
no commit information was found for this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse date string to date type.public void appendFeatures(Feature features) throws FeatureStoreException, IOException, ParseException
appendFeatures
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
features
- List of Feature metadata objectsFeatureStoreException
- If Client is not connected to Hopsworks, unable to identify date format and/or
no commit information was found for this feature group;IOException
- Generic IO exception.ParseException
- In case it's unable to parse date string to date type.public Statistics computeStatistics() throws FeatureStoreException, IOException
computeStatistics
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
- If Client is not connected to Hopsworks,IOException
- Generic IO exception.public Statistics computeStatistics(String wallclockTime) throws FeatureStoreException, IOException, ParseException
computeStatistics
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
wallclockTime
- number of commits to return.FeatureStoreException
IOException
ParseException
public Statistics getStatistics() throws FeatureStoreException, IOException
FeatureGroupBase
getStatistics
in class FeatureGroupBase<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>>
FeatureStoreException
- FeatureStoreExceptionIOException
- IOExceptionCopyright © 2023. All rights reserved.