Complete Guide to Effortless ML Monitoring with Evidently.ai

Vishal Kumar. S Last Updated : 19 Mar, 2024
21 min read

Introduction

Whether you’re a fresher or an experienced professional in the Data industry, did you know that ML models can experience up to a 20% performance drop in their first year? Monitoring these models is crucial, yet it poses challenges such as data changes, concept alterations, and data quality issues. ML Monitoring aids in early detection of model performance dips, data quality issues, and drift problems as new data streams in. This prevents failures in the ML pipeline and alerts the team to resolve the issue. Evidently.ai, a powerful open-source tool, simplifies ML Monitoring by providing pre-built reports and test suites to track data quality, data drift, and model performance. In this beginner’s guide to ML Monitoring with Evidently.ai, you’ll learn effective methods to monitor ML models in production, including monitoring setup, metrics, integrating Evidently.ai into ML lifecycles and workflows, and more.

Learning Objectives

  • Apply statistical tests to detect data quality issues like missing values, outliers, and data drift.
  • Track model performance over time by monitoring metrics like accuracy, precision, and recall using Evidently’s predefined reports and test suites.
  • Create a monitoring dashboard with plots like target drift, accuracy trend, and data quality checks using Evidently’s UI and visualization library.
  • Integrate Evidently at different stages of the ML pipeline – data preprocessing, model evaluation, and production monitoring – to track metrics.
  • Log model evaluation and drift metrics to tools like MLflow and Prefect for a complete view of model health.
  • Build custom test suites tailored to your specific data and use case by modifying its parameters.

This article was published as a part of the Data Science Blogathon.

Understanding ML Monitoring and Observability in AI Systems

ML Monitoring and Observability are essential components of maintaining the health and performance of AI systems. Let’s delve into their significance and how they contribute to the overall effectiveness of AI models.

ML Monitoring

We need ML Monitoring to do certain things:

  • Track the behavior of the models, whose output is generated, but they  are not implemented in production( Candidate models).
  • During the comparison of 2/more candidate models (A/B tests).
  • To track the performance of the production model.ML Monitoring is not only about the model, it’s about the overall health of the software system.

It’s a combination of different layers:

  • Service layer: where we will check the memory and overall latency taken.
  • Data and model health layer: It is used to check data drift, data leakage, schema change, etc., We should also monitor the KPI (Key Performance Indicators) metrics of that particular business, such as customer satisfaction, financial performance, employee productivity, sales growth, and other factors.

Note: Choosing the right metric to monitor the ML model, might not be the best metric all the time, continuous re-assessment is needed.

ML Monitoring

ML Observability

ML Observability is a superset of ML Monitoring. ML Monitoring refers to only finding the issues, and metrics and making the calculations, whereas observability covers the understanding of overall system behavior, specifically, finding the actual root cause for the issues that happened.

Both monitoring and observability help us find the issue, and its root cause, analyze it, retrain the model, and document the quality metrics, for various team members to understand and resolve the issues.

ML Observability

Key Considerations for ML Monitoring

  • Create an ML Monitoring setup concerning the specific use cases.
  • Choose model re-training concerning the use case.
  • Choose a reference dataset for reference to compare with the batch dataset.
  • Create Custom user-defined metrics for monitoring.

Let us see about these below:

ML Monitoring setup depends on the scale of complexity of deployment procedures we follow, the stability of the environment, feedback schedules, and seriousness/ impact level in case of model down, for that respective business.

We can choose automated model retraining in the deployment, to make predictions. But the decision to set up an automated retraining schedule depends on a lot of factors like cost, rules, and regulations of the company, use cases, etc.,

Reference Dataset in ML Monitoring

Suppose in production, if we have different models and each model uses different features, which belongs to variety of structures(both structured and unstructured features), it is difficult to find the data drift and other metrics. Instead we can create a reference dataset, which has all the expected trends, it should have and also some different values, and we will compare the properties of the new batch of data with the reference dataset, to find out if there is any significant differences or not.

It will serve as a baseline for distribution drift detection. Choosing the reference dataset, can be one or multiple datasets, like one for evaluating the model, other for data drift evaluation, all depends upon the use cases. We can also recreate the reference datasets based on our use cases, it may be daily/weekly/monthly using automated functions, also known as moving window strategy. So, it is important to choose a right reference dataset.

Custom Metrics in ML Monitoring

Instead of choosing the standard statistical metrics for evaluation like accuracy, precision, recall, and F1 score, we can create our custom metrics, that will bring more value to our specific use case. We can consider the KPIs to choose the user-defined metrics.

ML Monitoring Architecture

ML Monitoring needs to collect data and performance metrics at different stages. This involves:

Backend Monitoring

  • Data pipelines: Automated scripts that analyze the model predictions, data quality, and drift, and the results are stored in a database.
  • Batch monitoring: Scheduled jobs that run model evaluations and log metrics to a database.
  • Real-time monitoring: Metrics are sent from live ML models to a monitoring service for tracking.
ML Monitoring Architecture
"
  • Alerts: Get notifications when metric values are below thresholds without even the need for a dashboard.
  • Reports: Static reports for one-time sharing.
  • Dashboards: Live dashboards to interactively visualize model and data metrics over time.

