Kafka to MongoDB: Building a Streamlined Data Pipeline

Neha Vishwakarma Last Updated : 14 Mar, 2024
10 min read

Introduction

Data is fuel for the IT industry and the Data Science Project in today’s online world. IT industries rely heavily on real-time insights derived from streaming data sources. Handling and processing the streaming data is the hardest work for Data Analysis. We know that streaming data is data that is emitted at high volume in a continuous processing which means that the data changes every second. To handle this data we are using Confluent Platform – A self-managed, enterprise-grade distribution of Apache Kafka.

Kafka is a distributed, fault that can be handled by architecture, which serves as a popular choice for managing high-throughput data streams. The data then from Kafka are collected in the MongoDB in the form of collections. In this article, we are going to create the end-to-end pipeline in which data are fetched with the help of API in the Pipeline and then collected in Kafka in the form of topics and then stored in MongoDB from there we can use it in the project or do the feature engineering.

Data Pipeline

Learning Objectives

  • Learn what is streaming data and how to handle streaming data with the help of Kafka.
  • Understand the Confluent Platform – A self-managed, enterprise-grade distribution of Apache Kafka.
  • Store the data which is collected by Kafka into the MongoDB which is a NoSQL database that stores the unstructured data.
  • Create a fully end-to-end pipeline to fetch and store the data in the Database.

This article was published as a part of the Data Science Blogathon.

Identify the Problem

To handle the streaming data that is coming from the sensor, vehicles that contain sensor data are produced per second and it is hard to handle and preprocess the data to use in the Data Science project. So, to address this problem we are creating the end-to-end pipeline which handles the data and stores the data.

What is Streaming Data?

Streaming data is data that is continuously generated by different sources which is unstructured data. It refers to a continuous flow of data generated from different sources in real-time or near real-time. In traditional batch processing where data is collected and this streaming data is processed as it is generated. Streaming data can be IOT data like temperature sensors and GPS trackers, or Machine Data Like data this is generated by machines and industrial equipment like telemetry data from vehicles and manufacturing machinery. There are streaming data processing platforms such as Apache-Kafka.

What is Kafka?

Apache Kafka is a platform used for building real-time data pipelines and streaming applications. The Kafka Streams API is a powerful library that allows for on-the-fly processing, letting you collect and create windowing parameters, perform joins of data within a stream, and more. Apache Kafka consists of a storage layer and a compute layer that combines efficient, real-time data ingestion, streaming data pipelines, and storage across systems.

What Will Be the Approach?

This is a machine learning pipeline to help us know how to publish and process the data to and from Kafka confluent in JSON format. There are two parts of kafka data processing consumer and producer. To store the streaming data from the different producers and store it in confluent and then deserialization on data is done and that data are stored in Database.

Kafka

System Architecture Overview

We are processing the streaming data with the help of confluent kafka and the Kafka is divided into two two-part:

  • Kafka Producer: The Kafka Producer is responsible for producing and sending data to Kafka topics.
  • Kafka Consumer: Kafka Consumer is to read and process data from the Kafka topics.
Start
│
├─ Kafka Consumer (Read from Kafka Topic)
│   ├─ Process Message
│   └─ Store Data in MongoDB
│
├─ Kafka Producer (Generate Sensor Data)
│   ├─ Send Data to Kafka Topic
│   └─ Repeat
│
End

What are Components?

  • Topics: Topics are logical channels or categories which are published by producers. Each topic is divided into one or more partitions, and each topic is replicated across multiple brokers for fault tolerance. Producers publish data on specific topics, and consumers subscribe to topics to use data.
  • Producers: An Apache Kafka Producer is a client application that publishes (writes) events to a Kafka-cluster. Applications that send data into topics are known as Kafka producers. This section gives an overview of the Kafka producer.
Producer
  • Consumer: Kafka consumers are responsible for reading data and processing from Kafka topics and processing them. Consumers can be part of any application that needs to use and react to data from Kafka. They subscribe to one or more topics and data from Kafka brokers. Consumers can be organized into consumer groups, where each consumer group has one or more consumers and each topic of a topic is consumed by only one consumer within the group. This allows for parallel processing and load balancing of data consumption.
Consumer

What is the Project Structure?

This shows the flowchart of the project how the folder and the files are divided in project:

