Validation Report#
ValidationReport#
hsfs.validation_report.ValidationReport(
success,
results,
meta,
statistics,
evaluation_parameters=None,
id=None,
full_report_path=None,
featurestore_id=None,
featuregroup_id=None,
validation_time=None,
ingestion_result="unknown",
**kwargs
)
Metadata object representing a validation report generated by Great Expectations in the Feature Store.
Creation#
validate#
FeatureGroup.validate(
dataframe=None,
expectation_suite=None,
save_report=False,
validation_options=None,
ingestion_result="unknown",
ge_type=True,
)
Run validation based on the attached expectations.
Runs the expectation suite attached to the feature group against the provided dataframe. Raise an error if the great_expectations package is not installed.
Example
# connect to the Feature Store
fs = ...
# get feature group instance
fg = fs.get_or_create_feature_group(...)
ge_report = fg.validate(df, save_report=False)
Arguments
- dataframe
pandas.DataFrame | hsfs.feature_group.pyspark.sql.DataFrame | None
: The dataframe to run the data validation expectations against. - expectation_suite
hsfs.expectation_suite.ExpectationSuite | None
: Optionally provide an Expectation Suite to override the one that is possibly attached to the feature group. This is useful for testing new Expectation suites. When an extra suite is provided, the results will never be persisted. Defaults toNone
. - validation_options
Dict[str, Any] | None
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations.
- key
- ingestion_result
Literal['unknown', 'ingested', 'rejected', 'fg_data', 'experiement']
: Specify the fate of the associated data, defaults to "UNKNOWN". Supported options are "UNKNOWN", "INGESTED", "REJECTED", "EXPERIMENT", "FG_DATA". Use "INGESTED" or "REJECTED" for validation of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" for testing and development and "FG_DATA" when validating data already in the Feature Group. - save_report
bool | None
: Whether to save the report to the backend. This is only possible if the Expectation suite is initialised and attached to the Feature Group. Defaults to False. - ge_type
bool
: Whether to return a Great Expectations object or Hopsworks own abstraction. Defaults toTrue
if Great Expectations is installed, elseFalse
.
Returns
A Validation Report produced by Great Expectations.
insert#
FeatureGroup.insert(
features,
overwrite=False,
operation="upsert",
storage=None,
write_options=None,
validation_options=None,
wait=False,
)
Persist the metadata and materialize the feature group to the feature store or insert data from a dataframe into the existing feature group.
Incrementally insert data to a feature group or overwrite all data contained in the feature group. By default, the data is inserted into the offline storage as well as the online storage if the feature group is online_enabled=True
.
The features
dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, a Polars DataFrame or a two-dimensional Numpy array or a two-dimensional Python nested list. If statistics are enabled, statistics are recomputed for the entire feature group. If feature group's time travel format is HUDI
then operation
argument can be either insert
or upsert
.
If feature group doesn't exist the insert method will create the necessary metadata the first time it is invoked and writes the specified features
dataframe as feature group to the online/offline feature store.
Changed in 3.3.0
insert
and save
methods are now async by default in non-spark clients. To achieve the old behaviour, set wait
argument to True
.
Upsert new feature data with time travel format HUDI
# connect to the Feature Store
fs = ...
fg = fs.get_or_create_feature_group(
name='bitcoin_price',
description='Bitcoin price aggregated for days',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
fg.insert(df_bitcoin_processed)
Async insert
# connect to the Feature Store
fs = ...
fg1 = fs.get_or_create_feature_group(
name='feature_group_name1',
description='Description of the first FG',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
# async insertion in order not to wait till finish of the job
fg.insert(df_for_fg1, write_options={"wait_for_job" : False})
fg2 = fs.get_or_create_feature_group(
name='feature_group_name2',
description='Description of the second FG',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
fg.insert(df_for_fg2)
Arguments
- features
pandas.DataFrame | polars.dataframe.frame.DataFrame | hsfs.feature_group.pyspark.sql.DataFrame | hsfs.feature_group.pyspark.RDD | numpy.ndarray | List[list]
: Pandas DataFrame, Polars DataFrame, RDD, Ndarray, list. Features to be saved. - overwrite
bool
: Drop all data in the feature group before inserting new data. This does not affect metadata, defaults to False. - operation
str | None
: Apache Hudi operation type"insert"
or"upsert"
. Defaults to"upsert"
. - storage
str | None
: Overwrite default behaviour, write to offline storage only with"offline"
or online only with"online"
, defaults toNone
(If the streaming APIs are enabled, specifying the storage option is not supported). - write_options
Dict[str, Any] | None
: Additional write options as key-value pairs, defaults to{}
. When using thepython
engine, write_options can contain the following entries:- key
spark
and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group. - key
wait_for_job
and valueTrue
orFalse
to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits. - key
start_offline_backfill
and valueTrue
orFalse
to configure whether or not to start the materialization job to write data to the offline storage.start_offline_backfill
is deprecated. Usestart_offline_materialization
instead. - key
start_offline_materialization
and valueTrue
orFalse
to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job gets started immediately. - key
kafka_producer_config
and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties. - key
internal_kafka
and valueTrue
orFalse
in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults toFalse
and will use external listeners when connecting from outside of Hopsworks.
- key
- validation_options
Dict[str, Any] | None
: Additional validation options as key-value pairs, defaults to{}
.- key
run_validation
boolean value, set toFalse
to skip validation temporarily on ingestion. - key
save_report
boolean value, set toFalse
to skip upload of the validation report to Hopsworks. - key
ge_validate_kwargs
a dictionary containing kwargs for the validate method of Great Expectations. - key
fetch_expectation_suite
a boolean value, by defaultTrue
, to control whether the expectation suite of the feature group should be fetched before every insert.
- key
- wait
bool
: Wait for job to finish before returning, defaults toFalse
. Shortcut for read_options{"wait_for_job": False}
.
Returns
(Job
, ValidationReport
) A tuple with job information if python engine is used and the validation report if validation is enabled.
Raises
hsfs.client.exceptions.RestAPIError
. e.g fail to create feature group, dataframe schema does not match existing feature group schema, etc. hsfs.client.exceptions.DataValidationException
. If data validation fails and the expectation suite validation_ingestion_policy
is set to STRICT
. Data is NOT ingested.
Retrieval#
get_latest_validation_report#
FeatureGroup.get_latest_validation_report(ge_type=True)
Return the latest validation report attached to the Feature Group if it exists.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
latest_val_report = fg.get_latest_validation_report()
Arguments
- ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
if Great Expectations is installed, elseFalse
.
Returns
ValidationReport
. The latest validation report attached to the Feature Group.
Raises
hsfs.client.exceptions.RestAPIError
.
get_all_validation_reports#
FeatureGroup.get_all_validation_reports(ge_type=True)
Return the latest validation report attached to the feature group if it exists.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
val_reports = fg.get_all_validation_reports()
Arguments
- ge_type
bool
: IfTrue
returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via theto_ge_type()
method on hopsworks type. Defaults toTrue
if Great Expectations is installed, elseFalse
.
Returns
Union[List[ValidationReport
], ValidationReport
]. All validation reports attached to the feature group.
Raises
hsfs.client.exceptions.RestAPIError
. hsfs.client.exceptions.FeatureStoreException
.
Properties#
evaluation_parameters#
Evaluation parameters field of the validation report which store kwargs of the validation.
id#
Id of the validation report, set by backend.
ingestion_result#
Overall success of the validation run together with the ingestion validation policy. Indicating if dataframe was ingested or rejected.
meta#
Meta field of the validation report to store additional informations.
results#
List of expectation results obtained after validation.
statistics#
Statistics field of the validation report which store overall statistics about the validation result, e.g number of failing/successful expectations.
success#
Overall success of the validation step
Methods#
from_response_json#
ValidationReport.from_response_json(json_dict)
json#
ValidationReport.json()
to_dict#
ValidationReport.to_dict()
to_ge_type#
ValidationReport.to_ge_type()
to_json_dict#
ValidationReport.to_json_dict()