ML Monitoring metrics: Model Quality, Data Quality, Data Drift

Evaluation of ML Model Quality

To evaluate the model quality, we should not only use the standard metrics like precision, and recall, but we should also use the custom metrics, to implement that, we should have a deep knowledge of the business. Standard ML Monitoring is not always enough, because the feedback/ ground truth is delayed, so we will use the past performance to predict, but it will not guarantee us future results, especially in a volatile environment, where our target variable changes frequently, and also different segment of categories needs different metrics, the total aggregate metrics are not enough always. To tackle this, we should do Early monitoring.

Here, the below command is used to install evidently:

pip install evidently

Then, we will install all the necessary libraries.

#import necessary libraries
import numpy as np
import pandas as pd
from sklearn import ensemble
from sklearn import datasets
from evidently.report import Report
from evidently.metric_preset import ClassificationPreset, RegressionPreset
from evidently.metrics import *

We will create two datasets, one is the Reference dataset, and the other one is the current dataset. Reference is the training dataset, current is the batch dataset. We will then compare these 2 datasets with Evidently to evaluate the metrics.

Note: Evidently to display the metrics, needs the following features in the datasets, the ‘target’ named feature is for the target variable, ‘prediction’ named feature is only for the predicted value from the model.

First, we will see a regression example. Here, we will create a simulated predicted value feature in both datasets, by adding some noise to the target feature values.

# Import the necessary libraries and modules
from sklearn import datasets
import pandas as pd
import numpy as np

# Load the diabetes dataset from sklearn
data = datasets.load_diabetes()

# Create a DataFrame from the dataset's features and target values
diabetes = pd.DataFrame(data.data, columns=data.feature_names)
diabetes['target'] = data.target
# Add the actual target values to the DataFrame

# Add a 'prediction' column to simulate model predictions
diabetes['prediction'] = diabetes['target'].values + np.random.normal(0, 3, diabetes.shape[0])

diabetes.columns
# Create reference and current datasets for comparison
# These datasets are samples of the main dataset and are used for model evaluation
diabetes_ref = diabetes.sample(n=50, replace=False)
diabetes_cur = diabetes.sample(n=50, replace=False)

Enjoy the evidently metrics:

# Create a Report instance for regression with a set of predefined metrics
regression_performance_report = Report(metrics=[
    RegressionPreset(),
    # Preset is used for predefined set of regression metrics
])

# Run the report on the reference and current datasets
regression_performance_report.run(reference_data=diabetes_ref.sort_index(), current_data=diabetes_cur.sort_index())

# Display the report in 'inline' mode
regression_performance_report.show(mode='inline')

Output:

"
Evidently.ai
Evidently.ai
"

Classification Metrics:

Next, we will see a classification code example with predefined metrics, and with specific metrics alone.

from sklearn.ensemble import RandomForestClassifier

# Load the Iris dataset
data = datasets.load_iris()
iris = pd.DataFrame(data.data, columns=data.feature_names)
iris['target'] = data.target

# Create a binary classification problem
positive_class = 1
iris['target'] = (iris['target'] == positive_class).astype(int)

# Split the dataset into reference and current data
iris_ref = iris.sample(n=50, replace=False)
iris_curr = iris.sample(n=50, replace=False)

# Create a RandomForestClassifier
model = RandomForestClassifier()
model.fit(iris_ref[data.feature_names], iris_ref['target'])

# Generate predictions for reference and current data
iris_ref['prediction'] = model.predict_proba(iris_ref[data.feature_names])[:, 1]
iris_curr['prediction'] = model.predict_proba(iris_curr[data.feature_names])[:, 1]
#Classification preset containing various metrics and visualizations

class_report= Report(metrics=[ClassificationPreset(probas_threshold=0.5),])
class_report.run(reference_data=iris_ref,current_data=iris_curr)
class_report.show(mode='inline')

Output:

"
"

We will now see with custom metrics.

#Classification report containing various metrics and visualizations

classification_report = Report(metrics=[
    ClassificationQualityMetric(),
    ClassificationClassBalance(),
    ClassificationConfusionMatrix(),
    ClassificationClassSeparationPlot(),
    ClassificationProbDistribution(),
    ClassificationRocCurve(),
    ClassificationPRCurve(),
    ClassificationPRTable(),


])
class_report= Report(metrics=[ClassificationPreset(probas_threshold=0.5),])
class_report.run(reference_data=iris_ref,current_data=iris_curr)
class_report.show(mode='inline')

Output:

"
Evidently.ai

Similarly, we can see the visualizations of other metrics in the report as well.

We can save the data and model metrics in 4-ways:

  • As .json format: to save and view it in a more structured manner
  • As jpeg images: we can save each metric as images to share.
  • As python dictionary: to use it any other functions in the code
  • As .html file: to share the metrics to other team members as HTML file.

Here, are the below code snippets to save the metrics:

# Save the classification report to an HTML file
classification_report.save_html("Classification Report")

# Export the classification report as a JSON object
classification_report_json = classification_report.json

# Export the classification report as a dictionary
classification_report_dict = classification_report.as_dict()

