hsfs.storage_connector #
StorageConnector #
Bases: ABC
id property #
id: int | None
Id of the storage connector uniquely identifying it in the Feature store.
type property #
type: str | None
Type of the connector as string, e.g. "HOPFS, S3, ADLS, REDSHIFT, JDBC or SNOWFLAKE.
connector_options #
get_data #
get_data(
data_source: ds.DataSource, use_cached=True
) -> DataSourceData
Retrieve the data from the data source.
Example
# connect to the Feature Store
fs = ...
sc = fs.get_storage_connector("conn_name")
tables = sc.get_tables("database_name")
data = sc.get_data(tables[0])
Parameters: data_source (DataSource): The data source to retrieve data from. use_cached (bool): Whether to use cached data if available. Only supported for CRM and REST connectors. Defaults to True.
| RETURNS | DESCRIPTION |
|---|---|
DataSourceData | An object containing the data retrieved from the data source. |
get_databases #
get_feature_groups #
get_feature_groups() -> list[FeatureGroup]
Get the feature groups using this storage connector, based on explicit rovenance.
Only the accessible feature groups are returned. For more items use the base method, see get_feature_groups_provenance.
| RETURNS | DESCRIPTION |
|---|---|
list[FeatureGroup] | List of feature groups. |
get_feature_groups_provenance #
get_feature_groups_provenance() -> Links | None
Get the generated feature groups using this storage connector, based on explicit provenance.
These feature groups can be accessible or inaccessible.
Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.
| RETURNS | DESCRIPTION |
|---|---|
Links | None | The feature groups generated using this storage connector or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | In case the backend encounters an issue. |
get_metadata #
get_metadata(data_source: ds.DataSource) -> dict
Retrieve metadata information about the data source.
Example
# connect to the Feature Store
fs = ...
sc = fs.get_storage_connector("conn_name")
tables = sc.get_tables("database_name")
metadata = sc.get_metadata(tables[0])
| PARAMETER | DESCRIPTION |
|---|---|
data_source | The data source to retrieve metadata from. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
dict | A dictionary containing metadata about the data source. |
get_tables #
get_tables(
database: str | None = None,
) -> list[ds.DataSource]
Retrieve the list of tables from the specified database.
Example
# connect to the Feature Store
fs = ...
sc = fs.get_storage_connector("conn_name")
tables = sc.get_tables("database_name")
| PARAMETER | DESCRIPTION |
|---|---|
database | The name of the database to list tables from. If not provided, the default database is used. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[ds.DataSource] | A list of DataSource objects representing the tables. |
get_training_datasets #
get_training_datasets() -> list[TrainingDataset]
Get the training datasets using this storage connector, based on explicit provenance.
Only the accessible training datasets are returned. For more items use the base method, get_training_datasets_provenance.
| RETURNS | DESCRIPTION |
|---|---|
list[TrainingDataset] | List of training datasets. |
get_training_datasets_provenance #
get_training_datasets_provenance() -> Links | None
Get the generated training datasets using this storage connector, based on explicit provenance.
These training datasets can be accessible or inaccessible. Explicit provenance does not track deleted generated training dataset links, so deleted will always be empty. For inaccessible training datasets, only a minimal information is returned.
| RETURNS | DESCRIPTION |
|---|---|
Links | None | The training datasets generated using this storage connector or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | In case the backend encounters an issue. |
infer_metadata #
infer_metadata(
data_source: ds.DataSource,
preview_data: DataSourceData | None = None,
) -> InferredMetadata
Use platform intelligence to infer feature metadata for a data source table.
Calls the same backend used by the "Infer metadata" button in the UI when creating an external feature group: an LLM proposes per-column renames, Hopsworks types, descriptions, and a suggested primary key and event time.
Example
fs = ...
sc = fs.get_storage_connector("conn_name")
tables = sc.get_tables("database_name")
inferred = sc.infer_metadata(tables[0])
| PARAMETER | DESCRIPTION |
|---|---|
data_source | The data source (typically a table returned by TYPE: |
preview_data | Pre-fetched preview data to skip a server round-trip; if TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
InferredMetadata | An object containing the suggested feature renames, types, descriptions, primary key, and event time. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.PlatformIntelligenceException | If platform intelligence is not enabled on the cluster, or the LLM call fails. |
prepare_spark #
Prepare Spark to use this Storage Connector.
| PARAMETER | DESCRIPTION |
|---|---|
path | Path to prepare for reading from cloud storage. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
str | None | The path to be used for reading from Spark, which may be different from the input path if the connector has a base path configured. |
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a query or a path into a dataframe using the storage connector.
Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.
| PARAMETER | DESCRIPTION |
|---|---|
query | By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. TYPE: |
data_format | When reading from object stores such as S3, HopsFS and ADLS, specify the file format to be read, e.g., TYPE: |
options | Any additional key/value options to be passed to the connector. |
path | Path to be read from within the bucket of the storage connector. Not relevant for JDBC or database based connectors such as Snowflake, JDBC or Redshift. TYPE: |
dataframe_type | The type of the returned dataframe. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame | The read dataframe. |
save #
save() -> StorageConnector
Persist this storage connector to the feature store.
Example
import hopsworks
project = hopsworks.login()
fs = project.get_feature_store()
sc = hsfs.storage_connector.S3Connector(
id=None,
name="my_s3_connector",
featurestore_id=fs.id,
bucket="my-bucket",
region="eu-north-1",
)
sc.save()
| RETURNS | DESCRIPTION |
|---|---|
StorageConnector | The saved storage connector with its assigned id. |
spark_options abstractmethod #
update #
update() -> StorageConnector
Update this storage connector in the feature store.
Example
import hopsworks
project = hopsworks.login()
fs = project.get_feature_store()
sc = fs.get_data_source("my_s3_connector").storage_connector
sc._bucket = "new-bucket"
sc.update()
| RETURNS | DESCRIPTION |
|---|---|
StorageConnector | The updated storage connector. |
AdlsConnector #
Bases: StorageConnector
path property #
path: str | None
If the connector refers to a path (e.g. ADLS) - return the path of the connector.
service_credential property #
service_credential: str | None
Service credential of the ADLS storage connector.
prepare_spark #
Prepare Spark to use this Storage Connector.
conn.prepare_spark()
spark.read.format("json").load("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]")
# or
spark.read.format("json").load(conn.prepare_spark("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]"))
| PARAMETER | DESCRIPTION |
|---|---|
path | Path to prepare for reading from cloud storage. TYPE: |
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a path into a dataframe using the storage connector.
| PARAMETER | DESCRIPTION |
|---|---|
query | Not relevant for ADLS connectors. TYPE: |
data_format | The file format of the files to be read, e.g. TYPE: |
options | Any additional key/value options to be passed to the ADLS connector. |
path | Path within the bucket to be read. For example, path= TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
BigQueryConnector #
Bases: StorageConnector
The BigQuery storage connector provides integration to Google Cloud BigQuery.
You can use it to run bigquery on your GCP cluster and load results into spark dataframe by calling the read API.
Authentication to GCP is handled by uploading the JSON keyfile for service account to the Hopsworks Project. For more information on service accounts and creating keyfile in GCP, read Google Cloud documentation.
The storage connector uses the Google spark-bigquery-connector behind the scenes. To read more about the spark connector, like the spark options or usage, check Apache Spark SQL connector for Google BigQuery.
materialization_dataset property #
materialization_dataset: str | None
BigQuery materialization dataset (The dataset where the materialized view is going to be created, used in case of query).
parent_project property #
parent_project: str | None
BigQuery parent project (Google Cloud Project ID of the table to bill for the export).
query_project property #
query_project: str | None
BigQuery project (The Google Cloud Project ID of the table).
connector_options #
Return options to be passed to an external BigQuery connector library.
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads results from BigQuery into a spark dataframe using the storage connector.
Reading from bigquery is done via either specifying the BigQuery table or BigQuery query. For example, to read from a BigQuery table, set the BigQuery project, dataset and table on storage connector and read directly from the corresponding path.
conn.read()
Materialization Dataset on storage connector, and pass your SQL to query argument. conn.read(query='SQL')
query argument will take priority at runtime if the table options were also set on the storage connector. This allows user to run from both a query or table with same connector, assuming all fields were set. Also, user can set the path argument to a bigquery table path to read at runtime, if table options were not set initially while creating the connector. conn.read(path='project.dataset.table')
| PARAMETER | DESCRIPTION |
|---|---|
query | BigQuery query. TYPE: |
data_format | Spark data format. TYPE: |
options | Spark options. |
path | BigQuery table path. TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RAISES | DESCRIPTION |
|---|---|
ValueError | Malformed arguments. |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame | A Spark dataframe. |
GcsConnector #
Bases: StorageConnector
This storage connector provides integration to Google Cloud Storage (GCS).
Once you create a connector in FeatureStore, you can transact data from a GCS bucket into a spark dataframe by calling the read API.
Authentication to GCP is handled by uploading the JSON keyfile for service account to the Hopsworks Project. For more information on service accounts and creating keyfile in GCP, read Google Cloud documentation.
The connector also supports the optional encryption method Customer Supplied Encryption Key by Google. The encryption details are stored as Secrets in the FeatureStore for keeping it secure. Read more about encryption on Google Documentation.
The storage connector uses the Google gcs-connector-hadoop behind the scenes. For more information, check out Google Cloud Storage Connector for Spark and Hadoop.
prepare_spark #
Prepare Spark to use this Storage Connector.
conn.prepare_spark()
spark.read.format("json").load("gs://bucket/path")
# or
spark.read.format("json").load(conn.prepare_spark("gs://bucket/path"))
| PARAMETER | DESCRIPTION |
|---|---|
path | Path to prepare for reading from Google cloud storage. TYPE: |
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads GCS path into a dataframe using the storage connector.
To read directly from the default bucket, you can omit the path argument:
conn.read(data_format='spark_formats')
conn.read(data_format='spark_formats', paths='Path/object')
conn.read(data_format='spark_formats',path='gs://BUCKET/DATA')
| PARAMETER | DESCRIPTION |
|---|---|
query | Not relevant for GCS connectors. TYPE: |
data_format | Spark data format. TYPE: |
options | Spark options. |
path | GCS path. TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RAISES | DESCRIPTION |
|---|---|
ValueError | Malformed arguments. |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame | A Spark dataframe. |
HopsFSConnector #
Bases: StorageConnector
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a path into a dataframe using the HopsFS storage connector.
| PARAMETER | DESCRIPTION |
|---|---|
query | Not used for HopsFS. Kept for interface consistency. TYPE: |
data_format | The file format to be read, e.g., TYPE: |
options | Any additional key/value options to be passed to the connector. |
path | Path to be read within HopsFS. If the connector has a base path configured, relative paths will be resolved against it. Absolute TYPE: |
dataframe_type | The type of the returned dataframe. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame | The read dataframe. |
JdbcConnector #
Bases: StorageConnector
arguments property #
Additional JDBC arguments.
When running hsfs with PySpark/Spark in Hopsworks, the driver is automatically provided in the classpath but you need to set the driver argument to com.mysql.cj.jdbc.Driver when creating the Storage Connector.
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a query into a dataframe using the storage connector.
| PARAMETER | DESCRIPTION |
|---|---|
query | A SQL query to be read. TYPE: |
data_format | Not relevant for JDBC based connectors. TYPE: |
options | Any additional key/value options to be passed to the JDBC connector. |
path | Not relevant for JDBC based connectors. TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
KafkaConnector #
Bases: StorageConnector
ssl_endpoint_identification_algorithm property #
ssl_endpoint_identification_algorithm: str | None
Bootstrap servers string.
confluent_options #
Return prepared options to be passed to confluent_kafka, based on the provided apache spark configuration.
Right now only producer values with Importance >= medium are implemented.
See https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any] | A dictionary containing the configuration options for confluent_kafka. |
create_pem_files #
Create PEM (Privacy Enhanced Mail) files for Kafka SSL authentication.
This method writes the necessary PEM files for SSL authentication with Kafka, using the provided keystore and truststore locations and passwords. The generated file paths are stored as the following instance variables:
- self.ca_chain_path: Path to the generated CA chain PEM file.
- self.client_cert_path: Path to the generated client certificate PEM file.
- self.client_key_path: Path to the generated client key PEM file.
These files are used for configuring secure Kafka connections (e.g., with Spark or confluent_kafka). The method is idempotent and will only create the files once per connector instance.
| PARAMETER | DESCRIPTION |
|---|---|
kafka_options | A dictionary containing the Kafka configuration options, including keystore and truststore locations and passwords. |
kafka_options #
Return prepared options to be passed to kafka, based on the additional arguments.
See https://kafka.apache.org/documentation/.
| PARAMETER | DESCRIPTION |
|---|---|
distribute | Whether to distribute the SSL certificates to the cluster nodes. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any] | A dictionary containing the configuration options for kafka. |
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> None
Failure
This operation is not supported. Use read_stream instead to read a Kafka stream into a streaming Spark Dataframe.
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError | Always, since this operation is not supported. |
read_stream #
read_stream(
topic: str,
topic_pattern: bool = False,
message_format: str = "avro",
schema: str | None = None,
options: dict[str, Any] | None = None,
include_metadata: bool = False,
) -> TypeVar("pyspark.sql.DataFrame") | TypeVar(
"pyspark.sql.streaming.StreamingQuery"
)
Reads a Kafka stream from a topic or multiple topics into a Dataframe.
Engine Support
Spark only
Reading from data streams using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.
| PARAMETER | DESCRIPTION |
|---|---|
topic | Name or pattern of the topic(s) to subscribe to. TYPE: |
topic_pattern | Flag to indicate if TYPE: |
message_format | The format of the messages to use for decoding. Can be TYPE: |
schema | Optional schema, to use for decoding, can be an Avro schema string for TYPE: |
options | Additional options as key/value string pairs to be passed to Spark. Defaults to |
include_metadata | Indicate whether to return additional metadata fields from messages in the stream. Otherwise, only the decoded value fields are returned. TYPE: |
| RAISES | DESCRIPTION |
|---|---|
ValueError | Malformed arguments. |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.sql.streaming.StreamingQuery') | A Spark streaming dataframe. |
spark_options #
Return prepared options to be passed to Spark, based on the additional arguments.
This is done by just adding 'kafka.' prefix to kafka_options.
MongoDBConnector #
Bases: StorageConnector
MongoDB storage connector backed by the official mongo-spark-connector and pymongo.
Use this connector to register an external feature group whose data lives in a MongoDB collection. The connection_string is a MongoDB URI without embedded credentials; the username and password are kept in the Hopsworks secret store and spliced in at read time.
auth_mechanism property #
auth_mechanism: str | None
MongoDB authMechanism URI parameter (e.g. SCRAM-SHA-256).
collection property #
collection: str | None
Default collection name used when none is provided at read time.
connection_string property #
connection_string: str | None
MongoDB connection URI (mongodb:// or mongodb+srv://) without embedded credentials.
password property #
password: str | None
Database password resolved from the Hopsworks secret store.
connector_options #
Return arguments suitable for an external pymongo client.
from pymongo import MongoClient
sc = fs.get_storage_connector("mongo_conn")
client = MongoClient(**sc.connector_options())
Forwards any persisted self.options whose key looks like a MongoClient constructor kwarg (lowercase letters, digits, and underscores) so operator-set tuning knobs — maxPoolSize, serverSelectionTimeoutMS, tlsAllowInvalidCertificates, etc. — reach the driver. Keys that look like URI parameters (camelCase, already embedded in connection_uri) and anything non-string are dropped to avoid duplicate-config errors from pymongo.
prepare_spark #
Ensure the Spark session is wired with the mongo-spark-connector classpath.
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Read a collection from MongoDB into a dataframe.
| PARAMETER | DESCRIPTION |
|---|---|
query | Not used for MongoDB. TYPE: |
data_format | Not used for MongoDB. TYPE: |
options | Extra key/value options merged into the Spark reader configuration. |
path | Not used for MongoDB. TYPE: |
dataframe_type | Type of the returned dataframe. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
RedshiftConnector #
Bases: StorageConnector
cluster_identifier property #
cluster_identifier: str | None
Cluster identifier for redshift cluster.
database_user_name property #
database_user_name: str | None
Database username for redshift cluster.
connector_options #
Return options to be passed to an external Redshift connector library.
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a table or query into a dataframe using the storage connector.
| PARAMETER | DESCRIPTION |
|---|---|
query | By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. TYPE: |
data_format | Not relevant for JDBC based connectors such as Redshift. TYPE: |
options | Any additional key/value options to be passed to the JDBC connector. |
path | Not relevant for JDBC based connectors such as Redshift. TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
refetch #
refetch() -> None
Refetch storage connector in order to retrieve updated temporary credentials.
S3Connector #
Bases: StorageConnector
arguments property #
Additional spark options for the S3 connector, passed as a dictionary.
These are set using the Spark Options field in the UI when creating the connector. Example: {"fs.s3a.endpoint": "s3.eu-west-1.amazonaws.com", "fs.s3a.path.style.access": "true"}.
path property #
path: str | None
If the connector refers to a path (e.g. S3) - return the path of the connector.
server_encryption_algorithm property #
server_encryption_algorithm: str | None
Encryption algorithm if server-side S3 bucket encryption is enabled.
server_encryption_key property #
server_encryption_key: str | None
Encryption key if server-side S3 bucket encryption is enabled.
connector_options #
Return options to be passed to an external S3 connector library.
prepare_spark #
Prepare Spark to use this Storage Connector.
conn.prepare_spark()
spark.read.format("json").load("s3a://[bucket]/path")
# or
spark.read.format("json").load(conn.prepare_spark("s3a://[bucket]/path"))
| PARAMETER | DESCRIPTION |
|---|---|
path | Path to prepare for reading from cloud storage. TYPE: |
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a query or a path into a dataframe using the storage connector.
Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.
| PARAMETER | DESCRIPTION |
|---|---|
query | Not relevant for S3 connectors. TYPE: |
data_format | The file format of the files to be read, e.g. TYPE: |
options | Any additional key/value options to be passed to the S3 connector. |
path | Path within the bucket to be read. TYPE: |
dataframe_type | The type of the returned dataframe. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
SapHanaConnector #
Bases: StorageConnector
SAP HANA storage connector backed by the SAP JDBC driver.
Use this connector to register an external feature group whose data lives in SAP HANA, and to ingest data from HANA via the dlt-based ingestion job.
application property #
application: str | None
Optional SAP HANA application name surfaced for session tracing.
connector_options #
Return arguments suitable for an external Python HANA driver such as hdbcli.
from hdbcli import dbapi
sc = fs.get_storage_connector("hana_conn")
conn = dbapi.connect(**sc.connector_options())
prepare_spark #
Prepare the Spark session with the SAP HANA driver classpath when needed.
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Read a table or query from SAP HANA into a dataframe.
| PARAMETER | DESCRIPTION |
|---|---|
query | SQL query to read; overrides any configured table. TYPE: |
data_format | Not used for SAP HANA. TYPE: |
options | Extra key/value options passed to the Spark JDBC reader. |
path | Not used for SAP HANA. TYPE: |
dataframe_type | Type of the returned dataframe. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
SnowflakeConnector #
Bases: StorageConnector
options property #
Additional options for the Snowflake storage connector.
private_key property #
private_key: str | None
Path to the private key file for key pair authentication.
connector_options #
Prepare a Python dictionary with the needed arguments for you to connect to a Snowflake database.
It is useful for the snowflake.connector Python library.
import snowflake.connector
sc = fs.get_storage_connector("snowflake_conn")
ctx = snowflake.connector.connect(**sc.connector_options())
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any] | None | A dictionary with the needed arguments for you to connect to a Snowflake database. |
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a table or query into a dataframe using the storage connector.
| PARAMETER | DESCRIPTION |
|---|---|
query | By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. TYPE: |
data_format | Not relevant for Snowflake connectors. TYPE: |
options | Any additional key/value options to be passed to the engine. |
path | Not relevant for Snowflake connectors. TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
UnityCatalogConnector #
Bases: StorageConnector
Databricks Unity Catalog storage connector.
Reads Delta-formatted tables governed by Unity Catalog via the Arrow Flight query service. Direct Spark reads are not supported in this release; use the Arrow Flight path instead.
access_token property #
access_token: str | None
Databricks personal access token, decrypted from the Hopsworks secret store on retrieval.
account_host property #
account_host: str | None
Databricks account-console host, only set when oauth_endpoint is "ACCOUNT".
account_id property #
account_id: str | None
Databricks account ID, only set when oauth_endpoint is "ACCOUNT".
arguments property #
Additional Unity Catalog connection arguments passed through to the Arrow Flight server.
auth_method property #
auth_method: str
Authentication method for the Databricks workspace, either "PAT" or "OAUTH_M2M".
Defaults to "PAT" for connectors created before OAuth support landed.
aws_region property #
aws_region: str | None
Optional explicit AWS region for the managed storage backing this Unity Catalog.
When unset, the Arrow Flight read path guesses the region from the STS session-token Databricks returns with temporary table credentials.
client_id property #
client_id: str | None
Databricks service principal client ID, only set when auth_method is "OAUTH_M2M".
client_secret property #
client_secret: str | None
Databricks service principal client secret.
Write-only on the backend: this property is only populated when the caller has just constructed the connector locally with a secret in hand. Server responses never carry it; use has_client_secret to test whether a secret is on file.
default_catalog property #
default_catalog: str | None
Optional default Unity Catalog catalog to use when no catalog is explicitly specified.
has_access_token property #
has_access_token: bool
True iff a personal access token is on file for this connector.
The server never returns the access token itself on read; this boolean lets callers tell whether one exists without exposing the secret.
has_client_secret property #
has_client_secret: bool
True iff a client secret is on file for this connector.
The server never returns the client secret itself on read; this boolean lets callers tell whether one exists without exposing the secret.
oauth_endpoint property #
oauth_endpoint: str | None
OAuth token endpoint flavour, either "WORKSPACE" or "ACCOUNT".
Only set when auth_method is "OAUTH_M2M".
connector_options #
Return UC connector options shaped for external library use.
read #
Read a Unity Catalog table as a Spark DataFrame.
Default behavior: if the SparkSession is connected to a Databricks cluster, dispatch to native UC access (spark.read.table()), which is faster, auth'd by the cluster identity, and skips the Hopsworks round-trip. Otherwise resolves vended S3 credentials via Hopsworks and reads the Delta path directly.
Set force_vended=True to skip detection and always use the vended path, useful if the cluster identity lacks the grants the connector's SP has.
spark_options_for #
spark_options_for(
catalog: str, schema: str, table: str
) -> UnityCatalogSparkOptions
Resolve Spark read options for a Unity Catalog table.
Mirrors the Python / Arrow Flight architecture: Hopsworks vends the Databricks bearer; the SDK calls Databricks directly for vended S3 temp-credentials, then builds the per-bucket S3A keys + Delta path.
v1 supports AWS-backed Delta tables only. Non-AWS storage (Azure / GCP) raises here. Credentials live ~1 h; call this close to the action that triggers the read.
| PARAMETER | DESCRIPTION |
|---|---|
catalog | UC catalog name (e.g. TYPE: |
schema | UC schema name within the catalog (e.g. TYPE: |
table | UC table name within the schema (e.g. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
UnityCatalogSparkOptions | A typed |
UnityCatalogSparkOptions | object carrying the Delta path, per-bucket S3A keys, and credential expiry metadata. |
UnityCatalogSparkOptions #
Typed Spark read options vended for a Unity Catalog table.
Returned by UnityCatalogConnector.spark_options_for. Carries short-lived AWS credentials (in storage_options) plus the Delta path the table is stored at.
Use apply_to to wire the S3A credentials onto a SparkSession's Hadoop config, then spark.read.format(opts.format).load(opts.path). Or use read which does both in one call.
Credentials live ~1 h. Spark is lazy, so a long delay between spark_options_for() and the first action that actually reads from S3 can outlive the credentials. Mitigation: call close to the action, or use read().
expires_in_seconds property #
expires_in_seconds: int
Seconds remaining at the moment the server built this response.
storage_options property #
Per-bucket S3A Hadoop config keys + values.
apply_to #
apply_to(spark: Any) -> None
Apply the per-bucket S3A keys to spark's Hadoop config.
Per-bucket scope (fs.s3a.bucket.<bucket>.*) means adjacent reads to other buckets in the same SparkSession are unaffected. Subsequent reads of the same bucket will pick up the most-recently-applied credentials — matches AWS STS rotation semantics.
| PARAMETER | DESCRIPTION |
|---|---|
spark | A TYPE: |