Skip to content

Storage Connector#

Retrieval#

[source]

get_storage_connector#

FeatureStore.get_storage_connector(name)

Get a previously created storage connector from the feature store.

Storage connectors encapsulate all information needed for the execution engine to read and write to specific storage. This storage can be S3, a JDBC compliant database or the distributed filesystem HOPSFS.

If you want to connect to the online feature store, see the get_online_storage_connector method to get the JDBC connector for the Online Feature Store.

Getting a Storage Connector

sc = fs.get_storage_connector("demo_fs_meb10000_Training_Datasets")

td = fs.create_training_dataset(..., storage_connector=sc, ...)

Arguments

  • name str: Name of the storage connector to retrieve.

Returns

StorageConnector. Storage connector object.


[source]

get_online_storage_connector#

FeatureStore.get_online_storage_connector()

Get the storage connector for the Online Feature Store of the respective project's feature store.

The returned storage connector depends on the project that you are connected to.

Returns

StorageConnector. JDBC storage connector to the Online Feature Store.


HopsFS#

Properties#

[source]

description#

User provided description of the storage connector.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


Methods#

[source]

read#

HopsFSConnector.read(query=None, data_format=None, options={}, path=None)

Reads a query or a path into a dataframe using the storage connector.

Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.


[source]

refetch#

HopsFSConnector.refetch()

Refetch storage connector.


[source]

spark_options#

HopsFSConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

HopsFSConnector.to_dict()

[source]

update_from_response_json#

HopsFSConnector.update_from_response_json(json_dict)

JDBC#

Properties#

[source]

arguments#

Additional JDBC arguments. When running hsfs with PySpark/Spark in Hopsworks, the driver is automatically provided in the classpath but you need to set the driver argument to com.mysql.cj.jdbc.Driver when creating the Storage Connector


[source]

connection_string#

JDBC connection string.


[source]

description#

User provided description of the storage connector.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


Methods#

[source]

read#

JdbcConnector.read(query, data_format=None, options={}, path=None)

Reads a query into a dataframe using the storage connector.


[source]

refetch#

JdbcConnector.refetch()

Refetch storage connector.


[source]

spark_options#

JdbcConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

JdbcConnector.to_dict()

[source]

update_from_response_json#

JdbcConnector.update_from_response_json(json_dict)

S3#

Properties#

[source]

access_key#

Access key.


[source]

bucket#

Return the bucket for S3 connectors.


[source]

description#

User provided description of the storage connector.


[source]

iam_role#

IAM role.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


[source]

path#

If the connector refers to a path (e.g. S3) - return the path of the connector


[source]

secret_key#

Secret key.


[source]

server_encryption_algorithm#

Encryption algorithm if server-side S3 bucket encryption is enabled.


[source]

server_encryption_key#

Encryption key if server-side S3 bucket encryption is enabled.


[source]

session_token#

Session token.


Methods#

[source]

prepare_spark#

S3Connector.prepare_spark(path=None)

Prepare Spark to use this Storage Connector.

conn.prepare_spark()

spark.read.format("json").load("s3a://[bucket]/path")

# or
spark.read.format("json").load(conn.prepare_spark("s3a://[bucket]/path"))

Arguments

  • path Optional[str]: Path to prepare for reading from cloud storage. Defaults to None.

[source]

read#

S3Connector.read(query=None, data_format=None, options={}, path=None)

Reads a query or a path into a dataframe using the storage connector.

Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.


[source]

refetch#

S3Connector.refetch()

Refetch storage connector.


[source]

spark_options#

S3Connector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

S3Connector.to_dict()

[source]

update_from_response_json#

S3Connector.update_from_response_json(json_dict)

Redshift#

Properties#

[source]

arguments#

Additional JDBC, REDSHIFT, or Snowflake arguments.


[source]

auto_create#

Database username for redshift cluster.


[source]

cluster_identifier#

Cluster identifier for redshift cluster.


