Skip to content

hsfs.hopsworks_udf #

HopsworksUdf #

Meta data for user defined functions.

Stores meta data required to execute the user defined function in both spark and python engine. The class generates uses the metadata to dynamically generate user defined functions based on the engine it is executed in.

PARAMETER DESCRIPTION
func

The transformation function object or the source code of the transformation function.

TYPE: Callable | str

return_types

A python type or a list of python types that denotes the data types of the columns output from the transformation functions.

TYPE: list[type] | type | list[str] | str

name

Name of the transformation function.

TYPE: str | None DEFAULT: None

transformation_features

A list of objects of TransformationFeature that maps the feature used for transformation to their corresponding statistics argument names if any.

TYPE: list[TransformationFeature] | None DEFAULT: None

transformation_function_argument_names

The argument names of the transformation function.

TYPE: list[str] | None DEFAULT: None

dropped_argument_names

The arguments to be dropped from the finial DataFrame after the transformation functions are applied.

TYPE: list[str] | None DEFAULT: None

dropped_feature_names

The feature name corresponding to the arguments names that are dropped.

TYPE: list[str] | None DEFAULT: None

feature_name_prefix

Prefixes if any used in the feature view.

TYPE: str | None DEFAULT: None

output_column_names

The names of the output columns returned from the transformation function.

TYPE: str | None DEFAULT: None

generate_output_col_names

Generate default output column names for the transformation function.

TYPE: bool DEFAULT: True

return_types property #

return_types: list[str]

Get the output types of the UDF.

function_name property #

function_name: str

Get the function name of the UDF.

statistics_required property #

statistics_required: bool

Get if statistics for any feature is required by the UDF.

transformation_statistics property writable #

transformation_statistics: TransformationStatistics | None

Feature statistics required for the defined UDF.

output_column_names property writable #

output_column_names: list[str]

Output columns names of the transformation function.

transformation_features property #

transformation_features: list[str]

List of feature names to be used in the User Defined Function.

unprefixed_transformation_features property #

unprefixed_transformation_features: list[str]

List of feature name used in the transformation function without the feature name prefix.

feature_name_prefix property #

feature_name_prefix: str | None

The feature name prefix that needs to be added to the feature names.

statistics_features property #

statistics_features: list[str]

List of feature names that require statistics.

dropped_features property writable #

dropped_features: list[str]

List of features that will be dropped after the UDF is applied.

transformation_context property writable #

transformation_context: dict[str, Any]

Dictionary that contains the context variables required for the UDF.

These context variables passed to the UDF during execution.

alias #

alias(*args: str)

Set the names of the transformed features output by the UDF.

PARAMETER DESCRIPTION
args

Name of the output features after transformation. Can be passed as individual string arguments or as a list of strings. The number of output feature names provided must match the number of features returned by the transformation function.

TYPE: str DEFAULT: ()

executor #

executor(
    statistics: TransformationStatistics
    | list[FeatureDescriptiveStatistics]
    | dict[str, dict[str, Any]] = None,
    context: dict[str, Any] = None,
    online: bool = False,
) -> Any

Create an executable transformation with optional statistics and context for unit testing.

This method returns a callable object that can execute the UDF with the specified configuration. It is designed for unit testing transformation functions locally.

The executor allows you to: - Inject mock statistics for testing model-dependent transformations - Provide transformation context for testing transformation functions using - Switch between online (single-value) and offline (batch) execution modes

Testing UDF with pandas execution mode

@udf(return_type=float, mode="pandas")
def add_one(value):
    return value + 1

# Create executor and test
executor = add_one.executor()
result = executor.execute(pd.Series([1.0, 2.0, 3.0]))
assert result.tolist() == [2.0, 3.0, 4.0]

Testing UDF with python execution mode

@udf(return_type=float, mode="python")
def add_one(value):
    return value + 1

# Create executor and test
executor = add_one.executor()
result = executor.execute(1.0)
assert result == 2.0

Testing UDF with default execution mode

# In the default execution mode, Hopsworks executes the transformation function as pandas UDF for batch processing and as python function for online processing to get optimal.
# Hence, the function should should be able to handle both online and offline execution modes and unit-test musts be written for both these use-cases.
# In the offline mode, Hopsworks would pass a pandas Series to the function.
# In the online mode, Hopsworks would pass a single value to the function.

@udf(return_type=float)
def double_value(value):
    return value * 2

# Offline mode (batch processing with pandas Series)
offline_executor = double_value.executor(online=False)
batch_result = offline_executor.execute(pd.Series([1.0, 2.0, 3.0]))

# Online mode (single value processing)
online_executor = double_value.executor(online=True)
single_result = online_executor.execute(5.0)
assert single_result == 10.0

Unit test with mocked statistics

from hsfs.transformation_statistics import TransformationStatistics

@udf(return_type=float)
def normalize(value, statistics=TransformationStatistics("value")):
    return (value - statistics.value.mean) / statistics.value.std_dev

# Test with mock statistics
executor = normalize.executor(statistics={"value": {"mean": 100.0, "std_dev": 25.0}})
result = executor.execute(pd.Series([100.0, 125.0, 150.0]))
assert result.tolist() == [0.0, 1.0, 2.0]

Unit test with transformation context

