Skip to content

hsfs.transformation_function #

TransformationFunction #

NOT_FOUND_ERROR_CODE class-attribute instance-attribute #

NOT_FOUND_ERROR_CODE = 270160

DTO class for transformation functions.

hopsworks_udf property #

hopsworks_udf: HopsworksUdf

Meta data class for the user defined transformation function.

id property writable #

id: id

Transformation function id.

output_column_names property #

output_column_names: list[str]

Names of the output columns generated by the transformation functions.

transformation_statistics property writable #

transformation_statistics: TransformationStatistics | None

Feature statistics required for the defined UDF.

transformation_type property writable #

transformation_type: TransformationType

Type of the Transformation: can be model dependent or on-demand.

version property writable #

version: int

Version of the transformation function.

alias #

alias(*args: str)

Set the names of the transformed features output by the transformation function.

PARAMETER DESCRIPTION
args

The names of the transformed features. The number of names provided must match the number of output features of the transformation function.

TYPE: str DEFAULT: ()

delete #

delete() -> None

Delete transformation function from backend.

Example
# import hopsworks udf decorator
from hopworks import udf

# define function
@udf(int)
def plus_one(value):
    return value + 1

# create transformation function
plus_one_meta = fs.create_transformation_function(
        transformation_function=plus_one,
        version=1
    )
# persist transformation function in backend
plus_one_meta.save()

# retrieve transformation function
plus_one_fn = fs.get_transformation_function(name="plus_one")

# delete transformation function from backend
plus_one_fn.delete()

execute #

execute(*args: Any) -> Any

Execute the transformation function directly with the provided arguments.

This is a convenience method for quick testing of simple transformations that don't require statistics or transformation context. It executes in offline mode (batch processing).

Quick transformation testing

@udf(return_type=float)
def add_one(value):
    return value + 1

tf = TransformationFunction(
    featurestore_id=1,
    hopsworks_udf=add_one,
    transformation_type=TransformationType.ON_DEMAND
)

# Direct execution for simple tests
result = tf.execute(pd.Series([1.0, 2.0, 3.0]))
assert result.tolist() == [2.0, 3.0, 4.0]

Note

For transformations that require statistics or transformation context or need to be executed in online mode, use executor() instead:

result = tf.executor(statistics=stats, context=ctx).execute(data)

PARAMETER DESCRIPTION
*args

Input arguments matching the transformation function's parameter signature. For batch processing, pass pandas Series or DataFrames.

TYPE: Any DEFAULT: ()

RETURNS DESCRIPTION
Any

The transformed values.

Any
  • pd.Series - Single output Pandas UDFs.
Any
  • pd.DataFrame - Multi-output Pandas UDFs.
Any
  • int | float | str | bool | datetime | time | date - Single output Python UDFs.
Any
  • tuple[int | float | str | bool | datetime | time | date] - Multi-output Python UDFs.

executor #

executor(
    statistics: TransformationStatistics
    | list[FeatureDescriptiveStatistics]
    | dict[str, dict[str, Any]] = None,
    context: dict[str, Any] = None,
    online: bool = False,
) -> Any

Create an executable transformation with optional statistics and context for unit testing.

This method returns a callable object that can execute the transformation function with the specified configuration. It is designed for unit testing transformation functions locally.

The executor allows you to: - Inject mock statistics for testing model-dependent transformations - Provide transformation context for testing transformation functions using context variables - Switch between online (single-value) and offline (batch) execution modes

Testing transformation with pandas execution mode

@udf(return_type=float, mode="pandas")
def add_one(value):
    return value + 1

tf = TransformationFunction(
    featurestore_id=1,
    hopsworks_udf=add_one,
    transformation_type=TransformationType.ON_DEMAND
)

# Create executor and test
executor = tf.executor()
result = executor.execute(pd.Series([1.0, 2.0, 3.0]))
assert result.tolist() == [2.0, 3.0, 4.0]

Testing transformation with python execution mode

@udf(return_type=float, mode="python")
def add_one(value):
    return value + 1

tf = TransformationFunction(
    featurestore_id=1,
    hopsworks_udf=add_one,
    transformation_type=TransformationType.ON_DEMAND
)

# Create executor and test
executor = tf.executor()
result = executor.execute(1.0)
assert result == 2.0

Testing transformation with default execution mode

