Whether you’re a fresher or an experienced professional in the Data industry, did you know that ML models can experience up to a 20% performance drop in their first year? Monitoring these models is crucial, yet it poses challenges such as data changes, concept alterations, and data quality issues. ML Monitoring aids in early detection of model performance dips, data quality issues, and drift problems as new data streams in. This prevents failures in the ML pipeline and alerts the team to resolve the issue. Evidently.ai, a powerful open-source tool, simplifies ML Monitoring by providing pre-built reports and test suites to track data quality, data drift, and model performance. In this beginner’s guide to ML Monitoring with Evidently.ai, you’ll learn effective methods to monitor ML models in production, including monitoring setup, metrics, integrating Evidently.ai into ML lifecycles and workflows, and more.
This article was published as a part of the Data Science Blogathon.
ML Monitoring and Observability are essential components of maintaining the health and performance of AI systems. Let’s delve into their significance and how they contribute to the overall effectiveness of AI models.
We need ML Monitoring to do certain things:
It’s a combination of different layers:
Note: Choosing the right metric to monitor the ML model, might not be the best metric all the time, continuous re-assessment is needed.
ML Observability is a superset of ML Monitoring. ML Monitoring refers to only finding the issues, and metrics and making the calculations, whereas observability covers the understanding of overall system behavior, specifically, finding the actual root cause for the issues that happened.
Both monitoring and observability help us find the issue, and its root cause, analyze it, retrain the model, and document the quality metrics, for various team members to understand and resolve the issues.
Let us see about these below:
ML Monitoring setup depends on the scale of complexity of deployment procedures we follow, the stability of the environment, feedback schedules, and seriousness/ impact level in case of model down, for that respective business.
We can choose automated model retraining in the deployment, to make predictions. But the decision to set up an automated retraining schedule depends on a lot of factors like cost, rules, and regulations of the company, use cases, etc.,
Suppose in production, if we have different models and each model uses different features, which belongs to variety of structures(both structured and unstructured features), it is difficult to find the data drift and other metrics. Instead we can create a reference dataset, which has all the expected trends, it should have and also some different values, and we will compare the properties of the new batch of data with the reference dataset, to find out if there is any significant differences or not.
It will serve as a baseline for distribution drift detection. Choosing the reference dataset, can be one or multiple datasets, like one for evaluating the model, other for data drift evaluation, all depends upon the use cases. We can also recreate the reference datasets based on our use cases, it may be daily/weekly/monthly using automated functions, also known as moving window strategy. So, it is important to choose a right reference dataset.
Instead of choosing the standard statistical metrics for evaluation like accuracy, precision, recall, and F1 score, we can create our custom metrics, that will bring more value to our specific use case. We can consider the KPIs to choose the user-defined metrics.
ML Monitoring needs to collect data and performance metrics at different stages. This involves:
To evaluate the model quality, we should not only use the standard metrics like precision, and recall, but we should also use the custom metrics, to implement that, we should have a deep knowledge of the business. Standard ML Monitoring is not always enough, because the feedback/ ground truth is delayed, so we will use the past performance to predict, but it will not guarantee us future results, especially in a volatile environment, where our target variable changes frequently, and also different segment of categories needs different metrics, the total aggregate metrics are not enough always. To tackle this, we should do Early monitoring.
Here, the below command is used to install evidently:
pip install evidently
Then, we will install all the necessary libraries.
#import necessary libraries
import numpy as np
import pandas as pd
from sklearn import ensemble
from sklearn import datasets
from evidently.report import Report
from evidently.metric_preset import ClassificationPreset, RegressionPreset
from evidently.metrics import *
We will create two datasets, one is the Reference dataset, and the other one is the current dataset. Reference is the training dataset, current is the batch dataset. We will then compare these 2 datasets with Evidently to evaluate the metrics.
Note: Evidently to display the metrics, needs the following features in the datasets, the ‘target’ named feature is for the target variable, ‘prediction’ named feature is only for the predicted value from the model.
First, we will see a regression example. Here, we will create a simulated predicted value feature in both datasets, by adding some noise to the target feature values.
# Import the necessary libraries and modules
from sklearn import datasets
import pandas as pd
import numpy as np
# Load the diabetes dataset from sklearn
data = datasets.load_diabetes()
# Create a DataFrame from the dataset's features and target values
diabetes = pd.DataFrame(data.data, columns=data.feature_names)
diabetes['target'] = data.target
# Add the actual target values to the DataFrame
# Add a 'prediction' column to simulate model predictions
diabetes['prediction'] = diabetes['target'].values + np.random.normal(0, 3, diabetes.shape[0])
diabetes.columns
# Create reference and current datasets for comparison
# These datasets are samples of the main dataset and are used for model evaluation
diabetes_ref = diabetes.sample(n=50, replace=False)
diabetes_cur = diabetes.sample(n=50, replace=False)
Enjoy the evidently metrics:
# Create a Report instance for regression with a set of predefined metrics
regression_performance_report = Report(metrics=[
RegressionPreset(),
# Preset is used for predefined set of regression metrics
])
# Run the report on the reference and current datasets
regression_performance_report.run(reference_data=diabetes_ref.sort_index(), current_data=diabetes_cur.sort_index())
# Display the report in 'inline' mode
regression_performance_report.show(mode='inline')
Output:
Next, we will see a classification code example with predefined metrics, and with specific metrics alone.
from sklearn.ensemble import RandomForestClassifier
# Load the Iris dataset
data = datasets.load_iris()
iris = pd.DataFrame(data.data, columns=data.feature_names)
iris['target'] = data.target
# Create a binary classification problem
positive_class = 1
iris['target'] = (iris['target'] == positive_class).astype(int)
# Split the dataset into reference and current data
iris_ref = iris.sample(n=50, replace=False)
iris_curr = iris.sample(n=50, replace=False)
# Create a RandomForestClassifier
model = RandomForestClassifier()
model.fit(iris_ref[data.feature_names], iris_ref['target'])
# Generate predictions for reference and current data
iris_ref['prediction'] = model.predict_proba(iris_ref[data.feature_names])[:, 1]
iris_curr['prediction'] = model.predict_proba(iris_curr[data.feature_names])[:, 1]
#Classification preset containing various metrics and visualizations
class_report= Report(metrics=[ClassificationPreset(probas_threshold=0.5),])
class_report.run(reference_data=iris_ref,current_data=iris_curr)
class_report.show(mode='inline')
Output:
We will now see with custom metrics.
#Classification report containing various metrics and visualizations
classification_report = Report(metrics=[
ClassificationQualityMetric(),
ClassificationClassBalance(),
ClassificationConfusionMatrix(),
ClassificationClassSeparationPlot(),
ClassificationProbDistribution(),
ClassificationRocCurve(),
ClassificationPRCurve(),
ClassificationPRTable(),
])
class_report= Report(metrics=[ClassificationPreset(probas_threshold=0.5),])
class_report.run(reference_data=iris_ref,current_data=iris_curr)
class_report.show(mode='inline')
Output:
Similarly, we can see the visualizations of other metrics in the report as well.
We can save the data and model metrics in 4-ways:
Here, are the below code snippets to save the metrics:
# Save the classification report to an HTML file
classification_report.save_html("Classification Report")
# Export the classification report as a JSON object
classification_report_json = classification_report.json
# Export the classification report as a dictionary
classification_report_dict = classification_report.as_dict()
When we receive data from numerous sources, there are high chances of us facing data quality issues, let us see more about them below:
First, we should start with Data profiling – where we will analyze the descriptive statistical values of our data such as mean, median, etc.,
There are 2 different ways of implementing it, let us see both of them.
Note: We should be always careful in choosing the reference dataset while implementing the default test conditions of Evidently, based on it.
Click here to access the datasets.
pip install evidently
Import necessary libraries.
import pandas as pd
import numpy as np
from sklearn import datasets
from sklearn import ensemble
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataQualityPreset
from evidently.metrics import *
from evidently.test_suite import TestSuite
from evidently.test_preset import DataQualityTestPreset, DataStabilityTestPreset
from evidently.tests import *
# Import the necessary libraries and modules
from sklearn import datasets
import pandas as pd
import numpy as np
# Load the diabetes dataset from sklearn
df=pd.read_csv("/content/drive/MyDrive/DelayedFlights.csv")
#Choose the range for reference and current dataset.
month_range = df['Month']>=6
ref_data=df[~month_range]
curr_data=df[month_range]
We will first execute test suites for our Data Quality
#Command to create test suite for Dataset Summary.
test_suite = TestSuite(tests=[DataQualityTestPreset(),])
test_suite.run(reference_data=ref_data, current_data=curr_data)
test_suite.show(mode='inflow')
We can also execute custom tests, instead of using the default tests, for e.g.,
#column-level tests
data_quality_column_tests = TestSuite(tests=[
TestColumnValueMean(column_name='ArrDelay'),
])
data_quality_column_tests.run(reference_data=ref_data, current_data=curr_data)
data_quality_column_tests.show(mode='inline')
Output:
We can generate the Data Quality Report, as below:
#Command to create test suite for Data Quality Report.
data_quality_report = Report(metrics=[
DataQualityPreset(),
])
data_quality_report.run(reference_data=ref_data, current_data=curr_data)
data_quality_report.show(mode='inline')
Output:
To show only specific custom metrics in report, we can use,
#dataset-level metrics
data_quality_dataset_report = Report(metrics=[
DatasetSummaryMetric(),
DatasetMissingValuesMetric(),
DatasetCorrelationsMetric(),
])
data_quality_dataset_report.run(reference_data=ref_data, current_data=curr_data)
data_quality_dataset_report.show(mode='inline')
Output:
Data drift, also known as target drift, refers to the change in the distribution of prediction outputs over time. This phenomenon can provide valuable insights into the quality and performance of the model. Additionally, monitoring data distribution drift allows for early detection of potential issues, enabling proactive measures to maintain model accuracy and effectiveness.
There are two possible cases to consider with data drift:
Note: We must be careful in setting alerts for data drifts, considering the above factors.
Tips: Data Quality is always the first step, before a data drift check, because we can detect a lot of issues, present in our data in data quality checks.
We can detect data drift, by
In Statistical tests, there are parameter tests and non-parameter tests.
Parameter tests are used when we know the parameter value, which is only possible for very interpretable features, and datasets with very less features.
For large-sized data and non-sensitive datasets, it is advised to go with Non-parameterised tests.
For example: if we have only the current batch dataset and want to find out the data drift, it is advised to use the Non-parameterised tests, then parameterized tests, to have more sense.
We use these statistical tests typically, for smaller datasets (size <1000), these tests are more sensitive.
The drift score is calculated with the p-value.
Example:
K-S test (for numerical values),chi-squared test ( For categorical features), proportion difference test for independent samples based on Z-score (For binary categorical features)
These tests are used when the dataset size is very large.
These tests are used for non-sensitive datasets, and they give more interpretation than the statistical tests since non-technical people can understand the data drift based on distance value, better than the p-value from statistical tests.
Drift score is calculated with distance, divergence, or similar measures.
For example: Wasserstein distance (for numerical features), Population Stability index, Jensen- Shannon divergence (Categorical features), etc.,
There are rule-based checks, which are custom, user-defined – to detect what new changes, will be seen if new categorical values are added to the dataset.
For Large datasets, we can use Sampling (pick representative observations) or Bucketing/aggregation, for all observations.
For continuous data/ non-batch models, we can create time interval windows(e.g.) day, week, and month intervals, for separate reference and current datasets.
We can also add custom metrics, for our specific needs. We do not need the reference dataset, if the test we are choosing, does not depend on the reference dataset and the metric values, which are decided by us, instead of the reference dataset.
custom_performance_suite = TestSuite(tests=[
#TestColumnsType(),
#TestShareOfDriftedColumns(ls=0.5),
TestShareOfMissingValues(eq=0),
TestPrecisionScore(gt=0.5),
TestRecallScore(gt=0.3),
TestAccuracyScore(gte=0.75),
])
custom_performance_suite.run(reference_data=processed_reference, current_data=processed_prod_simulation[:batch_size])
custom_performance_suite.show(mode='inline')
data_drift_share_report = Report(metrics=[
DatasetDriftMetric()
])
# Run the report on the reference and current datasets
data_drift_share_report.run(reference_data=diabetes_ref.sort_index(), current_data=diabetes_cur.sort_index())
# Display the report in 'inline' mode
data_drift_share_report.show(mode='inline')
Output:
To know the data drift report for specific features, you can follow the below code snippet:
data_drift_column_report = Report(metrics=[
ColumnDriftMetric(column_name='ArrDelay'),
ColumnDriftMetric(column_name='ArrDelay', stattest='psi')
])
To continue further steps in the pipeline, only when all the tests passed
data_drift_suite.as_dict()['summary']['all_passed'] == True
data_drift_suite.as_dict()['summary']['by_status']['SUCCESS'] > 40
5) If we do not have the target variable, we can try using the “notargetvariabletestsuite” in Evidently.ai
no_target_performance_suite = TestSuite(tests=[NoTargetPerformanceTestPreset()])
#For demo purposes, we can split the datasets into different batches, of same batch size, and try test suite with different batch data, to find whetehr the model performance is declining or not, when we try different batches
no_target_performance_suite.run(reference_data=processed_data_reference, current_data=processed_data_prod_simulation[2*batch_size:3*batch_size])
no_target_performance_suite.show(mode='inline')
Let us perform Data drift and model quality checks in a Prefect pipeline
import pandas as pd
from datetime import datetime, timedelta
from sklearn import datasets
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
from scipy import stats
import numpy as np
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset, DataStabilityTestPreset
@task(name="Load Data", retries =3, retry_delay_seconds=5)
def load_data():
df=pd.read_csv("DelayedFlights.csv")
ref_data=df[1:500000]
curr_data=df[500000:700000]
return df,ref_data, curr_data
@task(name= "Data Preprocessing", retries = 3, retry_delay_seconds = 5)
def data_processing(df):
numerical_columns = [
'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime','CRSArrTime',
'FlightNum', 'CRSElapsedTime', 'AirTime', 'DepDelay',
'Distance', 'TaxiIn', 'TaxiOut', 'CarrierDelay', 'WeatherDelay', 'NASDelay',
'SecurityDelay', 'LateAircraftDelay']
df=df.drop(['Unnamed: 0','Year','CancellationCode','TailNum','Diverted','Cancelled','ArrTime','ActualElapsedTime'],axis=1)
delay_colns=['CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']
# Impute missing values with the 0 for these columns
df[delay_colns]=df[delay_colns].fillna(0)
# Impute missing values with the median for these columns
columns_to_impute = ['AirTime', 'ArrDelay', 'TaxiIn','CRSElapsedTime']
df[columns_to_impute]=df[columns_to_impute].fillna(df[columns_to_impute].median())
df=pd.get_dummies(df,columns=['UniqueCarrier', 'Origin', 'Dest'], drop_first=True)
z_threshold=3
z_scores=np.abs(stats.zscore(df[numerical_columns]))
outliers=np.where(z_scores>z_threshold)
df_no_outliers=df[(z_scores<=z_threshold).all(axis=1)]
return df_no_outliers
@task(name="Data Drift Test Report", retries=3, retry_delay_seconds=5)
def data_drift(df):
data_drift_suite = TestSuite(tests=[DataDriftTestPreset()])
reference=df[1:500000]
current=df[500000:700000]
data_drift_suite.run(reference_data=reference, current_data=current)
if not data_drift_suite.as_dict()['summary']['all_passed']:
data_drift_suite.save_html("Reports/data_drift_suite.html")
@flow(task_runner= SequentialTaskRunner)
def flow():
df, ref_data, curr_data =load_data()
data_quality(ref_data, curr_data)
processed_df=data_processing(df)
data_drift(processed_df)
flow()
We can log data drift test results to MLflow as mentioned below:
requirements.txt:-
jupyter>=1.0.0
mlflow
evidently>=0.4.7
pandas>=1.3.5
numpy>=1.19.5
scikit-learn>=0.24.0
requests
pyarrow
psycopg
psycopg_binary
Execute the below commands:
pip install -r requirements.txt
mlflow ui --backend-store-uri sqlite:///mlflow.db
import mlflow
import pandas as pd
from datetime import datetime, timedelta
from sklearn import datasets
from scipy import stats
import numpy as np
from evidently.test_suite import TestSuite
from evidently.test_preset import DataDriftTestPreset, DataQualityTestPreset, DataStabilityTestPreset
# Step 2: Define a task to perform data quality tests and generate a report
def data_processing(df):
numerical_columns = [
'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime','CRSArrTime',
'FlightNum', 'CRSElapsedTime', 'AirTime', 'DepDelay',
'Distance', 'TaxiIn', 'TaxiOut', 'CarrierDelay', 'WeatherDelay', 'NASDelay',
'SecurityDelay', 'LateAircraftDelay']
df=df.drop(['Unnamed: 0','Year','CancellationCode','TailNum','Diverted','Cancelled','ArrTime','ActualElapsedTime'],axis=1)
delay_colns=['CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']
# Impute missing values with the 0 for these columns
df[delay_colns]=df[delay_colns].fillna(0)
# Impute missing values with the median for these columns
columns_to_impute = ['AirTime', 'ArrDelay', 'TaxiIn','CRSElapsedTime']
df[columns_to_impute]=df[columns_to_impute].fillna(df[columns_to_impute].median())
df=pd.get_dummies(df,columns=['UniqueCarrier', 'Origin'], drop_first=True)
z_threshold=3
z_scores=np.abs(stats.zscore(df[numerical_columns]))
outliers=np.where(z_scores>z_threshold)
df_no_outliers=df[(z_scores<=z_threshold).all(axis=1)]
return df_no_outliers
# Set MLflow tracking URI and experiment
mlflow.set_tracking_uri("sqlite:///mlflow.db")
mlflow.set_experiment("Drift Test Suite")
batch_size=200000
Step 5: Iterate through batches
for batch_id in range(3):
with mlflow.start_run() as run:
df, ref_data, curr_data =load_data()
processed_df=data_processing(df)
data_drift_suite = TestSuite(tests=[DataDriftTestPreset()])
reference=df[1:500000]
current=df[500000:]
data_drift_suite.run(reference_data=reference, current_data=current[(batch_id*batch_size):(batch_id+1)*batch_size])
if not data_drift_suite.as_dict()['summary']['all_passed']:
data_drift_suite.save_html("Reports/data_drift_suite.html")
mlflow.log_param("Sucessful tests", data_drift_suite.as_dict()['summary']['success_tests'])
mlflow.log_param("Failure tests", data_drift_suite.as_dict()['summary']['failed_tests'])
mlflow.log_artifact("Reports/data_drift_suite.html")
print(run.info)
Output:
Dashboards allow us to visualize and monitor metrics over time. Let’s examine what panels and metrics we can add to a batch monitoring dashboard. We can add many elements like Data profile, target drift, data quality over time, accuracy plot, prediction drift data quality checks to analyze dataset issues, model performance change over time, and features important for the model to detect issues early and take necessary measures
Here, we will see how to build a monitoring dashboard using Evidently, including panels, test suites, and reports to visualize data and model metrics over time. We will also see how to integrate Evidently with Grafana and create batch monitoring dashboards, and online monitoring service dashboards.
Batch Monitoring Dashboard:
Below is the code, to create a batch monitoring dashboard.
# Importing necessary modules from Evidently
from evidently.report import Report
from evidently.metrics import ColumnDriftMetric, DatasetDriftMetric
from evidently.test_suite import TestSuite
from evidently.test_preset import DataQualityTestPreset
from evidently.ui.dashboards import CounterAgg, DashboardPanelCounter, DashboardPanelPlot, PanelValue, PlotType, ReportFilter, DashboardPanelTestSuite, TestFilter, TestSuitePanelType
from evidently.renderers.html_widgets import WidgetSize
from evidently.metric_preset import DataQualityPreset, TargetDriftPreset
from evidently.ui.workspace import Workspace, WorkspaceBase
# Loading the dataset
df=pd.read_csv("DelayedFlights.csv")
# Defining reference data and production simulation data
reference_data = df[5:7]
prod_simulation_data = df[7:]
batch_size = 2
# Defining workspace and project details
WORKSPACE = "Guide"
YOUR_PROJECT_NAME = "Analytics Vidhya Guide"
YOUR_PROJECT_DESCRIPTION = "Learn how to create Evidently Dashboards"
# Function to create data quality test suite
def create_data_quality_test_suite(i: int):
suite = TestSuite(
tests=[
DataQualityTestPreset(),
],
timestamp=datetime.datetime.now() + datetime.timedelta(days=i),
tags = []
)
suite.run(reference_data=reference_data, current_data=prod_simulation_data[i * batch_size : (i + 1) * batch_size])
return suite
# Function to create data quality report
def create_data_quality_report(i: int):
report = Report(
metrics=[
DataQualityPreset(), ColumnDriftMetric(column_name="ArrDelay"),
],
timestamp=datetime.datetime.now() + datetime.timedelta(days=i),
)
report.run(reference_data=reference_data, current_data=prod_simulation_data[i * batch_size : (i + 1) * batch_size])
return report
# Function to create project
def create_project(workspace: WorkspaceBase):
project = workspace.create_project(YOUR_PROJECT_NAME)
project.description = YOUR_PROJECT_DESCRIPTION
# Adding panels to the dashboard
project.dashboard.add_panel(
DashboardPanelCounter(
filter=ReportFilter(metadata_values={}, tag_values=[]),
agg=CounterAgg.NONE,
title="Bank Marketing Dataset",
)
)
project.dashboard.add_panel(
DashboardPanelPlot(
title="Target Drift",
filter=ReportFilter(metadata_values={}, tag_values=[]),
values=[
PanelValue(
metric_id="ColumnDriftMetric",
metric_args={"column_name.name": "ArrDelay"},
field_path=ColumnDriftMetric.fields.drift_score,
legend="target: ArrDelay",
),
],
plot_type=PlotType.LINE,
size=WidgetSize.HALF
)
)
# Adding test suites to the dashboard
project.dashboard.add_panel(
DashboardPanelTestSuite(
title="All tests: aggregated",
filter=ReportFilter(metadata_values={}, tag_values=[], include_test_suites=True),
size=WidgetSize.HALF,
time_agg="1M",
)
)
project.dashboard.add_panel(
DashboardPanelTestSuite(
title="All tests: detailed",
filter=ReportFilter(metadata_values={}, tag_values=[], include_test_suites=True),
size=WidgetSize.HALF,
panel_type=TestSuitePanelType.DETAILED,
time_agg="1D",
)
)
# Saving the project
project.save()
return project
# Function to create demo project
def create_demo_project(workspace: str):
ws = Workspace.create(workspace)
project = create_project(ws)
# Adding reports to the workspace
for i in range(0, 2):
report = create_data_quality_report(i=i)
ws.add_report(project.id, report)
suite = create_data_quality_test_suite(i=i)
ws.add_report(project.id, suite)
# Main function
if __name__ == "__main__":
create_demo_project(WORKSPACE)
Output:
Here, we simulate receiving metrics, reports, and test suite data from the ML service by sending data to the Collector. The Collector fetches the data, which is then utilized for visualization on the Dashboard. This process is configured to trigger every 5 seconds. Let us see the code below:
import datetime
import os.path
import time
import pandas as pd
from requests.exceptions import RequestException
from sklearn import datasets
# Importing modules from evidently package
from evidently.collector.client import CollectorClient
from evidently.collector.config import CollectorConfig, IntervalTrigger, ReportConfig
from evidently.test_suite import TestSuite
from evidently.test_preset import DataQualityTestPreset
from evidently.ui.dashboards import DashboardPanelTestSuite
from evidently.ui.dashboards import ReportFilter
from evidently.ui.dashboards import TestFilter
from evidently.ui.dashboards import TestSuitePanelType
from evidently.renderers.html_widgets import WidgetSize
from evidently.ui.workspace import Workspace
import pandas as pd
# Setting up constants
COLLECTOR_ID = "default"
COLLECTOR_TEST_ID = "default_test"
PROJECT_NAME = "Online monitoring as a service"
WORKSACE_PATH = "Analytics Vidhya Evidently Guide"
# Creating a client
client = CollectorClient("http://localhost:8001")
# Loading data
df =pd.read_csv("DelayedFlights.csv")
ref_data=df[:5000]
batch_size=200
curr_data=df[5000:7000]
# Function to create a test suite
def test_suite():
suite= TestSuite(tests=[DataQualityTestPreset()],tags=[])
suite.run(reference_data=ref_data, current_data=curr_data)
return ReportConfig.from_test_suite(suite)
# Function to setup workspace
def workspace_setup():
ws = Workspace.create(WORKSACE_PATH)
project = ws.create_project(PROJECT_NAME)
project.dashboard.add_panel(
DashboardPanelTestSuite(
title="Data Drift Tests",
filter=ReportFilter(metadata_values={}, tag_values=[], include_test_suites=True),
size=WidgetSize.HALF
)
)
project.dashboard.add_panel(
DashboardPanelTestSuite(
title="Data Drift Tests",
filter=ReportFilter(metadata_values={}, tag_values=[], include_test_suites=True),
size=WidgetSize.HALF,
panel_type=TestSuitePanelType.DETAILED
)
)
project.save()
# Function to setup config
def setup_config():
ws = Workspace.create(WORKSACE_PATH)
project = ws.search_project(PROJECT_NAME)[0]
test_conf = CollectorConfig(trigger=IntervalTrigger(interval=5),
report_config=test_suite(), project_id=str(project.id))
client.create_collector(COLLECTOR_TEST_ID, test_conf)
client.set_reference(COLLECTOR_TEST_ID, ref_data)
# Function to send data
def send_data():
print("Start sending data")
for i in range(2):
try:
data = curr_data[i * batch_size : (i + 1) * batch_size]
client.send_data(COLLECTOR_TEST_ID, data)
print("sent")
except RequestException as e:
print(f"collector service is not available: {e.__class__.__name__}")
time.sleep(1)
# Main function
def main():
workspace_setup()
setup_config()
send_data()
# Running the main function
if __name__ =='__main__':
main()
Output:
We can integrate Evidently, with Grafana Dashboard, we use PostgreSQL database, to store the metrics results.
Our docker file, in which it consists of all necessary dependencies.
version: '3.7'
volumes:
grafana_data: {}
networks:
front-tier:
back-tier:
services:
db:
image: postgres
restart: always
environment:
POSTGRES_PASSWORD: example
ports:
- "5432:5432"
networks:
- back-tier
adminer:
image: adminer
restart: always
ports:
- "8080:8080"
networks:
- back-tier
- front-tier
grafana:
image: grafana/grafana:8.5.21
user: "472"
ports:
- "3000:3000"
volumes:
- ./config/grafana_datasources.yaml:/etc/grafana/provisioning/datasources/datasource.yaml:ro
- ./config/grafana_dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml:ro
- ./dashboards:/opt/grafana/dashboards
networks:
- back-tier
- front-tier
restart: always
import datetime
import time
import logging
import psycopg
import pandas as pd
from evidently.metric_preset import DataQualityPreset
from sklearn import datasets
from evidently.test_preset import DataQualityTestPreset
from evidently.report import Report
from evidently.metrics import ColumnDriftMetric, Dataset
DriftMetric
# Configure logging settings
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s]: %(message)s")
# Define SQL statement to create table for storing drift metrics
create_table_statement = """
drop table if exists drift_metrics;
create table drift_metrics(
timestamp timestamp,
target_drift float,
share_drifted_columns float
)
# Read dataset
df=pd.read_csv("/home/vishal/mlflow_Evidently/DelayedFlights.csv")
# Define reference and production simulation data
reference_data = df[5000:5500]
prod_simulation_data = df[7000:]
mini_batch_size = 50
# Function to prepare database for storing drift metrics
def prep_db():
# Connect to PostgreSQL and create database if it doesn't exist
with psycopg.connect("host=localhost port=5432 user=postgres password=example", autocommit=True) as conn:
res = conn.execute("SELECT 1 FROM pg_database WHERE datname='test'")
if len(res.fetchall()) == 0:
conn.execute("create database test;")
# Connect to the 'test' database and create table for drift metrics
with psycopg.connect("host=localhost port=5432 dbname=test user=postgres password=example") as conn:
conn.execute(create_table_statement)
# Function to calculate drift metrics and store them in PostgreSQL
def calulate_metrics_postgresql(curr, i):
# Initialize report for data quality analysis
report = Report(metrics=[
DataQualityPreset(),
])
# Run the report on reference and current data
report.run(reference_data=reference_data, current_data=prod_simulation_data[i*mini_batch_size : (i+1)*mini_batch_size])
result = report.as_dict()
# Extract drift metrics from the report results
target_drift = result['metrics'][1]['result']['drift_score']
share_drifted_columns = result['metrics'][0]['result']['share_of_drifted_columns']
# Insert metrics into the 'drift_metrics' table
curr.execute(
"insert into drift_metrics(timestamp, target_drift, share_drifted_columns) values (%s, %s, %s)",
(datetime.datetime.now(), target_drift, share_drifted_columns)
)
# Function to perform batch monitoring and backfill drift metrics into PostgreSQL
def batch_monitoring_backfill():
# Prepare the database
prep_db()
# Connect to the 'test' database and iterate over mini-batches of data
with psycopg.connect("host=localhost port=5432 dbname=test user=postgres password=example", autocommit=True) as conn:
for i in range(50):
with conn.cursor() as curr:
# Calculate and store drift metrics for each mini-batch
calulate_metrics_postgresql(curr, i)
# Log progress and wait before processing the next mini-batch
logging.info("data sent")
time.sleep(3)
# Entry point of the script
if __name__ == '__main__':
batch_monitoring_backfill()
To execute the docker file,
docker compose-up --build
python grafana.py
Output:
In this guide, we have learned how to create default and custom test suites, presets, and metrics for Data Quality, Data Drift, Target Drift, and Model Performance drift. We also learned how to integrate tools like AirFlow, MLflow, Prefect with Evidently, and how to create Evidently Dashboards, for effective monitoring. This guide would have provided you the enough knowledge about ML Monitoring and observability in the production Environment, to implement in your upcoming projects.
A. ZenML acts as an MLOps orchestration platform, in which we can integrate all our MLOps stack components, helping us in tracking experiments.
A. Neptune.ai is a centralized experiment-tracking platform that helps us in tracking all our data and model artifacts, codes, reports, visualizations, etc.,
A. For effective ML Monitoring, it is advised to utilize data quality tests on raw datasets, while conducting other tests and reports on the clean, processed dataset.
A. No, model re-training is not automated, and it should be the last consideration to be taken, there are high chance that the batch dataset, may be broken and its size also will not be sufficient to train our model again, so the decision to re-train is left out to the Data scientists and ML engineers, collaborating with the domain experts, after the failed alerts were received.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.