Skip to content

hopsworks.job #

Job #

For backwards compatibility hopsworks.job.Job is still available as hsfs.core.job.Job. The use of this alias is discouraged as it is to be deprecated.

id property #

Id of the job.

name property #

Name of the job.

creation_time property #

Date of creation for the job.

job_type property #

Type of the job.

creator property #

Creator of the job.

job_schedule property #

Return the Job schedule.

executions property #

List of executions for the job.

href property #

The URL of the job in Hopsworks UI, use get_url instead.

config property #

Configuration for the job.

run #

run(
    args: str | None = None,
    await_termination: bool = True,
    *,
    start_time: datetime | None = None,
    end_time: datetime | None = None,
    logical_date: datetime | None = None,
    env_vars: dict[str, str] | None = None,
) -> Execution

Run the job.

Run the job, by default awaiting its completion, with the option of passing runtime arguments.

Example (batch job):

# connect to the Feature Store
fs = ...

# get the Feature Group instances
fg = fs.get_or_create_feature_group(...)

# insert in to feature group
job, _ = fg.insert(df, write_options={"start_offline_materialization": False})

# run job
execution = job.run()

# True if job executed successfully
print(execution.success)

# Download logs
out_log_path, err_log_path = execution.download_logs()

Example (Python App / Streamlit):

import hopsworks

project = hopsworks.login()
job_api = project.get_job_api()

# Get default Python App configuration
config = job_api.get_configuration("PYTHON_APP")
config["appPath"] = "Resources/my_streamlit_app.py"

# Create the job
job = job_api.create_job("my_app", config)

# Run - waits until the app is ready, then prints the App UI URL
execution = job.run()

# Access the Streamlit UI URL programmatically
print(execution.app_url)

# Stop the app
execution.stop()

# Delete the job
job.delete()

PARAMETER DESCRIPTION
args

Optional runtime arguments for the job.

TYPE: str | None DEFAULT: None

await_termination

Identifies if the client should wait for the job to complete. Ignored for Python App jobs which wait for RUNNING state instead.

TYPE: bool DEFAULT: True

start_time

Optional. If set, injects HOPS_START_TIME (and HOPS_END_TIME if end_time is also set) as env vars on this one-shot execution. Useful for manual backfills without creating a schedule. Overrides any scheduler-computed value for this run.

TYPE: datetime | None DEFAULT: None

end_time

Optional. Paired with start_time; sets HOPS_END_TIME and also serves as the run's data_interval_end for reconciliation purposes.

TYPE: datetime | None DEFAULT: None

logical_date

Optional. If set, overrides the run's logical date (data interval start). Usually inferred from start_time / the schedule.

TYPE: datetime | None DEFAULT: None

env_vars

Optional dict of arbitrary env vars to inject into this execution. Values take precedence over anything with the same name from the job config or the scheduler.

TYPE: dict[str, str] | None DEFAULT: None

RETURNS DESCRIPTION
Execution

The execution object for the submitted run.

RAISES DESCRIPTION
hopsworks.client.exceptions.JobExecutionException

If await_termination is True and the job finished with a failure status.

get_state #

get_state() -> Literal[
    "UNDEFINED",
    "INITIALIZING",
    "INITIALIZATION_FAILED",
    "FINISHED",
    "RUNNING",
    "ACCEPTED",
    "FAILED",
    "KILLED",
    "NEW",
    "NEW_SAVING",
    "SUBMITTED",
    "AGGREGATING_LOGS",
    "FRAMEWORK_FAILURE",
    "STARTING_APP_MASTER",
    "APP_MASTER_START_FAILED",
    "GENERATING_SECURITY_MATERIAL",
    "CONVERTING_NOTEBOOK",
]

Get the state of the job.

RETURNS DESCRIPTION
Literal['UNDEFINED', 'INITIALIZING', 'INITIALIZATION_FAILED', 'FINISHED', 'RUNNING', 'ACCEPTED', 'FAILED', 'KILLED', 'NEW', 'NEW_SAVING', 'SUBMITTED', 'AGGREGATING_LOGS', 'FRAMEWORK_FAILURE', 'STARTING_APP_MASTER', 'APP_MASTER_START_FAILED', 'GENERATING_SECURITY_MATERIAL', 'CONVERTING_NOTEBOOK']

The current state of the job.

Literal['UNDEFINED', 'INITIALIZING', 'INITIALIZATION_FAILED', 'FINISHED', 'RUNNING', 'ACCEPTED', 'FAILED', 'KILLED', 'NEW', 'NEW_SAVING', 'SUBMITTED', 'AGGREGATING_LOGS', 'FRAMEWORK_FAILURE', 'STARTING_APP_MASTER', 'APP_MASTER_START_FAILED', 'GENERATING_SECURITY_MATERIAL', 'CONVERTING_NOTEBOOK']

If no executions are found for the job, a warning is raised and it returns UNDEFINED.

get_final_state #

get_final_state() -> Literal[
    "UNDEFINED",
    "FINISHED",
    "FAILED",
    "KILLED",
    "FRAMEWORK_FAILURE",
    "APP_MASTER_START_FAILED",
    "INITIALIZATION_FAILED",
]

Get the final state of the job.

