Skip to content

FlinkCluster API#

Handle#

[source]

Project.get_flink_cluster_api()

Get the flink cluster api for the project.

Returns

FlinkClusterApi: The Flink Cluster Api handle


Setup the cluster#

[source]

setup_cluster#

FlinkClusterApi.setup_cluster(name, config=None)

Create a new flink job representing a flink cluster, or update an existing one.

import hopsworks

project = hopsworks.login()

flink_cluster_api = project.get_flink_cluster_api()

flink_config = flink_cluster_api.get_configuration()

flink_config['appName'] = "myFlinkCluster"

flink_cluster = flink_cluster_api.setup_cluster(name="myFlinkCluster", config=flink_config)
Arguments

  • name str: Name of the cluster.
  • config: Configuration of the cluster.

Returns

FlinkCluster: The FlinkCluster object representing the cluster

Raises

  • RestAPIError: If unable to get the flink cluster object

Get the cluster#

[source]

get_cluster#

FlinkClusterApi.get_cluster(name)

Get the job corresponding to the flink cluster.

import hopsworks

project = hopsworks.login()

flink_cluster_api = project.get_flink_cluster_api()

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

Arguments

  • name str: Name of the cluster.

Returns

FlinkCluster: The FlinkCluster object representing the cluster

Raises

  • RestAPIError: If unable to get the flink cluster object

Start the cluster#

[source]

start#

FlinkCluster.start(await_time=1800)

Start the flink cluster and wait until it reaches RUNNING state.

import hopsworks

project = hopsworks.login()

flink_cluster_api = project.get_flink_cluster_api()

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

flink_cluster.start()
Arguments

  • await_time: defaults to 1800 seconds to account for auto-scale mechanisms.

Raises

  • RestAPIError: If unable to start the flink cluster.

Submit job to cluster#

[source]

submit_job#

FlinkCluster.submit_job(jar_id, main_class, job_arguments=None)

Submit job using the specific jar file uploaded to the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# upload jar file to this cluster
main_class = "com.example.Main"
job_arguments = "-arg1 arg1 -arg2 arg2"
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)

#get jar file metadata (and select the 1st one for demo purposes)
jar_metadata = flink_cluster.get_jars()[0]
jar_id = jar_metadata["id"]
flink_cluster.submit_job(jar_id, main_class, job_arguments=job_arguments)

Arguments

  • jar_id: id if the jar file
  • main_class: path to the main class of the jar file
  • job_arguments: Job arguments (if any), defaults to none.

Returns

str: job id.

Raises

  • RestAPIError: If unable to submit the job.

Properties#

[source]

config#

Configuration for the cluster


[source]

creation_time#

Date of creation for the cluster


[source]

creator#

Creator of the cluster


[source]

id#

Id of the cluster


[source]

name#

Name of the cluster


[source]

state#

State of the cluster


Methods#

[source]

get_jars#

FlinkCluster.get_jars()

Get already uploaded jars from the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jar files from this cluster
flink_cluster.get_jars()

Returns

List[Dict]: The array of dicts with jar metadata.

Raises

  • RestAPIError: If unable to get jars from the flink cluster.

[source]

get_job#

FlinkCluster.get_job(job_id)

Get specific job from the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jobs from this cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.get_job(job_id)

Arguments

  • job_id: id of the job within this cluster

Returns

Dict: Dict with flink job id and status of the job.

Raises

  • RestAPIError: If unable to get the job from the cluster

[source]

get_jobs#

FlinkCluster.get_jobs()

Get jobs from the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jobs from this flink cluster
flink_cluster.get_jobs()

Returns

List[Dict]: The array of dicts with flink job id and status of the job.

Raises

  • RestAPIError: If unable to get the jobs from the cluster

[source]

get_url#

FlinkCluster.get_url()

[source]

job_state#

FlinkCluster.job_state(job_id)

Gets state of the job submitted to the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jobs from this flink cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.job_state(job_id)

Arguments

  • job_id: id of the job within this flink cluster

Returns

str: status of the job. Possible states: "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING".

Raises

  • RestAPIError: If unable to get the job state from the flink cluster.

[source]

start#

FlinkCluster.start(await_time=1800)

Start the flink cluster and wait until it reaches RUNNING state.

import hopsworks

project = hopsworks.login()

flink_cluster_api = project.get_flink_cluster_api()

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

flink_cluster.start()
Arguments

  • await_time: defaults to 1800 seconds to account for auto-scale mechanisms.

Raises

  • RestAPIError: If unable to start the flink cluster.

[source]

stop#

FlinkCluster.stop()

Stop this cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

flink_cluster.stop()

Raises

  • RestAPIError: If unable to stop the flink cluster.

[source]

stop_job#

FlinkCluster.stop_job(job_id)

Stop specific job in the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# stop the job
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.stop_job(job_id)

Arguments

  • job_id: id of the job within this flink cluster.

Raises

  • RestAPIError: If unable to stop the job

[source]

submit_job#

FlinkCluster.submit_job(jar_id, main_class, job_arguments=None)

Submit job using the specific jar file uploaded to the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# upload jar file to this cluster
main_class = "com.example.Main"
job_arguments = "-arg1 arg1 -arg2 arg2"
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)

#get jar file metadata (and select the 1st one for demo purposes)
jar_metadata = flink_cluster.get_jars()[0]
jar_id = jar_metadata["id"]
flink_cluster.submit_job(jar_id, main_class, job_arguments=job_arguments)

Arguments

  • jar_id: id if the jar file
  • main_class: path to the main class of the jar file
  • job_arguments: Job arguments (if any), defaults to none.

Returns

str: job id.

Raises

  • RestAPIError: If unable to submit the job.

[source]

upload_jar#

FlinkCluster.upload_jar(jar_file)

Upload jar file to the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# upload jar file to this cluster
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)

Arguments

  • jar_file: path to the jar file.

Raises

  • RestAPIError: If unable to upload jar file