Skip to content

Jobs API#

Handle#

[source]

get_job_api#

Project.get_job_api()

Get the job API for the project.

Returns

JobApi: The Job Api handle


Configuration#

[source]

get_configuration#

JobApi.get_configuration(type)

Get configuration for the specific job type.

Arguments

  • type str: Type of the job. Currently, supported types include: SPARK, PYSPARK, PYTHON, DOCKER, FLINK.

Returns

dict: Default job configuration

Raises

  • RestAPIError: If unable to get the job configuration

[source]

JobConfiguration#

hopsworks_common.core.job_configuration.JobConfiguration(
    driver_memory=2048,
    driver_cores=1,
    executor_memory=4096,
    executor_cores=1,
    executor_instances=1,
    dynamic_allocation=True,
    dynamic_min_executors=1,
    dynamic_max_executors=2,
    environment_name="spark-feature-pipeline",
    **kwargs
)

Creation#

[source]

create_job#

JobApi.create_job(name, config)

Create a new job or update an existing one.

import hopsworks

project = hopsworks.login()

job_api = project.get_job_api()

spark_config = job_api.get_configuration("PYSPARK")

spark_config['appPath'] = "/Resources/my_app.py"

job = job_api.create_job("my_spark_job", spark_config)
Arguments

  • name str: Name of the job.
  • config dict: Configuration of the job.

Returns

Job: The Job object

Raises

  • RestAPIError: If unable to create the job

Retrieval#

[source]

get_job#

JobApi.get_job(name)

Get a job.

Arguments

  • name str: Name of the job.

Returns

Job: The Job object

Raises

  • RestAPIError: If unable to get the job

[source]

get_jobs#

JobApi.get_jobs()

Get all jobs.

Returns

List[Job]: List of Job objects

Raises

  • RestAPIError: If unable to get the jobs

Properties#

[source]

config#

Configuration for the job


[source]

creation_time#

Date of creation for the job


[source]

creator#

Creator of the job


[source]

executions#


[source]

href#


[source]

id#

Id of the job


[source]

job_schedule#

Return the Job schedule


[source]

job_type#

Type of the job


[source]

name#

Name of the job


Methods#

[source]

delete#

Job.delete()

Delete the job

Potentially dangerous operation

This operation deletes the job and all executions.

Raises

RestAPIError.


[source]

get_executions#

Job.get_executions()

Retrieves all executions for the job ordered by submission time.

Returns

List[Execution]

Raises

RestAPIError in case the backend fails to retrieve executions.


[source]

get_final_state#

Job.get_final_state()

Get the final state of the job.

Returns

final_state. Final state of the job, which can be one of the following: UNDEFINED, FINISHED, FAILED, KILLED, FRAMEWORK_FAILURE, APP_MASTER_START_FAILED, INITIALIZATION_FAILED. UNDEFINED indicates that the job is still running.


[source]

get_state#

Job.get_state()

Get the state of the job.

Returns

state. Current state of the job, which can be one of the following: INITIALIZING, INITIALIZATION_FAILED, FINISHED, RUNNING, ACCEPTED, FAILED, KILLED, NEW, NEW_SAVING, SUBMITTED, AGGREGATING_LOGS, FRAMEWORK_FAILURE, STARTING_APP_MASTER, APP_MASTER_START_FAILED, GENERATING_SECURITY_MATERIAL, CONVERTING_NOTEBOOK. If no executions are found for the job, a warning is raised and it returns UNDEFINED.


[source]

get_url#

Job.get_url()

[source]

pause_schedule#

Job.pause_schedule()

Pauses the schedule of a Job execution


[source]

resume_schedule#

Job.resume_schedule()

Resumes the schedule of a Job execution


[source]

run#

Job.run(args=None, await_termination=True)

Run the job.

Run the job, by default awaiting its completion, with the option of passing runtime arguments.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instances
fg = fs.get_or_create_feature_group(...)

# insert in to feature group
job, _ = fg.insert(df, write_options={"start_offline_materialization": False})

# run job
execution = job.run()

# True if job executed successfully
print(execution.success)

# Download logs
out_log_path, err_log_path = execution.download_logs()

Arguments

  • args str: Optional runtime arguments for the job.
  • await_termination bool: Identifies if the client should wait for the job to complete, defaults to True.

Returns

Execution. The execution object for the submitted run.


[source]

save#

Job.save()

Save the job.

This function should be called after changing a property such as the job configuration to save it persistently.

job.config['appPath'] = "Resources/my_app.py"
job.save()
Returns

Job. The updated job object.


[source]

schedule#

Job.schedule(cron_expression, start_time=None, end_time=None)

Schedule the execution of the job.

If a schedule for this job already exists, the method updates it.

# Schedule the job
job.schedule(
    cron_expression="0 */5 * ? * * *",
    start_time=datetime.datetime.now(tz=timezone.utc)
)

# Retrieve the next execution time
print(job.job_schedule.next_execution_date_time)

Arguments

  • cron_expression str: The quartz cron expression
  • start_time datetime.datetime | None: The schedule start time in UTC. If None, the current time is used. The start_time can be a value in the past.
  • end_time datetime.datetime | None: The schedule end time in UTC. If None, the schedule will continue running indefinitely. The end_time can be a value in the past.

Returns

JobSchedule. The schedule of the job


[source]

unschedule#

Job.unschedule()

Unschedule the exceution of a Job