flowchart/
│
├── consumer.drawio.svg
├── flow of kafka.drawio
└── producer.drawio.svg

sample_data/
│
└── aps_failure_training_set1.csv

env/

sensor_data-pipeline_kafka
/
│
├── src/
│   ├── database/
│   │   ├── mongodb.py
│   │
│   ├── kafka_config/
│   │   └──__init__.py/
│   │       
│   │
│   ├── constant/
│   │   ├── __init__.py
│   │ 
│   │
│   ├── data_access/
│   │   ├── user_data.py
│   │   └── user_embedding_data.py
│   │
│   ├── entity/
│   │   ├── __init__.py
│   │   └── generic.py
│   │
│   ├── exception/
│   │   └── (exception handling)
│   │
│   ├── kafka_logger/
│   │   └── (logging configuration)
│   │
│   ├── kafka_consumer/
│   │   └── util.py
│   │
│   └── __init__.py
│
└── logs/
    └── (log files)



.dockerignore
.gitignore
Dockerfile
schema.json
consumer_main.py
producer_main.py
requirments.txt
setup.py
  • Kafka Producer: The producer is the main part that generates sensor data (e.g., from CSV files in sample_data/) and publishes it to a Kafka topic. error conditions that may occur during data generation or publishing.
  • Kafka Broker(s): Kafka brokers store and replicate data across the Kafka cluster, handle data partitioning, and ensure fault tolerance and high availability.
  • Kafka Consumer(s): Consumers read data from Kafka topics, process it (e.g., transformations, aggregations), and store it in MongoDB. They also monitor error conditions that may occur during data processing.
  • MongoDB: MongoDB stores the sensor data received from Kafka consumers. It provides a query for data retrieval and ensures data durability through replication and fault tolerance mechanisms.
Start
│
├── Kafka Producer ──────────────────┐
│   ├── Generate Sensor Data         │
│   └── Publish Data to Kafka Topic  │
│                                    │
│   └── Error Handling                
│                                    │
├── Kafka Broker(s) ─────────────────┤
│   ├── Store and Replicate Data     │
│   └── Handle Data Partitioning     │
│                                    │
├── Kafka Consumer(s) ───────────────┤
│   ├── Read Data from Kafka Topic   │
│   ├── Process Data                 │
│   └── Store Data in MongoDB        │
│                                    │
│   └── Error Handling                
│                                    │
├── MongoDB ────────────────────────┤
│   ├── Store Sensor Data            │
│   ├── Provide Query Interface      │
│   └── Ensure Data Durability       │
│                                    │
└── End                              │

Prerequisites For Data Storing

Confluent Kafka

  • Create Account: To understand Kafka, you need Confluent You love Apache Kafka®, but not managing it. Cloud-native, complete, and fully managed service goes above & beyond Kafka so your best people can focus on delivering value to your business.
Kafka
  • Create Topics:
  • Go to the Home page and on the sidebar
  • Go to Environment then click on Default
  • Go to the topics
  • select the New topic give the name of the topic

MongoDB

Create signup and then sign in on the MongoDB Atlas and save the connection link of the Mongodb Atlas for further use. 

Mongo DB

Step-by-Step Guide for Project Setup

  • Python Installation: Ensure Python is installed on your machine. You can download and install Python from the official website.
  • Conda version: Check the conda version in the terminal.
  • Virtual Environment Creation: Create a virtual environment using venv.
conda create -p venv python==3.10 -y
  • Virtual Environment Activation: Activate the virtual environment:
conda activate venv/
  • Install Required Packages: Use pip to install the necessary dependencies listed in the requirements.txt file:
pip install -r requirements.txt
  • we have to set some of the environment variables in local system This is the confluent cloud Cluster Environment Variable
API_KEY
API_SECRET_KEY
BOOTSTRAP_SERVER
SCHEMA_REGISTRY_API_KEY
SCHEMA_REGISTRY_API_SECRET
ENDPOINT_SCHEMA_URL

Environment Variable

Update the credential in .env file and run below command to run your application in the docker.

  • Create .env file in root dir of your project if it is not available paste the below content and update the credentials
API_KEY=asgdakhlsa
API_SECRET_KEY=dsdfsdf
BOOTSTRAP_SERVER=sdfasd
SCHEMA_REGISTRY_API_KEY=sdfsaf
SCHEMA_REGISTRY_API_SECRET=sdfasdf
ENDPOINT_SCHEMA_URL=sdafasf
MONGO_DB_URL=sdfasdfas

  • Running the Producer and consumer file:
