public class StreamFeatureGroup extends FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
| Modifier and Type | Field and Description | 
|---|---|
protected FeatureGroupEngine | 
featureGroupEngine  | 
created, creator, deltaStreamerJobConf, description, eventTime, expectationsNames, featureGroupEngineBase, features, featureStore, hudiPrecombineKey, id, location, LOGGER, name, onlineEnabled, onlineTopicName, partitionKeys, primaryKeys, statisticColumns, statisticsConfig, subject, timeTravelFormat, type, utils, version| Constructor and Description | 
|---|
StreamFeatureGroup()  | 
StreamFeatureGroup(FeatureStore featureStore,
                  int id)  | 
StreamFeatureGroup(FeatureStore featureStore,
                  @NonNull String name,
                  Integer version,
                  String description,
                  List<String> primaryKeys,
                  List<String> partitionKeys,
                  String hudiPrecombineKey,
                  boolean onlineEnabled,
                  List<Feature> features,
                  StatisticsConfig statisticsConfig,
                  String onlineTopicName,
                  String eventTime)  | 
StreamFeatureGroup(Integer id,
                  String description,
                  List<Feature> features)  | 
| Modifier and Type | Method and Description | 
|---|---|
void | 
appendFeatures(Feature feature)
Append a single feature to the schema of the feature group. 
 | 
void | 
appendFeatures(List<Feature> feature)
Append features to the schema of the feature group. 
 | 
QueryBase | 
asOf(String wallclockTime)  | 
QueryBase | 
asOf(String wallclockTime,
    String excludeUntil)  | 
void | 
commitDeleteRecord(org.apache.flink.streaming.api.datastream.DataStream<?> featureData)  | 
void | 
commitDeleteRecord(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                  Map<String,String> writeOptions)  | 
Map<Long,Map<String,String>> | 
commitDetails()  | 
Map<Long,Map<String,String>> | 
commitDetails(Integer integer)  | 
Map<Long,Map<String,String>> | 
commitDetails(String limit)  | 
Map<Long,Map<String,String>> | 
commitDetails(String wallclockTime,
             Integer limit)  | 
Statistics | 
computeStatistics()
Recompute the statistics for the feature group and save them to the feature store. 
 | 
Statistics | 
computeStatistics(String wallclockTime)  | 
Statistics | 
getStatistics()
Get the last statistics commit for the feature group. 
 | 
void | 
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData)  | 
void | 
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
      boolean overwrite)  | 
void | 
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
      boolean online,
      Map<String,String> writeOptions)  | 
void | 
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
      boolean online,
      Map<String,String> writeOptions,
      JobConfiguration jobConfiguration)  | 
void | 
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
      HudiOperationType hudiOperationType)  | 
void | 
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
      JobConfiguration jobConfiguration)  | 
void | 
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
      Map<String,String> writeOptions)  | 
void | 
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
      Storage storage)  | 
void | 
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
      Storage storage,
      boolean overwrite)  | 
void | 
insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
      Storage storage,
      boolean online,
      HudiOperationType hudiOperationType,
      Map<String,String> writeOptions)  | 
org.apache.flink.streaming.api.datastream.DataStreamSink<?> | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData)
Ingest a feature data to the online feature store using Flink DataStream API. 
 | 
org.apache.flink.streaming.api.datastream.DataStreamSink<?> | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
            Map<String,String> writeOptions)  | 
Object | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
            String queryName)  | 
Object | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
            String queryName,
            Map<String,String> writeOptions)  | 
Object | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
            String queryName,
            String outputMode)  | 
Object | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
            String queryName,
            String outputMode,
            boolean awaitTermination,
            Long timeout)  | 
Object | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
            String queryName,
            String outputMode,
            boolean awaitTermination,
            Long timeout,
            String checkpointLocation)  | 
Object | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
            String queryName,
            String outputMode,
            boolean awaitTermination,
            Long timeout,
            String checkpointLocation,
            Map<String,String> writeOptions)  | 
Object | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
            String queryName,
            String outputMode,
            boolean awaitTermination,
            Long timeout,
            String checkpointLocation,
            Map<String,String> writeOptions,
            JobConfiguration jobConfiguration)  | 
Object | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
            String queryName,
            String outputMode,
            boolean awaitTermination,
            String checkpointLocation)  | 
Object | 
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
            String queryName,
            String outputMode,
            String checkpointLocation)  | 
org.apache.flink.streaming.api.datastream.DataStream<?> | 
read()  | 
org.apache.flink.streaming.api.datastream.DataStream<?> | 
read(boolean online)  | 
org.apache.flink.streaming.api.datastream.DataStream<?> | 
read(boolean online,
    Map<String,String> readOptions)  | 
