Skip to content

hopsworks.core.trino_api #

TrinoApi #

API for connecting to Trino from within Hopsworks.

This class provides methods to establish connections to Trino using either the native Trino DBAPI or SQLAlchemy engine. Authentication is handled automatically using Hopsworks project credentials stored in the secrets storage.

The connection configuration adapts based on whether you're connecting from within the Hopsworks cluster or externally through the load balancer.

Example
import hopsworks
from sqlalchemy import text

project = hopsworks.login()
trino_api = project.get_trino_api()  # Get an instance of TrinoApi from the project context
conn = trino_api.connect(catalog="iceberg", schema="my_db")  # Get a DBAPI connection
cursor = conn.cursor()
cursor.execute("SELECT * FROM my_table")
rows = cursor.fetchall()
for row in rows:
    print(row)

# Or using SQLAlchemy
engine = trino_api.create_engine(catalog="iceberg", schema="my_db")
with engine.connect() as connection:
    result = connection.execute(text("SELECT * FROM my_table"))
    for row in result:
        print(row)

get_host #

get_host() -> str

Retrieve the Trino host based on client location.

Returns the external load balancer domain if connecting from outside the cluster, otherwise returns the internal service discovery hostname.

RETURNS DESCRIPTION
str

The Trino host URL.

RAISES DESCRIPTION
hopsworks_common.client.exceptions.TrinoException

If service discovery domain is not configured for internal clients.

get_port #

get_port() -> int

Get the Trino port number.

RETURNS DESCRIPTION
int

The port number for connecting to Trino.

get_basic_auth #

get_basic_auth() -> tuple[str, str]

Get a tuple containing the username and password for the current project user.

RETURNS DESCRIPTION
tuple[str, str]

A tuple containing the username and the password for the current project user.

RAISES DESCRIPTION
hopsworks_common.client.exceptions.TrinoException

If credentials cannot be retrieved from secrets storage or if the client cannot determine the username for the current project user.

connect #

connect(
    source: str = DEFAULT_SOURCE,
    catalog: str = DEFAULT_CATALOG,
    schema: str = DEFAULT_SCHEMA,
    session_properties: dict | None = None,
    http_headers: dict | None = None,
    max_attempts: int = DEFAULT_MAX_ATTEMPTS,
    request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
    isolation_level: IsolationLevel = AUTOCOMMIT,
    verify: bool | str = True,
    http_session: Any = None,
    client_tags: list[str] | None = None,
    legacy_primitive_types: bool = False,
    legacy_prepared_statements: bool | None = None,
    roles: dict | None = None,
    timezone: str | None = None,
    encoding: str | list[str] | None = None,
) -> Connection

Connect to Trino using the native DBAPI interface.

PARAMETER DESCRIPTION
source

Source identifier for Trino queries.

TYPE: str DEFAULT: DEFAULT_SOURCE

catalog

Trino catalog to connect to.

TYPE: str DEFAULT: DEFAULT_CATALOG

schema

Database schema within the catalog.

TYPE: str DEFAULT: DEFAULT_SCHEMA

session_properties

Dictionary of Trino session properties.

TYPE: dict | None DEFAULT: None

http_headers

Additional HTTP headers for the connection.

TYPE: dict | None DEFAULT: None

max_attempts

Maximum number of retry attempts for failed requests.

TYPE: int DEFAULT: DEFAULT_MAX_ATTEMPTS

request_timeout

Timeout in seconds for each HTTP request.

TYPE: int DEFAULT: DEFAULT_REQUEST_TIMEOUT

isolation_level

Transaction isolation level.

TYPE: IsolationLevel DEFAULT: AUTOCOMMIT

verify

Whether to verify SSL certificates. Set verify="/path/to/cert.crt" if you want to verify the ssl cert (default: True).

TYPE: bool | str DEFAULT: True

http_session

Custom HTTP session for connection pooling.

TYPE: Any DEFAULT: None

client_tags

Tags to identify the client in Trino query logs.

TYPE: list[str] | None DEFAULT: None

legacy_primitive_types

Whether to use legacy primitive type handling.

TYPE: bool DEFAULT: False

legacy_prepared_statements

Whether to use legacy prepared statement handling.