python producer_main.py 
python consumer_main.py

How to Implement Code?

  • src/: This directory is the main folder for all source code files. Within this directory, we have the following subdirectories:
    consumer/: This directory contains the code for the Kafka_consumer, responsible for reading data from Kafka topics and processing it.
    producer/: This directory contains the code for the Kafka_producer, responsible for generating and sending sensor data to Kafka topics.
  • README.md: This Markdown file contains documentation and instructions for the project, including an overview of its purpose, the instructions, usage guidelines, and any information.
  • requirements.txt: This file lists the Python library required for project. Each dependency is listed along with its version number. Tools like pip can use this file to install the necessary dependencies automatically.
sensor_data-pipeline_kafka/
│
├── src/
│   ├── consumer/
│   │   ├── __init__.py
│   │   └── kafka_consumer.py
│   │
│   ├── producer/
│   │   ├── __init__.py
│   │   └── kafka_producer.py
│   │
│   └── __init__.py
│
├── README.md
└── requirements.txt

mongodb.py: To Connect the MongoDB Altas through the link we are writing the python script

import pymongo
import os


import certifi
ca = certifi.where()
db_link ="mongodb+srv://Neha:<password>@cluster0.jsogkox.mongodb.net/"

class MongodbOperation:

    def __init__(self) -> None:

        #self.client = pymongo.MongoClient(os.getenv('MONGO_DB_URL'),tlsCAFile=ca)
        self.client = pymongo.MongoClient(db_link,tlsCAFile=ca)
        self.db_name="NehaDB"

    def insert_many(self,collection_name,records:list):
        self.client[self.db_name][collection_name].insert_many(records)
        

    def insert(self,collection_name,record):
        self.client[self.db_name][collection_name].insert_one(record)
        

enter your URL which is copy from the MongoDb Altas

Output:

from src.kafka_producer.json_producer import product_data_using_file
from src.constant import SAMPLE_DIR
import os
if __name__ == '__main__':
    
    topics = os.listdir(SAMPLE_DIR)
    print(f'topics: [{topics}]')
    for topic in topics:
        sample_topic_data_dir = os.path.join(SAMPLE_DIR,topic)
        sample_file_path = os.path.join(sample_topic_data_dir,os.listdir(sample_topic_data_dir)[0])
        product_data_using_file(topic=topic,file_path=sample_file_path)
        

This file runs then we call the python producer_main.py  and this is going to call below file:

import argparse
from uuid import uuid4
from src.kafka_config import sasl_conf, schema_config
from six.moves import input
from src.kafka_logger import logging
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
import pandas as pd
from typing import List
from src.entity.generic import Generic, instance_to_dict

FILE_PATH = "C:/Users/RAJIV/Downloads/ml-data-pipeline-main/sample_data/kafka-sensor-topic.csv"



def delivery_report(err, msg):
    """
    Reports the success or failure of a message delivery.
    Args:
        err (KafkaError): The error that occurred on None on success.
        msg (Message): The message that was produced or failed.
    """

    if err is not None:
        logging.info("Delivery failed for User record {}: {}".format(msg.key(), err))
        return
    logging.info('User record {} successfully produced to {} [{}] at offset {}'\
    .format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))


def product_data_using_file(topic,file_path):
    logging.info(f"Topic: {topic} file_path:{file_path}")
    schema_str = Generic.get_schema_to_produce_consume_data(file_path=file_path)
    schema_registry_conf = schema_config()
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)
    string_serializer = StringSerializer('utf_8')
    json_serializer = JSONSerializer(schema_str, schema_registry_client, \
    instance_to_dict)
    producer = Producer(sasl_conf())

    print("Producing user records to topic {}. ^C to exit.".format(topic))
    # while True:
    # Serve on_delivery callbacks from previous calls to produce()
    producer.poll(0.0)
    try:
        for instance in Generic.get_object(file_path=file_path):
            print(instance)
            logging.info(f"Topic: {topic} file_path:{instance.to_dict()}")
            producer.produce(topic=topic,
                             key=string_serializer(str(uuid4()), instance.to_dict()),
                             value=json_serializer(instance, \
                             SerializationContext(topic, MessageField.VALUE)),
                             on_delivery=delivery_report)
            print("\nFlushing records...")
            producer.flush()
    except KeyboardInterrupt:
        pass
    except ValueError:
        print("Invalid input, discarding record...")
        pass

    