Evaluation of Data Quality

When we receive data from numerous sources, there are high chances of us facing data quality issues, let us see more about them below:

  • Issues arise with data quality in production:Choosing the wrong source for fetching the data
  • Using third-party sources for new features/data integration, which can potentially make changes in data scheme
  • Broken upstream model

Data Quality Metrics Analysis

First, we should start with Data profiling – where we will analyze the descriptive statistical values of our data such as mean, median, etc.,

There are 2 different ways of implementing it, let us see both of them.

  • Without the reference data
    • Even without the reference dataset, we can check the data quality of our new batch data, by setting manual thresholds, to send alerts, when it has more duplicate columns/rows, missing values, and co-related features than the threshold value.
  • With reference data
    • With reference data, it is even more easier to compare and send alerts, when there is a significant difference in statistical distributions and metrics, schema, features, etc., between the reference and current dataset.

Note: We should be always careful in choosing the reference dataset while implementing the default test conditions of Evidently, based on it.

Click here to access the datasets.

pip install evidently

Import necessary libraries.

import pandas as pd
import numpy as np

from sklearn import datasets
from sklearn import ensemble

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataQualityPreset
from evidently.metrics import *
from evidently.test_suite import TestSuite
from evidently.test_preset import DataQualityTestPreset, DataStabilityTestPreset
from evidently.tests import *
# Import the necessary libraries and modules
from sklearn import datasets
import pandas as pd
import numpy as np

# Load the diabetes dataset from sklearn
df=pd.read_csv("/content/drive/MyDrive/DelayedFlights.csv")
#Choose the range for reference and current dataset.
month_range = df['Month']>=6
ref_data=df[~month_range]
curr_data=df[month_range]

We will first execute test suites for our Data Quality

#Command to create test suite for Dataset Summary.
test_suite = TestSuite(tests=[DataQualityTestPreset(),])
test_suite.run(reference_data=ref_data, current_data=curr_data)
test_suite.show(mode='inflow')

We can also execute custom tests, instead of using the default tests, for e.g.,

#column-level tests
data_quality_column_tests = TestSuite(tests=[
    TestColumnValueMean(column_name='ArrDelay'),
])

data_quality_column_tests.run(reference_data=ref_data, current_data=curr_data)
data_quality_column_tests.show(mode='inline')

Output:

Evidently.ai

Data Quality Report

We can generate the Data Quality Report, as below:

#Command to create test suite for Data Quality Report.
data_quality_report = Report(metrics=[
    DataQualityPreset(),
])

data_quality_report.run(reference_data=ref_data, current_data=curr_data)
data_quality_report.show(mode='inline')

Output:

Evidently.ai
Evidently.ai

To show only specific custom metrics in report, we can use,

#dataset-level metrics
data_quality_dataset_report = Report(metrics=[
    DatasetSummaryMetric(),
    DatasetMissingValuesMetric(),
    DatasetCorrelationsMetric(),

])

data_quality_dataset_report.run(reference_data=ref_data, current_data=curr_data)
data_quality_dataset_report.show(mode='inline')

Output:

"
"
"

Evaluation of Data Drift

Data drift, also known as target drift, refers to the change in the distribution of prediction outputs over time. This phenomenon can provide valuable insights into the quality and performance of the model. Additionally, monitoring data distribution drift allows for early detection of potential issues, enabling proactive measures to maintain model accuracy and effectiveness.

There are two possible cases to consider with data drift:

  • Our model is trained on a lot of weak features. In this case, even if some features has data drift, it will not affect the performance of the model, to a great extent. Here, we can do multivariate analysis on  the data drift, to take teh data drift decision.

Note: We must be careful in setting alerts for data drifts, considering the above factors.

"
  • Suppose our model is trained on only very few important features, then it is important to consider data drift. Here, we can do univariate analysis for the data drift, or we can combine a few features, and track the share % of drifting features, or track the data drift only for top features, to take the data drift decision depending on the use case.

Tips: Data Quality is always the first step, before a data drift check, because we can detect a lot of issues, present in our data in data quality checks.

Important Considerations in Data Drift

  • Always remember, to give preference to Prediction drift, than feature drift.
  • Data drift is useful in cases, to know early, whether the model will drift or not if the feedback delay happens in the production environment.

Data Drift Detection Methods

We can detect data drift, by

Statistical Tests

In Statistical tests, there are parameter tests and non-parameter tests. 

Parameter tests are used when we know the parameter value, which is only possible for very interpretable features, and datasets with very less features. 

For large-sized data and non-sensitive datasets, it is advised to go with Non-parameterised tests. 

For example: if we have only the current batch dataset and want to find out the data drift, it is advised to use the Non-parameterised tests, then parameterized tests, to have more sense. 

We use these statistical tests typically, for smaller datasets (size <1000), these tests are more sensitive.

The drift score is calculated with the p-value.

Example:

K-S test (for numerical values),chi-squared test ( For categorical features), proportion difference test for independent samples based on Z-score (For binary categorical features)

Distance Based Tests

These tests are used when the dataset size is very large.

