Setting up Real-time Structured Streaming with Spark and Kafka on Windows OS

Siddharth Last Updated : 29 Jun, 2021
6 min read

This article was published as a part of the Data Science Blogathon

Introduction

One of the major problem everyone face when they first try structured streaming is setting up the required environment to stream their data. We have a few tutorials online about how we can set up this. Most of them focus on asking you to install a virtual machine and an ubuntu operating system on it and then set up all the required files by changing the bash file. This works fine, but not for everyone. Once we use a virtual machine, sometimes we may have to wait long if we have machines with lower memory. The process could get stuck due to memory lag issues. So, for a better way of doing it and easy operation, I will show you how we can set up Structured streaming on our Windows operating system.

Tools used

For the setup we use the following tools:

1. Kafka (For streaming of data – acts as producer)

2. Zookeeper

3. Pyspark (For generating the streamed data – acts as a consumer)

4. Jupyter Notebook (Code Editor)

Environment variables

Important to note that here, I have added all files into C drive. Also naming should be the same as that files you install online.

We have to set up the environment variables as we go on installing these files. Refer to these images during the installation for a hassle-free experience.

The last image is Path from System variables.

 

Real-time Structured streaming environment variables

 

Real-time Structured streaming 2

 

Real-time Structured streaming 3

Required files

https://drive.google.com/drive/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw?usp=sharing

Installing Kafka

The first step is to install Kafka in our system. To do this we have to go to this link:

https://dzone.com/articles/running-apache-kafka-on-windows-os

 We require to install Java 8 initially and set up the environment variables. You can get all the instructions from the link.

Once we are done with Java, we must install a zookeeper. I have added zookeeper files into google drive. Feel free to use it or just follow all the instructions given in the link. If you have installed zookeeper correctly and set up the environment variable, you can see this output when you run zkserver as admin in the command prompt.

prompt

Next, install Kafka as per instruction in the link and run it using the command specified.

.binwindowskafka-server-start.bat .configserver.properties

Once everything is set up try creating a topic and checking if it’s working properly. If it does, you have completed Kafka installation.

Real-time Structured streaming kafka install

Installing Spark 

In this step, we install spark. You can basically follow this link to set up spark on your windows machine.

https://phoenixnap.com/kb/install-spark-on-windows-10

During one of the steps, it will ask for the winutils file to be set up. For your ease, I have added the file in the drive link I shared. In a folder called Hadoop. Just put that folder on your C drive and set up the environment variable as shown in the images.  I would highly recommend you use the spark file I have added to google drive. One of the main reasons is to stream data we need to manually set up a structured streaming environment. In our case, I set up all the required things and modified the files after testing a lot. In case you want to freshly set up, feel free to do so. If the setup doesn’t go correctly, we end up with an error like this while streaming data in pyspark:

Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

Once we are one with spark, we can now stream the required data from a CSV file in a producer and get it in a consumer using Kafka topic. I mostly work with Jupiter notebook and so, I have used a notebook for this tutorial.

In your notebook first, you have to install few libraries:

1. pip install pyspark

2. pip install Kafka

3. pip install py4j

How does structured streaming work with Pyspark?

 

Real-time Structured streaming pyspark

We have a CSV file that has data we want to stream. Let us proceed with the classic Iris dataset. Now if we want to stream the iris data, we need to use Kafka as a producer. Kafka, we create a topic to which we stream the iris data and the consumer can retrieve data frame this topic.

The following is the producer code to stream iris data:

import pandas as pd
from kafka import KafkaProducer
from datetime import datetime
import time
import random
import numpy as np

# pip install kafka-python

KAFKA_TOPIC_NAME_CONS = "Topic"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092'

