public class SparkEngine extends EngineBase
LOGGER, storageConnectorApi| Modifier and Type | Method and Description | 
|---|---|
| String | addFile(String filePath) | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> | castColumnType(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
              List<TrainingDatasetFeature> features) | 
| void | closeSparkSession() | 
| String | constructCheckpointPath(FeatureGroupBase featureGroup,
                       String queryName,
                       String queryPrefix) | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> | convertToDefaultDataframe(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset) | 
| <S> S | createEmptyDataFrame(S datasetGeneric) | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> | encodeComplexFeatures(FeatureGroupBase featureGroupBase,
                     org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset)Encodes all complex type features to binary using their avro type as schema. | 
| String | getCertKey() | 
| static SparkEngine | getInstance() | 
| Map<String,String> | getKafkaConfig(FeatureGroupBase featureGroup,
              Map<String,String> writeOptions) | 
| String | getKeyStorePath() | 
| Map<String,String> | getReadOptions(Map<String,String> providedOptions,
              DataFormat dataFormat) | 
| String | getTrustStorePath() | 
| Map<String,String> | getWriteOptions(Map<String,String> providedOptions,
               DataFormat dataFormat) | 
| protected String | makeQueryName(String queryName,
             FeatureGroupBase featureGroup) | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> | objectToDataset(Object obj) | 
| List<Feature> | parseFeatureGroupSchema(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
                       TimeTravelFormat timeTravelFormat) | 
| String | profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df) | 
| String | profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
       boolean correlation,
       boolean histogram) | 
| String | profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
       List<String> restrictToColumns) | 
| String | profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
       List<String> restrictToColumns,
       Boolean correlation,
       Boolean histogram) | 
| String | profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
       List<String> restrictToColumns,
       Boolean correlation,
       Boolean histogram,
       Boolean exactUniqueness) | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> | read(StorageConnector storageConnector,
    String dataFormat,
    Map<String,String> readOptions,
    String location) | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> | readStream(StorageConnector storageConnector,
          String dataFormat,
          String messageFormat,
          String schema,
          Map<String,String> options,
          boolean includeMetadata) | 
| void | registerHudiTemporaryTable(FeatureGroupAlias featureGroupAlias,
                          Map<String,String> readOptions) | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> | registerOnDemandTemporaryTable(ExternalFeatureGroup onDemandFeatureGroup,
                              String alias) | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> | sanitizeFeatureNames(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset) | 
| static void | setInstance(SparkEngine sparkEngine) | 
| void | setupConnectorHadoopConf(StorageConnector storageConnector) | 
| static String | sparkPath(String path) | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>[] | splitDataset(TrainingDataset trainingDataset,
            Query query,
            Map<String,String> readOptions) | 
| static List<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>> | splitLabels(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
           List<String> labels) | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> | sql(String query) | 
| void | streamToHudiTable(StreamFeatureGroup streamFeatureGroup,
                 Map<String,String> writeOptions) | 
| void | validateSparkConfiguration() | 
| org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>[] | write(TrainingDataset trainingDataset,
     Query query,
     Map<String,String> queryReadOptions,
     Map<String,String> writeOptions,
     org.apache.spark.sql.SaveMode saveMode)Setup Spark to write the data on the File System. | 
| void | writeEmptyDataframe(FeatureGroupBase featureGroup) | 
| void | writeOfflineDataframe(FeatureGroupBase featureGroup,
                     org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
                     HudiOperationType operation,
                     Map<String,String> writeOptions,
                     Integer validationId) | 
| void | writeOnlineDataframe(FeatureGroupBase featureGroupBase,
                    org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
                    String onlineTopicName,
                    Map<String,String> writeOptions)Writes feature group dataframe to kafka for online-fs ingestion. | 
| <S> org.apache.spark.sql.streaming.StreamingQuery | writeStreamDataframe(FeatureGroupBase featureGroupBase,
                    org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
                    String queryName,
                    String outputMode,
                    boolean awaitTermination,
                    Long timeout,
                    String checkpointLocation,
                    Map<String,String> writeOptions) | 
public static SparkEngine getInstance()
public static void setInstance(SparkEngine sparkEngine)
public void validateSparkConfiguration()
                                throws FeatureStoreException