These tests are used for non-sensitive datasets, and they give more interpretation than the statistical tests since non-technical people can understand the data drift based on distance value, better than the p-value from statistical tests.

Drift score is calculated with distance, divergence, or similar measures.

For example: Wasserstein distance (for numerical features), Population Stability index, Jensen- Shannon divergence (Categorical features), etc.,

Rule Based Tests

There are rule-based checks, which are custom, user-defined – to detect what new changes, will be seen if new categorical values are added to the dataset.

For Large datasets, we can use Sampling (pick representative observations) or Bucketing/aggregation, for all observations.

For continuous data/ non-batch models, we can create time interval windows(e.g.) day, week, and month intervals, for separate reference and current datasets.

Custom Metrics

We can also add custom metrics, for our specific needs. We do not need the reference dataset, if the test we are choosing, does not depend on the reference dataset and the metric values, which are decided by us, instead of the reference dataset.

custom_performance_suite = TestSuite(tests=[
    #TestColumnsType(),
    #TestShareOfDriftedColumns(ls=0.5),
    TestShareOfMissingValues(eq=0),
    TestPrecisionScore(gt=0.5),
    TestRecallScore(gt=0.3),
    TestAccuracyScore(gte=0.75),
])

custom_performance_suite.run(reference_data=processed_reference, current_data=processed_prod_simulation[:batch_size])
custom_performance_suite.show(mode='inline')

Things To Consider When Data Drift is Detected

  • It is not always necessary to retrain our model if data drift is found.
  • If data drift is detected, the first step is to investigate the data quality and external factors influencing it, such as seasonal spikes or natural calamities.
  • If there are no external factors, then check the data processing steps, and consult domain experts to identify the potential reason behind the data drift.
  • Even if you want to re-train the model, the new data would not be sufficient enough to retrain the model, that too there are chances the new data drift, arises due to data corruption. So, we should be always cautious of considering Re-training the model, as a decision.
  • If data drift is found, along with no prediction drift, then we need not worry about the data drift.
  • If data drift is detected along with prediction drift, and the result is positive, then our model is robust enough to handle the data drift. However, if the prediction drift shows negative results, it is advisable to consider re-training the model.
  • It is always a good practice, to check whether data drift alerts that occurred in the past are correct, or false positive if we have access to past historical data.
data_drift_share_report = Report(metrics=[
    DatasetDriftMetric()
])

# Run the report on the reference and current datasets
data_drift_share_report.run(reference_data=diabetes_ref.sort_index(), current_data=diabetes_cur.sort_index())

# Display the report in 'inline' mode
data_drift_share_report.show(mode='inline')

Output:

"

To know the data drift report for specific features, you can follow the below code snippet:

data_drift_column_report = Report(metrics=[
    ColumnDriftMetric(column_name='ArrDelay'),
    ColumnDriftMetric(column_name='ArrDelay', stattest='psi')
])

Tips and Suggestions

  • Do not use the class or target variable, in the dataset for generating data drift report.
  • Use customized test suites, based on your specific use cases, use the preset test suite only in the initial phases.
  • Use data stability, and data quality test suite for evaluating the raw batch dataset.
  • For automating the data and model checks in all the stages of a pipeline of the ML life cycle, we can store the result values of the tests, in a dictionary and move on to the further stages, only when the values, pass the threshold condition, in all the stages of the pipeline.

To continue further steps in the pipeline, only when all the tests passed

data_drift_suite.as_dict()['summary']['all_passed'] == True
data_drift_suite.as_dict()['summary']['by_status']['SUCCESS'] > 40

5) If we do not have the target variable, we can try using the “notargetvariabletestsuite” in Evidently.ai

no_target_performance_suite = TestSuite(tests=[NoTargetPerformanceTestPreset()])

#For demo purposes, we can split the datasets into different batches, of same batch size, and try test suite with different batch data, to find whetehr the model performance is declining or not, when we try different batches
no_target_performance_suite.run(reference_data=processed_data_reference, current_data=processed_data_prod_simulation[2*batch_size:3*batch_size])
no_target_performance_suite.show(mode='inline')

Integrate Evidently in a Prefect Pipeline

Let us perform Data drift and model quality checks in a Prefect pipeline

Step 1: Import Necessary Packages

import pandas as pd
from datetime import datetime, timedelta
from sklearn import datasets
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
from scipy import stats
import numpy as np
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset, DataStabilityTestPreset

Step 2: Load Data

@task(name="Load Data", retries =3, retry_delay_seconds=5)
def load_data():
    df=pd.read_csv("DelayedFlights.csv")
    ref_data=df[1:500000]
    curr_data=df[500000:700000]
    return df,ref_data, curr_data

Step 3: Data Preprocessing

