Skip to content

hsfs.storage_connector #

[source] AdlsConnector #

Bases: StorageConnector

[source] generation property #

generation: str | None

Generation of the ADLS storage connector.

[source] directory_id property #

directory_id: str | None

Directory ID of the ADLS storage connector.

[source] application_id property #

application_id: str | None

Application ID of the ADLS storage connector.

[source] account_name property #

account_name: str | None

Account name of the ADLS storage connector.

[source] container_name property #

container_name: str | None

Container name of the ADLS storage connector.

[source] service_credential property #

service_credential: str | None

Service credential of the ADLS storage connector.

[source] path property #

path: str | None

If the connector refers to a path (e.g. ADLS) - return the path of the connector.

[source] spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

[source] prepare_spark #

prepare_spark(path: str | None = None) -> str | None

Prepare Spark to use this Storage Connector.

conn.prepare_spark()

spark.read.format("json").load("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]")

# or
spark.read.format("json").load(conn.prepare_spark("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]"))
PARAMETER DESCRIPTION
path

Path to prepare for reading from cloud storage. Defaults to None.

TYPE: str | None DEFAULT: None

[source] read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str = "",
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a path into a dataframe using the storage connector.

PARAMETER DESCRIPTION
query

Not relevant for ADLS connectors.

TYPE: str | None DEFAULT: None

data_format

The file format of the files to be read, e.g. csv, parquet.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the ADLS connector.

TYPE: dict[str, Any] | None DEFAULT: None

path

Path within the bucket to be read. For example, path=path will read directly from the container specified on connector by constructing the URI as 'abfss://[container-name]@[account_name].dfs.core.windows.net/[path]'.

TYPE: str DEFAULT: ''

dataframe_type

str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: str DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

DataFrame.

[source] BigQueryConnector #

Bases: StorageConnector

The BigQuery storage connector provides integration to Google Cloud BigQuery.

You can use it to run bigquery on your GCP cluster and load results into spark dataframe by calling the read API.

Authentication to GCP is handled by uploading the JSON keyfile for service account to the Hopsworks Project. For more information on service accounts and creating keyfile in GCP, read Google Cloud documentation.

The storage connector uses the Google spark-bigquery-connector behind the scenes. To read more about the spark connector, like the spark options or usage, check Apache Spark SQL connector for Google BigQuery.

[source] key_path property #

key_path: str | None

JSON keyfile for service account.

[source] parent_project property #

parent_project: str | None

BigQuery parent project (Google Cloud Project ID of the table to bill for the export).

[source] dataset property #

dataset: str | None

BigQuery dataset (The dataset containing the table).

[source] query_table property #

query_table: str | None

BigQuery table name.

[source] query_project property #

query_project: str | None

BigQuery project (The Google Cloud Project ID of the table).

[source] materialization_dataset property #

materialization_dataset: str | None

BigQuery materialization dataset (The dataset where the materialized view is going to be created, used in case of query).

[source] arguments property #

arguments: dict[str, Any]

Additional spark options.

[source] connector_options #

connector_options() -> dict[str, Any]

Return options to be passed to an external BigQuery connector library.

[source] spark_options #

spark_options() -> dict[str, Any]

Return spark options to be set for BigQuery spark connector.

[source] read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads results from BigQuery into a spark dataframe using the storage connector.

Reading from bigquery is done via either specifying the BigQuery table or BigQuery query. For example, to read from a BigQuery table, set the BigQuery project, dataset and table on storage connector and read directly from the corresponding path.

conn.read()
OR, to read results from a BigQuery query, set Materialization Dataset on storage connector, and pass your SQL to query argument.
conn.read(query='SQL')
Optionally, passing query argument will take priority at runtime if the table options were also set on the storage connector. This allows user to run from both a query or table with same connector, assuming all fields were set. Also, user can set the path argument to a bigquery table path to read at runtime, if table options were not set initially while creating the connector.
conn.read(path='project.dataset.table')

PARAMETER DESCRIPTION
query

BigQuery query. Defaults to None.

TYPE: str | None DEFAULT: None

data_format

Spark data format. Defaults to None.

TYPE: str | None DEFAULT: None

options

Spark options. Defaults to None.

TYPE: dict[str, Any] | None DEFAULT: None

path

BigQuery table path. Defaults to None.

TYPE: str | None DEFAULT: None

dataframe_type

str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: str DEFAULT: 'default'

RAISES DESCRIPTION
ValueError