TYPE: bool | None DEFAULT: None

roles

Dictionary mapping catalog names to role names.

TYPE: dict | None DEFAULT: None

timezone

Timezone for the session.

TYPE: str | None DEFAULT: None

encoding

Character encoding for the connection.

TYPE: str | list[str] | None DEFAULT: None

RETURNS DESCRIPTION
Connection

A connection object implementing the Python DB API 2.0 specification.

RAISES DESCRIPTION
hopsworks_common.client.exceptions.TrinoException

If service discovery domain is not configured.

hopsworks.client.exceptions.RestAPIError

If credentials cannot be retrieved.

Example
import hopsworks

project = hopsworks.login()
trino_api = project.get_trino_api()
conn = trino_api.connect(catalog="iceberg", schema="my_db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM my_table")
rows = cursor.fetchall()
for row in rows:
    print(row)

create_engine #

create_engine(
    source: str = DEFAULT_SQLALCHEMY_SOURCE,
    catalog: str = DEFAULT_CATALOG,
    schema: str = DEFAULT_SCHEMA,
    session_properties: dict | None = None,
    http_headers: dict | None = None,
    max_attempts: int = DEFAULT_MAX_ATTEMPTS,
    request_timeout: int = DEFAULT_REQUEST_TIMEOUT,
    isolation_level: IsolationLevel = AUTOCOMMIT,
    verify: bool | str = True,
    http_session: Any = None,
    client_tags: list[str] | None = None,
    legacy_primitive_types: bool = False,
    legacy_prepared_statements: bool | None = None,
    roles: dict | None = None,
    timezone: str | None = None,
    encoding: str | list[str] | None = None,
) -> Engine

Create a SQLAlchemy engine for Trino.

PARAMETER DESCRIPTION
source

Source identifier for Trino queries.

TYPE: str DEFAULT: DEFAULT_SQLALCHEMY_SOURCE

catalog

Trino catalog to connect to.

TYPE: str DEFAULT: DEFAULT_CATALOG

schema

Database schema within the catalog.

TYPE: str DEFAULT: DEFAULT_SCHEMA

session_properties

Dictionary of Trino session properties.

TYPE: dict | None DEFAULT: None

http_headers

Additional HTTP headers for the connection.

TYPE: dict | None DEFAULT: None

max_attempts

Maximum number of retry attempts for failed requests.

TYPE: int DEFAULT: DEFAULT_MAX_ATTEMPTS

request_timeout

Timeout in seconds for each HTTP request.

TYPE: int DEFAULT: DEFAULT_REQUEST_TIMEOUT

isolation_level

Transaction isolation level.

TYPE: IsolationLevel DEFAULT: AUTOCOMMIT

verify

Whether to verify SSL certificates. Set verify="/path/to/cert.crt" if you want to verify the ssl cert (default: True).

TYPE: bool | str DEFAULT: True

http_session

Custom HTTP session for connection pooling.

TYPE: Any DEFAULT: None

client_tags

Tags to identify the client in Trino query logs.

TYPE: list[str] | None DEFAULT: None

legacy_primitive_types

Whether to use legacy primitive type handling.

TYPE: bool DEFAULT: False

legacy_prepared_statements

Whether to use legacy prepared statement handling.

TYPE: bool | None DEFAULT: None

roles

Dictionary mapping catalog names to role names.

TYPE: dict | None DEFAULT: None

timezone

Timezone for the session.

TYPE: str | None DEFAULT: None

encoding

Character encoding for the connection.

TYPE: str | list[str] | None DEFAULT: None

RETURNS DESCRIPTION
Engine

A SQLAlchemy engine for executing queries against Trino.

RAISES DESCRIPTION
hopsworks_common.client.exceptions.TrinoException

If service discovery domain is not configured.

hopsworks.client.exceptions.RestAPIError

If credentials cannot be retrieved.

Example
import hopsworks
from sqlalchemy import text

project = hopsworks.login()
trino_api = project.get_trino_api()
engine = trino_api.create_engine(catalog="iceberg", schema="my_db")
with engine.connect() as connection:
    result = connection.execute(text("SELECT * FROM my_table"))
    for row in result:
        print(row)