FeatureStoreExceptionpublic String getTrustStorePath()
public String getKeyStorePath()
public String getCertKey()
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> sql(String query)
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> registerOnDemandTemporaryTable(ExternalFeatureGroup onDemandFeatureGroup, String alias) throws FeatureStoreException, IOException
FeatureStoreExceptionIOExceptionpublic static List<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>> splitLabels(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, List<String> labels)
public void registerHudiTemporaryTable(FeatureGroupAlias featureGroupAlias, Map<String,String> readOptions) throws FeatureStoreException
FeatureStoreExceptionpublic org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>[] write(TrainingDataset trainingDataset, Query query, Map<String,String> queryReadOptions, Map<String,String> writeOptions, org.apache.spark.sql.SaveMode saveMode) throws FeatureStoreException, IOException
trainingDataset - Training Dataset metadata objectquery - Query ObjectqueryReadOptions - Additional read options as key-value pairs, defaults to empty MapwriteOptions - Additional write options as key-value pairs, defaults to empty MapsaveMode - org.apache.spark.sql.saveMode: Append, Overwrite, ErrorIfExists, IgnoreFeatureStoreException - If Client is not connected to HopsworksIOException - Generic IO exception.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>[] splitDataset(TrainingDataset trainingDataset, Query query, Map<String,String> readOptions) throws FeatureStoreException, IOException
FeatureStoreExceptionIOExceptionpublic Map<String,String> getWriteOptions(Map<String,String> providedOptions, DataFormat dataFormat)
public Map<String,String> getReadOptions(Map<String,String> providedOptions, DataFormat dataFormat)
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(StorageConnector storageConnector, String dataFormat, Map<String,String> readOptions, String location) throws FeatureStoreException, IOException
FeatureStoreExceptionIOExceptionpublic void writeOnlineDataframe(FeatureGroupBase featureGroupBase, org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, String onlineTopicName, Map<String,String> writeOptions) throws FeatureStoreException, IOException
featureGroupBase - dataset - writeOptions - FeatureStoreExceptionIOExceptionpublic <S> org.apache.spark.sql.streaming.StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase, org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String,String> writeOptions) throws FeatureStoreException, IOException, org.apache.spark.sql.streaming.StreamingQueryException, TimeoutException
FeatureStoreExceptionIOExceptionorg.apache.spark.sql.streaming.StreamingQueryExceptionTimeoutExceptionpublic org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> encodeComplexFeatures(FeatureGroupBase featureGroupBase, org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset) throws FeatureStoreException, IOException
featureGroupBase - FeatureGroupBase Feature Group hsfs metadata objectdataset - Spark DataFrame or RDD.FeatureStoreException - If Client is not connected to HopsworksIOException - Generic IO exception.public void writeEmptyDataframe(FeatureGroupBase featureGroup) throws IOException, FeatureStoreException, ParseException
public void writeOfflineDataframe(FeatureGroupBase featureGroup, org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, HudiOperationType operation, Map<String,String> writeOptions, Integer validationId) throws IOException, FeatureStoreException, ParseException
public String profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, List<String> restrictToColumns, Boolean correlation, Boolean histogram, Boolean exactUniqueness)
public String profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, List<String> restrictToColumns, Boolean correlation, Boolean histogram)
public String profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, List<String> restrictToColumns)
public String profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, boolean correlation, boolean histogram)
public String profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df)
public void setupConnectorHadoopConf(StorageConnector storageConnector) throws IOException, FeatureStoreException
IOExceptionFeatureStoreExceptionpublic void streamToHudiTable(StreamFeatureGroup streamFeatureGroup, Map<String,String> writeOptions) throws Exception
Exceptionpublic List<Feature> parseFeatureGroupSchema(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, TimeTravelFormat timeTravelFormat) throws FeatureStoreException
FeatureStoreExceptionpublic org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> sanitizeFeatureNames(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset)
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> convertToDefaultDataframe(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset)
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> castColumnType(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
                                                                             List<TrainingDatasetFeature> features)
                                                                      throws FeatureStoreException
FeatureStoreExceptionpublic String addFile(String filePath) throws FeatureStoreException
addFile in class EngineBaseFeatureStoreExceptionpublic Map<String,String> getKafkaConfig(FeatureGroupBase featureGroup, Map<String,String> writeOptions) throws FeatureStoreException, IOException
getKafkaConfig in class EngineBaseFeatureStoreExceptionIOExceptionpublic org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> readStream(StorageConnector storageConnector, String dataFormat, String messageFormat, String schema, Map<String,String> options, boolean includeMetadata) throws FeatureStoreException, IOException
FeatureStoreExceptionIOExceptionpublic org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> objectToDataset(Object obj)
public <S> S createEmptyDataFrame(S datasetGeneric)
public String constructCheckpointPath(FeatureGroupBase featureGroup, String queryName, String queryPrefix) throws FeatureStoreException, IOException
FeatureStoreExceptionIOExceptionprotected String makeQueryName(String queryName, FeatureGroupBase featureGroup)
public void closeSparkSession()
Copyright © 2025. All rights reserved.