Malformed arguments.

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

Dataframe: A Spark dataframe.

[source] GcsConnector #

Bases: StorageConnector

This storage connector provides integration to Google Cloud Storage (GCS).

Once you create a connector in FeatureStore, you can transact data from a GCS bucket into a spark dataframe by calling the read API.

Authentication to GCP is handled by uploading the JSON keyfile for service account to the Hopsworks Project. For more information on service accounts and creating keyfile in GCP, read Google Cloud documentation.

The connector also supports the optional encryption method Customer Supplied Encryption Key by Google. The encryption details are stored as Secrets in the FeatureStore for keeping it secure. Read more about encryption on Google Documentation.

The storage connector uses the Google gcs-connector-hadoop behind the scenes. For more information, check out Google Cloud Storage Connector for Spark and Hadoop.

[source] key_path property #

key_path: str | None

JSON keyfile for service account.

[source] algorithm property #

algorithm: str | None

Encryption Algorithm.

[source] encryption_key property #

encryption_key: str | None

Encryption Key.

[source] encryption_key_hash property #

encryption_key_hash: str | None

Encryption Key Hash.

[source] path property #

path: str | None

The path of the connector along with gs file system prefixed.

[source] bucket property #

bucket: str | None

GCS Bucket.

[source] spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

[source] read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str = "",
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads GCS path into a dataframe using the storage connector.

To read directly from the default bucket, you can omit the path argument:

conn.read(data_format='spark_formats')
Or to read objects from default bucket provide the object path without gsUtil URI schema. For example, following will read from a path gs://bucket_on_connector/Path/object :
conn.read(data_format='spark_formats', paths='Path/object')
Or to read with full gsUtil URI path,
conn.read(data_format='spark_formats',path='gs://BUCKET/DATA')
Parameters: query: Not relevant for GCS connectors. data_format: Spark data format. Defaults to None. options: Spark options. Defaults to None. path: GCS path. Defaults to None. dataframe_type: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

RAISES DESCRIPTION
ValueError

Malformed arguments.

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

Dataframe: A Spark dataframe.

[source] prepare_spark #

prepare_spark(path: str | None = None) -> str | None

Prepare Spark to use this Storage Connector.

conn.prepare_spark()
spark.read.format("json").load("gs://bucket/path")
# or
spark.read.format("json").load(conn.prepare_spark("gs://bucket/path"))
PARAMETER DESCRIPTION
path

Path to prepare for reading from Google cloud storage. Defaults to None.

TYPE: str | None DEFAULT: None

[source] HopsFSConnector #

Bases: StorageConnector

[source] spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

[source] JdbcConnector #

Bases: StorageConnector

[source] connection_string property #

connection_string: str | None

JDBC connection string.

[source] arguments property #

arguments: dict[str, Any] | None

Additional JDBC arguments.

When running hsfs with PySpark/Spark in Hopsworks, the driver is automatically provided in the classpath but you need to set the driver argument to com.mysql.cj.jdbc.Driver when creating the Storage Connector.

[source] spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

[source] read #

read(
    query: str,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a query into a dataframe using the storage connector.

PARAMETER DESCRIPTION
query

A SQL query to be read.

TYPE: str

data_format

Not relevant for JDBC based connectors.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the JDBC connector.

TYPE: dict[str, Any] | None DEFAULT: None

path

Not relevant for JDBC based connectors.

TYPE: str | None DEFAULT: None

dataframe_type

str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: str DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

DataFrame.

[source] KafkaConnector #

Bases: StorageConnector

[source] bootstrap_servers property #

bootstrap_servers: list[str] | None

Bootstrap servers string.

[source] security_protocol property #

security_protocol: str | None

Bootstrap servers string.

[source] ssl_truststore_location property #

ssl_truststore_location: str | None

Bootstrap servers string.

[source] ssl_keystore_location property #

ssl_keystore_location: str | None

Bootstrap servers string.

[source] ssl_endpoint_identification_algorithm property #

ssl_endpoint_identification_algorithm: str | None

Bootstrap servers string.

[source] options property #

options: dict[str, Any]

Bootstrap servers string.

[source] create_pem_files #

create_pem_files(kafka_options: dict[str, Any]) -> None

Create PEM (Privacy Enhanced Mail) files for Kafka SSL authentication.

This method writes the necessary PEM files for SSL authentication with Kafka, using the provided keystore and truststore locations and passwords. The generated file paths are stored as the following instance variables:

- self.ca_chain_path: Path to the generated CA chain PEM file.
- self.client_cert_path: Path to the generated client certificate PEM file.
- self.client_key_path: Path to the generated client key PEM file.

These files are used for configuring secure Kafka connections (e.g., with Spark or confluent_kafka). The method is idempotent and will only create the files once per connector instance.

[source] kafka_options #

kafka_options(distribute=True) -> dict[str, Any]

Return prepared options to be passed to kafka, based on the additional arguments.

See https://kafka.apache.org/documentation/.

[source] confluent_options #

confluent_options() -> dict[str, Any]

Return prepared options to be passed to confluent_kafka, based on the provided apache spark configuration.

Right now only producer values with Importance >= medium are implemented.

See https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html.

[source] spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

This is done by just adding 'kafka.' prefix to kafka_options.

See https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations.

[source] read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: str = "default",
) -> None