Output:

from src.kafka_consumer.json_consumer import consumer_using_sample_file

from src.constant import SAMPLE_DIR
import os
if __name__=='__main__':

    topics = os.listdir(SAMPLE_DIR)
    print(f'topics: [{topics}]')
    for topic in topics:
        sample_topic_data_dir = os.path.join(SAMPLE_DIR,topic)
        sample_file_path = os.path.join(sample_topic_data_dir,os.listdir(sample_topic_data_dir)[0])
        consumer_using_sample_file(topic="kafka-sensor-topic",file_path = sample_file_path)
        

This file runs then we call the python consumer_main.py  and this is going to call below file:

import argparse

from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
from src.entity.generic import Generic
from src.kafka_config import sasl_conf
from src.database.mongodb import MongodbOperation

def consumer_using_sample_file(topic,file_path):
    schema_str = Generic.get_schema_to_produce_consume_data(file_path=file_path)
    json_deserializer = JSONDeserializer(schema_str,
                                         from_dict=Generic.dict_to_object)

    consumer_conf = sasl_conf()
    consumer_conf.update({
        'group.id': 'group7',
        'auto.offset.reset': "earliest"})

    consumer = Consumer(consumer_conf)
    consumer.subscribe([topic])

    mongodb = MongodbOperation()
    records = []
    x = 0
    while True:
        try:
            # SIGINT can't be handled when polling, limit timeout to 1 second.
            msg = consumer.poll(1.0)
            if msg is None:
                continue

            record: Generic = json_deserializer(msg.value(), \
            SerializationContext(msg.topic(), MessageField.VALUE))

            # mongodb.insert(collection_name="car",record=car.record)

            if record is not None:
                records.append(record.to_dict())
                if x % 5000 == 0:
                    mongodb.insert_many(collection_name="sensor", records=records)
                    records = []
            x = x + 1
        except KeyboardInterrupt:
            break

    consumer.close()

Output:

When we run both the consumer and producer then the system is running on the kafka and the information/data are collected faster

Output:

"
Data in MongoDB

From MongoDB we are using this data to preprocess in EDA, feature engineering and data analytics work are done on this data.

Conclusion

In this article, we understand how we are storing and processing the streaming data from the sensor to the Kafka in the form of JSON format then we store the data to MongoDB.  We know that streaming data is data that is emitted at high volume in a continuous processing which means that the data changes every second. We have created the end-to-end pipeline in which data are fetched with the help of API in the Pipeline and then collected in Kafka in the form of topics and then stored in MongoDB from there we can use it in the project or do the feature engineering.

Key Takeaways

  • Learn what is streaming data and how to handle streaming data with the help of Kafka.
  • understand the Confluent Platform – A self-managed, enterprise-grade distribution of Apache Kafka.
  • store the data which is collected by Kafka into the MongoDB which is a NoSQL database that stores the unstructured data.
  • Create a fully end-to-end pipeline to fetch and store the data in the Database.
  • understand the functionality of each component of the project implement it on the docker and implement it on a cloud to use at any time.

Resources

Frequently Asked Questions

Q1. Why we are using MongoDB?

A. MongoDB stores the data in unstructured data. streaming data are unstructured forms of data for memory utilization we are using MongoDB as the database.

Q2. What is the purpose of building a data pipeline from Kafka to MongoDB?

A. The purpose is to create a real-time data processing pipeline where data ingested into Kafka topics can be consumed, processed, and stored in MongoDB for further analysis, reporting, or application usage.

Q3. What are some potential use cases for this Kafka-to-MongoDB data pipeline?

A. Use cases include real-time analytics, IoT data processing, log aggregation, social media monitoring, and recommendation systems, where streaming data needs to be processed and stored for further analysis or application usage.

The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.

I'm Neha Vishwakarma, a data science enthusiast with a background in information technology, dedicated to using data to inform decisions and tackle complex challenges. My passion lies in uncovering hidden insights through numbers.

Responses From Readers

We use cookies essential for this site to function well. Please click to help us improve its usefulness with additional cookies. Learn about our use of cookies in our Privacy Policy & Cookies Policy.

Show details