org.apache.flink.streaming.api.datastream.DataStream<?> | 
read(Map<String,String> readOptions)  | 
org.apache.flink.streaming.api.datastream.DataStream<?> | 
read(String wallclockTime)  | 
org.apache.flink.streaming.api.datastream.DataStream<?> | 
read(String wallclockTime,
    Map<String,String> readOptions)  | 
QueryBase | 
select(List<String> features)  | 
QueryBase | 
selectAll()  | 
QueryBase | 
selectExcept(List<String> features)  | 
QueryBase | 
selectExceptFeatures(List<Feature> features)  | 
QueryBase | 
selectFeatures(List<Feature> features)  | 
void | 
show(int numRows)  | 
void | 
show(int numRows,
    boolean online)  | 
void | 
updateFeatures(Feature feature)
Update the metadata of multiple features. 
 | 
void | 
updateFeatures(List<Feature> feature)
Update the metadata of multiple features. 
 | 
addTag, delete, deleteTag, getAvroSchema, getComplexFeatures, getDeserializedAvroSchema, getDeserializedEncodedAvroSchema, getEncodedAvroSchema, getFeature, getFeatureAvroSchema, getPrimaryKeys, getSubject, getTag, getTags, unloadSubject, updateDescription, updateFeatureDescription, updateStatisticsConfigprotected FeatureGroupEngine featureGroupEngine
public StreamFeatureGroup(FeatureStore featureStore, @NonNull @NonNull String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName, String eventTime)
public StreamFeatureGroup()
public StreamFeatureGroup(Integer id, String description, List<Feature> features)
public StreamFeatureGroup(FeatureStore featureStore, int id)
public org.apache.flink.streaming.api.datastream.DataStream<?> read()
                                                             throws FeatureStoreException,
                                                                    IOException
read in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionpublic org.apache.flink.streaming.api.datastream.DataStream<?> read(boolean online)
                                                             throws FeatureStoreException,
                                                                    IOException
read in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionpublic org.apache.flink.streaming.api.datastream.DataStream<?> read(Map<String,String> readOptions) throws FeatureStoreException, IOException
read in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionpublic org.apache.flink.streaming.api.datastream.DataStream<?> read(boolean online,
                                                                    Map<String,String> readOptions)
                                                             throws FeatureStoreException,
                                                                    IOException
read in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionpublic org.apache.flink.streaming.api.datastream.DataStream<?> read(String wallclockTime) throws FeatureStoreException, IOException, ParseException
read in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic org.apache.flink.streaming.api.datastream.DataStream<?> read(String wallclockTime, Map<String,String> readOptions) throws FeatureStoreException, IOException, ParseException
read in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic QueryBase asOf(String wallclockTime) throws FeatureStoreException, ParseException
asOf in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionParseExceptionpublic QueryBase asOf(String wallclockTime, String excludeUntil) throws FeatureStoreException, ParseException
asOf in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionParseExceptionpublic void show(int numRows)
          throws FeatureStoreException,
                 IOException
show in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionpublic void show(int numRows,
                 boolean online)
          throws FeatureStoreException,
                 IOException
show in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionpublic void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData)
            throws IOException,
                   FeatureStoreException,
                   ParseException
insert in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>IOExceptionFeatureStoreExceptionParseExceptionpublic void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                   Map<String,String> writeOptions)
            throws FeatureStoreException,
                   IOException,
                   ParseException
insert in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                   Storage storage)
            throws IOException,
                   FeatureStoreException,
                   ParseException
insert in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>IOExceptionFeatureStoreExceptionParseExceptionpublic void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                   boolean overwrite)
            throws IOException,
                   FeatureStoreException,
                   ParseException
insert in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>IOExceptionFeatureStoreExceptionParseExceptionpublic void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                   Storage storage,
                   boolean overwrite)
            throws IOException,
                   FeatureStoreException,
                   ParseException
insert in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>IOExceptionFeatureStoreExceptionParseExceptionpublic void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                   boolean online,
                   Map<String,String> writeOptions)
            throws FeatureStoreException,
                   IOException,
                   ParseException
insert in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                   HudiOperationType hudiOperationType)
            throws FeatureStoreException,
                   IOException,
                   ParseException
insert in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                   Storage storage,
                   boolean online,
                   HudiOperationType hudiOperationType,
                   Map<String,String> writeOptions)
            throws FeatureStoreException,
                   IOException,
                   ParseException
insert in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                   JobConfiguration jobConfiguration)
            throws FeatureStoreException,
                   IOException,
                   ParseException
insert in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic void insert(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                   boolean online,
                   Map<String,String> writeOptions,
                   JobConfiguration jobConfiguration)
            throws FeatureStoreException,
                   IOException,
                   ParseException
insert in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic void commitDeleteRecord(org.apache.flink.streaming.api.datastream.DataStream<?> featureData)
                        throws FeatureStoreException,
                               IOException,
                               ParseException
