Skip to content

hopsworks.flink_cluster #

Id of the cluster.

Name of the cluster.

Date of creation for the cluster.

Configuration for the cluster.

Creator of the cluster.

State of the cluster.

start(await_time=1800)

Start the flink cluster and wait until it reaches RUNNING state.

import hopsworks

project = hopsworks.login()

flink_cluster_api = project.get_flink_cluster_api()

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

flink_cluster.start()
PARAMETER DESCRIPTION
await_time

defaults to 1800 seconds to account for auto-scale mechanisms.

DEFAULT: 1800

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_jobs() -> list[dict]

Get jobs from the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jobs from this flink cluster
flink_cluster.get_jobs()
RETURNS DESCRIPTION
list[dict]

The array of dictionaries with flink job id and status of the job.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_job(job_id) -> dict

Get specific job from the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jobs from this cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.get_job(job_id)
PARAMETER DESCRIPTION
job_id

ID of the job within this cluster.

RETURNS DESCRIPTION
dict

A dictionary with flink job id and status of the job.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

stop_job(job_id)

Stop specific job in the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# stop the job
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.stop_job(job_id)
PARAMETER DESCRIPTION
job_id

ID of the job within this flink cluster.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_jars() -> list[dict]

Get already uploaded jars from the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jar files from this cluster
flink_cluster.get_jars()
RETURNS DESCRIPTION
list[dict]

The array of dictionaries with jar metadata.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

upload_jar(jar_file)

Upload jar file to the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# upload jar file to this cluster
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)
PARAMETER DESCRIPTION
jar_file

path to the jar file.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

submit_job(jar_id, main_class, job_arguments=None) -> str

Submit job using the specific jar file uploaded to the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# upload jar file to this cluster
main_class = "com.example.Main"
job_arguments = "-arg1 arg1 -arg2 arg2"
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)

#get jar file metadata (and select the 1st one for demo purposes)
jar_metadata = flink_cluster.get_jars()[0]
jar_id = jar_metadata["id"]
flink_cluster.submit_job(jar_id, main_class, job_arguments=job_arguments)
PARAMETER DESCRIPTION
jar_id

ID of the jar file.

main_class

Path to the main class of the jar file.

job_arguments

Job arguments, if any.

DEFAULT: None

RETURNS DESCRIPTION
str

Job ID.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

job_state(
    job_id,
) -> Literal[
    "INITIALIZING",
    "CREATED",
    "RUNNING",
    "FAILING",
    "FAILED",
    "CANCELLING",
    "CANCELED",
    "FINISHED",
    "RESTARTING",
    "SUSPENDED",
    "RECONCILING",
]

Gets state of the job submitted to the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jobs from this flink cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.job_state(job_id)
PARAMETER DESCRIPTION
job_id

ID of the job within this flink cluster.

RETURNS DESCRIPTION
Literal['INITIALIZING', 'CREATED', 'RUNNING', 'FAILING', 'FAILED', 'CANCELLING', 'CANCELED', 'FINISHED', 'RESTARTING', 'SUSPENDED', 'RECONCILING']

Status of the job.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

stop()

Stop this cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

flink_cluster.stop()
RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_url()

Get url to the flink cluster in Hopsworks.