hopsworks.core.trino_dbapi #
connect #
connect(
source: str = DEFAULT_SOURCE,
catalog: str = constants.DEFAULT_CATALOG,
schema: str = constants.DEFAULT_SCHEMA,
session_properties: dict | None = None,
http_headers: dict | None = None,
max_attempts: int = constants.DEFAULT_MAX_ATTEMPTS,
request_timeout: int = constants.DEFAULT_REQUEST_TIMEOUT,
isolation_level: IsolationLevel = IsolationLevel.AUTOCOMMIT,
verify: bool | str = True,
http_session: Any = None,
client_tags: list[str] | None = None,
legacy_primitive_types: bool = False,
legacy_prepared_statements: bool | None = None,
roles: dict | None = None,
timezone: str | None = None,
encoding: str | list[str] | None = None,
) -> Connection
Connect to Trino using the native DBAPI interface.
Use this when you want to work with cursors and the native Python DB API. For SQLAlchemy integration, use create_engine() instead.
Hopsworks automatically handles authentication using project credentials from the secrets storage. The connection is configured to use HTTPS with self-signed certificates.
| PARAMETER | DESCRIPTION |
|---|---|
source | Source identifier for Trino queries. TYPE: |
catalog | Trino catalog to connect to. TYPE: |
schema | Database schema within the catalog. TYPE: |
session_properties | Dictionary of Trino session properties. TYPE: |
http_headers | Additional HTTP headers for the connection. TYPE: |
max_attempts | Maximum number of retry attempts for failed requests. TYPE: |
request_timeout | Timeout in seconds for each HTTP request. TYPE: |
isolation_level | Transaction isolation level. TYPE: |
verify | Whether to verify SSL certificates. Set verify="/path/to/cert.crt" if you want to verify the ssl cert (default: True). |
http_session | Custom HTTP session for connection pooling. TYPE: |
client_tags | Tags to identify the client in Trino query logs. |
legacy_primitive_types | Whether to use legacy primitive type handling. TYPE: |
legacy_prepared_statements | Whether to use legacy prepared statement handling. TYPE: |
roles | Dictionary mapping catalog names to role names. TYPE: |
timezone | Timezone for the session. TYPE: |
encoding | Character encoding for the connection. |
| RETURNS | DESCRIPTION |
|---|---|
Connection | A connection object implementing the Python DB API 2.0 specification. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks_common.client.exceptions.TrinoException | If the service discovery domain is not configured. |
hopsworks.client.exceptions.RestAPIError | If credentials cannot be retrieved. |
Example
import hopsworks
from hopsworks.core.trino_dbapi import connect
project = hopsworks.login()
conn = connect(catalog="iceberg", schema="my_db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM my_table")
rows = cursor.fetchall()
for row in rows:
print(row)
create_engine #
create_engine(
source: str = DEFAULT_SQLALCHEMY_SOURCE,
catalog: str = constants.DEFAULT_CATALOG,
schema: str = constants.DEFAULT_SCHEMA,
session_properties: dict | None = None,
http_headers: dict | None = None,
max_attempts: int = constants.DEFAULT_MAX_ATTEMPTS,
request_timeout: int = constants.DEFAULT_REQUEST_TIMEOUT,
isolation_level: IsolationLevel = IsolationLevel.AUTOCOMMIT,
verify: bool | str = True,
http_session: Any = None,
client_tags: list[str] | None = None,
legacy_primitive_types: bool = False,
legacy_prepared_statements: bool | None = None,
roles: dict | None = None,
timezone: str | None = None,
encoding: str | list[str] | None = None,
) -> Engine
Create a SQLAlchemy engine for Trino.
Use this when you want to work with SQLAlchemy for database operations. For the native Python DB API, use connect() instead.
Hopsworks automatically handles authentication using project credentials from the secrets storage. The connection is configured to use HTTPS with self-signed certificates.
| PARAMETER | DESCRIPTION |
|---|---|
source | Source identifier for Trino queries. TYPE: |
catalog | Trino catalog to connect to. TYPE: |
schema | Database schema within the catalog. TYPE: |
session_properties | Dictionary of Trino session properties. TYPE: |
http_headers | Additional HTTP headers for the connection. TYPE: |
max_attempts | Maximum number of retry attempts for failed requests. TYPE: |
request_timeout | Timeout in seconds for each HTTP request. TYPE: |
isolation_level | Transaction isolation level. TYPE: |
verify | Whether to verify SSL certificates. Set verify="/path/to/cert.crt" if you want to verify the ssl cert (default: True). |
http_session | Custom HTTP session for connection pooling. TYPE: |
client_tags | Tags to identify the client in Trino query logs. |
legacy_primitive_types | Whether to use legacy primitive type handling. TYPE: |
legacy_prepared_statements | Whether to use legacy prepared statement handling. TYPE: |
roles | Dictionary mapping catalog names to role names. TYPE: |
timezone | Timezone for the session. TYPE: |
encoding | Character encoding for the connection. |
| RETURNS | DESCRIPTION |
|---|---|
Engine | An Engine object implementing the SQLAlchemy interface. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks_common.client.exceptions.TrinoException | If the service discovery domain is not configured. |
hopsworks.client.exceptions.RestAPIError | If credentials cannot be retrieved. |
Example
import hopsworks
from hopsworks.core.trino_dbapi import create_engine
from sqlalchemy import text
project = hopsworks.login()
engine = create_engine(catalog="iceberg", schema="my_db")
with engine.connect() as connection:
cursor = connection.execute(text("SELECT * FROM tiny.nation")).cursor
rows = cursor.fetchall()
for row in rows:
print(row)