@task(name= "Data Preprocessing", retries = 3, retry_delay_seconds = 5)
def data_processing(df):
    numerical_columns = [
        'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime','CRSArrTime',
        'FlightNum', 'CRSElapsedTime', 'AirTime', 'DepDelay',
        'Distance', 'TaxiIn', 'TaxiOut', 'CarrierDelay', 'WeatherDelay', 'NASDelay',
        'SecurityDelay', 'LateAircraftDelay']
    df=df.drop(['Unnamed: 0','Year','CancellationCode','TailNum','Diverted','Cancelled','ArrTime','ActualElapsedTime'],axis=1)
    delay_colns=['CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']

    # Impute missing values with the 0 for these columns
    df[delay_colns]=df[delay_colns].fillna(0)

    # Impute missing values with the median for these columns
    columns_to_impute = ['AirTime', 'ArrDelay', 'TaxiIn','CRSElapsedTime']
    df[columns_to_impute]=df[columns_to_impute].fillna(df[columns_to_impute].median())


    df=pd.get_dummies(df,columns=['UniqueCarrier', 'Origin', 'Dest'], drop_first=True)

    z_threshold=3
    z_scores=np.abs(stats.zscore(df[numerical_columns]))
    outliers=np.where(z_scores>z_threshold)
    df_no_outliers=df[(z_scores<=z_threshold).all(axis=1)]
    return df_no_outliers

Step 4: Data Drift Test Report

@task(name="Data Drift Test Report", retries=3, retry_delay_seconds=5)
def data_drift(df):
    data_drift_suite = TestSuite(tests=[DataDriftTestPreset()])
    reference=df[1:500000]
    current=df[500000:700000]
    data_drift_suite.run(reference_data=reference, current_data=current)
    if not data_drift_suite.as_dict()['summary']['all_passed']:
        data_drift_suite.save_html("Reports/data_drift_suite.html")       

Step 5: Define The Flow

@flow(task_runner= SequentialTaskRunner)
def flow():
    df, ref_data, curr_data =load_data()
    data_quality(ref_data, curr_data)
    processed_df=data_processing(df)
    data_drift(processed_df)   

Step 6: Execute The Flow

flow()
Evidently.ai

Integrate Evidently with MLflow

 We can log data drift test results to MLflow as mentioned below:

Step1: Install All the Necessary Packages

requirements.txt:-

jupyter>=1.0.0
mlflow
evidently>=0.4.7
pandas>=1.3.5
numpy>=1.19.5
scikit-learn>=0.24.0
requests
pyarrow
psycopg
psycopg_binary

Execute the below commands:

pip install -r requirements.txt

mlflow ui --backend-store-uri sqlite:///mlflow.db
import mlflow
import pandas as pd
from datetime import datetime, timedelta
from sklearn import datasets
from scipy import stats
import numpy as np
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset, DataStabilityTestPreset

Step 2: Define a Task to Load the Data From a CSV File

# Step 2: Define a task to perform data quality tests and generate a report
def data_processing(df):
    numerical_columns = [
        'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime','CRSArrTime',
        'FlightNum', 'CRSElapsedTime', 'AirTime', 'DepDelay',
        'Distance', 'TaxiIn', 'TaxiOut', 'CarrierDelay', 'WeatherDelay', 'NASDelay',
        'SecurityDelay', 'LateAircraftDelay']
    df=df.drop(['Unnamed: 0','Year','CancellationCode','TailNum','Diverted','Cancelled','ArrTime','ActualElapsedTime'],axis=1)
    delay_colns=['CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']

    # Impute missing values with the 0 for these columns
    df[delay_colns]=df[delay_colns].fillna(0)

    # Impute missing values with the median for these columns
    columns_to_impute = ['AirTime', 'ArrDelay', 'TaxiIn','CRSElapsedTime']
    df[columns_to_impute]=df[columns_to_impute].fillna(df[columns_to_impute].median())


    df=pd.get_dummies(df,columns=['UniqueCarrier', 'Origin'], drop_first=True)
    z_threshold=3
    z_scores=np.abs(stats.zscore(df[numerical_columns]))
    outliers=np.where(z_scores>z_threshold)
    df_no_outliers=df[(z_scores<=z_threshold).all(axis=1)]
    return df_no_outliers
 

Step 3: Set MLflow Tracking URI and Experiment

# Set MLflow tracking URI and experiment
mlflow.set_tracking_uri("sqlite:///mlflow.db")
mlflow.set_experiment("Drift Test Suite")

Step 4: Define Batch Size for Data Processing

batch_size=200000

Step 5: Iterate through batches

for batch_id in range(3):
    with mlflow.start_run() as run:
        df, ref_data, curr_data =load_data()
        processed_df=data_processing(df)
        data_drift_suite = TestSuite(tests=[DataDriftTestPreset()])
        reference=df[1:500000]
        current=df[500000:]
        data_drift_suite.run(reference_data=reference, current_data=current[(batch_id*batch_size):(batch_id+1)*batch_size])
        if not data_drift_suite.as_dict()['summary']['all_passed']:
            data_drift_suite.save_html("Reports/data_drift_suite.html")

        mlflow.log_param("Sucessful tests", data_drift_suite.as_dict()['summary']['success_tests'])
        mlflow.log_param("Failure tests", data_drift_suite.as_dict()['summary']['failed_tests'])

        mlflow.log_artifact("Reports/data_drift_suite.html")
        print(run.info)

Output:

"
"

ML Monitoring Dashboard

