Here’s a quick introduction to building machine learning pipelines using PySpark
The ability to build these machine learning pipelines is a must-have skill for any aspiring data scientist
This is a hands-on article with a structured PySpark code approach – so get your favorite Python IDE ready!
Introduction
Take a moment to ponder this – what are the skills an aspiring data scientist needs to possess to land an industry role?
A machine learning project has a lot of moving components that need to be tied together before we can successfully execute it. The ability to know how to build an end-to-end machine learning pipeline is a prized asset. As a data scientist (aspiring or established), you should know how these machine learning pipelines work.
This is, to put it simply, the amalgamation of two disciplines – data science and software engineering. These two go hand-in-hand for a data scientist. It isn’t just about building models – we need to have the software skills to build enterprise-level systems.
So in this article, we will focus on the basic idea behind building these machine learning pipelines using PySpark. This is a hands-on article so fire up your favorite Python IDE and let’s get going!
Note: This is part 2 of my PySpark for beginners series. You can check out the introductory article below:
An essential (and first) step in any data science project is to understand the data before building any Machine Learning model. Most data science aspirants stumble here – they just don’t spend enough time understanding what they’re working with. There’s a tendency to rush in and build models – a fallacy you must avoid.
We will follow this principle in this article. I’ll follow a structured approach throughout to ensure we don’t miss out on any critical step.
So first, let’s take a moment and understand each variable we’ll be working with here. We are going to use a dataset from a recently concluded India vs Bangladesh cricket match. Let’s see the different variables we have in the dataset:
Batsman: Unique batsman id (Integer)
Batsman_Name: Name of the batsman (String)
Bowler: Unique bowler id (Integer)
Bowler_Name: Name of the bowler (String)
Commentary: Description of the event as broadcasted (String)
Detail: Another string describing the events such as wickets and extra deliveries (String)
Dismissed: Unique Id of the batsman if dismissed (String)
Id: Unique row id (String)
Isball: Whether the delivery was legal or not (Boolean)
Isboundary: Whether the batsman hit a boundary or not (Binary)
Iswicket: Whether the batsman dismissed or not ( Binary)
Over: Over number (Double)
Runs: Runs on that particular delivery (Integer)
Timestamp: Time at which the data was recorded (Timestamp)
So let’s begin, shall we?
Reading a CSV file
When we power up Spark, the SparkSession variable is appropriately available under the name ‘spark‘. We can use this to read multiple types of files, such as CSV, JSON, TEXT, etc. This enables us to save the data as a Spark dataframe.
By default, it considers the data type of all the columns as a string. You can check the data types by using the printSchema function on the dataframe:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Now, we do not want all the columns in our dataset to be treated as strings. So what can we do about that?
We can define the custom schema for our dataframe in Spark. For this, we need to create an object of StructType which takes a list of StructField. And of course, we should define StructField with a column name, the data type of the column and whether null values are allowed for the particular column or not.
Refer to the below code snippet to understand how to create this custom schema:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
In any machine learning project, we always have a few columns that are not required for solving the problem. I’m sure you’ve come across this dilemma before as well, whether that’s in the industry or in an online hackathon.
In our instance, we can use the drop function to remove the column from the data. Use the asterisk (*) sign before the list to drop multiple columns from the dataset:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Unlike Pandas, Spark dataframes do not have the shape function to check the dimensions of the data. We can instead use the code below to check the dimensions of the dataset:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Spark’s describe function gives us most of the statistical results like mean, count, min, max, and standard deviation. You can use the summary function to get the quartiles of the numeric variables as well:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
It’s rare when we get a dataset without any missing values. Can you remember the last time that happened?
It is important to check the number of missing values present in all the columns. Knowing the count helps us treat the missing values before building any machine learning model using that data.
So, you can use the code below to find the null value count in your dataset:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Unlike Pandas, we do not have the value_counts() function in Spark dataframes. You can use the groupBy function to calculate the unique value counts of categorical variables:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Most machine learning algorithms accept the data only in numerical form. So, it is essential to convert any categorical variables present in our dataset into numbers.
Remember that we cannot simply drop them from our dataset as they might contain useful information. It would be a nightmare to lose that just because we don’t want to figure out how to use them!
Let’s see some of the methods to encode categorical variables using PySpark.
String Indexing
String Indexing is similar to Label Encoding. It assigns a unique integer value to each category. 0 is assigned to the most frequent category, 1 to the next most frequent value, and so on. We have to define the input column name that we want to index and the output column name in which we want the results:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
One-hot encoding is a concept every data scientist should know. I’ve relied on it multiple times when dealing with missing values. It’s a lifesaver!
Here’s the caveat – Spark’s OneHotEncoder does not directly encode the categorical variable.
First, we need to use the String Indexer to convert the variable into numerical form and then use OneHotEncoderEstimator to encode multiple columns of the dataset.
It creates a Sparse Vector for each row:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
A vector assembler combines a given list of columns into a single vector column.
This is typically used at the end of the data exploration and pre-processing steps. At this stage, we usually work with a few raw or transformed features that can be used to train our model.
The Vector Assembler converts them into a single feature column in order to train the machine learning model (such as Logistic Regression). It accepts numeric, boolean and vector type columns:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
A machine learning project typically involves steps like data preprocessing, feature extraction, model fitting and evaluating results. We need to perform a lot of transformations on the data in sequence. As you can imagine, keeping track of them can potentially become a tedious task.
This is where machine learning pipelines come in.
A pipeline allows us to maintain the data flow of all the relevant transformations that are required to reach the end result.
We need to define the stages of the pipeline which act as a chain of command for Spark to run. Here, each stage is either a Transformer or an Estimator.
Transformers and Estimators
As the name suggests, Transformers convert one dataframe into another either by updating the current values of a particular column (like converting categorical columns to numeric) or mapping it to some other values by using a defined logic.
An Estimator implements the fit() method on a dataframe and produces a model. For example, LogisticRegression is an Estimator that trains a classification model when we call the fit() method.
Let’s understand this with the help of some examples.
Examples of Pipelines
Let’s create a sample dataframe with three columns as shown below. Here, we will define some of the stages in which we want to transform the data and see how to set up the pipeline:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
We have created the dataframe. Suppose we have to transform the data in the below order:
stage_1: Label Encode or String Index the column category_1
stage_2: Label Encode or String Index the column category_2
stage_3: One-Hot Encode the indexed column category_2
At each stage, we will pass the input and output column name and setup the pipeline by passing the defined stages in the list of the Pipeline object.
The pipeline model then performs certain steps one by one in a sequence and gives us the end result. Let’s see how to implement the pipeline:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Now, let’s take a more complex example of setting up a pipeline. Here, we will do transformations on the data and build a logistic regression model.
For this, we will create a sample dataframe which will be our training dataset with four features and the target label:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
stage_1: Label Encode or String Index the column feature_2
stage_2: Label Encode or String Index the column feature_3
stage_3: One Hot Encode the indexed column of feature_2 and feature_3
stage_4: Create a vector of all the features required to train a Logistic Regression model
stage_5: Build a Logistic Regression model
We have to define the stages by providing the input column name and output column name. The final stage would be to build a logistic regression model. And in the end, when we run the pipeline on the training dataset, it will run the steps in a sequence and add new columns to the dataframe (like rawPrediction, probability, and prediction).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Congrats! We have successfully set up the pipeline. Let’s create a sample test dataset without the labels and this time, we do not need to define all the steps again. We will just pass the data through the pipeline and we are done!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This was a short but intuitive article on how to build machine learning pipelines using PySpark. I’ll reiterate it again because it’s that important – you need to know how these pipelines work. This is a big part of your role as a data scientist.
Have you worked on an end-to-end machine learning project before? Or been a part of a team that built these pipelines in an industry setting? Let’s connect in the comments section below and discuss.
I’ll see you in the next article on this PySpark for beginners series. Happy learning!
Ideas have always excited me. The fact that we could dream of something and bring it to reality fascinates me. Computer Science provides me a window to do exactly that. I love programming and use it to solve problems and a beginner in the field of Data Science.
I am getting :
IllegalArgumentException: 'Data type string of column Isboundary is not supported.\nData type string of column Iswicket is not supported.\nData type string of column Over is not supported.\nData type string of column Runs is not supported.'
for
# transform the data
final_data = assembler.transform(my_data)
Is there a recommendation to solve this error.
Lokesh
Excellent Article. Very clear to understand each data cleaning step even for a newbie in analytics. Thanks a lot for much informative article :)
Purnima Sharma
Thanks for the article, very well explained indeed. I was wondering if you could post the building of pipeline using the same example of cricket match.
We use cookies essential for this site to function well. Please click to help us improve its usefulness with additional cookies. Learn about our use of cookies in our Privacy Policy & Cookies Policy.
Show details
Powered By
Cookies
This site uses cookies to ensure that you get the best experience possible. To learn more about how we use cookies, please refer to our Privacy Policy & Cookies Policy.
brahmaid
It is needed for personalizing the website.
csrftoken
This cookie is used to prevent Cross-site request forgery (often abbreviated as CSRF) attacks of the website
Identityid
Preserves the login/logout state of users across the whole site.
sessionid
Preserves users' states across page requests.
g_state
Google One-Tap login adds this g_state cookie to set the user status on how they interact with the One-Tap modal.
MUID
Used by Microsoft Clarity, to store and track visits across websites.
_clck
Used by Microsoft Clarity, Persists the Clarity User ID and preferences, unique to that site, on the browser. This ensures that behavior in subsequent visits to the same site will be attributed to the same user ID.
_clsk
Used by Microsoft Clarity, Connects multiple page views by a user into a single Clarity session recording.
SRM_I
Collects user data is specifically adapted to the user or device. The user can also be followed outside of the loaded website, creating a picture of the visitor's behavior.
SM
Use to measure the use of the website for internal analytics
CLID
The cookie is set by embedded Microsoft Clarity scripts. The purpose of this cookie is for heatmap and session recording.
SRM_B
Collected user data is specifically adapted to the user or device. The user can also be followed outside of the loaded website, creating a picture of the visitor's behavior.
_gid
This cookie is installed by Google Analytics. The cookie is used to store information of how visitors use a website and helps in creating an analytics report of how the website is doing. The data collected includes the number of visitors, the source where they have come from, and the pages visited in an anonymous form.
_ga_#
Used by Google Analytics, to store and count pageviews.
_gat_#
Used by Google Analytics to collect data on the number of times a user has visited the website as well as dates for the first and most recent visit.
collect
Used to send data to Google Analytics about the visitor's device and behavior. Tracks the visitor across devices and marketing channels.
AEC
cookies ensure that requests within a browsing session are made by the user, and not by other sites.
G_ENABLED_IDPS
use the cookie when customers want to make a referral from their gmail contacts; it helps auth the gmail account.
test_cookie
This cookie is set by DoubleClick (which is owned by Google) to determine if the website visitor's browser supports cookies.
_we_us
this is used to send push notification using webengage.
WebKlipperAuth
used by webenage to track auth of webenagage.
ln_or
Linkedin sets this cookie to registers statistical data on users' behavior on the website for internal analytics.
JSESSIONID
Use to maintain an anonymous user session by the server.
li_rm
Used as part of the LinkedIn Remember Me feature and is set when a user clicks Remember Me on the device to make it easier for him or her to sign in to that device.
AnalyticsSyncHistory
Used to store information about the time a sync with the lms_analytics cookie took place for users in the Designated Countries.
lms_analytics
Used to store information about the time a sync with the AnalyticsSyncHistory cookie took place for users in the Designated Countries.
liap
Cookie used for Sign-in with Linkedin and/or to allow for the Linkedin follow feature.
visit
allow for the Linkedin follow feature.
li_at
often used to identify you, including your name, interests, and previous activity.
s_plt
Tracks the time that the previous page took to load
lang
Used to remember a user's language setting to ensure LinkedIn.com displays in the language selected by the user in their settings
s_tp
Tracks percent of page viewed
AMCV_14215E3D5995C57C0A495C55%40AdobeOrg
Indicates the start of a session for Adobe Experience Cloud
s_pltp
Provides page name value (URL) for use by Adobe Analytics
s_tslv
Used to retain and fetch time since last visit in Adobe Analytics
li_theme
Remembers a user's display preference/theme setting
li_theme_set
Remembers which users have updated their display / theme preferences
We do not use cookies of this type.
_gcl_au
Used by Google Adsense, to store and track conversions.
SID
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
SAPISID
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
__Secure-#
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
APISID
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
SSID
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
HSID
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
DV
These cookies are used for the purpose of targeted advertising.
NID
These cookies are used for the purpose of targeted advertising.
1P_JAR
These cookies are used to gather website statistics, and track conversion rates.
OTZ
Aggregate analysis of website visitors
_fbp
This cookie is set by Facebook to deliver advertisements when they are on Facebook or a digital platform powered by Facebook advertising after visiting this website.
fr
Contains a unique browser and user ID, used for targeted advertising.
bscookie
Used by LinkedIn to track the use of embedded services.
lidc
Used by LinkedIn for tracking the use of embedded services.
bcookie
Used by LinkedIn to track the use of embedded services.
aam_uuid
Use these cookies to assign a unique ID when users visit a website.
UserMatchHistory
These cookies are set by LinkedIn for advertising purposes, including: tracking visitors so that more relevant ads can be presented, allowing users to use the 'Apply with LinkedIn' or the 'Sign-in with LinkedIn' functions, collecting information about how visitors use the site, etc.
li_sugr
Used to make a probabilistic match of a user's identity outside the Designated Countries
MR
Used to collect information for analytics purposes.
ANONCHK
Used to store session ID for a users session to ensure that clicks from adverts on the Bing search engine are verified for reporting purposes and for personalisation
We do not use cookies of this type.
Cookie declaration last updated on 24/03/2023 by Analytics Vidhya.
Cookies are small text files that can be used by websites to make a user's experience more efficient. The law states that we can store cookies on your device if they are strictly necessary for the operation of this site. For all other types of cookies, we need your permission. This site uses different types of cookies. Some cookies are placed by third-party services that appear on our pages. Learn more about who we are, how you can contact us, and how we process personal data in our Privacy Policy.
I am getting : IllegalArgumentException: 'Data type string of column Isboundary is not supported.\nData type string of column Iswicket is not supported.\nData type string of column Over is not supported.\nData type string of column Runs is not supported.' for # transform the data final_data = assembler.transform(my_data) Is there a recommendation to solve this error.
Excellent Article. Very clear to understand each data cleaning step even for a newbie in analytics. Thanks a lot for much informative article :)
Thanks for the article, very well explained indeed. I was wondering if you could post the building of pipeline using the same example of cricket match.