public class SparkEngine extends Object
Modifier and Type | Method and Description |
---|---|
String |
addFile(String filePath) |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
castColumnType(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
List<TrainingDatasetFeature> features) |
String |
checkpointDirPath(String queryName,
String onlineTopicName) |
String |
constructCheckpointPath(FeatureGroupBase featureGroup,
String queryName,
String queryPrefix) |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
convertToDefaultDataframe(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset) |
<S> S |
createEmptyDataFrame(S datasetGeneric) |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
encodeComplexFeatures(FeatureGroupBase featureGroupBase,
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset)
Encodes all complex type features to binary using their avro type as schema.
|
String |
getCertKey() |
static SparkEngine |
getInstance() |
Map<String,String> |
getKafkaConfig(FeatureGroupBase featureGroup,
Map<String,String> writeOptions) |
String |
getKeyStorePath() |
Map<String,String> |
getReadOptions(Map<String,String> providedOptions,
DataFormat dataFormat) |
String |
getTrustStorePath() |
Map<String,String> |
getWriteOptions(Map<String,String> providedOptions,
DataFormat dataFormat) |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
objectToDataset(Object obj) |
List<Feature> |
parseFeatureGroupSchema(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
TimeTravelFormat timeTravelFormat) |
String |
profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df) |
String |
profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
boolean correlation,
boolean histogram) |
String |
profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
List<String> restrictToColumns) |
String |
profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
List<String> restrictToColumns,
Boolean correlation,
Boolean histogram) |
String |
profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
List<String> restrictToColumns,
Boolean correlation,
Boolean histogram,
Boolean exactUniqueness) |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
read(StorageConnector storageConnector,
String dataFormat,
Map<String,String> readOptions,
String location) |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
readStream(StorageConnector storageConnector,
String dataFormat,
String messageFormat,
String schema,
Map<String,String> options,
boolean includeMetadata) |
void |
registerHudiTemporaryTable(FeatureGroupAlias featureGroupAlias,
Map<String,String> readOptions) |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
registerOnDemandTemporaryTable(ExternalFeatureGroup onDemandFeatureGroup,
String alias) |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
sanitizeFeatureNames(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset) |
static void |
setInstance(SparkEngine sparkEngine) |
void |
setupConnectorHadoopConf(StorageConnector storageConnector) |
static String |
sparkPath(String path) |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>[] |
splitDataset(TrainingDataset trainingDataset,
Query query,
Map<String,String> readOptions) |
static List<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>> |
splitLabels(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
List<String> labels) |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
sql(String query) |
void |
streamToHudiTable(StreamFeatureGroup streamFeatureGroup,
Map<String,String> writeOptions) |
void |
validateSparkConfiguration() |
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>[] |
write(TrainingDataset trainingDataset,
Query query,
Map<String,String> queryReadOptions,
Map<String,String> writeOptions,
org.apache.spark.sql.SaveMode saveMode)
Setup Spark to write the data on the File System.
|
void |
writeEmptyDataframe(FeatureGroupBase featureGroup) |
void |
writeOfflineDataframe(FeatureGroupBase featureGroup,
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
HudiOperationType operation,
Map<String,String> writeOptions,
Integer validationId) |
void |
writeOnlineDataframe(FeatureGroupBase featureGroupBase,
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
String onlineTopicName,
Map<String,String> writeOptions)
Writes feature group dataframe to kafka for online-fs ingestion.
|
<S> org.apache.spark.sql.streaming.StreamingQuery |
writeStreamDataframe(FeatureGroupBase featureGroupBase,
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset,
String queryName,
String outputMode,
boolean awaitTermination,
Long timeout,
String checkpointLocation,
Map<String,String> writeOptions) |
public static SparkEngine getInstance()
public static void setInstance(SparkEngine sparkEngine)
public void validateSparkConfiguration() throws FeatureStoreException
FeatureStoreException
public String getTrustStorePath()
public String getKeyStorePath()
public String getCertKey()
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> sql(String query)
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> registerOnDemandTemporaryTable(ExternalFeatureGroup onDemandFeatureGroup, String alias) throws FeatureStoreException, IOException
FeatureStoreException
IOException
public static List<org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>> splitLabels(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, List<String> labels)
public void registerHudiTemporaryTable(FeatureGroupAlias featureGroupAlias, Map<String,String> readOptions) throws FeatureStoreException
FeatureStoreException
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>[] write(TrainingDataset trainingDataset, Query query, Map<String,String> queryReadOptions, Map<String,String> writeOptions, org.apache.spark.sql.SaveMode saveMode) throws FeatureStoreException, IOException
trainingDataset
- Training Dataset metadata objectquery
- Query ObjectqueryReadOptions
- Additional read options as key-value pairs, defaults to empty MapwriteOptions
- Additional write options as key-value pairs, defaults to empty MapsaveMode
- org.apache.spark.sql.saveMode: Append, Overwrite, ErrorIfExists, IgnoreFeatureStoreException
- If Client is not connected to HopsworksIOException
- Generic IO exception.public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>[] splitDataset(TrainingDataset trainingDataset, Query query, Map<String,String> readOptions) throws FeatureStoreException, IOException
FeatureStoreException
IOException
public Map<String,String> getWriteOptions(Map<String,String> providedOptions, DataFormat dataFormat)
public Map<String,String> getReadOptions(Map<String,String> providedOptions, DataFormat dataFormat)
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> read(StorageConnector storageConnector, String dataFormat, Map<String,String> readOptions, String location) throws FeatureStoreException, IOException
FeatureStoreException
IOException
public void writeOnlineDataframe(FeatureGroupBase featureGroupBase, org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, String onlineTopicName, Map<String,String> writeOptions) throws FeatureStoreException, IOException
featureGroupBase
- dataset
- writeOptions
- FeatureStoreException
IOException
public <S> org.apache.spark.sql.streaming.StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase, org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, String queryName, String outputMode, boolean awaitTermination, Long timeout, String checkpointLocation, Map<String,String> writeOptions) throws FeatureStoreException, IOException, org.apache.spark.sql.streaming.StreamingQueryException, TimeoutException
FeatureStoreException
IOException
org.apache.spark.sql.streaming.StreamingQueryException
TimeoutException
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> encodeComplexFeatures(FeatureGroupBase featureGroupBase, org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset) throws FeatureStoreException, IOException
featureGroupBase
- FeatureGroupBase Feature Group hsfs metadata objectdataset
- Spark DataFrame or RDD.FeatureStoreException
- If Client is not connected to HopsworksIOException
- Generic IO exception.public void writeEmptyDataframe(FeatureGroupBase featureGroup) throws IOException, FeatureStoreException, ParseException
public void writeOfflineDataframe(FeatureGroupBase featureGroup, org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, HudiOperationType operation, Map<String,String> writeOptions, Integer validationId) throws IOException, FeatureStoreException, ParseException
public String profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, List<String> restrictToColumns, Boolean correlation, Boolean histogram, Boolean exactUniqueness)
public String profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, List<String> restrictToColumns, Boolean correlation, Boolean histogram)
public String profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, List<String> restrictToColumns)
public String profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, boolean correlation, boolean histogram)
public String profile(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df)
public void setupConnectorHadoopConf(StorageConnector storageConnector) throws IOException
IOException
public void streamToHudiTable(StreamFeatureGroup streamFeatureGroup, Map<String,String> writeOptions) throws Exception
Exception
public List<Feature> parseFeatureGroupSchema(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, TimeTravelFormat timeTravelFormat) throws FeatureStoreException
FeatureStoreException
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> sanitizeFeatureNames(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset)
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> convertToDefaultDataframe(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset)
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> castColumnType(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataset, List<TrainingDatasetFeature> features) throws FeatureStoreException
FeatureStoreException
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> readStream(StorageConnector storageConnector, String dataFormat, String messageFormat, String schema, Map<String,String> options, boolean includeMetadata) throws FeatureStoreException, IOException
FeatureStoreException
IOException
public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> objectToDataset(Object obj)
public <S> S createEmptyDataFrame(S datasetGeneric)
public String constructCheckpointPath(FeatureGroupBase featureGroup, String queryName, String queryPrefix) throws FeatureStoreException, IOException
FeatureStoreException
IOException
public Map<String,String> getKafkaConfig(FeatureGroupBase featureGroup, Map<String,String> writeOptions) throws FeatureStoreException, IOException
FeatureStoreException
IOException
public String checkpointDirPath(String queryName, String onlineTopicName) throws FeatureStoreException
FeatureStoreException
Copyright © 2023. All rights reserved.