Dashboards allow us to visualize and monitor metrics over time. Let’s examine what panels and metrics we can add to a batch monitoring dashboard. We can add many elements like Data profile, target drift, data quality over time, accuracy plot, prediction drift data quality checks to analyze dataset issues, model performance change over time, and features important for the model to detect  issues early and take necessary measures

Deployment of a Live ML Monitoring Dashboard

Here, we will see how to build a monitoring dashboard using Evidently, including panels, test suites, and reports to visualize data and model metrics over time. We will also see how to integrate Evidently with Grafana and create batch monitoring dashboards, and online monitoring service dashboards.

Batch Monitoring Dashboard:

Below is the code, to create a batch monitoring dashboard.

Step 1: Import All Necessary Libraries

# Importing necessary modules from Evidently
from evidently.report import Report
from evidently.metrics import ColumnDriftMetric, DatasetDriftMetric
from evidently.test_suite import TestSuite
from evidently.test_preset import DataQualityTestPreset
from evidently.ui.dashboards import CounterAgg, DashboardPanelCounter, DashboardPanelPlot, PanelValue, PlotType, ReportFilter, DashboardPanelTestSuite, TestFilter, TestSuitePanelType
from evidently.renderers.html_widgets import WidgetSize
from evidently.metric_preset import DataQualityPreset, TargetDriftPreset
from evidently.ui.workspace import Workspace, WorkspaceBase

Step 2: Load the Dataset

# Loading the dataset
df=pd.read_csv("DelayedFlights.csv")

Step 3: Define Reference Data and Production Simulation Data

# Defining reference data and production simulation data
reference_data = df[5:7]
prod_simulation_data = df[7:]
batch_size = 2

Step 4: Define Workspace and Project Details

# Defining workspace and project details
WORKSPACE = "Guide"
YOUR_PROJECT_NAME = "Analytics Vidhya Guide"
YOUR_PROJECT_DESCRIPTION = "Learn how to create Evidently Dashboards"

Step 5: Create Data Quality Test Suite

# Function to create data quality test suite
def create_data_quality_test_suite(i: int):
    suite = TestSuite(
        tests=[
            DataQualityTestPreset(),
        ],
        timestamp=datetime.datetime.now() + datetime.timedelta(days=i),
        tags = []
    )

    suite.run(reference_data=reference_data, current_data=prod_simulation_data[i * batch_size : (i + 1) * batch_size])
    return suite

Step 6: Create a Data Quality Report

# Function to create data quality report
def create_data_quality_report(i: int):
    report = Report(
        metrics=[
            DataQualityPreset(), ColumnDriftMetric(column_name="ArrDelay"),
        ],
        timestamp=datetime.datetime.now() + datetime.timedelta(days=i),
    )

    report.run(reference_data=reference_data, current_data=prod_simulation_data[i * batch_size : (i + 1) * batch_size])
    return report 

Step 7: Create a Project

# Function to create project
def create_project(workspace: WorkspaceBase):
    project = workspace.create_project(YOUR_PROJECT_NAME)
    project.description = YOUR_PROJECT_DESCRIPTION

    # Adding panels to the dashboard
    project.dashboard.add_panel(
        DashboardPanelCounter(
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            agg=CounterAgg.NONE,
            title="Bank Marketing Dataset",
        )
    )

    project.dashboard.add_panel(
        DashboardPanelPlot(
            title="Target Drift",
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            values=[
                PanelValue(
                    metric_id="ColumnDriftMetric",
                    metric_args={"column_name.name": "ArrDelay"},
                    field_path=ColumnDriftMetric.fields.drift_score,
                    legend="target: ArrDelay",
                ),
            ],
            plot_type=PlotType.LINE,
            size=WidgetSize.HALF
        )
    )

    # Adding test suites to the dashboard
    project.dashboard.add_panel(
        DashboardPanelTestSuite(
            title="All tests: aggregated",
            filter=ReportFilter(metadata_values={}, tag_values=[], include_test_suites=True),
            size=WidgetSize.HALF,
            time_agg="1M",
        )
    )

    project.dashboard.add_panel(
        DashboardPanelTestSuite(
            title="All tests: detailed",
            filter=ReportFilter(metadata_values={}, tag_values=[], include_test_suites=True),
            size=WidgetSize.HALF,
            panel_type=TestSuitePanelType.DETAILED,
            time_agg="1D",
        )
    )

    # Saving the project
    project.save()
    return project

Step 8: Create a Workspace and Add Reports to the Workspace

# Function to create demo project
def create_demo_project(workspace: str):
    ws = Workspace.create(workspace)
    project = create_project(ws)

    # Adding reports to the workspace
    for i in range(0, 2):
        report = create_data_quality_report(i=i)
        ws.add_report(project.id, report)
        suite = create_data_quality_test_suite(i=i)
        ws.add_report(project.id, suite)     

Step 9: Call the Main Function

# Main function
if __name__ == "__main__":
    create_demo_project(WORKSPACE)

Output:

Evidently.ai
Evidently.ai

Online Monitoring Dashboard from ML as a Service:

Here, we simulate receiving metrics, reports, and test suite data from the ML service by sending data to the Collector. The Collector fetches the data, which is then utilized for visualization on the Dashboard. This process is configured to trigger every 5 seconds. Let us see the code below:

