Validation Report#
ValidationReport#
hsfs.validation_report.ValidationReport(
success,
results,
meta,
statistics,
evaluation_parameters=None,
id=None,
full_report_path=None,
featurestore_id=None,
featuregroup_id=None,
href=None,
expand=None,
items=None,
count=None,
type=None,
validation_time=None,
ingestion_result="UNKNOWN",
**kwargs
)
Metadata object representing a validation report generated by Great Expectations in the Feature Store.
Creation#
validate#
FeatureGroup.validate(
dataframe=None,
expectation_suite=None,
save_report=False,
validation_options={},
ingestion_result="UNKNOWN",
ge_type=True,
)
Run validation based on the attached expectations.
Runs any expectation attached with Deequ. But also runs attached Great Expectation Suites.
Example
# connect to the Feature Store
fs = ...
# get feature group instance
fg = fs.get_or_create_feature_group(...)
ge_report = fg.validate(df, save_report=False)
Arguments
- dataframe
Optional[Union[pandas.DataFrame, pyspark.sql.DataFrame]]
: The dataframe to run the data validation expectations against. - expectation_suite
Optional[hsfs.expectation_suite.ExpectationSuite]
: Optionally provide an Expectation Suite to override the one that is possibly attached to the feature group. This is useful for testing new Expectation suites. When an extra suite is provided, the results will never be persisted. Defaults toNone
. - validation_options
Optional[Dict[Any, Any]]
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations.
- key
- ingestion_result
str
: Specify the fate of the associated data, defaults to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" for testing and development and "FG_DATA" when validating data already in the Feature Group. - save_report
Optional[bool]
: Whether to save the report to the backend. This is only possible if the Expectation suite is initialised and attached to the Feature Group. Defaults to False. - ge_type
bool
: Whether to return a Great Expectations object or Hopsworks own abstraction. Defaults to True.
Returns
A Validation Report produced by Great Expectations.
insert#
FeatureGroup.insert(
features,
overwrite=False,
operation="upsert",
storage=None,
write_options={},
validation_options={},
save_code=True,
wait=False,
)
Persist the metadata and materialize the feature group to the feature store or insert data from a dataframe into the existing feature group.
Incrementally insert data to a feature group or overwrite all data contained in the feature group. By
default, the data is inserted into the offline storage as well as the online storage if the feature group is
online_enabled=True
.
The features
dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame,
or a two-dimensional Numpy array or a two-dimensional Python nested list.
If statistics are enabled, statistics are recomputed for the entire feature
group.
If feature group's time travel format is HUDI
then operation
argument can be
either insert
or upsert
.
If feature group doesn't exist the insert method will create the necessary metadata the first time it is
invoked and writes the specified features
dataframe as feature group to the online/offline feature store.
Changed in 3.3.0
insert
and save
methods are now async by default in non-spark clients.
To achieve the old behaviour, set wait
argument to True
.
Upsert new feature data with time travel format HUDI
# connect to the Feature Store
fs = ...
fg = fs.get_or_create_feature_group(
name='bitcoin_price',
description='Bitcoin price aggregated for days',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
fg.insert(df_bitcoin_processed)
Async insert
# connect to the Feature Store
fs = ...
fg1 = fs.get_or_create_feature_group(
name='feature_group_name1',
description='Description of the first FG',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
# async insertion in order not to wait till finish of the job
fg.insert(df_for_fg1, write_options={"wait_for_job" : False})
fg2 = fs.get_or_create_feature_group(
name='feature_group_name2',
description='Description of the second FG',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
fg.insert(df_for_fg2)
Arguments
- features
Union[pandas.DataFrame, pyspark.sql.DataFrame, pyspark.RDD, numpy.ndarray, List[list]]
: DataFrame, RDD, Ndarray, list. Features to be saved. - overwrite
Optional[bool]
: Drop all data in the feature group before inserting new data. This does not affect metadata, defaults to False. - operation
Optional[str]
: Apache Hudi operation type"insert"
or"upsert"
. Defaults to"upsert"
. - storage
Optional[str]
: Overwrite default behaviour, write to offline storage only with"offline"
or online only with"online"
, defaults toNone
(If the streaming APIs are enabled, specifying the storage option is not supported). - write_options
Optional[Dict[str, Any]]
: Additional write options as key-value pairs, defaults to{}
. When using thepython
engine, write_options can contain the following entries:- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits. - key
start_offline_backfill
and valueTrue
orFalse
to configure whether or not to start the materialization job to write data to the offline storage.start_offline_backfill
is deprecated. Usestart_offline_materialization
instead. - key
start_offline_materialization
and valueTrue
orFalse
to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job gets started immediately. - key
kafka_producer_config
and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties. - key
internal_kafka
and valueTrue
orFalse
in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults toFalse
and will use external listeners when connecting from outside of Hopsworks.
- key
- validation_options
Optional[Dict[str, Any]]
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
save_report
boolean value, set toFalse
to skip upload of the validation report to Hopsworks. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations. - key
fetch_expectation_suite
a boolean value, by defaultTrue
, to control whether the expectation suite of the feature group should be fetched before every insert.
- key
- save_code
Optional[bool]
: When running HSFS on Hopsworks or Databricks, HSFS can save the code/notebook used to create the feature group or used to insert data to it. When calling theinsert
method repeatedly with small batches of data, this can slow down the writes. Use this option to turn off saving code. Defaults toTrue
. - wait
bool
: Wait for job to finish before returning, defaults toFalse
. Shortcut for read_options{"wait_for_job": False}
.
Returns
(Job
, ValidationReport
) A tuple with job information if python engine is used and the validation report if validation is enabled.
Raises
hsfs.client.exceptions.RestAPIError
. e.g fail to create feature group, dataframe schema does not match
existing feature group schema, etc.
hsfs.client.exceptions.DataValidationException
. If data validation fails and the expectation
suite validation_ingestion_policy
is set to STRICT
. Data is NOT ingested.
Retrieval#
get_latest_validation_report#
FeatureGroup.get_latest_validation_report(ge_type=True)
Return the latest validation report attached to the Feature Group if it exists.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
latest_val_report = fg.get_latest_validation_report()
Arguments
- ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
.
Returns
ValidationReport
. The latest validation report attached to the Feature Group.
Raises
hsfs.client.exceptions.RestAPIError
.
get_all_validation_reports#
FeatureGroup.get_all_validation_reports(ge_type=True)
Return the latest validation report attached to the feature group if it exists.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
val_reports = fg.get_all_validation_reports()
Arguments
- ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
.
Returns
Union[List[ValidationReport
], ValidationReport
]. All validation reports attached to the feature group.
Raises
hsfs.client.exceptions.RestAPIError
.
hsfs.client.exceptions.FeatureStoreException
.
Properties#
evaluation_parameters#
Evaluation parameters field of the validation report which store kwargs of the validation.
id#
Id of the validation report, set by backend.
ingestion_result#
Overall success of the validation run together with the ingestion validation policy. Indicating if dataframe was ingested or rejected.
meta#
Meta field of the validation report to store additional informations.
results#
List of expectation results obtained after validation.
statistics#
Statistics field of the validation report which store overall statistics about the validation result, e.g number of failing/successful expectations.
success#
Overall success of the validation step
Methods#
from_response_json#
ValidationReport.from_response_json(json_dict)
json#
ValidationReport.json()
to_dict#
ValidationReport.to_dict()
to_ge_type#
ValidationReport.to_ge_type()
to_json_dict#
ValidationReport.to_json_dict()