Skip to content

Storage Connector#

Retrieval#

[source]

get_storage_connector#

FeatureStore.get_storage_connector(name)

Get a previously created storage connector from the feature store.

Storage connectors encapsulate all information needed for the execution engine to read and write to specific storage. This storage can be S3, a JDBC compliant database or the distributed filesystem HOPSFS.

If you want to connect to the online feature store, see the get_online_storage_connector method to get the JDBC connector for the Online Feature Store.

Example

# connect to the Feature Store
fs = ...

sc = fs.get_storage_connector("demo_fs_meb10000_Training_Datasets")

Arguments

  • name str: Name of the storage connector to retrieve.

Returns

StorageConnector. Storage connector object.


[source]

get_online_storage_connector#

FeatureStore.get_online_storage_connector()

Get the storage connector for the Online Feature Store of the respective project's feature store.

The returned storage connector depends on the project that you are connected to.

Example

# connect to the Feature Store
fs = ...

online_storage_connector = fs.get_online_storage_connector()

Returns

StorageConnector. JDBC storage connector to the Online Feature Store.


HopsFS#

Properties#

[source]

description#

User provided description of the storage connector.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


Methods#

[source]

connector_options#

HopsFSConnector.connector_options()

Return prepared options to be passed to an external connector library. Not implemented for this connector type.


[source]

get_feature_groups#

HopsFSConnector.get_feature_groups()

Get the feature groups using this storage connector, based on explicit provenance. Only the accessible feature groups are returned. For more items use the base method - get_feature_groups_provenance

Returns

`List[FeatureGroup]: List of feature groups.


[source]

get_feature_groups_provenance#

HopsFSConnector.get_feature_groups_provenance()

Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

Returns

ExplicitProvenance.Links: the feature groups generated using this storage connector

Raises

hsfs.client.exceptions.RestAPIError.


[source]

read#

HopsFSConnector.read(
    query=None, data_format=None, options=None, path=None, dataframe_type="default"
)

Reads a query or a path into a dataframe using the storage connector.

Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.

Arguments

  • query str | None: By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. Defaults to None.
  • data_format str | None: When reading from object stores such as S3, HopsFS and ADLS, specify the file format to be read, e.g. csv, parquet.
  • options Dict[str, Any] | None: Any additional key/value options to be passed to the connector.
  • path str | None: Path to be read from within the bucket of the storage connector. Not relevant for JDBC or database based connectors such as Snowflake, JDBC or Redshift.
  • dataframe_type str: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

Returns

DataFrame.


[source]

refetch#

HopsFSConnector.refetch()

Refetch storage connector.


[source]

spark_options#

HopsFSConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

HopsFSConnector.to_dict()

[source]

update_from_response_json#

HopsFSConnector.update_from_response_json(json_dict)

JDBC#

Properties#

[source]

arguments#

Additional JDBC arguments. When running hsfs with PySpark/Spark in Hopsworks, the driver is automatically provided in the classpath but you need to set the driver argument to com.mysql.cj.jdbc.Driver when creating the Storage Connector


[source]

connection_string#

JDBC connection string.


[source]

description#

User provided description of the storage connector.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


Methods#

[source]

connector_options#

JdbcConnector.connector_options()

Return prepared options to be passed to an external connector library. Not implemented for this connector type.


[source]

get_feature_groups#

JdbcConnector.get_feature_groups()

Get the feature groups using this storage connector, based on explicit provenance. Only the accessible feature groups are returned. For more items use the base method - get_feature_groups_provenance

Returns

`List[FeatureGroup]: List of feature groups.


[source]

get_feature_groups_provenance#

JdbcConnector.get_feature_groups_provenance()

Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

Returns

ExplicitProvenance.Links: the feature groups generated using this storage connector

Raises

hsfs.client.exceptions.RestAPIError.


[source]

read#

JdbcConnector.read(query, data_format=None, options=None, path=None, dataframe_type="default")

Reads a query into a dataframe using the storage connector.