[source]

database_driver#

Database endpoint for redshift cluster.


[source]

database_endpoint#

Database endpoint for redshift cluster.


[source]

database_group#

Database username for redshift cluster.


[source]

database_name#

Database name for redshift cluster.


[source]

database_password#

Database password for redshift cluster.


[source]

database_port#

Database port for redshift cluster.


[source]

database_user_name#

Database username for redshift cluster.


[source]

description#

User provided description of the storage connector.


[source]

expiration#

Cluster temporary credential expiration time.


[source]

iam_role#

IAM role.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


[source]

table_name#

Table name for redshift cluster.


Methods#

[source]

read#

RedshiftConnector.read(query, data_format=None, options={}, path=None)

Reads a query into a dataframe using the storage connector.


[source]

refetch#

RedshiftConnector.refetch()

Refetch storage connector in order to retrieve updated temporary credentials.


[source]

spark_options#

RedshiftConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

RedshiftConnector.to_dict()

[source]

update_from_response_json#

RedshiftConnector.update_from_response_json(json_dict)

Azure Data Lake Storage#

Properties#

[source]

account_name#

Account name of the ADLS storage connector


[source]

application_id#

Application ID of the ADLS storage connector


[source]

container_name#

Container name of the ADLS storage connector


[source]

description#

User provided description of the storage connector.


[source]

directory_id#

Directory ID of the ADLS storage connector


[source]

generation#

Generation of the ADLS storage connector


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


[source]

path#

If the connector refers to a path (e.g. ADLS) - return the path of the connector


[source]

service_credential#

Service credential of the ADLS storage connector


Methods#

[source]

prepare_spark#

AdlsConnector.prepare_spark(path=None)

Prepare Spark to use this Storage Connector.

conn.prepare_spark()

spark.read.format("json").load("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]")

# or
spark.read.format("json").load(conn.prepare_spark("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]"))

Arguments

  • path Optional[str]: Path to prepare for reading from cloud storage. Defaults to None.

[source]

read#

AdlsConnector.read(query=None, data_format=None, options={}, path=None)

Reads a query or a path into a dataframe using the storage connector.

Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.


[source]

refetch#

AdlsConnector.refetch()

Refetch storage connector.


[source]

spark_options#

AdlsConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

AdlsConnector.to_dict()

[source]

update_from_response_json#

AdlsConnector.update_from_response_json(json_dict)

Snowflake#

Properties#

[source]

account#

Account of the Snowflake storage connector


[source]

application#

Application of the Snowflake storage connector


[source]

database#

Database of the Snowflake storage connector


[source]

description#

User provided description of the storage connector.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


[source]

options#

Additional options for the Snowflake storage connector


[source]

password#

Password of the Snowflake storage connector


[source]

role#

Role of the Snowflake storage connector


[source]

schema#

Schema of the Snowflake storage connector


[source]

table#

Table of the Snowflake storage connector


[source]

token#

OAuth token of the Snowflake storage connector


[source]

url#

URL of the Snowflake storage connector


[source]

user#

User of the Snowflake storage connector


[source]

warehouse#

Warehouse of the Snowflake storage connector


Methods#

[source]

read#

SnowflakeConnector.read(query, data_format=None, options={}, path=None)

Reads a query into a dataframe using the storage connector.


[source]

refetch#

SnowflakeConnector.refetch()

Refetch storage connector.


[source]

snowflake_connector_options#

SnowflakeConnector.snowflake_connector_options()

In order to use the snowflake.connector Python library, this method prepares a Python dictionary with the needed arguments for you to connect to a Snowflake database.

import snowflake.connector

sc = fs.get_storage_connector("snowflake_conn")
ctx = snowflake.connector.connect(**sc.snowflake_connector_options())

[source]

spark_options#

SnowflakeConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

SnowflakeConnector.to_dict()

[source]

update_from_response_json#

SnowflakeConnector.update_from_response_json(json_dict)