hopsworks.flink_cluster #
[source] FlinkCluster #
[source] start #
start(await_time=1800)
Start the flink cluster and wait until it reaches RUNNING state.
import hopsworks
project = hopsworks.login()
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
flink_cluster.start()
| PARAMETER | DESCRIPTION |
|---|---|
await_time | defaults to 1800 seconds to account for auto-scale mechanisms. DEFAULT: |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] get_jobs #
Get jobs from the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jobs from this flink cluster
flink_cluster.get_jobs()
| RETURNS | DESCRIPTION |
|---|---|
list[dict] | The array of dictionaries with flink job id and status of the job. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] get_job #
get_job(job_id) -> dict
Get specific job from the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jobs from this cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.get_job(job_id)
| PARAMETER | DESCRIPTION |
|---|---|
job_id | ID of the job within this cluster.
|
| RETURNS | DESCRIPTION |
|---|---|
dict | A dictionary with flink job id and status of the job. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] stop_job #
stop_job(job_id)
Stop specific job in the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# stop the job
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.stop_job(job_id)
| PARAMETER | DESCRIPTION |
|---|---|
job_id | ID of the job within this flink cluster.
|
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] get_jars #
Get already uploaded jars from the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jar files from this cluster
flink_cluster.get_jars()
| RETURNS | DESCRIPTION |
|---|---|
list[dict] | The array of dictionaries with jar metadata. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] upload_jar #
upload_jar(jar_file)
Upload jar file to the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# upload jar file to this cluster
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)
| PARAMETER | DESCRIPTION |
|---|---|
jar_file | path to the jar file.
|
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] submit_job #
submit_job(jar_id, main_class, job_arguments=None) -> str
Submit job using the specific jar file uploaded to the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# upload jar file to this cluster
main_class = "com.example.Main"
job_arguments = "-arg1 arg1 -arg2 arg2"
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)
#get jar file metadata (and select the 1st one for demo purposes)
jar_metadata = flink_cluster.get_jars()[0]
jar_id = jar_metadata["id"]
flink_cluster.submit_job(jar_id, main_class, job_arguments=job_arguments)
| PARAMETER | DESCRIPTION |
|---|---|
jar_id | ID of the jar file.
|
main_class | Path to the main class of the jar file.
|
job_arguments | Job arguments, if any. DEFAULT: |
| RETURNS | DESCRIPTION |
|---|---|
str | Job ID. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] job_state #
job_state(
job_id,
) -> Literal[
"INITIALIZING",
"CREATED",
"RUNNING",
"FAILING",
"FAILED",
"CANCELLING",
"CANCELED",
"FINISHED",
"RESTARTING",
"SUSPENDED",
"RECONCILING",
]
Gets state of the job submitted to the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jobs from this flink cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.job_state(job_id)
| PARAMETER | DESCRIPTION |
|---|---|
job_id | ID of the job within this flink cluster.
|
| RETURNS | DESCRIPTION |
|---|---|
Literal['INITIALIZING', 'CREATED', 'RUNNING', 'FAILING', 'FAILED', 'CANCELLING', 'CANCELED', 'FINISHED', 'RESTARTING', 'SUSPENDED', 'RECONCILING'] | Status of the job. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] stop #
stop()
Stop this cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
flink_cluster.stop()
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |