FlinkCluster API#
Handle#
get_flink_cluster_api#
Project.get_flink_cluster_api()
Get the flink cluster API for the project.
Returns
FlinkClusterApi
: The Flink Cluster Api handle
Setup the cluster#
setup_cluster#
FlinkClusterApi.setup_cluster(name, config=None)
Create a new flink job representing a flink cluster, or update an existing one.
import hopsworks
project = hopsworks.login()
flink_cluster_api = project.get_flink_cluster_api()
flink_config = flink_cluster_api.get_configuration()
flink_config['appName'] = "myFlinkCluster"
flink_cluster = flink_cluster_api.setup_cluster(name="myFlinkCluster", config=flink_config)
- name
str
: Name of the cluster. - config: Configuration of the cluster.
Returns
FlinkCluster
: The FlinkCluster object representing the cluster
Raises
RestAPIError
: If unable to get the flink cluster object
Get the cluster#
get_cluster#
FlinkClusterApi.get_cluster(name)
Get the job corresponding to the flink cluster.
import hopsworks
project = hopsworks.login()
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
Arguments
- name
str
: Name of the cluster.
Returns
FlinkCluster
: The FlinkCluster object representing the cluster
Raises
RestAPIError
: If unable to get the flink cluster object
Start the cluster#
start#
FlinkCluster.start(await_time=1800)
Start the flink cluster and wait until it reaches RUNNING state.
import hopsworks
project = hopsworks.login()
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
flink_cluster.start()
- await_time: defaults to 1800 seconds to account for auto-scale mechanisms.
Raises
RestAPIError
: If unable to start the flink cluster.
Submit job to cluster#
submit_job#
FlinkCluster.submit_job(jar_id, main_class, job_arguments=None)
Submit job using the specific jar file uploaded to the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# upload jar file to this cluster
main_class = "com.example.Main"
job_arguments = "-arg1 arg1 -arg2 arg2"
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)
#get jar file metadata (and select the 1st one for demo purposes)
jar_metadata = flink_cluster.get_jars()[0]
jar_id = jar_metadata["id"]
flink_cluster.submit_job(jar_id, main_class, job_arguments=job_arguments)
Arguments
- jar_id: id if the jar file
- main_class: path to the main class of the jar file
- job_arguments: Job arguments (if any), defaults to none.
Returns
str
: job id.
Raises
RestAPIError
: If unable to submit the job.
Properties#
config#
Configuration for the cluster
creation_time#
Date of creation for the cluster
creator#
Creator of the cluster
id#
Id of the cluster
name#
Name of the cluster
state#
State of the cluster
Methods#
get_jars#
FlinkCluster.get_jars()
Get already uploaded jars from the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jar files from this cluster
flink_cluster.get_jars()
Returns
List[Dict]
: The array of dicts with jar metadata.
Raises
RestAPIError
: If unable to get jars from the flink cluster.
get_job#
FlinkCluster.get_job(job_id)
Get specific job from the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jobs from this cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.get_job(job_id)
Arguments
- job_id: id of the job within this cluster
Returns
Dict
: Dict with flink job id and status of the job.
Raises
RestAPIError
: If unable to get the job from the cluster
get_jobs#
FlinkCluster.get_jobs()
Get jobs from the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jobs from this flink cluster
flink_cluster.get_jobs()
Returns
List[Dict]
: The array of dicts with flink job id and status of the job.
Raises
RestAPIError
: If unable to get the jobs from the cluster
get_url#
FlinkCluster.get_url()
job_state#
FlinkCluster.job_state(job_id)
Gets state of the job submitted to the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jobs from this flink cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.job_state(job_id)
Arguments
- job_id: id of the job within this flink cluster
Returns
str
: status of the job. Possible states: "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING".
Raises
RestAPIError
: If unable to get the job state from the flink cluster.
start#
FlinkCluster.start(await_time=1800)
Start the flink cluster and wait until it reaches RUNNING state.
import hopsworks
project = hopsworks.login()
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
flink_cluster.start()
- await_time: defaults to 1800 seconds to account for auto-scale mechanisms.
Raises
RestAPIError
: If unable to start the flink cluster.
stop#
FlinkCluster.stop()
Stop this cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
flink_cluster.stop()
Raises
RestAPIError
: If unable to stop the flink cluster.
stop_job#
FlinkCluster.stop_job(job_id)
Stop specific job in the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# stop the job
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.stop_job(job_id)
Arguments
- job_id: id of the job within this flink cluster.
Raises
RestAPIError
: If unable to stop the job
submit_job#
FlinkCluster.submit_job(jar_id, main_class, job_arguments=None)
Submit job using the specific jar file uploaded to the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# upload jar file to this cluster
main_class = "com.example.Main"
job_arguments = "-arg1 arg1 -arg2 arg2"
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)
#get jar file metadata (and select the 1st one for demo purposes)
jar_metadata = flink_cluster.get_jars()[0]
jar_id = jar_metadata["id"]
flink_cluster.submit_job(jar_id, main_class, job_arguments=job_arguments)
Arguments
- jar_id: id if the jar file
- main_class: path to the main class of the jar file
- job_arguments: Job arguments (if any), defaults to none.
Returns
str
: job id.
Raises
RestAPIError
: If unable to submit the job.
upload_jar#
FlinkCluster.upload_jar(jar_file)
Upload jar file to the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# upload jar file to this cluster
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)
Arguments
- jar_file: path to the jar file.
Raises
RestAPIError
: If unable to upload jar file