Are u working as an HR ? struggling to predict whether the employees in your team will continue working or they’re consider leaving the organisation, No worries ! you don’t wanna be a astrologer to predict this, by using the power of Data Science, we can predict it accurately. Let us begin our wonderful journey of employee Attrition rate with a simple, yet powerful MLOps tool, called ZenML and streamlit. Let’s start our journey.
In this article, we will learn,
This article was published as a part of the Data Science Blogathon.
Problem Statement: Predict whether an employee will leave an organisation or not based on several factors like Age, income, Performance etc.,
Solution: Build a Logistic Regression model to predict the attrition rate of an Employee
Dataset: IBM HR Analytics Employee Attrition & Performance
[Source]: https://www.kaggle.com/datasets/pavansubhasht/ibm-hr-analytics-attrition-datasetBefore seeing the implementation of our project, let us see why we are using ZenML here first.
ZenML is a simple and powerful MLOps orchestration tool used to create ML pipelines, cache pipeline steps and save computational resources. ZenML also offers integration with multiple ML tools, making it one of the best tool to create ML pipelines.We can track our model steps,evaluation metrics, we can see our pipelines visually in Dashboards and many more.
In this project, we will implement a traditional pipeline, which uses ZenML, and we will be integrating mlflow with zenml, for experiment tracking.We will also implement a continuous deployment pipeline using MLflow integration with ZenML, which will ingest and clean the data, train the model and redeploys the model, when the prediction meets some minimum evaluation criteria.With this pipeline, we can make sure, if any new model performs better than the previous model’s threshold prediction value,then the MLFlow deployment server will be updated with the new model instead of the old model.
#create a virtual environment
python3 -m venv venv
#Activate your virtual environmnent in your project folder
source venv/bin/activate
All the basic ZenML Commands with its functionalities are given below:
#Install zenml
pip install zenml
#to Launch zenml server and dashboard locally
pip install "zenml[server]"
#to see the zenml Version:
zenml version
#To initiate a new repository
zenml init
#to run the dashboard locally:
zenml up
#to know the status of our zenml Pipelines
zenml show
These commands are necessary to know to work with ZenML.
We are using mlflow as the experiment tracker, to track our model,artifacts, hyperparameter values. We are registering the stack component, experiment tracker, model-deployer here:
#Integrating mlflow with ZenML
zenml integration install mlflow -y
#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker_employee --flavor=mlflow
#Registering the model deployer
zenml model-deployer register mlflow_employee --flavor=mlflow
#Registering the stack
zenml stack register mlflow_stack_employee -a default -o default -d mlflow_employee -e mlflow_tracker_employee --set
employee-attrition-prediction/ # Project directory
├── data/
│ └── HR-Employee-Attrition.csv # Dataset file
│
├── pipelines/
│ ├── deployment_pipeline.py # Deployment pipeline
│ ├── training_pipeline.py # Training pipeline
│ └── utils.py
│
├── src/ # Source code
│ ├── data_cleaning.py # Data cleaning and preprocessing
│ ├── evaluation.py # Model evaluation
│ └── model_dev.py # Model development
│
├── steps/ # code files for ZenML steps
│ ├── ingest_data.py # Ingestion of data
│ ├── clean_data.py # Data cleaning and preprocessing
│ ├── model_train.py # Train the model
│ ├── evaluation.py # Model evaluation
│ └── config.py
│
├── streamlit_app.py # Streamlit web application
│
├── run_deployment.py # Code for running deployment and prediction pipeline
├── run_pipeline.py # Code for running training pipeline
│
├── requirements.txt # List of project required packages
├── README.md # Project documentation
└── .zen/ # ZenML directory (created automatically after ZenML initialization)
We first ingest the data from the HR-Employee-Attrition-Rate dataset from the data folder.
import pandas as pd
from zenml import step
class IngestData:
def get_data(self) -> pd.DataFrame:
df = pd.read_csv("./data/HR-Employee-Attrition.csv")
return df
@step
def ingest_data() -> pd.DataFrame:
ingest_data = IngestData()
df = ingest_data.get_data()
return df
@step is a decorator, used to make the function ingest_data() as a step of the pipeline.
#Understand the data
df.info()
# See how the data looks
df.describe()
# Check the sample data
df.head()
#Check the null values
df.isnull.sum()
#Check the percentage of people who stayed and left the company:
df['Attrition'].value_counts()
df_left = df[df['Attrition'] == "Yes"]
df_stayed = df[df['Attrition'] == "No"]
left_percentage=df_left.shape[0]*100/df.shape[0]
stayed_percentage=df_stayed.shape[0]*100/df.shape[0]
print(f"The percentage of people who left the company are:{left_percentage}")
print(f"The percentage of people who stayed the company are:{stayed_percentage}")
#Analyse the differences in features between people who stayed and people who left the company
df_left.describe()
df_stayed.describe()
import pandas as pd
class DataPreProcessStrategy(DataStrategy):
def __init__(self, encoder=None):
self.encoder = encoder
"""This class is used to preprocess the given dataset"""
def handle_data(self, data: pd.DataFrame) -> pd.DataFrame:
try:
print("Column Names Before Preprocessing:", data.columns) # Add this line
data = data.drop(["EmployeeCount", "EmployeeNumber", "StandardHours"], axis=1)
if 'Attrition' in data.columns:
print("Attrition column found in data.")
else:
print("Attrition column not found in data.")
data["Attrition"] = data["Attrition"].apply(lambda x: 1 if x == "Yes" else 0)
data["Over18"] = data["Over18"].apply(lambda x: 1 if x == "Yes" else 0)
data["OverTime"] = data["OverTime"].apply(lambda x: 1 if x == "Yes" else 0)
# Extract categorical variables
cat = data[['BusinessTravel', 'Department', 'EducationField', 'Gender', 'JobRole', 'MaritalStatus']]
# Perform one-hot encoding on categorical variables
onehot = OneHotEncoder()
cat_encoded = onehot.fit_transform(cat).toarray()
# Convert cat_encoded to DataFrame
cat_df = pd.DataFrame(cat_encoded)
# Extract numerical variables
numerical = data[['Age', 'Attrition', 'DailyRate', 'DistanceFromHome', 'Education', 'EnvironmentSatisfaction', 'HourlyRate', 'JobInvolvement', 'JobLevel', 'JobSatisfaction', 'MonthlyIncome', 'MonthlyRate', 'NumCompaniesWorked', 'Over18', 'OverTime', 'PercentSalaryHike', 'PerformanceRating', 'RelationshipSatisfaction', 'StockOptionLevel', 'TotalWorkingYears', 'TrainingTimesLastYear', 'WorkLifeBalance', 'YearsAtCompany', 'YearsInCurrentRole', 'YearsSinceLastPromotion', 'YearsWithCurrManager']]
# Concatenate X_cat_df and X_numerical
data = pd.concat([cat_df, numerical], axis=1)
print("Column Names After Preprocessing:", data.columns) # Add this line
print("Preprocessed Data:")
print(data.head())
return data
except Exception as e:
logging.error(f"Error in preprocessing the data: {e}")
raise e
The data looks like this, after all data cleaning and processing completed: You can see in the image finally, the data consists of only numerical data after encoding completed.
We will then split the training and testing datasets in the ratio of 80:20.
from sklearn.model_selection import train_test_split
class DataDivideStrategy(DataStrategy):
def handle_data(self, data: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:
try:
# Check if 'Attrition' is present in the data
if 'Attrition' in data.columns:
X = data.drop(['Attrition'], axis=1)
Y = data['Attrition']
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
return X_train, X_test, Y_train, Y_test
else:
raise ValueError("'Attrition' column not found in data.")
except Exception as e:
logging.error(f"Error in data handling: {str(e)}")
raise e
Since, its’s a Classification problem, we are using Logistic Regression here, we can also use Random forest Classifer, Gradient boosting etc., classification algorithms.
from zenml import pipeline
@training_pipeline
def training_pipeline(data_path: str):
df = ingest_data(data_path)
X_train, X_test, y_train, y_test = clean_and_split_data(df)
model = define_model() # Define your machine learning model
trained_model = train_model(model, X_train, y_train)
evaluation_metrics = evaluate_model(trained_model, X_test, y_test)
Here, @training_pipeline decorator is used to defind the function training_pipeline() as a pipeline in ZenML.
For binary classification problems, we use evaluation metrics such as: accuracy, precision, F1 score, ROC-AUC curve etc., We import classification_report from scikit-learn library, to calculate the evaluation metrics and to give us the classification report.
Code:
import logging
import numpy as np
from sklearn.metrics import classification_report
class ClassificationReport:
@staticmethod
def calculate_scores(y_true: np.ndarray, y_pred: np.ndarray):
try:
logging.info("Calculate Classification Report")
report = classification_report(y_true, y_pred, output_dict=True)
logging.info(f"Classification Report:\n{report}")
return report
except Exception as e:
logging.error(f"Error in calculating Classification Report: {e}")
raise e
Classification Report:
To see the dashboard of the training_pipeline, we need to run the run_pipelilne.py,
run_pipelilne.py,
from zenml import pipeline
from pipelines.training_pipeline import train_pipeline
from zenml.client import Client
import pandas as pd
if __name__ == "__main__":
uri = Client().active_stack.experiment_tracker.get_tracking_uri()
print(uri)
train_pipeline(data_path="./data/HR-Employee-Attrition.csv")
which will return the tracking dashboard URL, looks like this,
“Dashboard URL: http://127.0.0.1:8237/workspaces/default/pipelines/6e7941f4-cf74-4e30-b3e3-ff4428b823d2/runs/2274fc18-aba1-4536-aaee-9d2ed7d26323/dag“
You can click the URL and view your amazing training pipeline in zenml dashboard. Here, the whole pipeline image is split into different image parts to see,it more clearly in detail.
Overall the training_pipeline looks like this in the dashboard, given below:
class DeploymentTriggerConfig(BaseParameters):
min_accuracy: float = 0.5
In this class DeploymentTriggerConfig, we set a minimum accuracy parameter, which specifies what our minimum model accuracy should be.
@step(enable_cache=False)
def deployment_trigger(
accuracy: float,
config: DeploymentTriggerConfig,
):
return accuracy > config.min_accuracy
Here, this deployment_trigger() function is used to deploy the model, only when it exceeds the minimum accuracy. We will cover about why we have used caching here in the next section.
@pipeline(enable_cache=False, settings={"docker":docker_settings})
def continuous_deployment_pipeline(
data_path: str,
#data_path="C:/Users/user/Desktop/machine learning/Project/zenml Pipeline/Customer_Satisfaction_project/data/olist_customers_dataset.csv",
min_accuracy:float=0.0,
workers: int=1,
timeout: int=DEFAULT_SERVICE_START_STOP_TIMEOUT,
):
df=ingest_data()
# Clean the data and split into training/test sets
X_train,X_test,Y_train,Y_test=clean_df(df)
model=train_model(X_train,X_test,Y_train,Y_test)
evaluation_metrics=evaluate_model(model,X_test,Y_test)
deployment_decision=deployment_trigger(evaluation_metrics)
mlflow_model_deployer_step(
model=model,
deploy_decision=deployment_decision,
workers=workers,
timeout=timeout,
)
Here, in this continuous_deployment_pipeline(), we will ingest the data, clean the data, train our model, evaluate it, and deploy our model only if it passes the deployment_trigger() condition, so that we can make sure the new model we are going to deploy, will execute only if it’s prediction accuracy exceeds the previous model’s prediction accuracy,which is the threshold value. This is how the continous_deployment_pipeline() works.
Caching refers to storing the output of the previous executed steps in the pipeline. The outputs are stored in the Artifact store. We use caching in the pipeline parameter, to mention that, there is no change in the outputs in the previous runs and current running step, so zenML will reuse the previous run output itself. Enabling caching will speed up the pipeline running process and saves our computational resources. But sometimes, in situations where, we need to run pipelines, where there will be dynamic change in the input, parameters, output like our continuous_deployment_pipeline(), then turning off the caching is well and good. So, we have written enable_cache=False here.
We use inference pipeline to make predictions on the new data, based on the deployed model. Let’s see how we used this pipeline in our project.
inference_pipeline()
@pipeline(enable_cache=False,settings={"docker":docker_settings})
def inference_pipeline(pipeline_name: str, pipeline_step_name:str):
data=dynamic_importer()
#print("Data Shape for Inference:", data.shape) # Print the shape of data for inference
service=prediction_service_loader(
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
running=False,
)
prediction=predictor(service=service,data=data)
return prediction
Here, the inference_pipeline(), works in the following order:
Let us see about each of these functions below:
@step(enable_cache=False)
def dynamic_importer()->str:
data=get_data_for_test()
return data
Here, it calls the get_data_for_test() in the utils.py, which will loads the new data, do data processing and returns the data.
@step(enable_cache=False)
def prediction_service_loader(
pipeline_name: str,
pipeline_step_name: str,
running:bool=True,
model_name: str="model",
)->MLFlowDeploymentService:
mlflow_model_deployer_component=MLFlowModelDeployer.get_active_model_deployer()
existing_services=mlflow_model_deployer_component.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
model_name=model_name,
running=running,
)
if not existing_services:
raise RuntimeError(
f"No MLFlow deployment service found for pipeline {pipeline_name},step {pipeline_step_name} and model{model_name} and pipeline for the model {model_name} is currently running"
)
Here, in this prediction_service_loader (), we load the deployment service with respect to the deployed model based on the parameters. A deployment service is a runtime environment, where our deployed model, is ready to accept inference requests to make predictions on the new data. The line existing_services=mlflow_model_deployer_component.find_model_server(), searches for any existing deployment service available based on the given parameters like pipeline name and pipeline step name, if there is no existing services available, then it means the deployment pipeline is not executed yet, or there is an issue with the deployment pipeline, so it thows an Runtime Error.
@step
def predictor(
service: MLFlowDeploymentService,
data: str,
) -> np.ndarray:
"""Run an inference request against a prediction service"""
service.start(timeout=21) # should be a NOP if already started
data = json.loads(data)
data.pop("columns")
data.pop("index")
columns_for_df = [
0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,"Age","DailyRate","DistanceFromHome","Education","EnvironmentSatisfaction","HourlyRate","JobInvolvement","JobLevel","JobSatisfaction","MonthlyIncome","MonthlyRate","NumCompaniesWorked","Over18","OverTime","PercentSalaryHike","PerformanceRating","RelationshipSatisfaction","StockOptionLevel","TotalWorkingYears","TrainingTimesLastYear","WorkLifeBalance","YearsAtCompany","YearsInCurrentRole","YearsSinceLastPromotion","YearsWithCurrManager",
]
df = pd.DataFrame(data["data"], columns=columns_for_df)
json_list = json.loads(json.dumps(list(df.T.to_dict().values())))
data = np.array(json_list)
prediction = service.predict(data)
return prediction
After, having the deployed model and the new data, we can use the predictor(), to make the predictions.
To visually, see the continuous deployment and inference pipeline, we need to run the run_deployment.py, where the configurations, to deploy and predict will be defined.
@click.option(
"--config",
type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]),
default=DEPLOY_AND_PREDICT,
help="Optionally you can choose to only run the deployment "
"pipeline to train and deploy a model (`deploy`), or to "
"only run a prediction against the deployed model "
"(`predict`). By default both will be run "
"(`deploy_and_predict`).",
)
Here, we can either run the continuous deployment pipeline or the inference pipeline, by following these commands,
#The continuous deployment pipeline
python run_deployment.py
#To see the inference Pipeline(that is to deploy and predict)
python run_deployment.py --config predict
After executing, the commands, you can see the zenML dashboard URL,like this
Dashboard URL: http://127.0.0.1:8237/workspaces/default/pipelines/b437cf1a-971c-4a23-a3b6-c296c1cdf8ca/runs/58826e07-6139-453d-88f9-b3c771bb6695/dag
Enjoy your pipeline visualisations in the dashboard:
The continuous deployment pipeline,(from ingestion of data to mlflow_model_deployer_step looks like),
Streamlit is an amazing open-source, python based framework, used to create UI’s, we can use streamlit to build web apps quickly, without knowing backend or frontend development. First, we need to install streamlit in our PC.
The commands to install and run streamlit server in our local system are,
#install streamlit in our local PC
pip install streamlit
#to run the streamlit local web server
streamlit run streamlit_app.py
Code:
import json
import numpy as np
import pandas as pd
import streamlit as st
from PIL import Image
from pipelines.deployment_pipeline import prediction_service_loader
from run_deployment import main
# Define a global variable to keep track of the service status
service_started = False
def start_service():
global service_started
service = prediction_service_loader(
pipeline_name="continuous_deployment_pipeline",
pipeline_step_name="mlflow_model_deployer_step",
running=False,
)
service.start(timeout=21) # Start the service
service_started = True
return service
def stop_service(service):
global service_started
service.stop() # Stop the service
service_started = False
def main():
st.title("Employee Attrition Prediction")
age = st.sidebar.slider("Age", 18, 65, 30)
monthly_income = st.sidebar.slider("Monthly Income", 0, 20000, 5000)
total_working_years = st.sidebar.slider("Total Working Years", 0, 40, 10)
years_in_current_role = st.sidebar.slider("Years in Current Role", 0, 20, 5)
years_since_last_promotion = st.sidebar.slider("Years Since Last Promotion", 0, 15, 2)
if st.button("Predict"):
global service_started
if not service_started:
service = start_service()
input_data = {
"Age": [age],
"MonthlyIncome": [monthly_income],
"TotalWorkingYears": [total_working_years],
"YearsInCurrentRole": [years_in_current_role],
"YearsSinceLastPromotion": [years_since_last_promotion],
}
df = pd.DataFrame(input_data)
json_list = json.loads(json.dumps(list(df.T.to_dict().values())))
data = np.array(json_list)
pred = service.predict(data)
st.success(
"Predicted Employee Attrition Probability (0 - 1): {:.2f}".format(
pred[0]
)
)
# Stop the service after prediction
if service_started:
stop_service(service)
if __name__ == "__main__":
main()
Here, we have created a streamlit web app, named “Employee Attrition Prediction“, in which users can provide the inputs such as Age, monthly income etc., to make the prediction, when the user clicks the “Predict” button, the input data is sent to the deployed model, the prediction is made and displayed for the user. This, is how our streamlit_app works. When, we run the streamlit_app.py file, we will get the network URL like this,
By clicking the network URL, we can see the amazing Streamlit UI, used to make predictions.
You can view all your stacks, components used, number of pipelines ran in the ZenML Dashboard making your MLOps journey easy.
ZenML Dashboard:
Stacks:
Components:
Number of Pipelines:
Number of runs:
We have successfully built an End-to-End Employee Attrition Rate prediciton MLOps project. We have ingested the data, cleaned it, trained the model, evaluate the model, trigger the deployment, deploy the model, predict the model by getting the new data, search for existing model services, if present, then predict the data, get the user inputs from the Streamlit web app and make predictions, while will help the HR department to take data driven decisions.
GitHub Code: https://github.com/VishalKumar-S/Employee-attrition-rate-MLOps-Project
A. Yes, ZenML is a free open-source MLOps tool, but to use the ZenML cloud, to use the zenml cloud servers with additional support from their team, it costs additionally.
A. Unlike Streamlit, to use FastAPI/ Flask / Shiny, it requires strong knowledge in HTML/CSS to create interactive UI’s. Whereas, in Streamlit, we do not need front-end knowledge to use it.
A. While ZenML provides a framework to manage and orchestrate ML pipelines, by integrating with mlflow we can track our ML experiments, it’s artefacts, parameters, and log metrics. So, we can get more info about the execution of steps.
A. The company should make retention strategies to prevent skilled employees who are at high risk of leaving, by making salary adjustments, creating engaging programs for them, training programs for their career and personal growth, and ensuring a good work environment which improves both employee’s career growth and company’s growth.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.