Step 1: Import all Necessary Libraries

import datetime
import os.path
import time
import pandas as pd

from requests.exceptions import RequestException
from sklearn import datasets

# Importing modules from evidently package
from evidently.collector.client import CollectorClient
from evidently.collector.config import CollectorConfig, IntervalTrigger, ReportConfig

from evidently.test_suite import TestSuite
from evidently.test_preset import DataQualityTestPreset

from evidently.ui.dashboards import DashboardPanelTestSuite
from evidently.ui.dashboards import ReportFilter
from evidently.ui.dashboards import TestFilter
from evidently.ui.dashboards import TestSuitePanelType
from evidently.renderers.html_widgets import WidgetSize
from evidently.ui.workspace import Workspace
import pandas as pd

Step 2: Set up Constants

# Setting up constants
COLLECTOR_ID = "default"
COLLECTOR_TEST_ID = "default_test"

PROJECT_NAME = "Online monitoring as a service"
WORKSACE_PATH = "Analytics Vidhya Evidently Guide"

Step 3: Create a Client

# Creating a client
client = CollectorClient("http://localhost:8001")

Step 4: Load the Data 

# Loading data
df =pd.read_csv("DelayedFlights.csv")
ref_data=df[:5000]
batch_size=200
curr_data=df[5000:7000]

Step 5: Create a Test Suite

# Function to create a test suite
def test_suite():
    suite= TestSuite(tests=[DataQualityTestPreset()],tags=[])
    suite.run(reference_data=ref_data, current_data=curr_data)
    return ReportConfig.from_test_suite(suite)  

Step 6: Setup Workspace

# Function to setup workspace
def workspace_setup():
    ws = Workspace.create(WORKSACE_PATH)
    project = ws.create_project(PROJECT_NAME)
    project.dashboard.add_panel(
        DashboardPanelTestSuite(
            title="Data Drift Tests",
            filter=ReportFilter(metadata_values={}, tag_values=[], include_test_suites=True),
            size=WidgetSize.HALF
        )
    )
    project.dashboard.add_panel(
        DashboardPanelTestSuite(
            title="Data Drift Tests",
            filter=ReportFilter(metadata_values={}, tag_values=[], include_test_suites=True),
            size=WidgetSize.HALF,
            panel_type=TestSuitePanelType.DETAILED
        )
    )
    project.save()

Step 7: Setup Configurations

# Function to setup config
def setup_config():
    ws = Workspace.create(WORKSACE_PATH)
    project = ws.search_project(PROJECT_NAME)[0]

    test_conf = CollectorConfig(trigger=IntervalTrigger(interval=5),
                                report_config=test_suite(), project_id=str(project.id))

    client.create_collector(COLLECTOR_TEST_ID, test_conf)
    client.set_reference(COLLECTOR_TEST_ID, ref_data)  

Step 8: Send Data

# Function to send data
def send_data():
    print("Start sending data")
    for i in range(2):
        try:
            data = curr_data[i * batch_size : (i + 1) * batch_size]
            client.send_data(COLLECTOR_TEST_ID, data)
            print("sent")
        except RequestException as e:
            print(f"collector service is not available: {e.__class__.__name__}")
        time.sleep(1)     

Step 9: Define the Main Function

# Main function
def main():
    workspace_setup()
    setup_config()
    send_data()

Step 10: Run the Main Function:


# Running the main function
if __name__ =='__main__':
    main()

Output:

Evidently.ai
Evidently.ai

Integrate Evidently with Grafana Dashboard

We can integrate Evidently, with Grafana Dashboard, we use PostgreSQL database, to store the metrics results.

Our docker file, in which it consists of all necessary dependencies.

version: '3.7'

volumes:
    grafana_data: {}

networks:
  front-tier:
  back-tier:

services:
  db:
    image: postgres
    restart: always
    environment:
      POSTGRES_PASSWORD: example
    ports:
      - "5432:5432"
    networks:
      - back-tier

  adminer:
    image: adminer
    restart: always
    ports:
      - "8080:8080"
    networks:
      - back-tier
      - front-tier

  grafana:
    image: grafana/grafana:8.5.21
    user: "472"
    ports:
      - "3000:3000"
    volumes:
      - ./config/grafana_datasources.yaml:/etc/grafana/provisioning/datasources/datasource.yaml:ro
      - ./config/grafana_dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml:ro
      - ./dashboards:/opt/grafana/dashboards
    networks:
      - back-tier
      - front-tier
    restart: always  

Step 1: Import Necessary Libraries

import datetime
import time
import logging
import psycopg
import pandas as pd
from evidently.metric_preset import DataQualityPreset
from sklearn import datasets
from evidently.test_preset import DataQualityTestPreset
from evidently.report import Report
from evidently.metrics import ColumnDriftMetric, Dataset

DriftMetric

Step 2: Configure Logging Settings

# Configure logging settings
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s]: %(message)s")

Step 3: Define SQL Statement to Create a Table for Storing Drift Metrics