Arguments

  • query str: A SQL query to be read.
  • data_format str | None: Not relevant for JDBC based connectors.
  • options Dict[str, Any] | None: Any additional key/value options to be passed to the JDBC connector.
  • path str | None: Not relevant for JDBC based connectors.
  • dataframe_type str: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

Returns

DataFrame.


[source]

refetch#

JdbcConnector.refetch()

Refetch storage connector.


[source]

spark_options#

JdbcConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

JdbcConnector.to_dict()

[source]

update_from_response_json#

JdbcConnector.update_from_response_json(json_dict)

S3#

Properties#

[source]

access_key#

Access key.


[source]

arguments#


[source]

bucket#

Return the bucket for S3 connectors.


[source]

description#

User provided description of the storage connector.


[source]

iam_role#

IAM role.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


[source]

path#

If the connector refers to a path (e.g. S3) - return the path of the connector


[source]

secret_key#

Secret key.


[source]

server_encryption_algorithm#

Encryption algorithm if server-side S3 bucket encryption is enabled.


[source]

server_encryption_key#

Encryption key if server-side S3 bucket encryption is enabled.


[source]

session_token#

Session token.


Methods#

[source]

connector_options#

S3Connector.connector_options()

Return prepared options to be passed to an external connector library. Not implemented for this connector type.


[source]

get_feature_groups#

S3Connector.get_feature_groups()

Get the feature groups using this storage connector, based on explicit provenance. Only the accessible feature groups are returned. For more items use the base method - get_feature_groups_provenance

Returns

`List[FeatureGroup]: List of feature groups.


[source]

get_feature_groups_provenance#

S3Connector.get_feature_groups_provenance()

Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

Returns

ExplicitProvenance.Links: the feature groups generated using this storage connector

Raises

hsfs.client.exceptions.RestAPIError.


[source]

prepare_spark#

S3Connector.prepare_spark(path=None)

Prepare Spark to use this Storage Connector.

conn.prepare_spark()

spark.read.format("json").load("s3a://[bucket]/path")

# or
spark.read.format("json").load(conn.prepare_spark("s3a://[bucket]/path"))

Arguments

  • path str | None: Path to prepare for reading from cloud storage. Defaults to None.

[source]

read#

S3Connector.read(query=None, data_format=None, options=None, path="", dataframe_type="default")

Reads a query or a path into a dataframe using the storage connector.

Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.

Arguments

  • query str | None: Not relevant for S3 connectors.
  • data_format str | None: The file format of the files to be read, e.g. csv, parquet.
  • options Dict[str, Any] | None: Any additional key/value options to be passed to the S3 connector.
  • path str: Path within the bucket to be read.
  • dataframe_type str: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

Returns

DataFrame.


[source]

refetch#

S3Connector.refetch()

Refetch storage connector.


[source]

spark_options#

S3Connector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

S3Connector.to_dict()

[source]

update_from_response_json#

S3Connector.update_from_response_json(json_dict)

Redshift#

Properties#

[source]

arguments#

Additional JDBC, REDSHIFT, or Snowflake arguments.


[source]

auto_create#

Database username for redshift cluster.


[source]

cluster_identifier#

Cluster identifier for redshift cluster.


[source]

database_driver#

Database endpoint for redshift cluster.


[source]

database_endpoint#

Database endpoint for redshift cluster.


[source]

database_group#

Database username for redshift cluster.


[source]

database_name#

Database name for redshift cluster.


[source]

database_password#

Database password for redshift cluster.


[source]

database_port#

Database port for redshift cluster.


[source]

database_user_name#

Database username for redshift cluster.


[source]

description#

User provided description of the storage connector.


[source]

expiration#

Cluster temporary credential expiration time.


[source]

iam_role#

IAM role.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


[source]

table_name#

Table name for redshift cluster.


Methods#

[source]

connector_options#

RedshiftConnector.connector_options()

Return prepared options to be passed to an external connector library. Not implemented for this connector type.


[source]

get_feature_groups#

RedshiftConnector.get_feature_groups()

Get the feature groups using this storage connector, based on explicit provenance. Only the accessible feature groups are returned. For more items use the base method - get_feature_groups_provenance

Returns

`List[FeatureGroup]: List of feature groups.


