Skip to content

Validation Report#

[source]

ValidationReport#

hsfs.validation_report.ValidationReport(
    success,
    results,
    meta,
    statistics,
    evaluation_parameters=None,
    id=None,
    full_report_path=None,
    featurestore_id=None,
    featuregroup_id=None,
    href=None,
    expand=None,
    items=None,
    count=None,
    type=None,
    validation_time=None,
    ingestion_result="UNKNOWN",
)

Metadata object representing a validation report generated by Great Expectations in the Feature Store.


Creation#

[source]

validate#

FeatureGroup.validate(
    dataframe=None,
    expectation_suite=None,
    save_report=False,
    validation_options={},
    ingestion_result="UNKNOWN",
    ge_type=True,
)

Run validation based on the attached expectations.

Runs any expectation attached with Deequ. But also runs attached Great Expectation Suites.

Example

# connect to the Feature Store
fs = ...

# get feature group instance
fg = fs.get_or_create_feature_group(...)

ge_report = fg.validate(df, save_report=False)

Arguments

  • dataframe Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame]]: The dataframe to run the data validation expectations against.
  • expectation_suite Optional[hsfs.expectation_suite.ExpectationSuite]: Optionally provide an Expectation Suite to override the one that is possibly attached to the feature group. This is useful for testing new Expectation suites. When an extra suite is provided, the results will never be persisted. Defaults to None.
  • validation_options Optional[Dict[Any, Any]]: Additional validation options as key-value pairs, defaults to {}.
    • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
    • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
  • ingestion_result str: Specify the fate of the associated data, defaults to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" for testing and development and "FG_DATA" when validating data already in the Feature Group.
  • save_report Optional[bool]: Whether to save the report to the backend. This is only possible if the Expectation suite is initialised and attached to the Feature Group. Defaults to False.
  • ge_type bool: Whether to return a Great Expectations object or Hopsworks own abstraction. Defaults to True.

Returns

A Validation Report produced by Great Expectations.


[source]

insert#

FeatureGroup.insert(
    features,
    overwrite=False,
    operation="upsert",
    storage=None,
    write_options={},
    validation_options={},
    save_code=True,
    wait=False,
)

Persist the metadata and materialize the feature group to the feature store or insert data from a dataframe into the existing feature group.

Incrementally insert data to a feature group or overwrite all data contained in the feature group. By default, the data is inserted into the offline storage as well as the online storage if the feature group is online_enabled=True. To insert only into the online or offline storage set storage="online" or storage="offline" respectively.

The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, or a two-dimensional Numpy array or a two-dimensional Python nested list. If statistics are enabled, statistics are recomputed for the entire feature group. If feature group's time travel format is HUDI then operation argument can be either insert or upsert.

If feature group doesn't exists the insert method will create the necessary metadata the first time it is invoked and writes the specified features dataframe as feature group to the online/offline feature store.

Changed in 3.3.0

insert and save methods are now async by default in non-spark clients. To achieve the old behaviour, set wait argument to True.

Upsert new feature data with time travel format HUDI

# connect to the Feature Store
fs = ...

fg = fs.get_or_create_feature_group(
    name='bitcoin_price',
    description='Bitcoin price aggregated for days',
    version=1,
    primary_key=['unix'],
    online_enabled=True,
    event_time='unix'
)

fg.insert(df_bitcoin_processed)

Async insert

# connect to the Feature Store
fs = ...

fg1 = fs.get_or_create_feature_group(
    name='feature_group_name1',
    description='Description of the first FG',
    version=1,
    primary_key=['unix'],
    online_enabled=True,
    event_time='unix'
)
# async insertion in order not to wait till finish of the job
fg.insert(df_for_fg1, write_options={"wait_for_job" : False})

fg2 = fs.get_or_create_feature_group(
    name='feature_group_name2',
    description='Description of the second FG',
    version=1,
    primary_key=['unix'],
    online_enabled=True,
    event_time='unix'
)
fg.insert(df_for_fg2)

Arguments

  • features Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]: DataFrame, RDD, Ndarray, list. Features to be saved.
  • overwrite Optional[bool]: Drop all data in the feature group before inserting new data. This does not affect metadata, defaults to False.
  • operation Optional[str]: Apache Hudi operation type "insert" or "upsert". Defaults to "upsert".
  • storage Optional[str]: Overwrite default behaviour, write to offline storage only with "offline" or online only with "online", defaults to None.
  • write_options Optional[Dict[str, Any]]: Additional write options as key-value pairs, defaults to {}. When using the python engine, write_options can contain the following entries:
    • key spark and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group.
    • key wait_for_job and value True or False to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits.
    • key start_offline_backfill and value True or False to configure whether or not to start the backfill job to write data to the offline storage. By default the backfill job gets started immediately.
    • key internal_kafka and value True or False in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to False and will use external listeners when connecting from outside of Hopsworks.
  • validation_options Optional[Dict[str, Any]]: Additional validation options as key-value pairs, defaults to {}.
    • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
    • key save_report boolean value, set to False to skip upload of the validation report to Hopsworks.
    • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
    • key fetch_expectation_suite a boolean value, by default True, to control whether the expectation suite of the feature group should be fetched before every insert.
  • save_code Optional[bool]: When running HSFS on Hopsworks or Databricks, HSFS can save the code/notebook used to create the feature group or used to insert data to it. When calling the insert method repeatedly with small batches of data, this can slow down the writes. Use this option to turn off saving code. Defaults to True.
  • wait bool: Wait for job to finish before returning, defaults to False. Shortcut for read_options {"wait_for_job": False}.

Returns

(Job, ValidationReport) A tuple with job information if python engine is used and the validation report if validation is enabled.


Fetch Validation Reports#

[source]

get_latest_validation_report#

FeatureGroup.get_latest_validation_report(ge_type=True)

Return the latest validation report attached to the Feature Group if it exists.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

latest_val_report = fg.get_latest_validation_report()

Arguments

  • ge_type bool: If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type. Defaults to True.

Returns

ValidationReport. The latest validation report attached to the Feature Group.

Raises

hsfs.client.exceptions.RestAPIError.


[source]

get_all_validation_reports#

FeatureGroup.get_all_validation_reports(ge_type=True)

Return the latest validation report attached to the feature group if it exists.

Example

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

val_reports = fg.get_all_validation_reports()

Arguments

  • ge_type bool: If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type. Defaults to True.

Returns

Union[List[ValidationReport], ValidationReport]. All validation reports attached to the feature group.

Raises

hsfs.client.exceptions.RestAPIError. hsfs.client.exceptions.FeatureStoreException.


Properties#

[source]

evaluation_parameters#

Evaluation parameters field of the validation report which store kwargs of the validation.


[source]

id#

Id of the validation report, set by backend.


[source]

ingestion_result#

Overall success of the validation run together with the ingestion validation policy. Indicating if dataframe was ingested or rejected.


[source]

meta#

Meta field of the validation report to store additional informations.


[source]

results#

List of expectation results obtained after validation.


[source]

statistics#

Statistics field of the validation report which store overall statistics about the validation result, e.g number of failing/successful expectations.


[source]

success#

Overall success of the validation step


Methods#

[source]

from_response_json#

ValidationReport.from_response_json(json_dict)

[source]

json#

ValidationReport.json()

[source]

to_dict#

ValidationReport.to_dict()

[source]

to_ge_type#

ValidationReport.to_ge_type()

[source]

to_json_dict#

ValidationReport.to_json_dict()