@udf(return_type=float)
def apply_discount(price, context):
    return price * (1 - context["discount_rate"])

executor = apply_discount.executor(context={"discount_rate": 0.1})
result = executor.execute(pd.Series([100.0, 200.0]))
assert result.tolist() == [90.0, 180.0]

Testing online vs offline execution modes

# For transformation functions using the default execution mode `default`.
# The function should should be able to handle both online and offline execution modes.
# In the offline mode, Hopsworks would pass a pandas Series to the function.
# In the online mode, Hopsworks would pass a single value to the function.
@udf(return_type=float, mode="default")
def double_value(value):
    return value * 2

# Offline mode (batch processing with pandas Series)
offline_executor = double_value.executor(online=False)
batch_result = offline_executor.execute(pd.Series([1.0, 2.0, 3.0]))

# Online mode (single value processing)
online_executor = double_value.executor(online=True)
single_result = online_executor.execute(5.0)
assert single_result == 10.0
PARAMETER DESCRIPTION
statistics

Statistics for model-dependent transformations. Can be provided as:

  • TransformationStatistics: Pre-built statistics object
  • dict[str, dict[str, Any]]: Dictionary mapping feature names to their statistics (e.g., {"amount": {"mean": 100.0, "std_dev": 25.0}})
  • list[FeatureDescriptiveStatistics]: List of statistics objects from Hopsworks

TYPE: TransformationStatistics | list[FeatureDescriptiveStatistics] | dict[str, dict[str, Any]] DEFAULT: None

context

A dictionary mapping variable names to values that provide contextual information to the transformation function at runtime. The keys must match parameter names defined in the UDF.

TYPE: dict[str, Any] DEFAULT: None

online

Whether to execute in online mode (single values) or offline mode (batch/vectorized). Only applicable when the UDF uses mode="default".

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Any

A callable object with an execute(*args) method to run the transformation.

execute #

execute(
    *args: Any,
) -> (
    pd.Series
    | pd.DataFrame
    | int
    | float
    | str
    | bool
    | datetime
    | time
    | date
    | tuple[
        int | float | str | bool | datetime | time | date,
        ...,
    ]
)

Execute the UDF directly with the provided arguments.

This is a convenience method for quick testing of simple UDFs that don't require statistics or transformation context. It executes the UDF in offline mode (batch processing).

Quick UDF testing

@udf(return_type=float)
def add_one(value):
    return value + 1

# Direct execution for simple tests
result = add_one.execute(pd.Series([1.0, 2.0, 3.0]))
assert result.tolist() == [2.0, 3.0, 4.0]

Note

For UDFs that require statistics or transformation context or need to be executed in online mode, use executor() instead:

result = my_udf.executor(statistics=stats, context=ctx).execute(data)

PARAMETER DESCRIPTION
*args

Input arguments matching the UDF's parameter signature. For batch processing, pass pandas Series or DataFrames.

TYPE: Any DEFAULT: ()

RETURNS DESCRIPTION
pd.Series | pd.DataFrame | int | float | str | bool | datetime | time | date | tuple[int | float | str | bool | datetime | time | date, ...]

The transformed values.

json #

json() -> str

Convert class into its json serialized form.

RETURNS DESCRIPTION
str

JSON serialized object.

from_response_json classmethod #

from_response_json(
    json_dict: dict[str, Any],
) -> HopsworksUdf

Function that constructs the class object from its json serialization.

PARAMETER DESCRIPTION
json_dict

JSON serialized dictionary for the class.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
HopsworksUdf

JSON deserialized class object.

TransformationFeature dataclass #

Mapping of feature names to their corresponding statistics argument names in the code.

The statistic_argument_name for a feature name would be None if the feature does not need statistics.

PARAMETER DESCRIPTION
feature_name

Name of the feature.

TYPE: str

statistic_argument_name

Name of the statistics argument in the code for the feature specified in the feature name.

TYPE: str | None

udf #

udf(
    return_type: list[type] | type,
    drop: str | list[str] | None = None,
    mode: Literal[
        "default", "python", "pandas"
    ] = "default",
) -> HopsworksUdf

Create an User Defined Function that can be and used within the Hopsworks Feature Store to create transformation functions.

Hopsworks UDF's are user defined functions that executes as 'pandas_udf' when executing in spark engine and as pandas functions in the python engine. The pandas udf/pandas functions gets as inputs pandas Series's and can provide as output a pandas Series or a pandas DataFrame. A Hopsworks udf is defined using the hopsworks_udf decorator. The outputs of the defined UDF must be mentioned in the decorator as a list of python types.

Example
from hopsworks import udf

@udf(float)
def add_one(data1):
    return data1 + 1
PARAMETER DESCRIPTION
return_type

The output types of the defined UDF.

TYPE: list[type] | type

drop

The features to be dropped after application of transformation functions.

TYPE: str | list[str] | None DEFAULT: None

mode

The exection mode of the UDF.

TYPE: Literal['default', 'python', 'pandas'] DEFAULT: 'default'

RETURNS DESCRIPTION
HopsworksUdf

The metadata object for hopsworks UDF's.

RAISES DESCRIPTION
hopsworks.client.exceptions.FeatureStoreException

If unable to create UDF.