[source]

get_feature_groups_provenance#

RedshiftConnector.get_feature_groups_provenance()

Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

Returns

ExplicitProvenance.Links: the feature groups generated using this storage connector

Raises

hsfs.client.exceptions.RestAPIError.


[source]

read#

RedshiftConnector.read(
    query=None, data_format=None, options=None, path=None, dataframe_type="default"
)

Reads a table or query into a dataframe using the storage connector.

Arguments

  • query str | None: By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. Defaults to None.
  • data_format str | None: Not relevant for JDBC based connectors such as Redshift.
  • options Dict[str, Any] | None: Any additional key/value options to be passed to the JDBC connector.
  • path str | None: Not relevant for JDBC based connectors such as Redshift.
  • dataframe_type str: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

Returns

DataFrame.


[source]

refetch#

RedshiftConnector.refetch()

Refetch storage connector in order to retrieve updated temporary credentials.


[source]

spark_options#

RedshiftConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

RedshiftConnector.to_dict()

[source]

update_from_response_json#

RedshiftConnector.update_from_response_json(json_dict)

Azure Data Lake Storage#

Properties#

[source]

account_name#

Account name of the ADLS storage connector


[source]

application_id#

Application ID of the ADLS storage connector


[source]

container_name#

Container name of the ADLS storage connector


[source]

description#

User provided description of the storage connector.


[source]

directory_id#

Directory ID of the ADLS storage connector


[source]

generation#

Generation of the ADLS storage connector


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


[source]

path#

If the connector refers to a path (e.g. ADLS) - return the path of the connector


[source]

service_credential#

Service credential of the ADLS storage connector


Methods#

[source]

connector_options#

AdlsConnector.connector_options()

Return prepared options to be passed to an external connector library. Not implemented for this connector type.


[source]

get_feature_groups#

AdlsConnector.get_feature_groups()

Get the feature groups using this storage connector, based on explicit provenance. Only the accessible feature groups are returned. For more items use the base method - get_feature_groups_provenance

Returns

`List[FeatureGroup]: List of feature groups.


[source]

get_feature_groups_provenance#

AdlsConnector.get_feature_groups_provenance()

Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

Returns

ExplicitProvenance.Links: the feature groups generated using this storage connector

Raises

hsfs.client.exceptions.RestAPIError.


[source]

prepare_spark#

AdlsConnector.prepare_spark(path=None)

Prepare Spark to use this Storage Connector.

conn.prepare_spark()

spark.read.format("json").load("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]")

# or
spark.read.format("json").load(conn.prepare_spark("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]"))

Arguments

  • path str | None: Path to prepare for reading from cloud storage. Defaults to None.

[source]

read#

AdlsConnector.read(
    query=None, data_format=None, options=None, path="", dataframe_type="default"
)

Reads a path into a dataframe using the storage connector. Arguments

  • query str | None: Not relevant for ADLS connectors.
  • data_format str | None: The file format of the files to be read, e.g. csv, parquet.
  • options Dict[str, Any] | None: Any additional key/value options to be passed to the ADLS connector.
  • path str: Path within the bucket to be read. For example, path=path will read directly from the container specified on connector by constructing the URI as 'abfss://[container-name]@[account_name].dfs.core.windows.net/[path]'. If no path is specified default container path will be used from connector.
  • dataframe_type str: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

Returns

DataFrame.


[source]

refetch#

AdlsConnector.refetch()

Refetch storage connector.


[source]

spark_options#

AdlsConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

AdlsConnector.to_dict()

[source]

update_from_response_json#

AdlsConnector.update_from_response_json(json_dict)

Snowflake#

Properties#

[source]

account#

Account of the Snowflake storage connector


[source]

application#

Application of the Snowflake storage connector


[source]

database#

Database of the Snowflake storage connector


[source]

description#

User provided description of the storage connector.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


[source]

options#

Additional options for the Snowflake storage connector


[source]

password#

Password of the Snowflake storage connector


[source]

role#

Role of the Snowflake storage connector


[source]

schema#

Schema of the Snowflake storage connector


[source]

table#

Table of the Snowflake storage connector


[source]

token#

OAuth token of the Snowflake storage connector


[source]

url#

URL of the Snowflake storage connector


[source]

user#

User of the Snowflake storage connector


[source]

warehouse#

Warehouse of the Snowflake storage connector


Methods#

[source]

connector_options#

SnowflakeConnector.connector_options()

In order to use the snowflake.connector Python library, this method prepares a Python dictionary with the needed arguments for you to connect to a Snowflake database.

import snowflake.connector

sc = fs.get_storage_connector("snowflake_conn")
ctx = snowflake.connector.connect(**sc.connector_options())

[source]

get_feature_groups#

SnowflakeConnector.get_feature_groups()

Get the feature groups using this storage connector, based on explicit provenance. Only the accessible feature groups are returned. For more items use the base method - get_feature_groups_provenance

Returns

`List[FeatureGroup]: List of feature groups.


