This article was published as a part of the Data Science Blogathon
One of the major problem everyone face when they first try structured streaming is setting up the required environment to stream their data. We have a few tutorials online about how we can set up this. Most of them focus on asking you to install a virtual machine and an ubuntu operating system on it and then set up all the required files by changing the bash file. This works fine, but not for everyone. Once we use a virtual machine, sometimes we may have to wait long if we have machines with lower memory. The process could get stuck due to memory lag issues. So, for a better way of doing it and easy operation, I will show you how we can set up Structured streaming on our Windows operating system.
For the setup we use the following tools:
1. Kafka (For streaming of data – acts as producer)
2. Zookeeper
3. Pyspark (For generating the streamed data – acts as a consumer)
4. Jupyter Notebook (Code Editor)
Important to note that here, I have added all files into C drive. Also naming should be the same as that files you install online.
We have to set up the environment variables as we go on installing these files. Refer to these images during the installation for a hassle-free experience.
The last image is Path from System variables.
https://drive.google.com/drive/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw?usp=sharing
The first step is to install Kafka in our system. To do this we have to go to this link:
https://dzone.com/articles/running-apache-kafka-on-windows-os
We require to install Java 8 initially and set up the environment variables. You can get all the instructions from the link.
Once we are done with Java, we must install a zookeeper. I have added zookeeper files into google drive. Feel free to use it or just follow all the instructions given in the link. If you have installed zookeeper correctly and set up the environment variable, you can see this output when you run zkserver as admin in the command prompt.
Next, install Kafka as per instruction in the link and run it using the command specified.
.binwindowskafka-server-start.bat .configserver.properties
Once everything is set up try creating a topic and checking if it’s working properly. If it does, you have completed Kafka installation.
In this step, we install spark. You can basically follow this link to set up spark on your windows machine.
https://phoenixnap.com/kb/install-spark-on-windows-10
During one of the steps, it will ask for the winutils file to be set up. For your ease, I have added the file in the drive link I shared. In a folder called Hadoop. Just put that folder on your C drive and set up the environment variable as shown in the images. I would highly recommend you use the spark file I have added to google drive. One of the main reasons is to stream data we need to manually set up a structured streaming environment. In our case, I set up all the required things and modified the files after testing a lot. In case you want to freshly set up, feel free to do so. If the setup doesn’t go correctly, we end up with an error like this while streaming data in pyspark:
Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
Once we are one with spark, we can now stream the required data from a CSV file in a producer and get it in a consumer using Kafka topic. I mostly work with Jupiter notebook and so, I have used a notebook for this tutorial.
In your notebook first, you have to install few libraries:
1. pip install pyspark
2. pip install Kafka
3. pip install py4j
We have a CSV file that has data we want to stream. Let us proceed with the classic Iris dataset. Now if we want to stream the iris data, we need to use Kafka as a producer. Kafka, we create a topic to which we stream the iris data and the consumer can retrieve data frame this topic.
The following is the producer code to stream iris data:
import pandas as pd from kafka import KafkaProducer from datetime import datetime import time import random import numpy as np # pip install kafka-python KAFKA_TOPIC_NAME_CONS = "Topic" KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092' if __name__ == "__main__": print("Kafka Producer Application Started ... ") kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS, value_serializer=lambda x: x.encode('utf-8')) filepath = "IRIS.csv" flower_df = pd.read_csv(filepath) flower_df['order_id'] = np.arange(len(flower_df)) flower_list = flower_df.to_dict(orient="records") message_list = [] message = None for message in flower_list: message_fields_value_list = [] message_fields_value_list.append(message["order_id"]) message_fields_value_list.append(message["sepal_length"]) message_fields_value_list.append(message["sepal_width"]) message_fields_value_list.append(message["petal_length"]) message_fields_value_list.append(message["petal_width"]) message_fields_value_list.append(message["species"]) message = ','.join(str(v) for v in message_fields_value_list) print("Message Type: ", type(message)) print("Message: ", message) kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, message) time.sleep(1) print("Kafka Producer Application Completed. ")
To start producer, we have to run zkserver as admin in windows command prompt and then start Kafka using: .binwindowskafka-server-start.bat .configserver.properties from command prompt in Kafka directory. If you get a “no broker” error then it means Kafka isn’t running properly.
The output after running this code on jupyter notebook looks like this:
Now, let us check the consumer. Run the following code to see if it works fine in a new notebook.
from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.ml.feature import Normalizer, StandardScaler import random import time kafka_topic_name = "Topic" kafka_bootstrap_servers = 'localhost:9092' spark = SparkSession \ .builder \ .appName("Structured Streaming ") \ .master("local[*]") \ .getOrCreate() spark.sparkContext.setLogLevel("ERROR") # Construct a streaming DataFrame that reads from topic flower_df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \ .option("subscribe", kafka_topic_name) \ .option("startingOffsets", "latest") \ .load() flower_df1 = flower_df.selectExpr("CAST(value AS STRING)", "timestamp") flower_schema_string = "order_id INT,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,species STRING" flower_df2 = flower_df1 \ .select(from_csv(col("value"), flower_schema_string) \ .alias("flower"), "timestamp") flower_df3 = flower_df2.select("flower.*", "timestamp") flower_df3.createOrReplaceTempView("flower_find"); song_find_text = spark.sql("SELECT * FROM flower_find") flower_agg_write_stream = song_find_text \ .writeStream \ .trigger(processingTime='5 seconds') \ .outputMode("append") \ .option("truncate", "false") \ .format("memory") \ .queryName("testedTable") \ .start() flower_agg_write_stream.awaitTermination(1)
Once you run this you should obtain an output like this:
As you can see I have run few queries and checked if data was streaming. The first time count was 5 and after few seconds count increased to 14 which confirms that data is streaming.
Here, basically, the idea is to create a spark context. We get the data using Kafka streaming on our Topic on the specified port. A spark session can be created using the getOrCreate() as shown in the code. The next step includes reading the Kafka stream and the data can be loaded using the load(). Since the data is streaming, it would be useful to have a timestamp at which each of the records has arrived. We specify the schema as we do in our SQL and finally create a data frame with the values of streamed data with their timestamp. At last with a processing time of 5 seconds, we can receive data in batches. We make use of SQL View to temporarily store the data in memory in append mode and we can perform all operations on it using our spark data frame.
Refer to the complete code here:
https://github.com/Siddharth1698/Structured-Streaming-Tutorial
This is one of my spark streaming project, you can refer to this for more detailed queries and use of machine learning in spark:
https://github.com/Siddharth1698/Spotify-Recommendation-System-using-Pyspark-and-Kafka
1. https://github.com/Siddharth1698/Structured-Streaming-Tutorial
2. https://dzone.com/articles/running-apache-kafka-on-windows-os
3. https://phoenixnap.com/kb/install-spark-on-windows-10
4. https://drive.google.com/drive/u/0/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw
5. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
6. Thumbnail Image -> https://unsplash.com/photos/ImcUkZ72oUs
If you follow these steps, you can easily set up all the environments as well as run your first structured streaming program with spark and Kafka. In case of any difficulties in setting it up, feel free to contact me :
https://www.linkedin.com/in/siddharth-m-426a9614a/
Passionate about artificial intelligence, I am dedicated to advancing research in Generative AI and Large Language Models (LLMs). My work focuses on exploring innovative solutions and pushing the boundaries of what's possible in this dynamic and transformative field.
8 Must Know Spark Optimization Tips for Data En...
How to Work With CSV Files in Python?
Spotify Recommendation System using Pyspark and...
Introduction to Spark Streaming
Spark Data Streaming with MongoDB
Handling Streaming Data with Apache Kafka ̵...
Real-time Data Streaming using Apache Spark!
A Beginner’s Guide to Spark Streaming For...
Introduction to Apache Kafka: Fundamentals and ...
Apache Kafka Use Cases and Installation Guide
We use cookies essential for this site to function well. Please click to help us improve its usefulness with additional cookies. Learn about our use of cookies in our Privacy Policy & Cookies Policy.
Show details
This site uses cookies to ensure that you get the best experience possible. To learn more about how we use cookies, please refer to our Privacy Policy & Cookies Policy.
It is needed for personalizing the website.
Expiry: Session
Type: HTTP
This cookie is used to prevent Cross-site request forgery (often abbreviated as CSRF) attacks of the website
Expiry: Session
Type: HTTPS
Preserves the login/logout state of users across the whole site.
Expiry: Session
Type: HTTPS
Preserves users' states across page requests.
Expiry: Session
Type: HTTPS
Google One-Tap login adds this g_state cookie to set the user status on how they interact with the One-Tap modal.
Expiry: 365 days
Type: HTTP
Used by Microsoft Clarity, to store and track visits across websites.
Expiry: 1 Year
Type: HTTP
Used by Microsoft Clarity, Persists the Clarity User ID and preferences, unique to that site, on the browser. This ensures that behavior in subsequent visits to the same site will be attributed to the same user ID.
Expiry: 1 Year
Type: HTTP
Used by Microsoft Clarity, Connects multiple page views by a user into a single Clarity session recording.
Expiry: 1 Day
Type: HTTP
Collects user data is specifically adapted to the user or device. The user can also be followed outside of the loaded website, creating a picture of the visitor's behavior.
Expiry: 2 Years
Type: HTTP
Use to measure the use of the website for internal analytics
Expiry: 1 Years
Type: HTTP
The cookie is set by embedded Microsoft Clarity scripts. The purpose of this cookie is for heatmap and session recording.
Expiry: 1 Year
Type: HTTP
Collected user data is specifically adapted to the user or device. The user can also be followed outside of the loaded website, creating a picture of the visitor's behavior.
Expiry: 2 Months
Type: HTTP
This cookie is installed by Google Analytics. The cookie is used to store information of how visitors use a website and helps in creating an analytics report of how the website is doing. The data collected includes the number of visitors, the source where they have come from, and the pages visited in an anonymous form.
Expiry: 399 Days
Type: HTTP
Used by Google Analytics, to store and count pageviews.
Expiry: 399 Days
Type: HTTP
Used by Google Analytics to collect data on the number of times a user has visited the website as well as dates for the first and most recent visit.
Expiry: 1 Day
Type: HTTP
Used to send data to Google Analytics about the visitor's device and behavior. Tracks the visitor across devices and marketing channels.
Expiry: Session
Type: PIXEL
cookies ensure that requests within a browsing session are made by the user, and not by other sites.
Expiry: 6 Months
Type: HTTP
use the cookie when customers want to make a referral from their gmail contacts; it helps auth the gmail account.
Expiry: 2 Years
Type: HTTP
This cookie is set by DoubleClick (which is owned by Google) to determine if the website visitor's browser supports cookies.
Expiry: 1 Year
Type: HTTP
this is used to send push notification using webengage.
Expiry: 1 Year
Type: HTTP
used by webenage to track auth of webenagage.
Expiry: Session
Type: HTTP
Linkedin sets this cookie to registers statistical data on users' behavior on the website for internal analytics.
Expiry: 1 Day
Type: HTTP
Use to maintain an anonymous user session by the server.
Expiry: 1 Year
Type: HTTP
Used as part of the LinkedIn Remember Me feature and is set when a user clicks Remember Me on the device to make it easier for him or her to sign in to that device.
Expiry: 1 Year
Type: HTTP
Used to store information about the time a sync with the lms_analytics cookie took place for users in the Designated Countries.
Expiry: 6 Months
Type: HTTP
Used to store information about the time a sync with the AnalyticsSyncHistory cookie took place for users in the Designated Countries.
Expiry: 6 Months
Type: HTTP
Cookie used for Sign-in with Linkedin and/or to allow for the Linkedin follow feature.
Expiry: 6 Months
Type: HTTP
allow for the Linkedin follow feature.
Expiry: 1 Year
Type: HTTP
often used to identify you, including your name, interests, and previous activity.
Expiry: 2 Months
Type: HTTP
Tracks the time that the previous page took to load
Expiry: Session
Type: HTTP
Used to remember a user's language setting to ensure LinkedIn.com displays in the language selected by the user in their settings
Expiry: Session
Type: HTTP
Tracks percent of page viewed
Expiry: Session
Type: HTTP
Indicates the start of a session for Adobe Experience Cloud
Expiry: Session
Type: HTTP
Provides page name value (URL) for use by Adobe Analytics
Expiry: Session
Type: HTTP
Used to retain and fetch time since last visit in Adobe Analytics
Expiry: 6 Months
Type: HTTP
Remembers a user's display preference/theme setting
Expiry: 6 Months
Type: HTTP
Remembers which users have updated their display / theme preferences
Expiry: 6 Months
Type: HTTP
Used by Google Adsense, to store and track conversions.
Expiry: 3 Months
Type: HTTP
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
Expiry: 2 Years
Type: HTTP
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
Expiry: 2 Years
Type: HTTP
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
Expiry: 2 Years
Type: HTTP
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
Expiry: 2 Years
Type: HTTP
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
Expiry: 2 Years
Type: HTTP
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
Expiry: 2 Years
Type: HTTP
These cookies are used for the purpose of targeted advertising.
Expiry: 6 Hours
Type: HTTP
These cookies are used for the purpose of targeted advertising.
Expiry: 1 Month
Type: HTTP
These cookies are used to gather website statistics, and track conversion rates.
Expiry: 1 Month
Type: HTTP
Aggregate analysis of website visitors
Expiry: 6 Months
Type: HTTP
This cookie is set by Facebook to deliver advertisements when they are on Facebook or a digital platform powered by Facebook advertising after visiting this website.
Expiry: 4 Months
Type: HTTP
Contains a unique browser and user ID, used for targeted advertising.
Expiry: 2 Months
Type: HTTP
Used by LinkedIn to track the use of embedded services.
Expiry: 1 Year
Type: HTTP
Used by LinkedIn for tracking the use of embedded services.
Expiry: 1 Day
Type: HTTP
Used by LinkedIn to track the use of embedded services.
Expiry: 6 Months
Type: HTTP
Use these cookies to assign a unique ID when users visit a website.
Expiry: 6 Months
Type: HTTP
These cookies are set by LinkedIn for advertising purposes, including: tracking visitors so that more relevant ads can be presented, allowing users to use the 'Apply with LinkedIn' or the 'Sign-in with LinkedIn' functions, collecting information about how visitors use the site, etc.
Expiry: 6 Months
Type: HTTP
Used to make a probabilistic match of a user's identity outside the Designated Countries
Expiry: 90 Days
Type: HTTP
Used to collect information for analytics purposes.
Expiry: 1 year
Type: HTTP
Used to store session ID for a users session to ensure that clicks from adverts on the Bing search engine are verified for reporting purposes and for personalisation
Expiry: 1 Day
Type: HTTP
Cookie declaration last updated on 24/03/2023 by Analytics Vidhya.
Cookies are small text files that can be used by websites to make a user's experience more efficient. The law states that we can store cookies on your device if they are strictly necessary for the operation of this site. For all other types of cookies, we need your permission. This site uses different types of cookies. Some cookies are placed by third-party services that appear on our pages. Learn more about who we are, how you can contact us, and how we process personal data in our Privacy Policy.
Edit
Resend OTP
Resend OTP in 45s