hsfs.hopsworks_udf #
HopsworksUdf #
Meta data for user defined functions.
Stores meta data required to execute the user defined function in both spark and python engine. The class generates uses the metadata to dynamically generate user defined functions based on the engine it is executed in.
| PARAMETER | DESCRIPTION |
|---|---|
func | The transformation function object or the source code of the transformation function. |
return_types | A python type or a list of python types that denotes the data types of the columns output from the transformation functions. |
name | Name of the transformation function. TYPE: |
transformation_features | A list of objects of TYPE: |
transformation_function_argument_names | The argument names of the transformation function. |
dropped_argument_names | The arguments to be dropped from the finial DataFrame after the transformation functions are applied. |
dropped_feature_names | The feature name corresponding to the arguments names that are dropped. |
feature_name_prefix | Prefixes if any used in the feature view. TYPE: |
output_column_names | The names of the output columns returned from the transformation function. TYPE: |
generate_output_col_names | Generate default output column names for the transformation function. TYPE: |
statistics_required property #
statistics_required: bool
Get if statistics for any feature is required by the UDF.
transformation_statistics property writable #
transformation_statistics: TransformationStatistics | None
Feature statistics required for the defined UDF.
output_column_names property writable #
Output columns names of the transformation function.
transformation_features property #
List of feature names to be used in the User Defined Function.
unprefixed_transformation_features property #
List of feature name used in the transformation function without the feature name prefix.
feature_name_prefix property #
feature_name_prefix: str | None
The feature name prefix that needs to be added to the feature names.
statistics_features property #
List of feature names that require statistics.
dropped_features property writable #
List of features that will be dropped after the UDF is applied.
transformation_context property writable #
Dictionary that contains the context variables required for the UDF.
These context variables passed to the UDF during execution.
alias #
alias(*args: str)
Set the names of the transformed features output by the UDF.
| PARAMETER | DESCRIPTION |
|---|---|
args | Name of the output features after transformation. Can be passed as individual string arguments or as a list of strings. The number of output feature names provided must match the number of features returned by the transformation function. TYPE: |
executor #
executor(
statistics: TransformationStatistics
| list[FeatureDescriptiveStatistics]
| dict[str, dict[str, Any]] = None,
context: dict[str, Any] = None,
online: bool = False,
) -> Any
Create an executable transformation with optional statistics and context for unit testing.
This method returns a callable object that can execute the UDF with the specified configuration. It is designed for unit testing transformation functions locally.
The executor allows you to: - Inject mock statistics for testing model-dependent transformations - Provide transformation context for testing transformation functions using - Switch between online (single-value) and offline (batch) execution modes
Testing UDF with pandas execution mode
@udf(return_type=float, mode="pandas")
def add_one(value):
return value + 1
# Create executor and test
executor = add_one.executor()
result = executor.execute(pd.Series([1.0, 2.0, 3.0]))
assert result.tolist() == [2.0, 3.0, 4.0]
Testing UDF with python execution mode
@udf(return_type=float, mode="python")
def add_one(value):
return value + 1
# Create executor and test
executor = add_one.executor()
result = executor.execute(1.0)
assert result == 2.0
Testing UDF with default execution mode
# In the default execution mode, Hopsworks executes the transformation function as pandas UDF for batch processing and as python function for online processing to get optimal.
# Hence, the function should should be able to handle both online and offline execution modes and unit-test musts be written for both these use-cases.
# In the offline mode, Hopsworks would pass a pandas Series to the function.
# In the online mode, Hopsworks would pass a single value to the function.
@udf(return_type=float)
def double_value(value):
return value * 2
# Offline mode (batch processing with pandas Series)
offline_executor = double_value.executor(online=False)
batch_result = offline_executor.execute(pd.Series([1.0, 2.0, 3.0]))
# Online mode (single value processing)
online_executor = double_value.executor(online=True)
single_result = online_executor.execute(5.0)
assert single_result == 10.0
Unit test with mocked statistics
from hsfs.transformation_statistics import TransformationStatistics
@udf(return_type=float)
def normalize(value, statistics=TransformationStatistics("value")):
return (value - statistics.value.mean) / statistics.value.std_dev
# Test with mock statistics
executor = normalize.executor(statistics={"value": {"mean": 100.0, "std_dev": 25.0}})
result = executor.execute(pd.Series([100.0, 125.0, 150.0]))
assert result.tolist() == [0.0, 1.0, 2.0]
Unit test with transformation context
@udf(return_type=float)
def apply_discount(price, context):
return price * (1 - context["discount_rate"])
executor = apply_discount.executor(context={"discount_rate": 0.1})
result = executor.execute(pd.Series([100.0, 200.0]))
assert result.tolist() == [90.0, 180.0]
Testing online vs offline execution modes
# For transformation functions using the default execution mode `default`.
# The function should should be able to handle both online and offline execution modes.
# In the offline mode, Hopsworks would pass a pandas Series to the function.
# In the online mode, Hopsworks would pass a single value to the function.
@udf(return_type=float, mode="default")
def double_value(value):
return value * 2
# Offline mode (batch processing with pandas Series)
offline_executor = double_value.executor(online=False)
batch_result = offline_executor.execute(pd.Series([1.0, 2.0, 3.0]))
# Online mode (single value processing)
online_executor = double_value.executor(online=True)
single_result = online_executor.execute(5.0)
assert single_result == 10.0
| PARAMETER | DESCRIPTION |
|---|---|
statistics | Statistics for model-dependent transformations. Can be provided as:
TYPE: |
context | A dictionary mapping variable names to values that provide contextual information to the transformation function at runtime. The keys must match parameter names defined in the UDF. |
online | Whether to execute in online mode (single values) or offline mode (batch/vectorized). Only applicable when the UDF uses TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
Any | A callable object with an |
execute #
execute(
*args: Any,
) -> (
pd.Series
| pd.DataFrame
| int
| float
| str
| bool
| datetime
| time
| date
| tuple[
int | float | str | bool | datetime | time | date,
...,
]
)
Execute the UDF directly with the provided arguments.
This is a convenience method for quick testing of simple UDFs that don't require statistics or transformation context. It executes the UDF in offline mode (batch processing).
Quick UDF testing
@udf(return_type=float)
def add_one(value):
return value + 1
# Direct execution for simple tests
result = add_one.execute(pd.Series([1.0, 2.0, 3.0]))
assert result.tolist() == [2.0, 3.0, 4.0]
Note
For UDFs that require statistics or transformation context or need to be executed in online mode, use executor() instead:
result = my_udf.executor(statistics=stats, context=ctx).execute(data)
| PARAMETER | DESCRIPTION |
|---|---|
*args | Input arguments matching the UDF's parameter signature. For batch processing, pass pandas Series or DataFrames. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
pd.Series | pd.DataFrame | int | float | str | bool | datetime | time | date | tuple[int | float | str | bool | datetime | time | date, ...] | The transformed values. |
json #
json() -> str
Convert class into its json serialized form.
| RETURNS | DESCRIPTION |
|---|---|
str | JSON serialized object. |
from_response_json classmethod #
from_response_json(
json_dict: dict[str, Any],
) -> HopsworksUdf
Function that constructs the class object from its json serialization.
| PARAMETER | DESCRIPTION |
|---|---|
json_dict | JSON serialized dictionary for the class. |
| RETURNS | DESCRIPTION |
|---|---|
HopsworksUdf | JSON deserialized class object. |
TransformationFeature dataclass #
Mapping of feature names to their corresponding statistics argument names in the code.
The statistic_argument_name for a feature name would be None if the feature does not need statistics.
| PARAMETER | DESCRIPTION |
|---|---|
feature_name | Name of the feature. TYPE: |
statistic_argument_name | Name of the statistics argument in the code for the feature specified in the feature name. TYPE: |
udf #
udf(
return_type: list[type] | type,
drop: str | list[str] | None = None,
mode: Literal[
"default", "python", "pandas"
] = "default",
) -> HopsworksUdf
Create an User Defined Function that can be and used within the Hopsworks Feature Store to create transformation functions.
Hopsworks UDF's are user defined functions that executes as 'pandas_udf' when executing in spark engine and as pandas functions in the python engine. The pandas udf/pandas functions gets as inputs pandas Series's and can provide as output a pandas Series or a pandas DataFrame. A Hopsworks udf is defined using the hopsworks_udf decorator. The outputs of the defined UDF must be mentioned in the decorator as a list of python types.
Example
from hopsworks import udf
@udf(float)
def add_one(data1):
return data1 + 1
| PARAMETER | DESCRIPTION |
|---|---|
return_type | The output types of the defined UDF. |
drop | The features to be dropped after application of transformation functions. |
mode | The exection mode of the UDF. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
HopsworksUdf | The metadata object for hopsworks UDF's. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.FeatureStoreException | If unable to create UDF. |