[source]

get_feature_groups_provenance#

SnowflakeConnector.get_feature_groups_provenance()

Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

Returns

ExplicitProvenance.Links: the feature groups generated using this storage connector

Raises

hsfs.client.exceptions.RestAPIError.


[source]

read#

SnowflakeConnector.read(
    query=None, data_format=None, options=None, path=None, dataframe_type="default"
)

Reads a table or query into a dataframe using the storage connector.

Arguments

  • query str | None: By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. Defaults to None.
  • data_format str | None: Not relevant for Snowflake connectors.
  • options Dict[str, Any] | None: Any additional key/value options to be passed to the engine.
  • path str | None: Not relevant for Snowflake connectors.
  • dataframe_type str: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

Returns

DataFrame.


[source]

refetch#

SnowflakeConnector.refetch()

Refetch storage connector.


[source]

snowflake_connector_options#

SnowflakeConnector.snowflake_connector_options()

Alias for connector_options


[source]

spark_options#

SnowflakeConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

SnowflakeConnector.to_dict()

[source]

update_from_response_json#

SnowflakeConnector.update_from_response_json(json_dict)

Google Cloud Storage#

This storage connector provides integration to Google Cloud Storage (GCS). Once you create a connector in FeatureStore, you can transact data from a GCS bucket into a spark dataframe by calling the read API.

Authentication to GCP is handled by uploading the JSON keyfile for service account to the Hopsworks Project. For more information on service accounts and creating keyfile in GCP, read Google Cloud documentation.

The connector also supports the optional encryption method Customer Supplied Encryption Key by Google. The encryption details are stored as Secrets in the FeatureStore for keeping it secure. Read more about encryption on Google Documentation.

The storage connector uses the Google gcs-connector-hadoop behind the scenes. For more information, check out Google Cloud Storage Connector for Spark and Hadoop

Properties#

[source]

algorithm#

Encryption Algorithm


[source]

bucket#

GCS Bucket


[source]

description#

User provided description of the storage connector.


[source]

encryption_key#

Encryption Key


[source]

encryption_key_hash#

Encryption Key Hash


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

key_path#

JSON keyfile for service account


[source]

name#

Name of the storage connector.


[source]

path#

the path of the connector along with gs file system prefixed


Methods#

[source]

connector_options#

GcsConnector.connector_options()

Return prepared options to be passed to an external connector library. Not implemented for this connector type.


[source]

get_feature_groups#

GcsConnector.get_feature_groups()

Get the feature groups using this storage connector, based on explicit provenance. Only the accessible feature groups are returned. For more items use the base method - get_feature_groups_provenance

Returns

