Storage Connector#
Retrieval#
get_storage_connector#
FeatureStore.get_storage_connector(name)
Get a previously created storage connector from the feature store.
Storage connectors encapsulate all information needed for the execution engine to read and write to specific storage. This storage can be S3, a JDBC compliant database or the distributed filesystem HOPSFS.
If you want to connect to the online feature store, see the
get_online_storage_connector
method to get the JDBC connector for the Online
Feature Store.
Getting a Storage Connector
sc = fs.get_storage_connector("demo_fs_meb10000_Training_Datasets")
td = fs.create_training_dataset(..., storage_connector=sc, ...)
Arguments
- name
str
: Name of the storage connector to retrieve.
Returns
StorageConnector
. Storage connector object.
get_online_storage_connector#
FeatureStore.get_online_storage_connector()
Get the storage connector for the Online Feature Store of the respective project's feature store.
The returned storage connector depends on the project that you are connected to.
Returns
StorageConnector
. JDBC storage connector to the Online Feature Store.
HopsFS#
Properties#
description#
User provided description of the storage connector.
id#
Id of the storage connector uniquely identifying it in the Feature store.
name#
Name of the storage connector.
Methods#
read#
HopsFSConnector.read(query=None, data_format=None, options={}, path=None)
Reads a query or a path into a dataframe using the storage connector.
Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.
refetch#
HopsFSConnector.refetch()
Refetch storage connector.
spark_options#
HopsFSConnector.spark_options()
Return prepared options to be passed to Spark, based on the additional arguments.
to_dict#
HopsFSConnector.to_dict()
update_from_response_json#
HopsFSConnector.update_from_response_json(json_dict)
JDBC#
Properties#
arguments#
Additional JDBC arguments. When running hsfs with PySpark/Spark in Hopsworks,
the driver is automatically provided in the classpath but you need to set the driver
argument to
com.mysql.cj.jdbc.Driver
when creating the Storage Connector
connection_string#
JDBC connection string.
description#
User provided description of the storage connector.
id#
Id of the storage connector uniquely identifying it in the Feature store.
name#
Name of the storage connector.
Methods#
read#
JdbcConnector.read(query, data_format=None, options={}, path=None)
Reads a query into a dataframe using the storage connector.
refetch#
JdbcConnector.refetch()
Refetch storage connector.
spark_options#
JdbcConnector.spark_options()
Return prepared options to be passed to Spark, based on the additional arguments.
to_dict#
JdbcConnector.to_dict()
update_from_response_json#
JdbcConnector.update_from_response_json(json_dict)
S3#
Properties#
access_key#
Access key.
bucket#
Return the bucket for S3 connectors.
description#
User provided description of the storage connector.
iam_role#
IAM role.
id#
Id of the storage connector uniquely identifying it in the Feature store.
name#
Name of the storage connector.
path#
If the connector refers to a path (e.g. S3) - return the path of the connector
secret_key#
Secret key.
server_encryption_algorithm#
Encryption algorithm if server-side S3 bucket encryption is enabled.
server_encryption_key#
Encryption key if server-side S3 bucket encryption is enabled.
session_token#
Session token.
Methods#
prepare_spark#
S3Connector.prepare_spark(path=None)
Prepare Spark to use this Storage Connector.
conn.prepare_spark()
spark.read.format("json").load("s3a://[bucket]/path")
# or
spark.read.format("json").load(conn.prepare_spark("s3a://[bucket]/path"))
Arguments
- path
Optional[str]
: Path to prepare for reading from cloud storage. Defaults toNone
.
read#
S3Connector.read(query=None, data_format=None, options={}, path=None)
Reads a query or a path into a dataframe using the storage connector.
Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.
refetch#
S3Connector.refetch()
Refetch storage connector.
spark_options#
S3Connector.spark_options()
Return prepared options to be passed to Spark, based on the additional arguments.
to_dict#
S3Connector.to_dict()
update_from_response_json#
S3Connector.update_from_response_json(json_dict)
Redshift#
Properties#
arguments#
Additional JDBC, REDSHIFT, or Snowflake arguments.
auto_create#
Database username for redshift cluster.
cluster_identifier#
Cluster identifier for redshift cluster.
database_driver#
Database endpoint for redshift cluster.
database_endpoint#
Database endpoint for redshift cluster.
database_group#
Database username for redshift cluster.
database_name#
Database name for redshift cluster.
database_password#
Database password for redshift cluster.
database_port#
Database port for redshift cluster.
database_user_name#
Database username for redshift cluster.
description#
User provided description of the storage connector.
expiration#
Cluster temporary credential expiration time.
iam_role#
IAM role.
id#
Id of the storage connector uniquely identifying it in the Feature store.
name#
Name of the storage connector.
table_name#
Table name for redshift cluster.
Methods#
read#
RedshiftConnector.read(query, data_format=None, options={}, path=None)
Reads a query into a dataframe using the storage connector.
refetch#
RedshiftConnector.refetch()
Refetch storage connector in order to retrieve updated temporary credentials.
spark_options#
RedshiftConnector.spark_options()
Return prepared options to be passed to Spark, based on the additional arguments.
to_dict#
RedshiftConnector.to_dict()
update_from_response_json#
RedshiftConnector.update_from_response_json(json_dict)
Azure Data Lake Storage#
Properties#
account_name#
Account name of the ADLS storage connector
application_id#
Application ID of the ADLS storage connector
container_name#
Container name of the ADLS storage connector
description#
User provided description of the storage connector.
directory_id#
Directory ID of the ADLS storage connector
generation#
Generation of the ADLS storage connector
id#
Id of the storage connector uniquely identifying it in the Feature store.
name#
Name of the storage connector.
path#
If the connector refers to a path (e.g. ADLS) - return the path of the connector
service_credential#
Service credential of the ADLS storage connector
Methods#
prepare_spark#
AdlsConnector.prepare_spark(path=None)
Prepare Spark to use this Storage Connector.
conn.prepare_spark()
spark.read.format("json").load("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]")
# or
spark.read.format("json").load(conn.prepare_spark("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]"))
Arguments
- path
Optional[str]
: Path to prepare for reading from cloud storage. Defaults toNone
.
read#
AdlsConnector.read(query=None, data_format=None, options={}, path=None)
Reads a query or a path into a dataframe using the storage connector.
Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.
refetch#
AdlsConnector.refetch()
Refetch storage connector.
spark_options#
AdlsConnector.spark_options()
Return prepared options to be passed to Spark, based on the additional arguments.
to_dict#
AdlsConnector.to_dict()
update_from_response_json#
AdlsConnector.update_from_response_json(json_dict)
Snowflake#
Properties#
account#
Account of the Snowflake storage connector
application#
Application of the Snowflake storage connector
database#
Database of the Snowflake storage connector
description#
User provided description of the storage connector.
id#
Id of the storage connector uniquely identifying it in the Feature store.
name#
Name of the storage connector.
options#
Additional options for the Snowflake storage connector
password#
Password of the Snowflake storage connector
role#
Role of the Snowflake storage connector
schema#
Schema of the Snowflake storage connector
table#
Table of the Snowflake storage connector
token#
OAuth token of the Snowflake storage connector
url#
URL of the Snowflake storage connector
user#
User of the Snowflake storage connector
warehouse#
Warehouse of the Snowflake storage connector
Methods#
read#
SnowflakeConnector.read(query, data_format=None, options={}, path=None)
Reads a query into a dataframe using the storage connector.
refetch#
SnowflakeConnector.refetch()
Refetch storage connector.
snowflake_connector_options#
SnowflakeConnector.snowflake_connector_options()
In order to use the snowflake.connector
Python library, this method
prepares a Python dictionary with the needed arguments for you to connect to
a Snowflake database.
import snowflake.connector
sc = fs.get_storage_connector("snowflake_conn")
ctx = snowflake.connector.connect(**sc.snowflake_connector_options())
spark_options#
SnowflakeConnector.spark_options()
Return prepared options to be passed to Spark, based on the additional arguments.
to_dict#
SnowflakeConnector.to_dict()
update_from_response_json#
SnowflakeConnector.update_from_response_json(json_dict)
Google Cloud Storage#
This storage connector provides integration to Google Cloud Storage (GCS).
Once you create a connector in FeatureStore, you can transact data from a GCS bucket into a spark dataframe
by calling the read
API.
Authentication to GCP is handled by uploading the JSON keyfile for service account
to the Hopsworks Project. For more information
on service accounts and creating keyfile in GCP, read Google Cloud documentation.
The connector also supports the optional encryption method Customer Supplied Encryption Key
by Google.
The encryption details are stored as Secrets
in the FeatureStore for keeping it secure.
Read more about encryption on Google Documentation.
The storage connector uses the Google gcs-connector-hadoop
behind the scenes. For more information, check out Google Cloud Storage Connector for Spark and Hadoop
Properties#
algorithm#
Encryption Algorithm
bucket#
GCS Bucket
description#
User provided description of the storage connector.
encryption_key#
Encryption Key
encryption_key_hash#
Encryption Key Hash
id#
Id of the storage connector uniquely identifying it in the Feature store.
key_path#
JSON keyfile for service account
name#
Name of the storage connector.
path#
the path of the connector along with gs file system prefixed
Methods#
prepare_spark#
GcsConnector.prepare_spark(path=None)
Prepare Spark to use this Storage Connector.
conn.prepare_spark()
spark.read.format("json").load("gs://bucket/path")
# or
spark.read.format("json").load(conn.prepare_spark("gs://bucket/path"))
Arguments
- path
Optional[str]
: Path to prepare for reading from Google cloud storage. Defaults toNone
.
read#
GcsConnector.read(query=None, data_format=None, options={}, path=None)
Reads GCS path into a dataframe using the storage connector.
conn.read(data_format='spark_formats',path='gs://BUCKET/DATA')
Arguments
- data_format
Optional[str]
: Spark data format. Defaults toNone
. - options
dict
: Spark options. Defaults toNone
. - path
Optional[str]
: GCS path. Defaults toNone
.
Raises
ValueError
: Malformed arguments.
Returns
Dataframe
: A Spark dataframe.
refetch#
GcsConnector.refetch()
Refetch storage connector.
spark_options#
GcsConnector.spark_options()
Return prepared options to be passed to Spark, based on the additional arguments.
to_dict#
GcsConnector.to_dict()
update_from_response_json#
GcsConnector.update_from_response_json(json_dict)
BigQuery#
The BigQuery storage connector provides integration to Google Cloud BigQuery.
You can use it to run bigquery on your GCP cluster and load results into spark dataframe by calling the read
API.
Authentication to GCP is handled by uploading the JSON keyfile for service account
to the Hopsworks Project. For more information
on service accounts and creating keyfile in GCP, read Google Cloud documentation.
The storage connector uses the Google spark-bigquery-connector
behind the scenes.
To read more about the spark connector, like the spark options or usage, check Apache Spark SQL connector for Google BigQuery.
Properties#
arguments#
Additional spark options
dataset#
BigQuery dataset (The dataset containing the table)
description#
User provided description of the storage connector.
id#
Id of the storage connector uniquely identifying it in the Feature store.
key_path#
JSON keyfile for service account
materialization_dataset#
BigQuery materialization dataset (The dataset where the materialized view is going to be created, used in case of query)
name#
Name of the storage connector.
parent_project#
BigQuery parent project (Google Cloud Project ID of the table to bill for the export)
query_project#
BigQuery project (The Google Cloud Project ID of the table)
query_table#
BigQuery table name
Methods#
read#
BigQueryConnector.read(query=None, data_format=None, options={}, path=None)
Reads results from BigQuery into a spark dataframe using the storage connector.
Reading from bigquery is done via either specifying the BigQuery table or BigQuery query. For example, to read from a BigQuery table, set the BigQuery project, dataset and table on storage connector and read directly from the corresponding path.
conn.read()
Materialization Dataset
on storage connector,
and pass your SQL to query
argument.
conn.read(query='SQL')
query
argument will take priority at runtime if the table options were also set
on the storage connector. This allows user to run from both a query or table with same connector, assuming
all fields were set.
Also, user can set the path
argument to a bigquery table path to read at runtime,
if table options were not set initially while creating the connector.
conn.read(path='project.dataset.table')
Arguments
- query
Optional[str]
: BigQuery query. Defaults toNone
. - data_format
Optional[str]
: Spark data format. Defaults toNone
. - options
dict
: Spark options. Defaults toNone
. - path
Optional[str]
: BigQuery table path. Defaults toNone
.
Raises
ValueError
: Malformed arguments.
Returns
Dataframe
: A Spark dataframe.
refetch#
BigQueryConnector.refetch()
Refetch storage connector.
spark_options#
BigQueryConnector.spark_options()
Return spark options to be set for BigQuery spark connector
to_dict#
BigQueryConnector.to_dict()
update_from_response_json#
BigQueryConnector.update_from_response_json(json_dict)