hopsworks.job #
Job #
For backwards compatibility hopsworks.job.Job is still available as hsfs.core.job.Job. The use of this alias is discouraged as it is to be deprecated.
Returned by
-
FeatureMonitoringConfig.get_job -
FeatureMonitoringConfig.run_job -
JobApi.create_job -
JobApi.get_job -
JobApi.get_jobs -
FeatureGroup.sink_job -
FeatureView.log -
FeatureView.materialize_log -
Job.save -
TrainingDataset.insert -
TrainingDataset.save -
FeatureGroup.materialization_job -
FeatureGroup.insert -
FeatureGroup.multi_part_insert -
FeatureGroup.save -
FeatureView.create_training_data -
FeatureView.create_train_test_split -
FeatureView.create_train_validation_test_split -
FeatureView.recreate_training_dataset
id property #
Id of the job.
name property #
Name of the job.
creation_time property #
Date of creation for the job.
job_type property #
Type of the job.
creator property #
Creator of the job.
job_schedule property #
Return the Job schedule.
executions property #
List of executions for the job.
href property #
The URL of the job in Hopsworks UI, use get_url instead.
config property #
Configuration for the job.
run #
run(
args: str | None = None,
await_termination: bool = True,
*,
start_time: datetime | None = None,
end_time: datetime | None = None,
logical_date: datetime | None = None,
env_vars: dict[str, str] | None = None,
) -> Execution
Run the job.
Run the job, by default awaiting its completion, with the option of passing runtime arguments.
Example (batch job):
# connect to the Feature Store
fs = ...
# get the Feature Group instances
fg = fs.get_or_create_feature_group(...)
# insert in to feature group
job, _ = fg.insert(df, write_options={"start_offline_materialization": False})
# run job
execution = job.run()
# True if job executed successfully
print(execution.success)
# Download logs
out_log_path, err_log_path = execution.download_logs()
Example (Python App / Streamlit):
import hopsworks
project = hopsworks.login()
job_api = project.get_job_api()
# Get default Python App configuration
config = job_api.get_configuration("PYTHON_APP")
config["appPath"] = "Resources/my_streamlit_app.py"
# Create the job
job = job_api.create_job("my_app", config)
# Run - waits until the app is ready, then prints the App UI URL
execution = job.run()
# Access the Streamlit UI URL programmatically
print(execution.app_url)
# Stop the app
execution.stop()
# Delete the job
job.delete()
| PARAMETER | DESCRIPTION |
|---|---|
args | Optional runtime arguments for the job. TYPE: |
await_termination | Identifies if the client should wait for the job to complete. Ignored for Python App jobs which wait for RUNNING state instead. TYPE: |
start_time | Optional. If set, injects TYPE: |
end_time | Optional. Paired with TYPE: |
logical_date | Optional. If set, overrides the run's logical date (data interval start). Usually inferred from TYPE: |
env_vars | Optional dict of arbitrary env vars to inject into this execution. Values take precedence over anything with the same name from the job config or the scheduler. |
| RETURNS | DESCRIPTION |
|---|---|
Execution | The execution object for the submitted run. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.JobExecutionException | If |
get_state #
get_state() -> Literal[
"UNDEFINED",
"INITIALIZING",
"INITIALIZATION_FAILED",
"FINISHED",
"RUNNING",
"ACCEPTED",
"FAILED",
"KILLED",
"NEW",
"NEW_SAVING",
"SUBMITTED",
"AGGREGATING_LOGS",
"FRAMEWORK_FAILURE",
"STARTING_APP_MASTER",
"APP_MASTER_START_FAILED",
"GENERATING_SECURITY_MATERIAL",
"CONVERTING_NOTEBOOK",
]
Get the state of the job.
| RETURNS | DESCRIPTION |
|---|---|
Literal['UNDEFINED', 'INITIALIZING', 'INITIALIZATION_FAILED', 'FINISHED', 'RUNNING', 'ACCEPTED', 'FAILED', 'KILLED', 'NEW', 'NEW_SAVING', 'SUBMITTED', 'AGGREGATING_LOGS', 'FRAMEWORK_FAILURE', 'STARTING_APP_MASTER', 'APP_MASTER_START_FAILED', 'GENERATING_SECURITY_MATERIAL', 'CONVERTING_NOTEBOOK'] | The current state of the job. |
Literal['UNDEFINED', 'INITIALIZING', 'INITIALIZATION_FAILED', 'FINISHED', 'RUNNING', 'ACCEPTED', 'FAILED', 'KILLED', 'NEW', 'NEW_SAVING', 'SUBMITTED', 'AGGREGATING_LOGS', 'FRAMEWORK_FAILURE', 'STARTING_APP_MASTER', 'APP_MASTER_START_FAILED', 'GENERATING_SECURITY_MATERIAL', 'CONVERTING_NOTEBOOK'] | If no executions are found for the job, a warning is raised and it returns |
get_final_state #
get_final_state() -> Literal[
"UNDEFINED",
"FINISHED",
"FAILED",
"KILLED",
"FRAMEWORK_FAILURE",
"APP_MASTER_START_FAILED",
"INITIALIZATION_FAILED",
]
Get the final state of the job.
| RETURNS | DESCRIPTION |
|---|---|
Literal['UNDEFINED', 'FINISHED', 'FAILED', 'KILLED', 'FRAMEWORK_FAILURE', 'APP_MASTER_START_FAILED', 'INITIALIZATION_FAILED'] | The final state of the job. |
Literal['UNDEFINED', 'FINISHED', 'FAILED', 'KILLED', 'FRAMEWORK_FAILURE', 'APP_MASTER_START_FAILED', 'INITIALIZATION_FAILED'] |
|
get_executions #
Retrieves all executions for the job ordered by submission time.
| RETURNS | DESCRIPTION |
|---|---|
list[Execution] | List of Execution objects. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
save #
save() -> Job
Save the job.
This function should be called after changing a property such as the job configuration to save it persistently.
job.config['appPath'] = "Resources/my_app.py"
job.save()
| RETURNS | DESCRIPTION |
|---|---|
Job | The updated job object. |
delete #
delete()
Delete the job.
Potentially dangerous operation
This operation deletes the job and all executions.
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
schedule #
schedule(
cron_expression: str,
start_time: datetime = None,
end_time: datetime = None,
*,
catchup: bool = False,
max_active_runs: int = 1,
start_time_offset_seconds: int | None = None,
end_time_offset_seconds: int | None = None,
skip_to_date: datetime = None,
max_catchup_runs: int = None,
) -> JobSchedule
Schedule the execution of the job.
If a schedule for this job already exists, the method updates it.
# Defaults (None, None): HOPS_START_TIME = last execution time (= previous cron fire),
# HOPS_END_TIME = current cron fire. Works on any cron — no per-schedule tuning needed.
job.schedule(
cron_expression="0 0 * ? * * *",
start_time=datetime.now(tz=timezone.utc),
)
# Fixed 2-hour window ending at the cron fire (e.g. 08:00 → 10:00 at 10:00):
job.schedule(
cron_expression="0 0 * ? * * *",
start_time_offset_seconds=-2 * 3600, # HOPS_START_TIME = fire - 2h
end_time_offset_seconds=0, # HOPS_END_TIME = fire
catchup=True, # replay all missed intervals on recovery
max_active_runs=2, # allow at most 2 concurrent runs
)
| PARAMETER | DESCRIPTION |
|---|---|
cron_expression | The quartz cron expression. TYPE: |
start_time | The schedule start time in UTC. If TYPE: |
end_time | The schedule end time in UTC. If TYPE: |
catchup | If True and the scheduler missed fires (outage, etc.), create one execution per missed interval on recovery. If False (default), only create the most recent. TYPE: |
max_active_runs | Upper bound on concurrent executions for this job. Default 1. TYPE: |
start_time_offset_seconds | Controls
TYPE: |
end_time_offset_seconds | Controls
TYPE: |
skip_to_date | If set, reconciliation skips every missed interval strictly before this date. TYPE: |
max_catchup_runs | Upper bound on missed intervals created during reconciliation (keeps most recent). TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
JobSchedule | The schedule of the job. |
get_alerts #
Get all alerts for the job.
| RETURNS | DESCRIPTION |
|---|---|
list[alert.JobAlert] | List of JobAlert objects. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_alert #
Get an alert for the job by ID.
| PARAMETER | DESCRIPTION |
|---|---|
alert_id | ID of the alert. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
alert.JobAlert | The JobAlert object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
create_alert #
create_alert(
receiver: str,
status: Literal[
"long_running", "failed", "finished", "killed"
],
severity: Literal["critical", "warning", "info"],
) -> alert.JobAlert
Create an alert for the job.
# Create alert for the job
job.create_alert(
receiver="email",
status="failed",
severity="critical"
)
| PARAMETER | DESCRIPTION |
|---|---|
receiver | The receiver of the alert. TYPE: |
status | The status of the alert. TYPE: |
severity | The severity of the alert. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
alert.JobAlert | The created JobAlert object. |
| RAISES | DESCRIPTION |
|---|---|
ValueError | If |
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |