Getting Started
Note: you may get an error when installing hopsworks on Colab, and it is safe to ignore it.
!pip install -U hopsworks --quiet
💽 Loading the Data¶
The data you will use comes from three different CSV files:
- credit_cards.csv: information such as the expiration date and provider.
- transactions.csv: events containing information about when a credit card was used, such as a timestamp, location, and the amount spent. A boolean fraud_label variable (True/False) tells us whether a transaction was fraudulent or not.
- profiles.csv: credit card user information such as birthdate and city of residence.
In a production system, these CSV files would originate from separate data sources or tables, and probably separate data pipelines. All three files have a common credit card number column cc_num, which you will use later to join features together from the different datasets.
Now, you can go ahead and load the data.
import pandas as pd
import numpy as np
window_len = "4h"
url = "https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/"
credit_cards_df = pd.read_csv(url + "credit_cards.csv")
credit_cards_df.head(3)
profiles_df = pd.read_csv(url + "profiles.csv", parse_dates=["birthdate"])
profiles_df.head(3)
trans_df = pd.read_csv(url + "transactions.csv", parse_dates=["datetime"])
trans_df.head(3)
🛠️ Feature Engineering¶
Fraudulent transactions can differ from regular ones in many different ways. Typical red flags would for instance be a large transaction volume/frequency in the span of a few hours. It could also be the case that elderly people in particular are targeted by fraudsters. To facilitate model learning, we will create additional features based on these patterns. In particular, we will create two types of features:
- Features that aggregate data from different data sources. This could for instance be the age of a customer at the time of a transaction, which combines the birthdate feature from profiles.csv with the datetime feature from transactions.csv.
- Features that aggregate data from multiple time steps. An example of this could be the transaction frequency of a credit card in the span of a few hours, which is computed using a window function.
Now you are ready to start with the first category.
# Compute age at transaction.
age_df = trans_df.merge(profiles_df, on="cc_num", how="left")
trans_df["age_at_transaction"] = (age_df["datetime"] - age_df["birthdate"]) / np.timedelta64(1, "Y")
# Compute days until card expires.
card_expiry_df = trans_df.merge(credit_cards_df, on="cc_num", how="left")
card_expiry_df["expires"] = pd.to_datetime(card_expiry_df["expires"], format="%m/%y")
trans_df["days_until_card_expires"] = (card_expiry_df["expires"] - card_expiry_df["datetime"]) / np.timedelta64(1, "D")
trans_df[["age_at_transaction", "days_until_card_expires"]].head()
The next step is that you will create features from aggregations that are computed over every credit card over multiple time steps.
You start by computing a feature that captures the physical distance between consecutive transactions, which we will call loc_delta
. Here, you will use Haversine distance to quantify the distance between two longitude and latitude coordinates.
from math import radians
# Do some simple preprocessing.
trans_df.sort_values("datetime", inplace=True)
trans_df[["longitude", "latitude"]] = trans_df[["longitude", "latitude"]].applymap(radians)
def haversine(long, lat):
"""Compute Haversine distance between each consecutive coordinate in (long, lat)."""
long_shifted = long.shift()
lat_shifted = lat.shift()
long_diff = long_shifted - long
lat_diff = lat_shifted - lat
a = np.sin(lat_diff/2.0)**2
b = np.cos(lat) * np.cos(lat_shifted) * np.sin(long_diff/2.0)**2
c = 2*np.arcsin(np.sqrt(a + b))
return c
trans_df["loc_delta"] = trans_df.groupby("cc_num")\
.apply(lambda x : haversine(x["longitude"], x["latitude"]))\
.reset_index(level=0, drop=True)\
.fillna(0)
Next you will compute windowed aggregates. Here you will use 4-hour windows, but feel free to experiment with different window lengths by setting window_len
below to a value of your choice.
cc_group = trans_df[["cc_num", "amount", "datetime"]].groupby("cc_num").rolling(window_len, on="datetime")
# Moving average of transaction volume.
df_4h_mavg = pd.DataFrame(cc_group.mean())
df_4h_mavg.columns = ["trans_volume_mavg", "datetime"]
df_4h_mavg = df_4h_mavg.reset_index(level=["cc_num"])
df_4h_mavg = df_4h_mavg.drop(columns=["cc_num", "datetime"])
df_4h_mavg = df_4h_mavg.sort_index()
# Moving standard deviation of transaction volume.
df_4h_std = pd.DataFrame(cc_group.mean())
df_4h_std.columns = ["trans_volume_mstd", "datetime"]
df_4h_std = df_4h_std.reset_index(level=["cc_num"])
df_4h_std = df_4h_std.drop(columns=["cc_num", "datetime"])
df_4h_std = df_4h_std.fillna(0)
df_4h_std = df_4h_std.sort_index()
window_aggs_df = df_4h_std.merge(df_4h_mavg,left_index=True, right_index=True)
# Moving average of transaction frequency.
df_4h_count = pd.DataFrame(cc_group.mean())
df_4h_count.columns = ["trans_freq", "datetime"]
df_4h_count = df_4h_count.reset_index(level=["cc_num"])
df_4h_count = df_4h_count.drop(columns=["cc_num", "datetime"])
df_4h_count = df_4h_count.sort_index()
window_aggs_df = window_aggs_df.merge(df_4h_count,left_index=True, right_index=True)
# Moving average of location difference between consecutive transactions.
cc_group = trans_df[["cc_num", "loc_delta", "datetime"]].groupby("cc_num").rolling(window_len, on="datetime").mean()
df_4h_loc_delta_mavg = pd.DataFrame(cc_group)
df_4h_loc_delta_mavg.columns = ["loc_delta_mavg", "datetime"]
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.reset_index(level=["cc_num"])
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.drop(columns=["cc_num", "datetime"])
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.sort_index()
window_aggs_df = window_aggs_df.merge(df_4h_loc_delta_mavg,left_index=True, right_index=True)
window_aggs_df = window_aggs_df.merge(trans_df[["cc_num", "datetime"]].sort_index(),left_index=True, right_index=True)
window_aggs_df.tail()
🪄 Creating Feature Groups¶
A feature group can be seen as a collection of conceptually related features that are computed together at the same cadence. In your case, you will create a feature group for the transaction data and a feature group for the windowed aggregations on the transaction data. Both will have tid
as primary key, which will allow you to join them together to create training data in a follow-on tutorial.
Feature groups provide a namespace for features, so two features are allowed to have the same name as long as they belong to different feature groups. For instance, in a real-life setting we would likely want to experiment with different window lengths. In that case, we can create feature groups with identical schema for each window length.
Before you can create a feature group we need to connect to our feature store.
import hopsworks
project = hopsworks.login()
fs = project.get_feature_store()
To create a feature group we need to give it a name and specify a primary key. It is also good to provide a description of the contents of the feature group and a version number, if it is not defined it will automatically be incremented to 1.
trans_fg = fs.get_or_create_feature_group(
name="transactions",
version="1",
description="Transaction data",
primary_key=['cc_num'],
event_time=['datetime']
)
A full list of arguments can be found in the documentation.
At this point, we have only specified some metadata for the feature group. It does not store any data or even have a schema defined for the data. To make the feature group persistent we populate it with its associated data.
If you have previously inserted this data into trans_fg, you can skip this step.
trans_fg.insert(trans_df)
We can move on and do the same thing for the feature group with our windows aggregation.
window_aggs_fg = fs.get_or_create_feature_group(
name=f"transactions_{window_len}_aggs",
version=1,
description=f"Aggregate transaction data over {window_len} windows.",
primary_key=['cc_num'],
event_time=['datetime']
)
window_aggs_fg.insert(window_aggs_df)
Click on the hyperlink printed in the cell output above to inspect your feature group in the UI.
🔪 Feature Selection¶
We start by selecting all the features we want to include for model training/inference.
Hopsworks provides a simple DSL (domain specific language) for joining together features from different feature groups. You use the select()/select_all()/select_except()
feature group methods to select features (from that feature group), and the join()
method to join together features from a different feature group.
# Select features for training data.
ds_query = trans_fg.select(["fraud_label", "category", "amount", "age_at_transaction", "days_until_card_expires", "loc_delta"])\
.join(window_aggs_fg.select_except(["cc_num"]), on="cc_num")\
# Uncomment, if you want to inspect the data at this point
#ds_query.show(5)
What if features being joined together have the same name (but come from different feature groups)?
The problem of name clashes when joining features together can easily happen. For example, recall that you computed the features in transactions_4h_aggs using 4-hour aggregates. If you had created another feature group for 12-hour aggregates, you may have designed an identical schema with the same feature names (just for 12-hours, not 4-hours). If you join features together with identical names from different feature groups, you should pass a prefix argument (e.g., prefix='4hr'
) in the join operation to give the features unique names in the join object. See the documentation for more details.
🤖 Transformation Functions¶
You will preprocess our data using min-max scaling on numerical features and label encoding on categorical features. To do this, for each feature, you can define a one-to-one mapping between your feature and a transformation function. This ensures that transformation functions such as min-max scaling are fitted only on the train set (and not the validation/test set), which ensures that there is no leakage into the test set during training. The transformation functions are also needed for model inference pipelines, where pre-computed features are retrieved from the feature store and the same transformation functions (with the same state/parameters computed on the train set) are applied on the features in serving (inference pipelines).
# Load the transformation functions.
min_max_scaler = fs.get_transformation_function(name="min_max_scaler")
label_encoder = fs.get_transformation_function(name="label_encoder")
# Map features to transformation functions.
transformation_functions = {
"category": label_encoder,
"amount": min_max_scaler,
"trans_volume_mavg": min_max_scaler,
"trans_volume_mstd": min_max_scaler,
"trans_freq": min_max_scaler,
"loc_delta": min_max_scaler,
"loc_delta_mavg": min_max_scaler,
"age_at_transaction": min_max_scaler,
"days_until_card_expires": min_max_scaler,
}
⚙️ Feature View Creation¶
The Feature View is the collection of features (from feature groups) and transformation functions used to train models and serve precomputed features to deployed models.
The Feature View includes all of the features defined in the query object you created earlier. It can additionally include filters, one or more columns identified as the target(s) (or label) and the set of transformation functions and the features they are applied to.
You create a Feature View with fs.create_feature_view()
.
feature_view = fs.create_feature_view(
name='transactions_view',
query=ds_query,
version=1,
labels=["fraud_label"],
transformation_functions=transformation_functions
)
You can retrieve a reference to an existing feature view with: fs.get_feature_view('transactions_view')
.
feature_view = fs.get_feature_view('transactions_view', version=1)
🏋️ Training Dataset Creation¶
In Hopsworks, training data is the data from a set of features that is used to train a model. The FeatureView provides the set of features for the training data, but you can apply filters to select a subset of data from the data available for those features. For example, you could select training data for a given time range, or data only for users based on a particular geographic region.
Training data can be read as in-memory Pandas DataFrames or as (materialized) files (.csv, .tfrecord, etc) on disk. Training data is immutable and versioned. This makes training data also reproducible - you can delete the training data files and reproduce the exact same training data at a later point in time.
Training data may be read with user-defined splits, such as:
- Training set - the subset of training data used to train a model.
- Validation set - the subset of training data used to evaluate hparams when training a model
- Test set - the holdout subset of training data used to evaluate a model
You use a FeatureView object to create training data, and there are different methods for reading training data as Pandas DataFrames or as creating training data as files. You can provide time filters specifing the start_time and end_time for training data.
For example, this shows you how to create training data as files:
from datetime import datetime
date_format = "%Y-%m-%d %H:%M:%S"
start_time = int(float(datetime.strptime("2022-01-01 00:00:01", date_format).timestamp()) * 1000)
end_time = int(float(datetime.strptime("2022-02-28 23:59:59", date_format).timestamp()) * 1000)
td_version, td_job = feature_view.create_training_data(
description = 'transactions_dataset_jan_feb',
data_format = 'csv',
write_options = {'wait_for_job': True},
coalesce = True,
start_time = start_time,
end_time = end_time,
)
You can then retrieve the training data from the files using:
X_train, y_train, X_test, y_test = feature_view.get_train_test_split(0.2)
If you want to read the train/test set as Pandas DataFrames, use the call below which does not save training data as files, but stores metadata in the feature store that a version of training data was created for the feature view.
🧬 Read train/test splits from a feature view
X_train, y_train, X_test, y_test = feature_view.train_test_split(0.2)
X_train.head(5)
y_train.head(5)
Let's check the distribution of our target label.
y_train.value_counts(normalize=True)
Notice that the distribution is extremely skewed, which is natural considering that fraudulent transactions make up a tiny part of all transactions. Thus you should somehow address the class imbalance. There are many approaches for this, such as weighting the loss function, over- or undersampling, creating synthetic data, or modifying the decision threshold. In this example, you'll use the simplest method which is to just supply a class weight parameter to our learning algorithm. The class weight will affect how much importance is attached to each class, which in your case means that higher importance will be placed on positive (fraudulent) samples.
🧬 Train Model¶
Next you'll train a model. Here, you set the class weight of the positive class to be twice as big as the negative class.
from sklearn.linear_model import LogisticRegression
pos_class_weight = 0.9
clf = LogisticRegression(class_weight={0: 1.0 - pos_class_weight, 1: pos_class_weight}, solver='liblinear')
clf.fit(X_train, y_train.values.ravel())
Let's see how well it performs on your test data.
from sklearn.metrics import classification_report
import pprint
preds = clf.predict(X_test)
report_dict = classification_report(y_test, preds, output_dict=True)
pprint.pprint(report_dict, width=2)
Pickle the model and write it to a local folder.
import joblib
joblib.dump(clf, 'model.pkl')
mr = project.get_model_registry()
📝 Export Model¶
Next you will export the model and attach additional information like the signature of the inputs/predictions, a concrete input example and evaluation metrics. In the last line, you upload the pickled model file to the Model Registry.
from hsml.schema import Schema
from hsml.model_schema import ModelSchema
input_schema = Schema(X_test)
output_schema = Schema(y_test)
fraud_model = mr.sklearn.create_model("fraud",
metrics={'accuracy': report_dict['accuracy']},
input_example=X_test.head(1),
model_schema=ModelSchema(input_schema=input_schema, output_schema=output_schema))
fraud_model.save('model.pkl')
🤖 Create Deployment¶
Next step is to deploy the model on KServe behind Hopsworks for real-time inference requests.
Note: deployments for scikit-learn without Kserve requires a predictor script. For details please refer to Hopsworks documentation.
deployment = fraud_model.deploy()
deployment.start()
You can retrieve a reference to the model with mr.get_model(..) and a reference to the deployed model with project.get_model_deployment(..)
ms = project.get_model_serving()
deployment = ms.get_deployment("fraud")
fraud_model = mr.get_model("fraud", version=1)
🚀 Test your Model with an Inference Request¶
Finally you can start making predictions with your model!
Send inference requests to the deployed model as follows:
deployment.predict({"instances":[fraud_model.input_example]})
Try out your Model Interactively¶
We will build a user interface with Gradio to allow you to enter a credit card category and amount to see if the credit card transaction will be marked as suspected of fraud or not.
!pip install gradio --quiet
!pip install typing-extensions==4.3.0
import gradio as gr
import numpy as np
def greet(credit_card_example):
cc_data = credit_card_example.iloc[0].astype("float").to_numpy()
# Add missing feature values to the feature vector. Here we hard-code the values,
# but if you enable the Online Feature Store, you could retrieve them with the following commented out code
# entry = { "cc_num" : credit_card_example[0]}
# passed_features = {"category": credit_card_example[0], "amount" : credit_card_example[1]}
# feature_vector = feature_view.get_feature_vector(entry, passed_features)
cc_data = np.append(cc_data, [fraud_model.input_example[2:9]])
list_cc = cc_data.tolist()
data = {
"instances": [list_cc]
}
res = deployment.predict(data)
res = res["predictions"][0]
if res == 0 :
return "Not Suspected of Fraud"
return "Suspected of Fraud"
credit_card_example = gr.Dataframe(
headers=["category", "amount"],
value=[fraud_model.input_example[0:2]]
)
demo = gr.Interface(greet,
credit_card_example,
"text",
title="Live Credit Card Fraud Detector",
description="Enter credit card transaction details.",
allow_flagging="never"
)
demo.launch(share=True)
🥳 Next Steps¶
Congratulations you've now completed the quickstart example for Managed Hopsworks.
Check out our other tutorials on ➡ https://github.com/logicalclocks/hopsworks-tutorials
Or documentation at ➡ https://docs.hopsworks.ai