# Define SQL statement to create table for storing drift metrics
create_table_statement = """
drop table if exists drift_metrics;
create table drift_metrics(
	timestamp timestamp,
	target_drift float,
	share_drifted_columns float
)

Step 4: Read Dataset


# Read dataset
df=pd.read_csv("/home/vishal/mlflow_Evidently/DelayedFlights.csv")

Step 5: Define Reference and Production Simulation Data

# Define reference and production simulation data
reference_data = df[5000:5500]
prod_simulation_data = df[7000:]
mini_batch_size = 50

Step 6: Prepare Database for Storing Drift Metrics

# Function to prepare database for storing drift metrics
def prep_db():
    # Connect to PostgreSQL and create database if it doesn't exist
    with psycopg.connect("host=localhost port=5432 user=postgres password=example", autocommit=True) as conn:
        res = conn.execute("SELECT 1 FROM pg_database WHERE datname='test'")
        if len(res.fetchall()) == 0:
            conn.execute("create database test;")
        # Connect to the 'test' database and create table for drift metrics
        with psycopg.connect("host=localhost port=5432 dbname=test user=postgres password=example") as conn:
            conn.execute(create_table_statement)          

Step 7: Calculate Drift Metrics and Store them in PostgreSQL


# Function to calculate drift metrics and store them in PostgreSQL
def calulate_metrics_postgresql(curr, i):
    # Initialize report for data quality analysis
    report = Report(metrics=[
        DataQualityPreset(),
    ])

    # Run the report on reference and current data
    report.run(reference_data=reference_data, current_data=prod_simulation_data[i*mini_batch_size : (i+1)*mini_batch_size])
    result = report.as_dict()

    # Extract drift metrics from the report results
    target_drift = result['metrics'][1]['result']['drift_score']
    share_drifted_columns = result['metrics'][0]['result']['share_of_drifted_columns']

    # Insert metrics into the 'drift_metrics' table
    curr.execute(
        "insert into drift_metrics(timestamp, target_drift, share_drifted_columns) values (%s, %s, %s)",
        (datetime.datetime.now(), target_drift, share_drifted_columns)
    )

Step 8: Perform Batch Monitoring and Backfill Drift Metrics into PostgreSQL

# Function to perform batch monitoring and backfill drift metrics into PostgreSQL
def batch_monitoring_backfill():
    # Prepare the database
    prep_db()
    # Connect to the 'test' database and iterate over mini-batches of data
    with psycopg.connect("host=localhost port=5432 dbname=test user=postgres password=example", autocommit=True) as conn:
        for i in range(50):
            with conn.cursor() as curr:
                # Calculate and store drift metrics for each mini-batch
                calulate_metrics_postgresql(curr, i)
            # Log progress and wait before processing the next mini-batch
            logging.info("data sent")
            time.sleep(3)

Step 9: Execute the Project

# Entry point of the script
if __name__ == '__main__':
    batch_monitoring_backfill()

To execute the docker file, 

docker compose-up --build
python grafana.py

Output:

Grafana
Evidently.ai
Evidently.ai

Key Takeaways

  • Creating a reference dataset is crucial for effective ML Monitoring.
  • For long-term purposes, we need to create our own custom test suites, instead of using default test suites.
  • We can use Evidently, at any stage in our ML pipeline, it may be data preprocessing, cleaning, model training, evaluation and in the production environment.
  • Logging is more important than monitoring, as it helps in detecting the issues.
  • Data Drift, does not necessarily always indicate our model is bad if the features are weak.

Conclusion

In this guide, we have learned how to create default and custom test suites, presets, and metrics for Data Quality, Data Drift, Target Drift, and Model Performance drift. We also learned how to integrate tools like AirFlow, MLflow, Prefect with Evidently, and how to create Evidently Dashboards, for effective monitoring. This guide would have provided you the enough knowledge about ML Monitoring and observability in the production Environment, to implement in your upcoming projects.

Frequently Asked Questions

Q1. What is the need for ZenML here?

A. ZenML acts as an MLOps orchestration platform, in which we can integrate all our MLOps stack components, helping us in tracking experiments.

Q2. What is the need to integrate neptune.ai here?

A. Neptune.ai is a centralized experiment-tracking platform that helps us in tracking all our data and model artifacts, codes, reports, visualizations, etc.,

Q3. When to use which type of Evidently report checks in our project?

A. For effective ML Monitoring, it is advised to utilize data quality tests on raw datasets, while conducting other tests and reports on the clean, processed dataset.

Q4. Is model retraining automated in our CI/CD pipeline?

A. No, model re-training is not automated, and it should be the last consideration to be taken, there are high chance that the batch dataset, may be broken and its size also will not be sufficient to train our model again, so the decision to re-train is left out to the Data scientists and ML engineers, collaborating with the domain experts, after the failed alerts were received.

The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.

Hey, Im Vishal Kumar, I'm interested in learning about AI and Data Science, and I'm currently learning about it. I feel writing blogs will improve my skills and share my knowledge. Let's build a strong data science community.

Responses From Readers

Clear

Congratulations, You Did It!
Well Done on Completing Your Learning Journey. Stay curious and keep exploring!

We use cookies essential for this site to function well. Please click to help us improve its usefulness with additional cookies. Learn about our use of cookies in our Privacy Policy & Cookies Policy.

Show details