public class StreamFeatureGroup extends FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Modifier and Type | Field and Description |
---|---|
protected FeatureGroupEngine |
featureGroupEngine |
created, creator, deltaStreamerJobConf, description, eventTime, expectationsNames, featureGroupEngineBase, features, featureStore, hudiPrecombineKey, id, location, LOGGER, name, onlineEnabled, onlineTopicName, partitionKeys, primaryKeys, statisticColumns, statisticsConfig, subject, timeTravelFormat, type, utils, version
Constructor and Description |
---|
StreamFeatureGroup() |
StreamFeatureGroup(FeatureStore featureStore,
int id) |
StreamFeatureGroup(FeatureStore featureStore,
@NonNull String name,
Integer version,
String description,
List<String> primaryKeys,
List<String> partitionKeys,
String hudiPrecombineKey,
boolean onlineEnabled,
List<Feature> features,
StatisticsConfig statisticsConfig,
String onlineTopicName,
String eventTime) |
StreamFeatureGroup(Integer id,
String description,
List<Feature> features) |
Modifier and Type | Method and Description |
---|---|
void |
appendFeatures(Feature feature)
Append a single feature to the schema of the feature group.
|
void |
appendFeatures(List<Feature> feature)
Append features to the schema of the feature group.
|
QueryBase |
asOf(String wallclockTime) |
QueryBase |
asOf(String wallclockTime,
String excludeUntil) |
void |
commitDeleteRecord(org.apache.flink.streaming.api.datastream.DataStream<?> featureData) |
void |
commitDeleteRecord(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
Map<String,String> writeOptions) |
Map<Long,Map<String,String>> |
commitDetails() |
Map<Long,Map<String,String>> |
commitDetails(Integer integer) |
Map<Long,Map<String,String>> |
commitDetails(String limit) |
Map<Long,Map<String,String>> |
commitDetails(String wallclockTime,
Integer limit) |
Statistics |
computeStatistics()
Recompute the statistics for the feature group and save them to the feature store.
|
Statistics |
computeStatistics(String wallclockTime) |
Statistics |
getStatistics()
Get the last statistics commit for the feature group.
|
void |
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData) |
void |
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
boolean overwrite) |
void |
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
boolean online,
Map<String,String> writeOptions) |
void |
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
boolean online,
Map<String,String> writeOptions,
JobConfiguration jobConfiguration) |
void |
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
HudiOperationType hudiOperationType) |
void |
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
JobConfiguration jobConfiguration) |
void |
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
Map<String,String> writeOptions) |
void |
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
Storage storage) |
void |
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
Storage storage,
boolean overwrite) |
void |
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
Storage storage,
boolean online,
HudiOperationType hudiOperationType,
Map<String,String> writeOptions) |
org.apache.flink.streaming.api.datastream.DataStreamSink<?> |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData)
Ingest a feature data to the online feature store using Flink DataStream API.
|
org.apache.flink.streaming.api.datastream.DataStreamSink<?> |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
Map<String,String> writeOptions) |
Object |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
String queryName) |
Object |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
String queryName,
Map<String,String> writeOptions) |
Object |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
String queryName,
String outputMode) |
Object |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout) |
Object |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout,
String checkpointLocation) |
Object |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout,
String checkpointLocation,
Map<String,String> writeOptions) |
Object |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout,
String checkpointLocation,
Map<String,String> writeOptions,
JobConfiguration jobConfiguration) |
Object |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
String queryName,
String outputMode,
boolean awaitTermination,
String checkpointLocation) |
Object |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
String queryName,
String outputMode,
String checkpointLocation) |
org.apache.flink.streaming.api.datastream.DataStream<?> |
read() |
org.apache.flink.streaming.api.datastream.DataStream<?> |
read(boolean online) |
org.apache.flink.streaming.api.datastream.DataStream<?> |
read(boolean online,
Map<String,String> readOptions) |
org.apache.flink.streaming.api.datastream.DataStream<?> |
read(Map<String,String> readOptions) |
org.apache.flink.streaming.api.datastream.DataStream<?> |
read(String wallclockTime) |
org.apache.flink.streaming.api.datastream.DataStream<?> |
read(String wallclockTime,
Map<String,String> readOptions) |
QueryBase |
select(List<String> features) |
QueryBase |
selectAll() |
QueryBase |
selectExcept(List<String> features) |
QueryBase |
selectExceptFeatures(List<Feature> features) |
QueryBase |
selectFeatures(List<Feature> features) |
void |
show(int numRows) |
void |
show(int numRows,
boolean online) |
void |
updateFeatures(Feature feature)
Update the metadata of multiple features.
|
void |
updateFeatures(List<Feature> feature)
Update the metadata of multiple features.
|
addTag, delete, deleteTag, getAvroSchema, getComplexFeatures, getDeserializedAvroSchema, getDeserializedEncodedAvroSchema, getEncodedAvroSchema, getFeature, getFeatureAvroSchema, getPrimaryKeys, getSubject, getTag, getTags, unloadSubject, updateDescription, updateFeatureDescription, updateStatisticsConfig
protected FeatureGroupEngine featureGroupEngine
public StreamFeatureGroup(FeatureStore featureStore, @NonNull @NonNull String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName, String eventTime)
public StreamFeatureGroup()
public StreamFeatureGroup(Integer id, String description, List<Feature> features)
public StreamFeatureGroup(FeatureStore featureStore, int id)
public org.apache.flink.streaming.api.datastream.DataStream<?> read() throws FeatureStoreException, IOException
read
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
public org.apache.flink.streaming.api.datastream.DataStream<?> read(boolean online) throws FeatureStoreException, IOException
read
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
public org.apache.flink.streaming.api.datastream.DataStream<?> read(Map<String,String> readOptions) throws FeatureStoreException, IOException
read
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
public org.apache.flink.streaming.api.datastream.DataStream<?> read(boolean online, Map<String,String> readOptions) throws FeatureStoreException, IOException
read
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
public org.apache.flink.streaming.api.datastream.DataStream<?> read(String wallclockTime) throws FeatureStoreException, IOException, ParseException
read
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public org.apache.flink.streaming.api.datastream.DataStream<?> read(String wallclockTime, Map<String,String> readOptions) throws FeatureStoreException, IOException, ParseException
read
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public QueryBase asOf(String wallclockTime) throws FeatureStoreException, ParseException
asOf
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
ParseException
public QueryBase asOf(String wallclockTime, String excludeUntil) throws FeatureStoreException, ParseException
asOf
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
ParseException
public void show(int numRows) throws FeatureStoreException, IOException
show
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
public void show(int numRows, boolean online) throws FeatureStoreException, IOException
show
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
public void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData) throws IOException, FeatureStoreException, ParseException
insert
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
IOException
FeatureStoreException
ParseException
public void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
insert
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, Storage storage) throws IOException, FeatureStoreException, ParseException
insert
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
IOException
FeatureStoreException
ParseException
public void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, boolean overwrite) throws IOException, FeatureStoreException, ParseException
insert
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
IOException
FeatureStoreException
ParseException
public void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, Storage storage, boolean overwrite) throws IOException, FeatureStoreException, ParseException
insert
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
IOException
FeatureStoreException
ParseException
public void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, boolean online, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
insert
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, HudiOperationType hudiOperationType) throws FeatureStoreException, IOException, ParseException
insert
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, Storage storage, boolean online, HudiOperationType hudiOperationType, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
insert
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException
insert
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, boolean online, Map<String,String> writeOptions, JobConfiguration jobConfiguration) throws FeatureStoreException, IOException, ParseException
insert
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public void commitDeleteRecord(org.apache.flink.streaming.api.datastream.DataStream<?> featureData) throws FeatureStoreException, IOException, ParseException
commitDeleteRecord
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public void commitDeleteRecord(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, Map<String,String> writeOptions) throws FeatureStoreException, IOException, ParseException
commitDeleteRecord
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public Map<Long,Map<String,String>> commitDetails() throws IOException, FeatureStoreException, ParseException
commitDetails
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
IOException
FeatureStoreException
ParseException
public Map<Long,Map<String,String>> commitDetails(Integer integer) throws IOException, FeatureStoreException, ParseException
commitDetails
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
IOException
FeatureStoreException
ParseException
public Map<Long,Map<String,String>> commitDetails(String limit) throws IOException, FeatureStoreException, ParseException
commitDetails
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
IOException
FeatureStoreException
ParseException
public Map<Long,Map<String,String>> commitDetails(String wallclockTime, Integer limit) throws IOException, FeatureStoreException, ParseException
commitDetails
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
IOException
FeatureStoreException
ParseException
public QueryBase selectFeatures(List<Feature> features)
selectFeatures
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
public QueryBase select(List<String> features)
select
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
public QueryBase selectAll()
selectAll
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
public QueryBase selectExceptFeatures(List<Feature> features)
selectExceptFeatures
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
public QueryBase selectExcept(List<String> features)
selectExcept
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
public org.apache.flink.streaming.api.datastream.DataStreamSink<?> insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData) throws Exception
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("card_transactions", 1);
// read stream from the source and aggregate stream
DataStream<TransactionAgg> aggregationStream =
env.fromSource(transactionSource, customWatermark, "Transaction Kafka Source")
.keyBy(r -> r.getCcNum())
.window(SlidingEventTimeWindows.of(Time.minutes(windowLength), Time.minutes(1)))
.aggregate(new TransactionCountAggregate());
// insert streaming feature data
fg.insertStream(featureData);
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
featureData
- Features in Streaming Dataframe to be saved.Exception
public org.apache.flink.streaming.api.datastream.DataStreamSink<?> insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, Map<String,String> writeOptions) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
public Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
public Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, Map<String,String> writeOptions) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
public Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
public Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, String checkpointLocation) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
public Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
public Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
public Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String,String> writeOptions) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
public Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, boolean awaitTermination, String checkpointLocation) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
public Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String,String> writeOptions, JobConfiguration jobConfiguration) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
public void updateFeatures(Feature feature) throws FeatureStoreException, IOException, ParseException
FeatureGroupBase
updateFeatures
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
feature
- Feature metadata objectFeatureStoreException
- FeatureStoreExceptionIOException
- IOExceptionParseException
- ParseExceptionpublic void updateFeatures(List<Feature> feature) throws FeatureStoreException, IOException, ParseException
FeatureGroupBase
updateFeatures
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
feature
- List of Feature metadata objectsFeatureStoreException
- FeatureStoreExceptionIOException
- IOExceptionParseException
- ParseExceptionpublic void appendFeatures(List<Feature> feature) throws FeatureStoreException, IOException, ParseException
FeatureGroupBase
appendFeatures
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
feature
- list of Feature metadata objectsFeatureStoreException
- FeatureStoreExceptionIOException
- IOExceptionParseException
- ParseExceptionpublic void appendFeatures(Feature feature) throws FeatureStoreException, IOException, ParseException
FeatureGroupBase
appendFeatures
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
feature
- List of Feature metadata objectsFeatureStoreException
- FeatureStoreExceptionIOException
- IOExceptionParseException
- ParseExceptionpublic Statistics computeStatistics() throws FeatureStoreException, IOException, ParseException
FeatureGroupBase
computeStatistics
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
- FeatureStoreExceptionIOException
- IOExceptionParseException
public Statistics computeStatistics(String wallclockTime) throws FeatureStoreException, IOException, ParseException
computeStatistics
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
IOException
ParseException
public Statistics getStatistics() throws FeatureStoreException, IOException
FeatureGroupBase
getStatistics
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
FeatureStoreException
- FeatureStoreExceptionIOException
- IOExceptionCopyright © 2023. All rights reserved.