# In the default execution mode, Hopsworks executes the transformation function as pandas UDF for batch processing and as python function for online processing to get optimal.
# Hence, the function should should be able to handle both online and offline execution modes and unit-test musts be written for both these use-cases.
# In the offline mode, Hopsworks would pass a pandas Series to the function.
# In the online mode, Hopsworks would pass a single value to the function.

@udf(return_type=float)
def double_value(value):
    return value * 2

tf = TransformationFunction(
    featurestore_id=1,
    hopsworks_udf=double_value,
    transformation_type=TransformationType.ON_DEMAND
)

# Offline mode (batch processing with pandas Series)
offline_executor = tf.executor(online=False)
batch_result = offline_executor.execute(pd.Series([1.0, 2.0, 3.0]))

# Online mode (single value processing)
online_executor = tf.executor(online=True)
single_result = online_executor.execute(5.0)
assert single_result == 10.0

Unit test with mocked statistics

from hsfs.transformation_statistics import TransformationStatistics

@udf(return_type=float)
def normalize(value, statistics=TransformationStatistics("value")):
    return (value - statistics.value.mean) / statistics.value.std_dev

tf = TransformationFunction(
    featurestore_id=1,
    hopsworks_udf=normalize,
    transformation_type=TransformationType.MODEL_DEPENDENT
)

# Test with mock statistics
executor = tf.executor(statistics={"value": {"mean": 100.0, "std_dev": 25.0}})
result = executor.execute(pd.Series([100.0, 125.0, 150.0]))
assert result.tolist() == [0.0, 1.0, 2.0]

Unit test with transformation context

@udf(return_type=float)
def apply_discount(price, context):
    return price * (1 - context["discount_rate"])

tf = TransformationFunction(
    featurestore_id=1,
    hopsworks_udf=apply_discount,
    transformation_type=TransformationType.ON_DEMAND
)

executor = tf.executor(context={"discount_rate": 0.1})
result = executor.execute(pd.Series([100.0, 200.0]))
assert result.tolist() == [90.0, 180.0]

Testing online vs offline execution modes

# For transformation functions using the default execution mode `default`.
# The function should should be able to handle both online and offline execution modes.
# In the offline mode, Hopsworks would pass a pandas Series to the function.
# In the online mode, Hopsworks would pass a single value to the function.
@udf(return_type=float, mode="default")
def double_value(value):
    return value * 2

tf = TransformationFunction(
    featurestore_id=1,
    hopsworks_udf=double_value,
    transformation_type=TransformationType.ON_DEMAND
)

# Offline mode (batch processing with pandas Series)
offline_executor = tf.executor(online=False)
batch_result = offline_executor.execute(pd.Series([1.0, 2.0, 3.0]))

# Online mode (single value processing)
online_executor = tf.executor(online=True)
single_result = online_executor.execute(5.0)
assert single_result == 10.0
PARAMETER DESCRIPTION
statistics

Statistics for model-dependent transformations. Can be provided as:

  • TransformationStatistics: Pre-built statistics object
  • dict[str, dict[str, Any]]: Dictionary mapping feature names to their statistics (e.g., {"amount": {"mean": 100.0, "std_dev": 25.0}})
  • list[FeatureDescriptiveStatistics]: List of statistics objects from Hopsworks

TYPE: TransformationStatistics | list[FeatureDescriptiveStatistics] | dict[str, dict[str, Any]] DEFAULT: None

context

A dictionary mapping variable names to values that provide contextual information to the transformation function at runtime. The keys must match parameter names defined in the transformation function.

TYPE: dict[str, Any] DEFAULT: None

online

Whether to execute in online mode (single values) or offline mode (batch/vectorized). Only applicable when the transformation uses mode="default".

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Any

A callable object with an execute(*args) method to run the transformation.

Any
  • pd.Series - Single output Pandas UDFs.
Any
  • pd.DataFrame - Multi-output Pandas UDFs.
Any
  • int | float | str | bool | datetime | time | date - Single output Python UDFs.
Any
  • tuple[int | float | str | bool | datetime | time | date] - Multi-output Python UDFs.

save #

save() -> None

Save a transformation function into the backend.

Example
# import hopsworks udf decorator
from hopworks import udf

# define function
@udf(int)
def plus_one(value):
    return value + 1

# create transformation function
plus_one_meta = fs.create_transformation_function(
        transformation_function=plus_one,
        version=1
    )

# persist transformation function in backend
plus_one_meta.save()

TransformationType #

Bases: Enum

Class that store the possible types of transformation functions.