if __name__ == "__main__":
    print("Kafka Producer Application Started ... ")

    kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
                                       value_serializer=lambda x: x.encode('utf-8'))
    
    filepath = "IRIS.csv"
    
    flower_df = pd.read_csv(filepath)
  
    flower_df['order_id'] = np.arange(len(flower_df))

    
    flower_list = flower_df.to_dict(orient="records")
       

    message_list = []
    message = None
    for message in flower_list:
        
        message_fields_value_list = []
               
        message_fields_value_list.append(message["order_id"])
        message_fields_value_list.append(message["sepal_length"])
        message_fields_value_list.append(message["sepal_width"])
        message_fields_value_list.append(message["petal_length"])
        message_fields_value_list.append(message["petal_width"])
        message_fields_value_list.append(message["species"])

        message = ','.join(str(v) for v in message_fields_value_list)
        print("Message Type: ", type(message))
        print("Message: ", message)
        kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, message)
        time.sleep(1)


    print("Kafka Producer Application Completed. ")

To start producer, we have to run zkserver as admin in windows command prompt and then start Kafka using:  .binwindowskafka-server-start.bat .configserver.properties from command prompt in Kafka directory. If you get a “no broker” error then it means Kafka isn’t running properly.

The output after running this code on jupyter notebook looks like this:

output 1

Now, let us check the consumer. Run the following code to see if it works fine in a new notebook.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random

import time

kafka_topic_name = "Topic"
kafka_bootstrap_servers = 'localhost:9092'

spark = SparkSession \
        .builder \
        .appName("Structured Streaming ") \
        .master("local[*]") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Construct a streaming DataFrame that reads from topic
flower_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

flower_df1 = flower_df.selectExpr("CAST(value AS STRING)", "timestamp")


flower_schema_string = "order_id INT,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,species STRING"



flower_df2 = flower_df1 \
        .select(from_csv(col("value"), flower_schema_string) \
                .alias("flower"), "timestamp")


flower_df3 = flower_df2.select("flower.*", "timestamp")

    
flower_df3.createOrReplaceTempView("flower_find");
song_find_text = spark.sql("SELECT * FROM flower_find")
flower_agg_write_stream = song_find_text \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("testedTable") \
        .start()

flower_agg_write_stream.awaitTermination(1)

Once you run this you should obtain an output like this:

output 2

As you can see I have run few queries and checked if data was streaming. The first time count was 5 and after few seconds count increased to 14 which confirms that data is streaming.

Here, basically, the idea is to create a spark context. We get the data using Kafka streaming on our Topic on the specified port. A spark session can be created using the getOrCreate() as shown in the code. The next step includes reading the Kafka stream and the data can be loaded using the load(). Since the data is streaming, it would be useful to have a timestamp at which each of the records has arrived. We specify the schema as we do in our SQL and finally create a data frame with the values of streamed data with their timestamp. At last with a processing time of 5 seconds, we can receive data in batches. We make use of SQL View to temporarily store the data in memory in append mode and we can perform all operations on it using our spark data frame.

Refer to the complete code here:

https://github.com/Siddharth1698/Structured-Streaming-Tutorial

This is one of my spark streaming project, you can refer to this for more detailed queries and use of machine learning in spark:

https://github.com/Siddharth1698/Spotify-Recommendation-System-using-Pyspark-and-Kafka

References

1. https://github.com/Siddharth1698/Structured-Streaming-Tutorial

2. https://dzone.com/articles/running-apache-kafka-on-windows-os

3. https://phoenixnap.com/kb/install-spark-on-windows-10

4. https://drive.google.com/drive/u/0/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw

5. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

6. Thumbnail Image -> https://unsplash.com/photos/ImcUkZ72oUs

Conclusion

If you follow these steps, you can easily set up all the environments as well as run your first structured streaming program with spark and Kafka. In case of any difficulties in setting it up, feel free to contact me :

[email protected]

https://www.linkedin.com/in/siddharth-m-426a9614a/

The media shown in this article are not owned by Analytics Vidhya and are used at the Author’s discretion.

Responses From Readers

Clear

We use cookies essential for this site to function well. Please click to help us improve its usefulness with additional cookies. Learn about our use of cookies in our Privacy Policy & Cookies Policy.

Show details