Data is fuel for the IT industry and the Data Science Project in today’s online world. IT industries rely heavily on real-time insights derived from streaming data sources. Handling and processing the streaming data is the hardest work for Data Analysis. We know that streaming data is data that is emitted at high volume in a continuous processing which means that the data changes every second. To handle this data we are using Confluent Platform – A self-managed, enterprise-grade distribution of Apache Kafka.
Kafka is a distributed, fault that can be handled by architecture, which serves as a popular choice for managing high-throughput data streams. The data then from Kafka are collected in the MongoDB in the form of collections. In this article, we are going to create the end-to-end pipeline in which data are fetched with the help of API in the Pipeline and then collected in Kafka in the form of topics and then stored in MongoDB from there we can use it in the project or do the feature engineering.
This article was published as a part of the Data Science Blogathon.
To handle the streaming data that is coming from the sensor, vehicles that contain sensor data are produced per second and it is hard to handle and preprocess the data to use in the Data Science project. So, to address this problem we are creating the end-to-end pipeline which handles the data and stores the data.
Streaming data is data that is continuously generated by different sources which is unstructured data. It refers to a continuous flow of data generated from different sources in real-time or near real-time. In traditional batch processing where data is collected and this streaming data is processed as it is generated. Streaming data can be IOT data like temperature sensors and GPS trackers, or Machine Data Like data this is generated by machines and industrial equipment like telemetry data from vehicles and manufacturing machinery. There are streaming data processing platforms such as Apache-Kafka.
Apache Kafka is a platform used for building real-time data pipelines and streaming applications. The Kafka Streams API is a powerful library that allows for on-the-fly processing, letting you collect and create windowing parameters, perform joins of data within a stream, and more. Apache Kafka consists of a storage layer and a compute layer that combines efficient, real-time data ingestion, streaming data pipelines, and storage across systems.
This is a machine learning pipeline to help us know how to publish and process the data to and from Kafka confluent in JSON format. There are two parts of kafka data processing consumer and producer. To store the streaming data from the different producers and store it in confluent and then deserialization on data is done and that data are stored in Database.
We are processing the streaming data with the help of confluent kafka and the Kafka is divided into two two-part:
Start
│
├─ Kafka Consumer (Read from Kafka Topic)
│ ├─ Process Message
│ └─ Store Data in MongoDB
│
├─ Kafka Producer (Generate Sensor Data)
│ ├─ Send Data to Kafka Topic
│ └─ Repeat
│
End
This shows the flowchart of the project how the folder and the files are divided in project:
flowchart/
│
├── consumer.drawio.svg
├── flow of kafka.drawio
└── producer.drawio.svg
sample_data/
│
└── aps_failure_training_set1.csv
env/
sensor_data-pipeline_kafka
/
│
├── src/
│ ├── database/
│ │ ├── mongodb.py
│ │
│ ├── kafka_config/
│ │ └──__init__.py/
│ │
│ │
│ ├── constant/
│ │ ├── __init__.py
│ │
│ │
│ ├── data_access/
│ │ ├── user_data.py
│ │ └── user_embedding_data.py
│ │
│ ├── entity/
│ │ ├── __init__.py
│ │ └── generic.py
│ │
│ ├── exception/
│ │ └── (exception handling)
│ │
│ ├── kafka_logger/
│ │ └── (logging configuration)
│ │
│ ├── kafka_consumer/
│ │ └── util.py
│ │
│ └── __init__.py
│
└── logs/
└── (log files)
.dockerignore
.gitignore
Dockerfile
schema.json
consumer_main.py
producer_main.py
requirments.txt
setup.py
Start
│
├── Kafka Producer ──────────────────┐
│ ├── Generate Sensor Data │
│ └── Publish Data to Kafka Topic │
│ │
│ └── Error Handling
│ │
├── Kafka Broker(s) ─────────────────┤
│ ├── Store and Replicate Data │
│ └── Handle Data Partitioning │
│ │
├── Kafka Consumer(s) ───────────────┤
│ ├── Read Data from Kafka Topic │
│ ├── Process Data │
│ └── Store Data in MongoDB │
│ │
│ └── Error Handling
│ │
├── MongoDB ────────────────────────┤
│ ├── Store Sensor Data │
│ ├── Provide Query Interface │
│ └── Ensure Data Durability │
│ │
└── End │
Create signup and then sign in on the MongoDB Atlas and save the connection link of the Mongodb Atlas for further use.
conda create -p venv python==3.10 -y
conda activate venv/
pip install -r requirements.txt
API_KEY
API_SECRET_KEY
BOOTSTRAP_SERVER
SCHEMA_REGISTRY_API_KEY
SCHEMA_REGISTRY_API_SECRET
ENDPOINT_SCHEMA_URL
Update the credential in .env file and run below command to run your application in the docker.
API_KEY=asgdakhlsa
API_SECRET_KEY=dsdfsdf
BOOTSTRAP_SERVER=sdfasd
SCHEMA_REGISTRY_API_KEY=sdfsaf
SCHEMA_REGISTRY_API_SECRET=sdfasdf
ENDPOINT_SCHEMA_URL=sdafasf
MONGO_DB_URL=sdfasdfas
python producer_main.py
python consumer_main.py
sensor_data-pipeline_kafka/
│
├── src/
│ ├── consumer/
│ │ ├── __init__.py
│ │ └── kafka_consumer.py
│ │
│ ├── producer/
│ │ ├── __init__.py
│ │ └── kafka_producer.py
│ │
│ └── __init__.py
│
├── README.md
└── requirements.txt
mongodb.py: To Connect the MongoDB Altas through the link we are writing the python script
import pymongo
import os
import certifi
ca = certifi.where()
db_link ="mongodb+srv://Neha:<password>@cluster0.jsogkox.mongodb.net/"
class MongodbOperation:
def __init__(self) -> None:
#self.client = pymongo.MongoClient(os.getenv('MONGO_DB_URL'),tlsCAFile=ca)
self.client = pymongo.MongoClient(db_link,tlsCAFile=ca)
self.db_name="NehaDB"
def insert_many(self,collection_name,records:list):
self.client[self.db_name][collection_name].insert_many(records)
def insert(self,collection_name,record):
self.client[self.db_name][collection_name].insert_one(record)
enter your URL which is copy from the MongoDb Altas
Output:
from src.kafka_producer.json_producer import product_data_using_file
from src.constant import SAMPLE_DIR
import os
if __name__ == '__main__':
topics = os.listdir(SAMPLE_DIR)
print(f'topics: [{topics}]')
for topic in topics:
sample_topic_data_dir = os.path.join(SAMPLE_DIR,topic)
sample_file_path = os.path.join(sample_topic_data_dir,os.listdir(sample_topic_data_dir)[0])
product_data_using_file(topic=topic,file_path=sample_file_path)
This file runs then we call the python producer_main.py and this is going to call below file:
import argparse
from uuid import uuid4
from src.kafka_config import sasl_conf, schema_config
from six.moves import input
from src.kafka_logger import logging
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
import pandas as pd
from typing import List
from src.entity.generic import Generic, instance_to_dict
FILE_PATH = "C:/Users/RAJIV/Downloads/ml-data-pipeline-main/sample_data/kafka-sensor-topic.csv"
def delivery_report(err, msg):
"""
Reports the success or failure of a message delivery.
Args:
err (KafkaError): The error that occurred on None on success.
msg (Message): The message that was produced or failed.
"""
if err is not None:
logging.info("Delivery failed for User record {}: {}".format(msg.key(), err))
return
logging.info('User record {} successfully produced to {} [{}] at offset {}'\
.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
def product_data_using_file(topic,file_path):
logging.info(f"Topic: {topic} file_path:{file_path}")
schema_str = Generic.get_schema_to_produce_consume_data(file_path=file_path)
schema_registry_conf = schema_config()
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
string_serializer = StringSerializer('utf_8')
json_serializer = JSONSerializer(schema_str, schema_registry_client, \
instance_to_dict)
producer = Producer(sasl_conf())
print("Producing user records to topic {}. ^C to exit.".format(topic))
# while True:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.0)
try:
for instance in Generic.get_object(file_path=file_path):
print(instance)
logging.info(f"Topic: {topic} file_path:{instance.to_dict()}")
producer.produce(topic=topic,
key=string_serializer(str(uuid4()), instance.to_dict()),
value=json_serializer(instance, \
SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
print("\nFlushing records...")
producer.flush()
except KeyboardInterrupt:
pass
except ValueError:
print("Invalid input, discarding record...")
pass
Output:
from src.kafka_consumer.json_consumer import consumer_using_sample_file
from src.constant import SAMPLE_DIR
import os
if __name__=='__main__':
topics = os.listdir(SAMPLE_DIR)
print(f'topics: [{topics}]')
for topic in topics:
sample_topic_data_dir = os.path.join(SAMPLE_DIR,topic)
sample_file_path = os.path.join(sample_topic_data_dir,os.listdir(sample_topic_data_dir)[0])
consumer_using_sample_file(topic="kafka-sensor-topic",file_path = sample_file_path)
This file runs then we call the python consumer_main.py and this is going to call below file:
import argparse
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
from src.entity.generic import Generic
from src.kafka_config import sasl_conf
from src.database.mongodb import MongodbOperation
def consumer_using_sample_file(topic,file_path):
schema_str = Generic.get_schema_to_produce_consume_data(file_path=file_path)
json_deserializer = JSONDeserializer(schema_str,
from_dict=Generic.dict_to_object)
consumer_conf = sasl_conf()
consumer_conf.update({
'group.id': 'group7',
'auto.offset.reset': "earliest"})
consumer = Consumer(consumer_conf)
consumer.subscribe([topic])
mongodb = MongodbOperation()
records = []
x = 0
while True:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = consumer.poll(1.0)
if msg is None:
continue
record: Generic = json_deserializer(msg.value(), \
SerializationContext(msg.topic(), MessageField.VALUE))
# mongodb.insert(collection_name="car",record=car.record)
if record is not None:
records.append(record.to_dict())
if x % 5000 == 0:
mongodb.insert_many(collection_name="sensor", records=records)
records = []
x = x + 1
except KeyboardInterrupt:
break
consumer.close()
Output:
When we run both the consumer and producer then the system is running on the kafka and the information/data are collected faster
Output:
From MongoDB we are using this data to preprocess in EDA, feature engineering and data analytics work are done on this data.
In this article, we understand how we are storing and processing the streaming data from the sensor to the Kafka in the form of JSON format then we store the data to MongoDB. We know that streaming data is data that is emitted at high volume in a continuous processing which means that the data changes every second. We have created the end-to-end pipeline in which data are fetched with the help of API in the Pipeline and then collected in Kafka in the form of topics and then stored in MongoDB from there we can use it in the project or do the feature engineering.
A. MongoDB stores the data in unstructured data. streaming data are unstructured forms of data for memory utilization we are using MongoDB as the database.
A. The purpose is to create a real-time data processing pipeline where data ingested into Kafka topics can be consumed, processed, and stored in MongoDB for further analysis, reporting, or application usage.
A. Use cases include real-time analytics, IoT data processing, log aggregation, social media monitoring, and recommendation systems, where streaming data needs to be processed and stored for further analysis or application usage.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.