In today’s digital world, people are increasingly shifting towards online transactions and digital payment due to convenience rather than cash. With the increase in transition, there is also an increase in fraud. Fraud transaction can be of any type as it involves requesting money using false identity or false information. This poses a significant problem for individuals and financial institutions. In this project, we will use the credit card dataset to design the MLOPs model using the Airflow tool to monitor live transactions and predict whether they are genuine or fraudulent.
This article was published as a part of the Data Science Blogathon.
The Fraud transaction dataset contains data from different sources, which contain columns such as transaction time, name, amount, gender, category, etc. The Fraud transaction estimation model is a Machine learning model developed to predict the false transaction. The model is trained on a large set of valid and fraudulent transactions. To predict the new false transaction.
Fraud transaction analysis is the process of analyzing the past dataset. The dataset analysis aims to find the irregularity in the data and find the patterns in the dataset. Fraud transaction analysis plays a crucial role in business to protect customers and reduce financial loss. There are different types of fraud transaction analysis, such as Rule-based analysis and Anomaly detection.
Detecting fraudulent transactions is essential for businesses and financial institutions to protect customers against fraud and safeguard their money. Below are some crucial reasons for detecting fraudulent transactions.
Data collection and preprocessing is the important part of developing the fraud detection model. Once the data is collected, there are several steps to be performed on the dataset.
Looking at numerical numbers may not help you to give a relationship among them. We will use Python libraries to plot graphs and chat to get insights from the dataset.
Techniques used to visualize the hotel booking dataset.
The fraud detection MLOP model has multiple use cases across different industries. Below is the use case application:
Building a fraud detection model have several challenges for various reason:
Best practices while creating a Fraud detection model are discussed below
With the increasing digitalization and increase in internet adoption, more and more people are going to use digital payment methods and online booking facilities. With the increase in technology development, it will create an easy and faster payment tool. So, it also becomes essential to develop such a tool that prevents fraud and increases customer trust in the company and services. Businesses often look for reliable, accessible, cost-effective solutions. Technology can play a crucial role in it. Building tools and services around the financial product can help the business provide a wide range of services to its customers. This personalized financial product can also be provided, enabling more trust and improving relations between customers and businesses.
Let us perform a fundamental Data analysis using Python implementation on a dataset from Kaggle. To download the dataset, click here.
The fraud detection dataset contains over 1 million records on which the model will be trained. Below are the dataset details:
Column | Description |
---|---|
trans_date_trans_time | Transaction DateTime |
cc_num | Credit Card Number of Customer |
merchant | Merchant Name |
category | Category of Merchant |
amt | Amount of Transaction |
first | First Name of Credit Card Holder |
last | Last Name of Credit Card Holder |
gender | Gender of Credit Card Holder |
street | Street Address of Credit Card Holder |
city | City of Credit Card Holder |
state | State of Credit Card Holder |
zip | Zip of Credit Card Holder |
lat | Latitude Location of Credit Card Holder |
long | Longitude Location of Credit Card Holder |
city_pop | Credit Card Holder’s City Population |
job | Job of Credit Card Holder |
dob | Date of Birth of Credit Card Holder |
trans_num | Transaction Number |
unix_time | UNIX Time of transaction |
merch_lat | Latitude Location of Merchant |
merch_long | Longitude Location of Merchant |
is_fraud | Fraud Flag <— Target Class |
import random
import calendar
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import norm, skew, ttest_ind
import warnings
warnings.filterwarnings('ignore')
#Read the data from train and test file then converting them to dataframe
df=pd.read_csv('C:\Decodr\G3\B\FRAUD_DETECTION_IN_IMBALANCED_DATA\data2\Train.csv')
T_df=pd.read_csv('C:\Decodr\G3\B\FRAUD_DETECTION_IN_IMBALANCED_DATA\data2\Test.csv')
#Dataframe shape
df.shape,T_df.shape
((1048575, 22), (555719, 22))
#Checking train and test dataframe info
df.info(),T_df.info()
#Checking null value in train and test dataframe
df.isna().sum(),T_df.isna().sum()
OUTPUT
#Fraud on the basis of category
sns.countplot(data=df[df['is_fraud_cat'] == "T"], x='category')
plt.xticks(rotation=45)
plt.show()
OUTPUT
Insight
Most frauds occurred in categories of shopping_net and grocery_pos
#Fraud on the basis of gender
sns.countplot(data=df[df['is_fraud_cat']=="T"],x='gender')
plt.show()
OUTPUT
Insight
Although more cases of fraud happened with female customers, the number is almost the same for both Males and Females.
#Fraud on the basis of state
fig, ax = plt.subplots(figsize=(120,60))
plt.rcParams.update({'font.size': 60})
sns.countplot(data=df[df['is_fraud_cat']=="T"],x='state')
plt.xticks(rotation=45)
for p, label in zip(ax.patches, df["state"].value_counts().index):
ax.annotate(label, (p.get_x(), p.get_height()+0.15))
plt.title("Number of Credit Card Frauds by State")
plt.show()
OUTPUT
Insight
States OH, TX, and LA report the most significant number of credit card frauds
#Fraud on the basis of City
def randomcolor():
r = random.random()
b = random.random()
g = random.random()
rgb = [r,g,b]
return rgb
plt.rcParams.update({'font.size': 20})
df[df['is_fraud_cat']=="T"]["city"].value_counts(sort=True,ascending=False)
.head(10).plot(kind="bar",color=randomcolor())
plt.title("Number of Credit Card Frauds by City")
plt.show()
OUTPUT
Insight
Dallas, Houston, and Birmingham report the most frauds city-wise.
#Fraud on the basis of Job
df[df['is_fraud_cat']=="T"]["job"].value_counts(sort=True,ascending=False).head(10)
.plot(kind="bar",color=randomcolor())
plt.title("Number of Credit Card Frauds by Job")
plt.show()
OUTPUT
Insight
Most frauds occurred in jobs of quantity surveyor followed by naval architect and materials engineer
#Fraud vs Non Fraud
plt.figure(figsize=(8,5))
ax = sns.countplot(x="is_fraud", data=df,color=randomcolor())
for p in ax.patches:
ax.annotate('{:.1f}'.format(p.get_height()), (p.get_x()+0.25, p.get_height()+0.01))
plt.show()
OUTPUT
Insight
Only around 6006 entries represent fraud transactions out of nearly 1 million entries; hence, we are looking at an imbalanced dataset.
data['trans_date_trans_time'] = pd.to_datetime(data['trans_date_trans_time'],
format='%d-%m-%Y %H:%M')
data['trans_date']=data['trans_date_trans_time'].dt.strftime('%Y-%m-%d')
data['trans_date']=pd.to_datetime(data['trans_date'])
data['dob']=pd.to_datetime(data['dob'],format='%d-%m-%Y')
data["age"] = data["trans_date"]-data["dob"]
data["age"] = data["age"].astype('int64')
data['trans_month'] = pd.DatetimeIndex(data['trans_date']).month
data['trans_year'] = pd.DatetimeIndex(data['trans_date']).year
data['Month_name'] = data['trans_month'].apply(lambda x: calendar.month_abbr[x])
data['latitudinal_distance'] = abs(round(data['merch_lat']-data['lat'],3))
data['longitudinal_distance'] = abs(round(data['merch_long']-data['long'],3))
data.gender=data.gender.apply(lambda x: 1 if x=="M" else 0)
data = data.drop(['cc_num','merchant','first','last','street','zip','trans_num',
'unix_time','trans_date_trans_time','city','lat','long','job','dob','merch_lat',
'merch_long','trans_date','state','Month_name'],axis=1)
data =pd.get_dummies(data,columns=['category'],drop_first=True)
#Performing Undersampling
normal = data[data['is_fraud']==0]
fraud = data[data['is_fraud']==1]
normal_sample=normal.sample(n=len(fraud),random_state=42)
new_data = pd.concat([normal_sample,fraud],ignore_index=True)
In the above step, I was reading data files locally for visualization, but for the implementation part, we will use cloud services such as MS Azure. I will show
you how I integrated MS Azure with the Airflow tool for data ingestion and model building. In MS Azure, firstly, create a Storage account, and then inside it a container. In this container, store the file. We will build an Airflow pipeline that fetches the data from the container and keeps it at the required location. After this, we will build an end-to-end model, and then it will deploy in-stream cloud where it can be public.
To create a storage account, you must create an Azure account. Follow the below steps:
Airflow is an open-source workflow management platform that helps build and monitor the model. It uses the Directed Acyclic Graph (DAG) to define the workflow. Airflow offers several advantages, as described below:
In the real world, building a model is not enough; we have to deploy the model into production and monitor the model performance over time and how it interacts with real-world data. We can build an end-to-end machine learning and also watch it using Airflow. In airflow, we can create a workflow and set the dependency in which they will be executed. Workflow status can also be checked in airflow whether it is completed successfully, failed restarted, etc. After the workflow is executed, the logs can be monitored in airflow. This way, we can track our production-ready model. I highly suggest you refer to the Airflow document for more details.
The workflow consists of the following steps:
As we have above, the different airflow operators. Now, let’s move toward the coding part.
data_upload_operator
from azure.storage.blob import BlobServiceClient
from config.constant import storage_account_key, storage_account_name,
connection_string, container_name, file_path_up, file_name
def uploadToBlobStorage():
try:
blob_service_client = BlobServiceClient.from_connection_string
(connection_string)
blob_client = blob_service_client.get_blob_client
(container = container_name, blob = file_name)
with open(file_path_up,"rb") as data:
blob_client.upload_blob(data)
print("Upload " + file_name + " from local to container " + container_name)
except Exception as e:
print(f"An error occurred: {str(e)}")
uploadToBlobStorage()
Above, we have defined the uploadToBlobStorage() method, which will connect with the MS azure storage account. Then, it will take the file from local storage and upload it to the cloud.
data_download_operator
from azure.storage.blob import BlobServiceClient
from config.constant import storage_account_key, storage_account_name,
connection_string, container_name, blob_name, file_path_down
def downloadFromBlobStorage():
try:
# Initialize a BlobServiceClient using the connection string
blob_service_client = BlobServiceClient.from_connection_string
(connection_string)
# Get a BlobClient for the target blob
blob_client = blob_service_client.get_blob_client
(container=container_name, blob=blob_name)
# Download the blob to a local file
with open(file_path_down, "wb") as data:
data.write(blob_client.download_blob().readall())
print(f"Downloaded {blob_name} from {container_name} to {file_path_down}")
except Exception as e:
print(f"An error occurred: {str(e)}")
downloadFromBlobStorage()
Here, the downloadFromBlobStorage() method is defined. It will connect with the storage account and download the file. Then, the file will be stored on the local path.
data_preprocessing_operator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import pandas as pd
import calendar
class DataPreprocessingOperator(BaseOperator):
@apply_defaults
def __init__(self, preprocessed_data, *args, **kwargs):
super(DataPreprocessingOperator, self).__init__(*args, **kwargs)
self.preprocessed_data = preprocessed_data
def execute(self, context):
try:
# Perform data preprocessing logic here
# For example, you can clean, transform, or engineer
#features in the ingested data
data = pd.read_csv('data/processed/ingested_data.csv')
data['trans_date_trans_time'] = pd.to_datetime
(data['trans_date_trans_time'], format='%d-%m-%Y %H:%M')
data['trans_date']=data['trans_date_trans_time'].dt.strftime('%Y-%m-%d')
data['trans_date']=pd.to_datetime(data['trans_date'])
data['dob']=pd.to_datetime(data['dob'],format='%d-%m-%Y')
data["age"] = data["trans_date"]-data["dob"]
data["age"] = data["age"].astype('int64')
data['trans_month'] = pd.DatetimeIndex(data['trans_date']).month
data['trans_year'] = pd.DatetimeIndex(data['trans_date']).year
data['Month_name'] = data['trans_month'].
apply(lambda x: calendar.month_abbr[x])
data['latitudinal_distance'] = abs(round(data['merch_lat']-data['lat'],3))
data['longitudinal_distance'] = abs(round(data['merch_long']-data['long'],3))
data.gender=data.gender.apply(lambda x: 1 if x=="M" else 0)
data = data.drop(['cc_num','merchant','first','last','street','zip',
'trans_num','unix_time','trans_date_trans_time','city','lat','long',
'job','dob','merch_lat','merch_long','trans_date','state','Month_name'],
axis=1)
data =pd.get_dummies(data,columns=['category'],drop_first=True)
#Performing Undersampling
normal = data[data['is_fraud']==0]
fraud = data[data['is_fraud']==1]
normal_sample=normal.sample(n=len(fraud),random_state=42)
new_data = pd.concat([normal_sample,fraud],ignore_index=True)
#Performing Oversampling
# normal = data[data['is_fraud']==0]
# fraud = data[data['is_fraud']==1]
# fraud_sample=fraud.sample(n=len(normal),replace=True,random_state=42)
# new_data = pd.concat([normal,fraud_sample],ignore_index=True)
# Save the preprocessed data to the output file (e.g., a CSV file)
new_data.to_csv(self.preprocessed_data, index=False)
except Exception as e:
self.log.error(f'Data preprocessing failed: {str(e)}')
raise e
model_training_operator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
class ModelTrainingRFCOperator(BaseOperator):
"""
Custom Apache Airflow operator to train a machine learning model and
save it to a file.
"""
def __init__(self, X_train_file, y_train_file, model_file, *args, **kwargs):
"""
Initialize the operator.
:param X_train_file: File path to the features of the training set (X_train).
:param y_train_file: File path to the labels of the training set (y_train).
:param model_file: File path to save the trained model.
"""
super(ModelTrainingRFCOperator, self).__init__(*args, **kwargs)
self.X_train_file = X_train_file
self.y_train_file = y_train_file
self.model_file = model_file
def execute(self, context):
self.log.info(f'Training a machine learning model using data from
{self.X_train_file,self.y_train_file }')
try:
X_train = pd.read_csv(self.X_train_file)
y_train = pd.read_csv(self.y_train_file)
print(X_train.shape)
print(y_train.shape)
# Initialize and train your machine learning model
#(replace with your model class)
RFC = RandomForestClassifier(n_estimators=100, random_state=0)
RFC.fit(X_train, y_train)
# Save the trained model to the provided model_file
joblib.dump(RFC, self.model_file)
except Exception as e:
self.log.error(f'Model training failed: {str(e)}')
raise e
After preprocessing and data splitting, the next step is to train the model. In code, we have used RandomForestClassifier for model training.
model_evaluation_operator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report
import joblib
class ModelEvaluationRFCOperator(BaseOperator):
"""
Custom Apache Airflow operator to evaluate a machine learning model and
save evaluation results to a file.
"""
@apply_defaults
def __init__(self, X_test_file, y_test_file, model_file, output_file,
*args, **kwargs):
"""
Initialize the operator.
:param X_test_file: File path to the features of the testing set (X_test).
:param y_test_file: File path to the labels of the testing set (y_test).
:param model_file: File path to load the trained model.
:param output_file: File path to save the evaluation results.
"""
super(ModelEvaluationRFCOperator, self).__init__(*args, **kwargs)
self.X_test_file = X_test_file
self.y_test_file = y_test_file
self.model_file = model_file
self.output_file = output_file
def execute(self, context):
self.log.info(f'Evaluating the machine learning model using data from
{self.X_test_file,self.y_test_file }')
# Retrieve the test data from the previous task using XCom
test_data = context['ti'].xcom_pull(task_ids='data_split_task', key='test_data')
try:
"""
Execute the operator to evaluate a machine learning model and
save evaluation results to a file.
"""
# Load the testing data and trained model from the provided files
X_test = pd.read_csv(self.X_test_file)
y_test = pd.read_csv(self.y_test_file)
model = joblib.load(self.model_file)
# Make predictions using the trained model
y_pred = model.predict(X_test)
# Calculate and print evaluation metrics
accuracy = accuracy_score(y_test, y_pred)
classification_rep = classification_report
(y_test, y_pred, target_names=['class_0', 'class_1']) # Customize labels as needed
# Save evaluation results to the specified output file
with open(self.output_file, 'w') as f:
f.write(f"Accuracy: {accuracy}\n\nClassification Report:\n
{classification_rep}")
except Exception as e:
self.log.error(f'Model evaluation failed: {str(e)}')
raise e
After model training, we evaluated the model and prepared the classification report. Here, we are checking model accuracy, precision, recall, and F1-score.
model_prediction_operator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report
import joblib
import calendar
class ModelPredictionOperator(BaseOperator):
"""
Custom Apache Airflow operator to evaluate a machine learning model and
save evaluation results to a file.
"""
@apply_defaults
def __init__(self, input_file, model_file, output_file, *args, **kwargs):
"""
Initialize the operator.
:param X_test_file: File path to the features of the testing set (X_test).
:param y_test_file: File path to the labels of the testing set (y_test).
:param model_file: File path to load the trained model.
:param output_file: File path to save the evaluation results.
"""
super(ModelPredictionOperator, self).__init__(*args, **kwargs)
self.input_file = input_file
self.model_file = model_file
self.output_file = output_file
def execute(self, context):
self.log.info(f'Evaluating the machine learning model using data
from {self.input_file}')
try:
"""
Execute the operator to evaluate a machine learning model and
save evaluation results to a file.
"""
# Load the testing data and trained model from the provided files
new_data = pd.read_csv('data/raw/Test.csv')
new_data['trans_date_trans_time'] = pd.to_datetime
(new_data['trans_date_trans_time'], format='%d-%m-%Y %H:%M')
new_data['trans_date']=new_data['trans_date_trans_time'].
dt.strftime('%Y-%m-%d')
new_data['trans_date']=pd.to_datetime(new_data['trans_date'])
new_data['dob']=pd.to_datetime(new_data['dob'],format='%d-%m-%Y')
new_data["age"] = new_data["trans_date"]-new_data["dob"]
new_data["age"] = new_data["age"].astype('int64')
new_data['trans_month'] = pd.DatetimeIndex(new_data['trans_date']).month
new_data['trans_year'] = pd.DatetimeIndex(new_data['trans_date']).year
new_data['Month_name'] =
new_data['trans_month'].apply(lambda x: calendar.month_abbr[x])
new_data['latitudinal_distance'] =
abs(round(new_data['merch_lat']-new_data['lat'],3))
new_data['longitudinal_distance'] =
abs(round(new_data['merch_long']-new_data['long'],3))
new_data.gender=new_data.gender.apply(lambda x: 1 if x=="M" else 0)
new_data = new_data.drop(['cc_num','merchant','first','last','street',
'zip','trans_num','unix_time','trans_date_trans_time','city','lat',
'long','job','dob','merch_lat','merch_long','trans_date','state',
'Month_name'],axis=1)
new_data =pd.get_dummies(new_data,columns=['category'],drop_first=True)
X_new = new_data.drop(["is_fraud"],axis=1)
y_new = new_data["is_fraud"]
model = joblib.load(self.model_file)
# Make predictions using the trained model
y_pred_new = model.predict(X_new)
print('y_new', y_new)
print('y_pred_new',y_pred_new)
# Calculate and print evaluation metrics
accuracy = accuracy_score(y_new, y_pred_new)
classification_rep = classification_report
(y_new, y_pred_new, target_names=['class_0', 'class_1']) # Customize labels as needed
# Save evaluation results to the specified output file
with open(self.output_file, 'w') as f:
f.write(f"Accuracy: {accuracy}\n\nClassification Report:\n
{classification_rep}")
except Exception as e:
self.log.error(f'Model evaluation failed: {str(e)}')
raise e
In the prediction operator, we are testing the model on a new dataset, i.e., a test data file. After the prediction, we are preparing a classification report.
Create the virtual environment using python or anaconda.
#Command to create virtual environment
python3 -m venv <virtual_environment_name>
You need to install some Python packages in your environment using the below command.
cd airflow-projects/fraud-prediction
pip install -r requirements.txt
Before running the workflow, you must install the airflow and set up the database.
#Installing airflow
pip install 'apache-airflow==2.7.1' \ --constraint
"https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/
constraints-3.8.txt"
#Setting home path
export AIRFLOW_HOME=/c/Users/[YourUsername]/airflow
#Initialize the database:
airflow db init
#Create an Airflow User
airflow users create --username admin –password admin –firstname admin
–lastname admin –role Admin –email [email protected]
#Check the created user
airflow users list
#Run the Webserver
#Run the scheduler
airflow scheduler
#If the default port 8080 is in use, change the port by typing:
airflow webserver –port <port number>
We can log in to the Airflow web portal using the username and password created above.
Above, we have created different airflow operators that can be run using Airflow DAG. We can trigger the DAG using a single click.
There is a different status through which a workflow is passed before it’s completed successfully or failed. These are shown below:
Below are the different operators which we discussed above. We can also monitor the workflow status in real time when it is executed.
We can monitor the log of the workflow-triggered DAG in Airflow. Below is the sample.
After we got the best model, then we deployed the model using the streamlit code. To run this Streamlit app in your local system, using the below command:
# command to run the streamlit app locally
streamlit run streamlit_app.py
The cloud version of an app can also be accessed using the below URL, which can be accessed publicly.
https://fraud-prediction-mlops-d8rcgc2prmv9xapx5ahhhn.streamlit.app/
For end-to-end complete ML implementation code, please click here.
We have experimented with multiple algorithms and compared the performance of each model. The results are as follows:
Models | Accuracy | Precision Non-Fraud | Precision Fraud | Recall Non-Fraud | Recall Fraud | F1-Score Non-Fraud | F1-Score Fraud |
---|---|---|---|---|---|---|---|
AdaBoostClassifier | 91.51% | 91% | 92% | 93% | 90% | 92% | 91% |
DecisionTreeClassifier | 95.51% | 96% | 95% | 95% | 95% | 96% | 95% |
GradientBoostingClassifier | 95.09% | 95% | 95% | 95% | 95% | 95% | 95% |
RandomForestClassifier | 95.96% | 96% | 96% | 97% | 95% | 96% | 96% |
After using the ensemble learning technique on a highly imbalanced dataset from the above result, we can see that all four models have performed very well with an accuracy of more than 90%. The Random Forest classifier and Decision tree classifier have almost the same accuracy, with a Random Forest being slightly better than a decision tree.
A live demo application of this project using Streamlit. It takes some input features for the product and predicts the valid or fraudulent transaction using our trained models.
Today’s world is digital, and technology has become a part of our lives. There is an increase in online services from books to smartphones and laptops. Anything can be purchased online. So, preventing fraud and implementing a Fraud detection model become essential for every company. Machine Learning can play an essential role for businesses and customers.
A. The Fraud detection dataset contains many columns that can help determine whether the transaction is valid or fraudulent. This feature includes Amount, Area, Age, Transaction Type, gender, etc.
A. The purpose of the Fraud detection model is to determine whether the transaction is fraud or not. This helps businesses to prevent fraud, gain customer trust, and increase the company’s profit.
A. The Fraud detection model is a Machine Learning model trained on more than ten lakh records of past customer transaction data to determine whether the transaction is valid. This helps to predict whether the transaction is valid or not in real-time.
A. Fraud detection is predicted based on some factors, such as data type and quality. If the model is trained on more parameters, it tends to predict the price more accurately.
A. Businesses and financial institutions can use such technology or tools to prevent fraud and increase profit. This offers them a competitive advantage over companies, helping them to attract more customers. Businesses can also build such financial tools and provide better services to customers.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.