Jobs API#
Handle#
get_job_api#
Project.get_job_api()
Get the job API for the project.
Returns
JobApi
: The Job Api handle
Configuration#
get_configuration#
JobApi.get_configuration(type)
Get configuration for the specific job type.
Arguments
- type
str
: Type of the job. Currently, supported types include: SPARK, PYSPARK, PYTHON, DOCKER, FLINK.
Returns
dict
: Default job configuration
Raises
RestAPIError
: If unable to get the job configuration
JobConfiguration#
hopsworks_common.core.job_configuration.JobConfiguration(
driver_memory=2048,
driver_cores=1,
executor_memory=4096,
executor_cores=1,
executor_instances=1,
dynamic_allocation=True,
dynamic_min_executors=1,
dynamic_max_executors=2,
environment_name="spark-feature-pipeline",
**kwargs
)
Creation#
create_job#
JobApi.create_job(name, config)
Create a new job or update an existing one.
import hopsworks
project = hopsworks.login()
job_api = project.get_job_api()
spark_config = job_api.get_configuration("PYSPARK")
spark_config['appPath'] = "/Resources/my_app.py"
job = job_api.create_job("my_spark_job", spark_config)
- name
str
: Name of the job. - config
dict
: Configuration of the job.
Returns
Job
: The Job object
Raises
RestAPIError
: If unable to create the job
Retrieval#
get_job#
JobApi.get_job(name)
Get a job.
Arguments
- name
str
: Name of the job.
Returns
Job
: The Job object
Raises
RestAPIError
: If unable to get the job
get_jobs#
JobApi.get_jobs()
Get all jobs.
Returns
List[Job]
: List of Job objects
Raises
RestAPIError
: If unable to get the jobs
Properties#
config#
Configuration for the job
creation_time#
Date of creation for the job
creator#
Creator of the job
executions#
href#
id#
Id of the job
job_schedule#
Return the Job schedule
job_type#
Type of the job
name#
Name of the job
Methods#
delete#
Job.delete()
Delete the job
Potentially dangerous operation
This operation deletes the job and all executions.
Raises
RestAPIError
.
get_executions#
Job.get_executions()
Retrieves all executions for the job ordered by submission time.
Returns
List[Execution]
Raises
RestAPIError
in case the backend fails to retrieve executions.
get_final_state#
Job.get_final_state()
Get the final state of the job.
Returns
final_state
. Final state of the job, which can be one of the following: UNDEFINED
, FINISHED
, FAILED
, KILLED
, FRAMEWORK_FAILURE
, APP_MASTER_START_FAILED
, INITIALIZATION_FAILED
. UNDEFINED
indicates that the job is still running.
get_state#
Job.get_state()
Get the state of the job.
Returns
state
. Current state of the job, which can be one of the following: INITIALIZING
, INITIALIZATION_FAILED
, FINISHED
, RUNNING
, ACCEPTED
, FAILED
, KILLED
, NEW
, NEW_SAVING
, SUBMITTED
, AGGREGATING_LOGS
, FRAMEWORK_FAILURE
, STARTING_APP_MASTER
, APP_MASTER_START_FAILED
, GENERATING_SECURITY_MATERIAL
, CONVERTING_NOTEBOOK
. If no executions are found for the job, a warning is raised and it returns UNDEFINED
.
get_url#
Job.get_url()
pause_schedule#
Job.pause_schedule()
Pauses the schedule of a Job execution
resume_schedule#
Job.resume_schedule()
Resumes the schedule of a Job execution
run#
Job.run(args=None, await_termination=True)
Run the job.
Run the job, by default awaiting its completion, with the option of passing runtime arguments.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instances
fg = fs.get_or_create_feature_group(...)
# insert in to feature group
job, _ = fg.insert(df, write_options={"start_offline_materialization": False})
# run job
execution = job.run()
# True if job executed successfully
print(execution.success)
# Download logs
out_log_path, err_log_path = execution.download_logs()
Arguments
- args
str
: Optional runtime arguments for the job. - await_termination
bool
: Identifies if the client should wait for the job to complete, defaults to True.
Returns
Execution
. The execution object for the submitted run.
save#
Job.save()
Save the job.
This function should be called after changing a property such as the job configuration to save it persistently.
job.config['appPath'] = "Resources/my_app.py"
job.save()
Job
. The updated job object.
schedule#
Job.schedule(cron_expression, start_time=None, end_time=None)
Schedule the execution of the job.
If a schedule for this job already exists, the method updates it.
# Schedule the job
job.schedule(
cron_expression="0 */5 * ? * * *",
start_time=datetime.datetime.now(tz=timezone.utc)
)
# Retrieve the next execution time
print(job.job_schedule.next_execution_date_time)
Arguments
- cron_expression
str
: The quartz cron expression - start_time
datetime.datetime | None
: The schedule start time in UTC. If None, the current time is used. The start_time can be a value in the past. - end_time
datetime.datetime | None
: The schedule end time in UTC. If None, the schedule will continue running indefinitely. The end_time can be a value in the past.
Returns
JobSchedule
. The schedule of the job
unschedule#
Job.unschedule()
Unschedule the exceution of a Job