public class StreamFeatureGroup extends FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Modifier and Type | Field and Description |
---|---|
protected FeatureGroupEngine |
featureGroupEngine |
created, creator, deltaStreamerJobConf, deprecated, description, eventTime, expectationsNames, featureGroupEngineBase, features, featureStore, hudiPrecombineKey, id, location, LOGGER, name, notificationTopicName, onlineEnabled, onlineTopicName, partitionKeys, path, primaryKeys, statisticColumns, statisticsConfig, storageConnector, subject, timeTravelFormat, topicName, type, utils, version
Constructor and Description |
---|
StreamFeatureGroup() |
StreamFeatureGroup(FeatureStore featureStore,
int id) |
StreamFeatureGroup(FeatureStore featureStore,
@NonNull String name,
Integer version,
String description,
List<String> primaryKeys,
List<String> partitionKeys,
String hudiPrecombineKey,
boolean onlineEnabled,
List<Feature> features,
TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig,
String onlineTopicName,
String topicName,
String notificationTopicName,
String eventTime,
StorageConnector storageConnector,
String path) |
StreamFeatureGroup(Integer id,
String description,
List<Feature> features) |
Modifier and Type | Method and Description |
---|---|
org.apache.flink.streaming.api.datastream.DataStreamSink<?> |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData)
Ingest a feature data to the online feature store using Flink DataStream API.
|
org.apache.flink.streaming.api.datastream.DataStreamSink<?> |
insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData,
Map<String,String> writeOptions) |
void |
save()
Save the feature group metadata on Hopsworks.
|
void |
save(Map<String,String> writeOptions,
JobConfiguration materializationJobConfiguration)
Save the feature group metadata on Hopsworks.
|
addTag, checkDeprecated, delete, deleteTag, getAvroSchema, getComplexFeatures, getDeserializedAvroSchema, getDeserializedEncodedAvroSchema, getEncodedAvroSchema, getFeature, getFeatureAvroSchema, getPrimaryKeys, getSubject, getTag, getTags, setDeprecated, unloadSubject, updateDeprecated, updateDeprecated, updateDescription, updateFeatureDescription, updateNotificationTopicName, updateStatisticsConfig
protected FeatureGroupEngine featureGroupEngine
public StreamFeatureGroup(FeatureStore featureStore, @NonNull @NonNull String name, Integer version, String description, List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey, boolean onlineEnabled, List<Feature> features, TimeTravelFormat timeTravelFormat, StatisticsConfig statisticsConfig, String onlineTopicName, String topicName, String notificationTopicName, String eventTime, StorageConnector storageConnector, String path)
public StreamFeatureGroup()
public StreamFeatureGroup(Integer id, String description, List<Feature> features)
public StreamFeatureGroup(FeatureStore featureStore, int id)
public void save() throws FeatureStoreException, IOException
FeatureStoreException
IOException
public void save(Map<String,String> writeOptions, JobConfiguration materializationJobConfiguration) throws FeatureStoreException, IOException
writeOptions
- Options to provide to the materialization jobmaterializationJobConfiguration
- Resource configuration for the materialization jobFeatureStoreException
IOException
public org.apache.flink.streaming.api.datastream.DataStreamSink<?> insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData) throws Exception
// get feature store handle
FeatureStore fs = HopsworksConnection.builder().build().getFeatureStore();
// get feature group handle
StreamFeatureGroup fg = fs.getStreamFeatureGroup("card_transactions", 1);
// read stream from the source and aggregate stream
DataStream<TransactionAgg> aggregationStream =
env.fromSource(transactionSource, customWatermark, "Transaction Kafka Source")
.keyBy(r -> r.getCcNum())
.window(SlidingEventTimeWindows.of(Time.minutes(windowLength), Time.minutes(1)))
.aggregate(new TransactionCountAggregate());
// insert streaming feature data
fg.insertStream(featureData);
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
featureData
- Features in Streaming Dataframe to be saved.Exception
public org.apache.flink.streaming.api.datastream.DataStreamSink<?> insertStream(org.apache.flink.streaming.api.datastream.DataStream<?> featureData, Map<String,String> writeOptions) throws Exception
insertStream
in class FeatureGroupBase<org.apache.flink.streaming.api.datastream.DataStream<?>>
Exception
Copyright © 2025. All rights reserved.