hopsworks.job #
[source] Job #
For backwards compatibility hopsworks.job.Job is still available as hsfs.core.job.Job. The use of this alias is discouraged as it is to be deprecated.
Returned by
-
JobApi.create_job -
JobApi.get_job -
JobApi.get_jobs -
FeatureGroup.sink_job -
Job.save -
FeatureGroup.materialization_job -
FeatureGroup.insert -
FeatureGroup.multi_part_insert -
FeatureGroup.save -
FeatureView.create_train_test_split -
FeatureView.create_train_validation_test_split -
FeatureView.create_training_data -
FeatureView.log -
FeatureView.materialize_log -
FeatureView.recreate_training_dataset
[source] run #
Run the job.
Run the job, by default awaiting its completion, with the option of passing runtime arguments.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instances
fg = fs.get_or_create_feature_group(...)
# insert in to feature group
job, _ = fg.insert(df, write_options={"start_offline_materialization": False})
# run job
execution = job.run()
# True if job executed successfully
print(execution.success)
# Download logs
out_log_path, err_log_path = execution.download_logs()
| PARAMETER | DESCRIPTION |
|---|---|
args | Optional runtime arguments for the job. TYPE: |
await_termination | Identifies if the client should wait for the job to complete. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
|
|
[source] get_state #
get_state() -> Literal[
"UNDEFINED",
"INITIALIZING",
"INITIALIZATION_FAILED",
"FINISHED",
"RUNNING",
"ACCEPTED",
"FAILED",
"KILLED",
"NEW",
"NEW_SAVING",
"SUBMITTED",
"AGGREGATING_LOGS",
"FRAMEWORK_FAILURE",
"STARTING_APP_MASTER",
"APP_MASTER_START_FAILED",
"GENERATING_SECURITY_MATERIAL",
"CONVERTING_NOTEBOOK",
]
Get the state of the job.
| RETURNS | DESCRIPTION |
|---|---|
Literal['UNDEFINED', 'INITIALIZING', 'INITIALIZATION_FAILED', 'FINISHED', 'RUNNING', 'ACCEPTED', 'FAILED', 'KILLED', 'NEW', 'NEW_SAVING', 'SUBMITTED', 'AGGREGATING_LOGS', 'FRAMEWORK_FAILURE', 'STARTING_APP_MASTER', 'APP_MASTER_START_FAILED', 'GENERATING_SECURITY_MATERIAL', 'CONVERTING_NOTEBOOK'] | The current state of the job. |
Literal['UNDEFINED', 'INITIALIZING', 'INITIALIZATION_FAILED', 'FINISHED', 'RUNNING', 'ACCEPTED', 'FAILED', 'KILLED', 'NEW', 'NEW_SAVING', 'SUBMITTED', 'AGGREGATING_LOGS', 'FRAMEWORK_FAILURE', 'STARTING_APP_MASTER', 'APP_MASTER_START_FAILED', 'GENERATING_SECURITY_MATERIAL', 'CONVERTING_NOTEBOOK'] | If no executions are found for the job, a warning is raised and it returns |
[source] get_final_state #
get_final_state() -> Literal[
"UNDEFINED",
"FINISHED",
"FAILED",
"KILLED",
"FRAMEWORK_FAILURE",
"APP_MASTER_START_FAILED",
"INITIALIZATION_FAILED",
]
Get the final state of the job.
| RETURNS | DESCRIPTION |
|---|---|
Literal['UNDEFINED', 'FINISHED', 'FAILED', 'KILLED', 'FRAMEWORK_FAILURE', 'APP_MASTER_START_FAILED', 'INITIALIZATION_FAILED'] | The final state of the job. |
Literal['UNDEFINED', 'FINISHED', 'FAILED', 'KILLED', 'FRAMEWORK_FAILURE', 'APP_MASTER_START_FAILED', 'INITIALIZATION_FAILED'] |
|
[source] get_executions #
get_executions()
Retrieves all executions for the job ordered by submission time.
| RETURNS | DESCRIPTION |
|---|---|
|
|
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] save #
save() -> Job
Save the job.
This function should be called after changing a property such as the job configuration to save it persistently.
job.config['appPath'] = "Resources/my_app.py"
job.save()
| RETURNS | DESCRIPTION |
|---|---|
Job | The updated job object. |
[source] delete #
delete()
Delete the job.
Potentially dangerous operation
This operation deletes the job and all executions.
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] schedule #
schedule(
cron_expression: str,
start_time: datetime = None,
end_time: datetime = None,
) -> JobSchedule
Schedule the execution of the job.
If a schedule for this job already exists, the method updates it.
# Schedule the job
job.schedule(
cron_expression="0 */5 * ? * * *",
start_time=datetime.datetime.now(tz=timezone.utc)
)
# Retrieve the next execution time
print(job.job_schedule.next_execution_date_time)
| PARAMETER | DESCRIPTION |
|---|---|
cron_expression | The quartz cron expression. TYPE: |
start_time | The schedule start time in UTC. If TYPE: |
end_time | The schedule end time in UTC. If TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
JobSchedule | The schedule of the job |
[source] create_alert #
create_alert(
receiver: str,
status: Literal[
"long_running", "failed", "finished", "killed"
],
severity: Literal["critical", "warning", "info"],
) -> alert.JobAlert
Create an alert for the job.
# Create alert for the job
job.create_alert(
receiver="email",
status="failed",
severity="critical"
)
| PARAMETER | DESCRIPTION |
|---|---|
receiver | The receiver of the alert. TYPE: |
status | The status of the alert. TYPE: |
severity | The severity of the alert. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
alert.JobAlert | The created JobAlert object. |
| RAISES | DESCRIPTION |
|---|---|
ValueError | If |
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |