hopsworks.core.trino_api #
TrinoApi #
API for connecting to Trino from within Hopsworks.
This class provides methods to establish connections to Trino using either the native Trino DBAPI or SQLAlchemy engine. Authentication is handled automatically using Hopsworks project credentials stored in the secrets storage.
The connection configuration adapts based on whether you're connecting from within the Hopsworks cluster or externally through the load balancer.
Example
import hopsworks
from sqlalchemy import text
project = hopsworks.login()
trino_api = project.get_trino_api() # Get an instance of TrinoApi from the project context
conn = trino_api.connect(catalog="iceberg", schema="my_db") # Get a DBAPI connection
cursor = conn.cursor()
cursor.execute("SELECT * FROM my_table")
rows = cursor.fetchall()
for row in rows:
print(row)
# Or using SQLAlchemy
engine = trino_api.create_engine(catalog="iceberg", schema="my_db")
with engine.connect() as connection:
result = connection.execute(text("SELECT * FROM my_table"))
for row in result:
print(row)
Returned by
get_host #
get_host() -> str
Retrieve the Trino host based on client location.
Returns the external load balancer domain if connecting from outside the cluster, otherwise returns the internal service discovery hostname.
| RETURNS | DESCRIPTION |
|---|---|
str | The Trino host URL. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks_common.client.exceptions.TrinoException | If service discovery domain is not configured for internal clients. |
get_port #
get_port() -> int
Get the Trino port number.
| RETURNS | DESCRIPTION |
|---|---|
int | The port number for connecting to Trino. |
get_basic_auth #
Get a tuple containing the username and password for the current project user.
| RETURNS | DESCRIPTION |
|---|---|
tuple[str, str] | A tuple containing the username and the password for the current project user. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks_common.client.exceptions.TrinoException | If credentials cannot be retrieved from secrets storage or if the client cannot determine the username for the current project user. |
connect #
connect(
source: str = DEFAULT_SOURCE,
catalog: str = DEFAULT_CATALOG,
schema: str = DEFAULT_SCHEMA,
session_properties: dict | None = None,
http_headers: dict | None = None,
max_attempts: int = DEFAULT_MAX_ATTEMPTS,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
isolation_level: IsolationLevel = AUTOCOMMIT,
verify: bool | str = True,
http_session: Any = None,
client_tags: list[str] | None = None,
legacy_primitive_types: bool = False,
legacy_prepared_statements: bool | None = None,
roles: dict | None = None,
timezone: str | None = None,
encoding: str | list[str] | None = None,
) -> Connection
Connect to Trino using the native DBAPI interface.
| PARAMETER | DESCRIPTION |
|---|---|
source | Source identifier for Trino queries. TYPE: |
catalog | Trino catalog to connect to. TYPE: |
schema | Database schema within the catalog. TYPE: |
session_properties | Dictionary of Trino session properties. TYPE: |
http_headers | Additional HTTP headers for the connection. TYPE: |
max_attempts | Maximum number of retry attempts for failed requests. TYPE: |
request_timeout | Timeout in seconds for each HTTP request. TYPE: |
isolation_level | Transaction isolation level. TYPE: |
verify | Whether to verify SSL certificates. Set verify="/path/to/cert.crt" if you want to verify the ssl cert (default: True). |
http_session | Custom HTTP session for connection pooling. TYPE: |
client_tags | Tags to identify the client in Trino query logs. |
legacy_primitive_types | Whether to use legacy primitive type handling. TYPE: |
legacy_prepared_statements | Whether to use legacy prepared statement handling. TYPE: |
roles | Dictionary mapping catalog names to role names. TYPE: |
timezone | Timezone for the session. TYPE: |
encoding | Character encoding for the connection. |
| RETURNS | DESCRIPTION |
|---|---|
Connection | A connection object implementing the Python DB API 2.0 specification. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks_common.client.exceptions.TrinoException | If service discovery domain is not configured. |
hopsworks.client.exceptions.RestAPIError | If credentials cannot be retrieved. |
Example
import hopsworks
project = hopsworks.login()
trino_api = project.get_trino_api()
conn = trino_api.connect(catalog="iceberg", schema="my_db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM my_table")
rows = cursor.fetchall()
for row in rows:
print(row)
create_engine #
create_engine(
source: str = DEFAULT_SQLALCHEMY_SOURCE,
catalog: str = DEFAULT_CATALOG,
schema: str = DEFAULT_SCHEMA,
session_properties: dict | None = None,
http_headers: dict | None = None,
max_attempts: int = DEFAULT_MAX_ATTEMPTS,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
isolation_level: IsolationLevel = AUTOCOMMIT,
verify: bool | str = True,
http_session: Any = None,
client_tags: list[str] | None = None,
legacy_primitive_types: bool = False,
legacy_prepared_statements: bool | None = None,
roles: dict | None = None,
timezone: str | None = None,
encoding: str | list[str] | None = None,
) -> Engine
Create a SQLAlchemy engine for Trino.
| PARAMETER | DESCRIPTION |
|---|---|
source | Source identifier for Trino queries. TYPE: |
catalog | Trino catalog to connect to. TYPE: |
schema | Database schema within the catalog. TYPE: |
session_properties | Dictionary of Trino session properties. TYPE: |
http_headers | Additional HTTP headers for the connection. TYPE: |
max_attempts | Maximum number of retry attempts for failed requests. TYPE: |
request_timeout | Timeout in seconds for each HTTP request. TYPE: |
isolation_level | Transaction isolation level. TYPE: |
verify | Whether to verify SSL certificates. Set verify="/path/to/cert.crt" if you want to verify the ssl cert (default: True). |
http_session | Custom HTTP session for connection pooling. TYPE: |
client_tags | Tags to identify the client in Trino query logs. |
legacy_primitive_types | Whether to use legacy primitive type handling. TYPE: |
legacy_prepared_statements | Whether to use legacy prepared statement handling. TYPE: |
roles | Dictionary mapping catalog names to role names. TYPE: |
timezone | Timezone for the session. TYPE: |
encoding | Character encoding for the connection. |
| RETURNS | DESCRIPTION |
|---|---|
Engine | A SQLAlchemy engine for executing queries against Trino. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks_common.client.exceptions.TrinoException | If service discovery domain is not configured. |
hopsworks.client.exceptions.RestAPIError | If credentials cannot be retrieved. |
Example
import hopsworks
from sqlalchemy import text
project = hopsworks.login()
trino_api = project.get_trino_api()
engine = trino_api.create_engine(catalog="iceberg", schema="my_db")
with engine.connect() as connection:
result = connection.execute(text("SELECT * FROM my_table"))
for row in result:
print(row)