`List[FeatureGroup]: List of feature groups.


[source]

get_feature_groups_provenance#

GcsConnector.get_feature_groups_provenance()

Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

Returns

ExplicitProvenance.Links: the feature groups generated using this storage connector

Raises

hsfs.client.exceptions.RestAPIError.


[source]

prepare_spark#

GcsConnector.prepare_spark(path=None)

Prepare Spark to use this Storage Connector.

conn.prepare_spark()
spark.read.format("json").load("gs://bucket/path")
# or
spark.read.format("json").load(conn.prepare_spark("gs://bucket/path"))

Arguments

  • path str | None: Path to prepare for reading from Google cloud storage. Defaults to None.

[source]

read#

GcsConnector.read(
    query=None, data_format=None, options=None, path="", dataframe_type="default"
)

Reads GCS path into a dataframe using the storage connector.

To read directly from the default bucket, you can omit the path argument:

conn.read(data_format='spark_formats')
Or to read objects from default bucket provide the object path without gsUtil URI schema. For example, following will read from a path gs://bucket_on_connector/Path/object :
conn.read(data_format='spark_formats', paths='Path/object')
Or to read with full gsUtil URI path,
conn.read(data_format='spark_formats',path='gs://BUCKET/DATA')
Arguments

  • query str | None: Not relevant for GCS connectors.
  • data_format str | None: Spark data format. Defaults to None.
  • options Dict[str, Any] | None: Spark options. Defaults to None.
  • path str: GCS path. Defaults to None.
  • dataframe_type str: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

Raises

  • ValueError: Malformed arguments.

Returns

Dataframe: A Spark dataframe.


[source]

refetch#

GcsConnector.refetch()

Refetch storage connector.


[source]

spark_options#

GcsConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments.


[source]

to_dict#

GcsConnector.to_dict()

[source]

update_from_response_json#

GcsConnector.update_from_response_json(json_dict)

BigQuery#

The BigQuery storage connector provides integration to Google Cloud BigQuery. You can use it to run bigquery on your GCP cluster and load results into spark dataframe by calling the read API.

Authentication to GCP is handled by uploading the JSON keyfile for service account to the Hopsworks Project. For more information on service accounts and creating keyfile in GCP, read Google Cloud documentation.

The storage connector uses the Google spark-bigquery-connector behind the scenes. To read more about the spark connector, like the spark options or usage, check Apache Spark SQL connector for Google BigQuery.

Properties#

[source]

arguments#

Additional spark options


[source]

dataset#

BigQuery dataset (The dataset containing the table)


[source]

description#

User provided description of the storage connector.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

key_path#

JSON keyfile for service account


[source]

materialization_dataset#

BigQuery materialization dataset (The dataset where the materialized view is going to be created, used in case of query)


[source]

name#

Name of the storage connector.


[source]

parent_project#

BigQuery parent project (Google Cloud Project ID of the table to bill for the export)


[source]

query_project#

BigQuery project (The Google Cloud Project ID of the table)


[source]

query_table#

BigQuery table name


Methods#

[source]

connector_options#

BigQueryConnector.connector_options()

Return options to be passed to an external BigQuery connector library


[source]

get_feature_groups#

BigQueryConnector.get_feature_groups()

Get the feature groups using this storage connector, based on explicit provenance. Only the accessible feature groups are returned. For more items use the base method - get_feature_groups_provenance

Returns

`List[FeatureGroup]: List of feature groups.


[source]

get_feature_groups_provenance#

BigQueryConnector.get_feature_groups_provenance()

Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

Returns

ExplicitProvenance.Links: the feature groups generated using this storage connector

Raises

hsfs.client.exceptions.RestAPIError.


[source]

read#

BigQueryConnector.read(
    query=None, data_format=None, options=None, path=None, dataframe_type="default"
)

Reads results from BigQuery into a spark dataframe using the storage connector.

Reading from bigquery is done via either specifying the BigQuery table or BigQuery query. For example, to read from a BigQuery table, set the BigQuery project, dataset and table on storage connector and read directly from the corresponding path.

conn.read()
OR, to read results from a BigQuery query, set Materialization Dataset on storage connector, and pass your SQL to query argument.
conn.read(query='SQL')
Optionally, passing query argument will take priority at runtime if the table options were also set on the storage connector. This allows user to run from both a query or table with same connector, assuming all fields were set. Also, user can set the path argument to a bigquery table path to read at runtime, if table options were not set initially while creating the connector.
conn.read(path='project.dataset.table')