commitDeleteRecord in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic void commitDeleteRecord(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                               Map<String,String> writeOptions)
                        throws FeatureStoreException,
                               IOException,
                               ParseException
commitDeleteRecord in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic Map<Long,Map<String,String>> commitDetails() throws IOException, FeatureStoreException, ParseException
commitDetails in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>IOExceptionFeatureStoreExceptionParseExceptionpublic Map<Long,Map<String,String>> commitDetails(Integer integer) throws IOException, FeatureStoreException, ParseException
commitDetails in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>IOExceptionFeatureStoreExceptionParseExceptionpublic Map<Long,Map<String,String>> commitDetails(String limit) throws IOException, FeatureStoreException, ParseException
commitDetails in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>IOExceptionFeatureStoreExceptionParseExceptionpublic Map<Long,Map<String,String>> commitDetails(String wallclockTime, Integer limit) throws IOException, FeatureStoreException, ParseException
commitDetails in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>IOExceptionFeatureStoreExceptionParseExceptionpublic QueryBase selectFeatures(List<Feature> features)
selectFeatures in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>public QueryBase select(List<String> features)
select in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>public QueryBase selectAll()
selectAll in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>public QueryBase selectExceptFeatures(List<Feature> features)
selectExceptFeatures in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>public QueryBase selectExcept(List<String> features)
selectExcept in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>public org.apache.flink.streaming.api.datastream.DataStreamSink<?> insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData)
                                                                         throws Exception
 
        // get feature store handle
        FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
        // get feature group handle
        StreamFeatureGroup fg = fs.getStreamFeatureGroup("card_transactions", 1);
        // read stream from the source and aggregate stream
        DataStream<TransactionAgg> aggregationStream =
          env.fromSource(transactionSource, customWatermark, "Transaction Kafka Source")
          .keyBy(r -> r.getCcNum())
          .window(SlidingEventTimeWindows.of(Time.minutes(windowLength), Time.minutes(1)))
          .aggregate(new TransactionCountAggregate());
        // insert streaming feature data
        fg.insertStream(featureData);
 
 insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>featureData - Features in Streaming Dataframe to be saved.Exceptionpublic org.apache.flink.streaming.api.datastream.DataStreamSink<?> insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
                                                                                Map<String,String> writeOptions)
                                                                         throws Exception
insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>Exceptionpublic Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName) throws Exception
insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>Exceptionpublic Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, Map<String,String> writeOptions) throws Exception
insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>Exceptionpublic Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode) throws Exception
insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>Exceptionpublic Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, String checkpointLocation) throws Exception
insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>Exceptionpublic Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout) throws Exception
insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>Exceptionpublic Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation) throws Exception
insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>Exceptionpublic Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String,String> writeOptions) throws Exception
insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>Exceptionpublic Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, boolean awaitTermination, String checkpointLocation) throws Exception
insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>Exceptionpublic Object insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String,String> writeOptions, JobConfiguration jobConfiguration) throws Exception
insertStream in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>Exceptionpublic void updateFeatures(Feature feature) throws FeatureStoreException, IOException, ParseException
FeatureGroupBaseupdateFeatures in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>feature - Feature metadata objectFeatureStoreException - FeatureStoreExceptionIOException - IOExceptionParseException - ParseExceptionpublic void updateFeatures(List<Feature> feature) throws FeatureStoreException, IOException, ParseException
FeatureGroupBaseupdateFeatures in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>feature - List of Feature metadata objectsFeatureStoreException - FeatureStoreExceptionIOException - IOExceptionParseException - ParseExceptionpublic void appendFeatures(List<Feature> feature) throws FeatureStoreException, IOException, ParseException
FeatureGroupBaseappendFeatures in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>feature - list of Feature metadata objectsFeatureStoreException - FeatureStoreExceptionIOException - IOExceptionParseException - ParseExceptionpublic void appendFeatures(Feature feature) throws FeatureStoreException, IOException, ParseException
FeatureGroupBaseappendFeatures in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>feature - List of Feature metadata objectsFeatureStoreException - FeatureStoreExceptionIOException - IOExceptionParseException - ParseExceptionpublic Statistics computeStatistics() throws FeatureStoreException, IOException, ParseException
FeatureGroupBasecomputeStatistics in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreException - FeatureStoreExceptionIOException - IOExceptionParseExceptionpublic Statistics computeStatistics(String wallclockTime) throws FeatureStoreException, IOException, ParseException
computeStatistics in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreExceptionIOExceptionParseExceptionpublic Statistics getStatistics() throws FeatureStoreException, IOException
FeatureGroupBasegetStatistics in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>FeatureStoreException - FeatureStoreExceptionIOException - IOExceptionCopyright © 2023. All rights reserved.