NOT SUPPORTED.

[source] read_stream #

read_stream(
    topic: str,
    topic_pattern: bool = False,
    message_format: str = "avro",
    schema: str | None = None,
    options: dict[str, Any] | None = None,
    include_metadata: bool = False,
) -> TypeVar("pyspark.sql.DataFrame") | TypeVar(
    "pyspark.sql.streaming.StreamingQuery"
)

Reads a Kafka stream from a topic or multiple topics into a Dataframe.

Engine Support

Spark only

Reading from data streams using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.

PARAMETER DESCRIPTION
topic

Name or pattern of the topic(s) to subscribe to.

TYPE: str

topic_pattern

Flag to indicate if topic string is a pattern. Defaults to False.

TYPE: bool DEFAULT: False

message_format

The format of the messages to use for decoding. Can be "avro" or "json". Defaults to "avro".

TYPE: str DEFAULT: 'avro'

schema

Optional schema, to use for decoding, can be an Avro schema string for "avro" message format, or for JSON encoding a Spark StructType schema, or a DDL formatted string. Defaults to None.

TYPE: str | None DEFAULT: None

options

Additional options as key/value string pairs to be passed to Spark. Defaults to {}.

TYPE: dict[str, Any] | None DEFAULT: None

include_metadata

Indicate whether to return additional metadata fields from messages in the stream. Otherwise, only the decoded value fields are returned. Defaults to False.

TYPE: bool DEFAULT: False

RAISES DESCRIPTION
ValueError

Malformed arguments.

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.sql.streaming.StreamingQuery')

StreamingDataframe: A Spark streaming dataframe.

[source] RedshiftConnector #

Bases: StorageConnector

[source] cluster_identifier property #

cluster_identifier: str | None

Cluster identifier for redshift cluster.

[source] database_driver property #

database_driver: str | None

Database endpoint for redshift cluster.

[source] database_endpoint property #

database_endpoint: str | None

Database endpoint for redshift cluster.

[source] database_name property #

database_name: str | None

Database name for redshift cluster.

[source] database_port property #

database_port: int | str | None

Database port for redshift cluster.

[source] table_name property #

table_name: str | None

Table name for redshift cluster.

[source] database_user_name property #

database_user_name: str | None

Database username for redshift cluster.

[source] auto_create property #

auto_create: bool | None

Database username for redshift cluster.

[source] database_group property #

database_group: str | None

Database username for redshift cluster.

[source] database_password property #

database_password: str | None

Database password for redshift cluster.

[source] iam_role property #

iam_role: Any | None

IAM role.

[source] expiration property #

expiration: int | str | None

Cluster temporary credential expiration time.

[source] arguments property #

arguments: str | None

Additional JDBC, REDSHIFT, or Snowflake arguments.

[source] connector_options #

connector_options() -> dict[str, Any]

Return options to be passed to an external Redshift connector library.

[source] spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

[source] read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a table or query into a dataframe using the storage connector.

PARAMETER DESCRIPTION
query

By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. Defaults to None.

TYPE: str | None DEFAULT: None

data_format

Not relevant for JDBC based connectors such as Redshift.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the JDBC connector.

TYPE: dict[str, Any] | None DEFAULT: None

path

Not relevant for JDBC based connectors such as Redshift.

TYPE: str | None DEFAULT: None

dataframe_type

str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: str DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

DataFrame.

[source] refetch #

refetch() -> None

Refetch storage connector in order to retrieve updated temporary credentials.

[source] S3Connector #

Bases: StorageConnector

[source] access_key property #

access_key: str | None

Access key.