RETURNS DESCRIPTION
Literal['UNDEFINED', 'FINISHED', 'FAILED', 'KILLED', 'FRAMEWORK_FAILURE', 'APP_MASTER_START_FAILED', 'INITIALIZATION_FAILED']

The final state of the job.

Literal['UNDEFINED', 'FINISHED', 'FAILED', 'KILLED', 'FRAMEWORK_FAILURE', 'APP_MASTER_START_FAILED', 'INITIALIZATION_FAILED']

UNDEFINED indicates that the job is still running.

get_executions #

get_executions() -> list[Execution]

Retrieves all executions for the job ordered by submission time.

RETURNS DESCRIPTION
list[Execution]

List of Execution objects.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

save #

save() -> Job

Save the job.

This function should be called after changing a property such as the job configuration to save it persistently.

job.config['appPath'] = "Resources/my_app.py"
job.save()
RETURNS DESCRIPTION
Job

The updated job object.

delete #

delete()

Delete the job.

Potentially dangerous operation

This operation deletes the job and all executions.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

schedule #

schedule(
    cron_expression: str,
    start_time: datetime = None,
    end_time: datetime = None,
    *,
    catchup: bool = False,
    max_active_runs: int = 1,
    start_time_offset_seconds: int | None = None,
    end_time_offset_seconds: int | None = None,
    skip_to_date: datetime = None,
    max_catchup_runs: int = None,
) -> JobSchedule

Schedule the execution of the job.

If a schedule for this job already exists, the method updates it.

# Defaults (None, None): HOPS_START_TIME = last execution time (= previous cron fire),
# HOPS_END_TIME = current cron fire. Works on any cron — no per-schedule tuning needed.
job.schedule(
    cron_expression="0 0 * ? * * *",
    start_time=datetime.now(tz=timezone.utc),
)

# Fixed 2-hour window ending at the cron fire (e.g. 08:00 → 10:00 at 10:00):
job.schedule(
    cron_expression="0 0 * ? * * *",
    start_time_offset_seconds=-2 * 3600,   # HOPS_START_TIME = fire - 2h
    end_time_offset_seconds=0,             # HOPS_END_TIME   = fire
    catchup=True,              # replay all missed intervals on recovery
    max_active_runs=2,         # allow at most 2 concurrent runs
)
PARAMETER DESCRIPTION
cron_expression

The quartz cron expression.

TYPE: str

start_time

The schedule start time in UTC. If None, the current time is used. Can be in the past.

TYPE: datetime DEFAULT: None

end_time

The schedule end time in UTC. If None, runs indefinitely. Can be in the past.

TYPE: datetime DEFAULT: None

catchup

If True and the scheduler missed fires (outage, etc.), create one execution per missed interval on recovery. If False (default), only create the most recent.

TYPE: bool DEFAULT: False

max_active_runs

Upper bound on concurrent executions for this job. Default 1.

TYPE: int DEFAULT: 1

start_time_offset_seconds

Controls HOPS_START_TIME. Three modes:

  • None (default) — use the previous cron fire (last execution time). Adapts to any cron naturally.
  • intHOPS_START_TIME = cron_fire + seconds. Negative values look backwards from the fire; positive look forward.

TYPE: int | None DEFAULT: None

end_time_offset_seconds

Controls HOPS_END_TIME. Three modes:

  • None (default) — use the cron fire time (HOPS_END_TIME = cron_fire).
  • intHOPS_END_TIME = cron_fire + seconds.

TYPE: int | None DEFAULT: None

skip_to_date

If set, reconciliation skips every missed interval strictly before this date.

TYPE: datetime DEFAULT: None

max_catchup_runs

Upper bound on missed intervals created during reconciliation (keeps most recent).

TYPE: int DEFAULT: None

RETURNS DESCRIPTION
JobSchedule

The schedule of the job.

unschedule #

unschedule()

Unschedule the exceution of a Job.

resume_schedule #

resume_schedule()

Resumes the schedule of a Job execution.

pause_schedule #

pause_schedule()

Pauses the schedule of a Job execution.

get_alerts #

get_alerts() -> list[alert.JobAlert]

Get all alerts for the job.

RETURNS DESCRIPTION
list[alert.JobAlert]

List of JobAlert objects.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_alert #

get_alert(alert_id: int) -> alert.JobAlert

Get an alert for the job by ID.

PARAMETER DESCRIPTION
alert_id

ID of the alert.

TYPE: int

RETURNS DESCRIPTION
alert.JobAlert

The JobAlert object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

create_alert #

create_alert(
    receiver: str,
    status: Literal[
        "long_running", "failed", "finished", "killed"
    ],
    severity: Literal["critical", "warning", "info"],
) -> alert.JobAlert

Create an alert for the job.

# Create alert for the job
job.create_alert(
    receiver="email",
    status="failed",
    severity="critical"
)
PARAMETER DESCRIPTION
receiver

The receiver of the alert.

TYPE: str

status

The status of the alert.

TYPE: Literal['long_running', 'failed', 'finished', 'killed']

severity

The severity of the alert.

TYPE: Literal['critical', 'warning', 'info']

RETURNS DESCRIPTION
alert.JobAlert

The created JobAlert object.

RAISES DESCRIPTION
ValueError

If status or severity is not valid.

hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_url #

get_url()

Get url to the job in Hopsworks.