Arguments

  • query str | None: BigQuery query. Defaults to None.
  • data_format str | None: Spark data format. Defaults to None.
  • options Dict[str, Any] | None: Spark options. Defaults to None.
  • path str | None: BigQuery table path. Defaults to None.
  • dataframe_type str: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

Raises

  • ValueError: Malformed arguments.

Returns

Dataframe: A Spark dataframe.


[source]

refetch#

BigQueryConnector.refetch()

Refetch storage connector.


[source]

spark_options#

BigQueryConnector.spark_options()

Return spark options to be set for BigQuery spark connector


[source]

to_dict#

BigQueryConnector.to_dict()

[source]

update_from_response_json#

BigQueryConnector.update_from_response_json(json_dict)

Kafka#

Properties#

[source]

bootstrap_servers#

Bootstrap servers string.


[source]

description#

User provided description of the storage connector.


[source]

id#

Id of the storage connector uniquely identifying it in the Feature store.


[source]

name#

Name of the storage connector.


[source]

options#

Bootstrap servers string.


[source]

security_protocol#

Bootstrap servers string.


[source]

ssl_endpoint_identification_algorithm#

Bootstrap servers string.


[source]

ssl_keystore_location#

Bootstrap servers string.


[source]

ssl_truststore_location#

Bootstrap servers string.


Methods#

[source]

confluent_options#

KafkaConnector.confluent_options()

Return prepared options to be passed to confluent_kafka, based on the provided apache spark configuration. Right now only producer values with Importance >= medium are implemented. https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html


[source]

connector_options#

KafkaConnector.connector_options()

Return prepared options to be passed to an external connector library. Not implemented for this connector type.


[source]

get_feature_groups#

KafkaConnector.get_feature_groups()

Get the feature groups using this storage connector, based on explicit provenance. Only the accessible feature groups are returned. For more items use the base method - get_feature_groups_provenance

Returns

`List[FeatureGroup]: List of feature groups.


[source]

get_feature_groups_provenance#

KafkaConnector.get_feature_groups_provenance()

Get the generated feature groups using this storage connector, based on explicit provenance. These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

Returns

ExplicitProvenance.Links: the feature groups generated using this storage connector

Raises

hsfs.client.exceptions.RestAPIError.


[source]

kafka_options#

KafkaConnector.kafka_options()

Return prepared options to be passed to kafka, based on the additional arguments. https://kafka.apache.org/documentation/


[source]

read#

KafkaConnector.read(
    query=None, data_format=None, options=None, path=None, dataframe_type="default"
)

NOT SUPPORTED.


[source]

read_stream#

KafkaConnector.read_stream(
    topic,
    topic_pattern=False,
    message_format="avro",
    schema=None,
    options=None,
    include_metadata=False,
)

Reads a Kafka stream from a topic or multiple topics into a Dataframe.

Engine Support

Spark only

Reading from data streams using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.

Arguments

  • topic str: Name or pattern of the topic(s) to subscribe to.
  • topic_pattern bool: Flag to indicate if topic string is a pattern. Defaults to False.
  • message_format str: The format of the messages to use for decoding. Can be "avro" or "json". Defaults to "avro".
  • schema str | None: Optional schema, to use for decoding, can be an Avro schema string for "avro" message format, or for JSON encoding a Spark StructType schema, or a DDL formatted string. Defaults to None.
  • options Dict[str, Any] | None: Additional options as key/value string pairs to be passed to Spark. Defaults to {}.
  • include_metadata bool: Indicate whether to return additional metadata fields from messages in the stream. Otherwise, only the decoded value fields are returned. Defaults to False.

Raises

  • ValueError: Malformed arguments.

Returns

StreamingDataframe: A Spark streaming dataframe.


[source]

refetch#

KafkaConnector.refetch()

Refetch storage connector.


[source]

spark_options#

KafkaConnector.spark_options()

Return prepared options to be passed to Spark, based on the additional arguments. This is done by just adding 'kafka.' prefix to kafka_options. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations


[source]

to_dict#

KafkaConnector.to_dict()

[source]

update_from_response_json#

KafkaConnector.update_from_response_json(json_dict)