[source] secret_key property #

secret_key: str | None

Secret key.

[source] server_encryption_algorithm property #

server_encryption_algorithm: str | None

Encryption algorithm if server-side S3 bucket encryption is enabled.

[source] server_encryption_key property #

server_encryption_key: str | None

Encryption key if server-side S3 bucket encryption is enabled.

[source] bucket property #

bucket: str | None

Return the bucket for S3 connectors.

[source] region property #

region: str | None

Return the region for S3 connectors.

[source] session_token property #

session_token: str | None

Session token.

[source] iam_role property #

iam_role: str | None

IAM role.

[source] path property #

path: str | None

If the connector refers to a path (e.g. S3) - return the path of the connector.

[source] arguments property #

arguments: dict[str, Any] | None

Additional spark options for the S3 connector, passed as a dictionary.

These are set using the Spark Options field in the UI when creating the connector. Example: {"fs.s3a.endpoint": "s3.eu-west-1.amazonaws.com", "fs.s3a.path.style.access": "true"}.

[source] spark_options #

spark_options() -> dict[str, str]

Return prepared options to be passed to Spark, based on the additional arguments.

[source] prepare_spark #

prepare_spark(path: str | None = None) -> str | None

Prepare Spark to use this Storage Connector.

conn.prepare_spark()

spark.read.format("json").load("s3a://[bucket]/path")

# or
spark.read.format("json").load(conn.prepare_spark("s3a://[bucket]/path"))
PARAMETER DESCRIPTION
path

Path to prepare for reading from cloud storage.

TYPE: str | None DEFAULT: None

[source] connector_options #

connector_options() -> dict[str, Any]

Return options to be passed to an external S3 connector library.

[source] read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str = "",
    dataframe_type: Literal[
        "default",
        "spark",
        "pandas",
        "polars",
        "numpy",
        "python",
    ] = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a query or a path into a dataframe using the storage connector.

Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.

PARAMETER DESCRIPTION
query

Not relevant for S3 connectors.

TYPE: str | None DEFAULT: None

data_format

The file format of the files to be read, e.g. csv, parquet.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the S3 connector.

TYPE: dict[str, Any] | None DEFAULT: None

path

Path within the bucket to be read.

TYPE: str DEFAULT: ''

dataframe_type

The type of the returned dataframe. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: Literal['default', 'spark', 'pandas', 'polars', 'numpy', 'python'] DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

DataFrame.

[source] SnowflakeConnector #

Bases: StorageConnector

[source] url property #

url: str | None

URL of the Snowflake storage connector.

[source] warehouse property #

warehouse: str | None

Warehouse of the Snowflake storage connector.

[source] database property #

database: str | None

Database of the Snowflake storage connector.

[source] user property #

user: Any | None

User of the Snowflake storage connector.

[source] password property #

password: str | None

Password of the Snowflake storage connector.

[source] token property #

token: str | None

OAuth token of the Snowflake storage connector.

[source] schema property #

schema: str | None

Schema of the Snowflake storage connector.

[source] table property #

table: str | None

Table of the Snowflake storage connector.

[source] role property #

role: Any | None

Role of the Snowflake storage connector.

[source] account property #

account: str | None

Account of the Snowflake storage connector.

[source] application property #

application: Any

Application of the Snowflake storage connector.

[source] options property #

options: dict[str, Any] | None

Additional options for the Snowflake storage connector.

[source] private_key property #

private_key: str | None

Path to the private key file for key pair authentication.

[source] passphrase property #

passphrase: str | None

Passphrase for the private key file.

[source] snowflake_connector_options #

snowflake_connector_options() -> dict[str, Any] | None

Alias for connector_options.

[source] connector_options #

connector_options() -> dict[str, Any] | None

Prepare a Python dictionary with the needed arguments for you to connect to a Snowflake database.

It is useful for the snowflake.connector Python library.

import snowflake.connector

sc = fs.get_storage_connector("snowflake_conn")
ctx = snowflake.connector.connect(**sc.connector_options())

[source] spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

[source] read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a table or query into a dataframe using the storage connector.

PARAMETER DESCRIPTION
query

By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. Defaults to None.

TYPE: str | None DEFAULT: None

data_format

Not relevant for Snowflake connectors.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the engine.

TYPE: dict[str, Any] | None DEFAULT: None

path

Not relevant for Snowflake connectors.

TYPE: str | None DEFAULT: None

dataframe_type

str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: str DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

DataFrame.