Part 01: Load, Engineer & Connect¶
Note: you may get an error when installing hopsworks on Colab, and it is safe to ignore it.
This is the first part of the quick start series of tutorials about Hopsworks Feature Store. As part of this first module, we will work with data related to credit card transactions. The objective of this tutorial is to demonstrate how to work with the Hopworks Feature Store for batch data with a goal of training and deploying a model that can predict fraudulent transactions.
🗒️ This notebook is divided in 3 sections:¶
- Loading the data and feature engineeing,
- Connect to the Hopsworks feature store,
- Create feature groups and upload them to the feature store.
First of all we will load the data and do some feature engineering on it.
!pip install -U hopsworks --quiet
💽 Loading the Data ¶
The data we will use comes from three different CSV files:
credit_cards.csv
: credit card information such as expiration date and provider.transactions.csv
: transaction information such as timestamp, location, and the amount. Importantly, the binaryfraud_label
variable tells us whether a transaction was fraudulent or not.profiles.csv
: credit card user information such as birthdate and city of residence.
We can conceptualize these CSV files as originating from separate data sources.
All three files have a credit card number column cc_num
in common, which we can use for joins.
Let's go ahead and load the data.
import pandas as pd
credit_cards_df = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/credit_cards.csv")
credit_cards_df.head(3)
profiles_df = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/profiles.csv", parse_dates=["birthdate"])
profiles_df.head(3)
trans_df = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/card_fraud_data/transactions.csv", parse_dates=["datetime"])
trans_df.head(3)
🛠️ Feature Engineering ¶
Fraudulent transactions can differ from regular ones in many different ways. Typical red flags would for instance be a large transaction volume/frequency in the span of a few hours. It could also be the case that elderly people in particular are targeted by fraudsters. To facilitate model learning we will create additional features based on these patterns. In particular, we will create two types of features:
- Features that aggregate data from different data sources. This could for instance be the age of a customer at the time of a transaction, which combines the
birthdate
feature fromprofiles.csv
with thedatetime
feature fromtransactions.csv
. - Features that aggregate data from multiple time steps. An example of this could be the transaction frequency of a credit card in the span of a few hours, which is computed using a window function.
Let's start with the first category.
import numpy as np
# Compute age at transaction.
age_df = trans_df.merge(profiles_df, on="cc_num", how="left")
trans_df["age_at_transaction"] = (age_df["datetime"] - age_df["birthdate"]) / np.timedelta64(1, "Y")
# Compute days until card expires.
card_expiry_df = trans_df.merge(credit_cards_df, on="cc_num", how="left")
card_expiry_df["expires"] = pd.to_datetime(card_expiry_df["expires"], format="%m/%y")
trans_df["days_until_card_expires"] = (card_expiry_df["expires"] - card_expiry_df["datetime"]) / np.timedelta64(1, "D")
trans_df[["age_at_transaction", "days_until_card_expires"]].head()
Next, we create features that for each credit card aggregate data from multiple time steps.
We start by computing the distance between consecutive transactions, which we will call loc_delta
.
Here we use the Haversine distance to quantify the distance between two longitude and latitude coordinates.
from math import radians
# Do some simple preprocessing.
trans_df.sort_values("datetime", inplace=True)
trans_df[["longitude", "latitude"]] = trans_df[["longitude", "latitude"]].applymap(radians)
def haversine(long, lat):
"""Compute Haversine distance between each consecutive coordinate in (long, lat)."""
long_shifted = long.shift()
lat_shifted = lat.shift()
long_diff = long_shifted - long
lat_diff = lat_shifted - lat
a = np.sin(lat_diff/2.0)**2
b = np.cos(lat) * np.cos(lat_shifted) * np.sin(long_diff/2.0)**2
c = 2*np.arcsin(np.sqrt(a + b))
return c
trans_df["loc_delta"] = trans_df.groupby("cc_num")\
.apply(lambda x : haversine(x["longitude"], x["latitude"]))\
.reset_index(level=0, drop=True)\
.fillna(0)
Next we compute windowed aggregates. Here we will use 4-hour windows, but feel free to experiment with different window lengths by setting window_len
below to a value of your choice.
window_len = "4h"
cc_group = trans_df[["cc_num", "amount", "datetime"]].groupby("cc_num").rolling(window_len, on="datetime")
# Moving average of transaction volume.
df_4h_mavg = pd.DataFrame(cc_group.mean())
df_4h_mavg.columns = ["trans_volume_mavg", "datetime"]
df_4h_mavg = df_4h_mavg.reset_index(level=["cc_num"])
df_4h_mavg = df_4h_mavg.drop(columns=["cc_num", "datetime"])
df_4h_mavg = df_4h_mavg.sort_index()
# Moving standard deviation of transaction volume.
df_4h_std = pd.DataFrame(cc_group.mean())
df_4h_std.columns = ["trans_volume_mstd", "datetime"]
df_4h_std = df_4h_std.reset_index(level=["cc_num"])
df_4h_std = df_4h_std.drop(columns=["cc_num", "datetime"])
df_4h_std = df_4h_std.fillna(0)
df_4h_std = df_4h_std.sort_index()
window_aggs_df = df_4h_std.merge(df_4h_mavg,left_index=True, right_index=True)
# Moving average of transaction frequency.
df_4h_count = pd.DataFrame(cc_group.mean())
df_4h_count.columns = ["trans_freq", "datetime"]
df_4h_count = df_4h_count.reset_index(level=["cc_num"])
df_4h_count = df_4h_count.drop(columns=["cc_num", "datetime"])
df_4h_count = df_4h_count.sort_index()
window_aggs_df = window_aggs_df.merge(df_4h_count,left_index=True, right_index=True)
# Moving average of location difference between consecutive transactions.
cc_group = trans_df[["cc_num", "loc_delta", "datetime"]].groupby("cc_num").rolling(window_len, on="datetime").mean()
df_4h_loc_delta_mavg = pd.DataFrame(cc_group)
df_4h_loc_delta_mavg.columns = ["loc_delta_mavg", "datetime"]
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.reset_index(level=["cc_num"])
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.drop(columns=["cc_num", "datetime"])
df_4h_loc_delta_mavg = df_4h_loc_delta_mavg.sort_index()
window_aggs_df = window_aggs_df.merge(df_4h_loc_delta_mavg,left_index=True, right_index=True)
window_aggs_df = window_aggs_df.merge(trans_df[["cc_num", "datetime"]].sort_index(),left_index=True, right_index=True)
window_aggs_df.tail()
Convert date time object to unix epoch in milliseconds¶
trans_df.datetime = trans_df.datetime.values.astype(np.int64) // 10 ** 6
window_aggs_df.datetime = window_aggs_df.datetime.values.astype(np.int64) // 10 ** 6
🪄 Creating Feature Groups ¶
A feature group can be seen as a collection of conceptually related features. In our case, we will create a feature group for the transaction data and a feature group for the windowed aggregations on the transaction data. Both will have cc_num
as primary key, which will allow us to join them when creating a dataset in the next tutorial.
Feature groups can also be used to define a namespace for features. For instance, in a real-life setting we would likely want to experiment with different window lengths. In that case, we can create feature groups with identical schema for each window length.
Before we can create a feature group we need to connect to our feature store.
import hopsworks
project = hopsworks.login()
fs = project.get_feature_store()
To create a feature group we need to give it a name and specify a primary key. It is also good to provide a description of the contents of the feature group and a version number, if it is not defined it will automatically be incremented to 1
.
trans_fg = fs.get_or_create_feature_group(
name="transactions_fraud_batch_fg",
version=1,
description="Transaction data",
primary_key=['cc_num'],
event_time=['datetime']
)
A full list of arguments can be found in the documentation.
At this point, we have only specified some metadata for the feature group. It does not store any data or even have a schema defined for the data. To make the feature group persistent we populate it with its associated data using the insert
function.
trans_fg.insert(trans_df)
feature_descriptions = [
{"name": "tid", "description": "Transaction id"},
{"name": "datetime", "description": "Transaction time"},
{"name": "cc_num", "description": "Number of the credit card performing the transaction"},
{"name": "category", "description": "Expense category"},
{"name": "amount", "description": "Dollar amount of the transaction"},
{"name": "latitude", "description": "Transaction location latitude"},
{"name": "longitude", "description": "Transaction location longitude"},
{"name": "city", "description": "City in which the transaction was made"},
{"name": "country", "description": "Country in which the transaction was made"},
{"name": "fraud_label", "description": "Whether the transaction was fraudulent or not"},
{"name": "age_at_transaction", "description": "Age of the card holder when the transaction was made"},
{"name": "days_until_card_expires", "description": "Card validity days left when the transaction was made"},
{"name": "loc_delta", "description": "Haversine distance between this transaction location and the previous transaction location from the same card"},
]
for desc in feature_descriptions:
trans_fg.update_feature_description(desc["name"], desc["description"])
At the creation of the feature group, you will prompted with an URL that will directly link to it; there you will be able to explore some of the aspects of your newly created feature group.
We can move on and do the same thing for the feature group with our windows aggregation.
window_aggs_fg = fs.get_or_create_feature_group(
name=f"transactions_{window_len}_aggs_fraud_batch_fg",
version=1,
description=f"Aggregate transaction data over {window_len} windows.",
primary_key=['cc_num'],
event_time=['datetime']
)
window_aggs_fg.insert(window_aggs_df)
feature_descriptions = [
{"name": "datetime", "description": "Transaction time"},
{"name": "cc_num", "description": "Number of the credit card performing the transaction"},
{"name": "loc_delta_mavg", "description": "Moving average of location difference between consecutive transactions from the same card"},
{"name": "trans_freq", "description": "Moving average of transaction frequency from the same card"},
{"name": "trans_volume_mavg", "description": "Moving average of transaction volume from the same card"},
{"name": "trans_volume_mstd", "description": "Moving standard deviation of transaction volume from the same card"},
]
for desc in feature_descriptions:
window_aggs_fg.update_feature_description(desc["name"], desc["description"])
Both feature groups are now accessible and searchable in the UI
⏭️ **Next:** Part 02 ¶
In the following notebook we will use our feature groups to create a dataset we can train a model on.