In the last article, we discussed Apache Spark and the big data ecosystem, and we discussed the role of apache spark in data processing in big data. If you haven’t read it yet, you can find it on this page.
This article will discuss the Spark MLlib in detail with Implementation with Python. In Big data primary goal is to process the data and get insights, and spark lets you build data pipelines with Machine learning support which means we can get our training and prediction in real-time. Spark ML library → MLlib (Machine learning library) is used for implementing machine learning operations using Spark Dataframe based APIs.
MLlib supports various data sources, including Parquet, CSV, JSON, JDBC, etc. MLlib has special libraries for images and LIBSVM data types and supports label column data and a feature vector.
It is a DataFrame type that has a label column and feature vector.
This article was published as a part of the Data Science Blogathon.
Spark MLlib provides functions that easily extract features from raw data, feature extraction, dimensionality reduction, string tokenization, and other tasks.
We can make machine learning pipelines using Transformers, Estimators, and the Pipeline API.
We are going to implement a Regression model in Spark:
Installation and setup of Pyspark are effortless. Using the pip package manager, you can install Pyspark in any cloud notebook in Python.
!pip install pyspark
!pip install findspark
import findspark
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
We need to create a Spark session to load our data frames.
# Spark Context Class
sc = SparkContext()
# spark session
spark = SparkSession
.appName("Python Spark DataFrames Machine learning")
.config("spark.some.config.option", "some-value")
I was verifying the Spark session instance by running the spark command.
In this article, we are mainly going to cover four SparkMl functions. For more information about ML libraries, refer to this link.
)The feature library is used to create feature vectors and process them, and the feature library contains VectorAssembler, Normalizer, Standard Scaler, etc.
)Stat library is used for Statistical tasks like Correlation, Hypothesis testing, Std deviation, etc.
)The regression library contains Regression models.
from import VectorAssembler, Normalizer, StandardScaler
from import Correlation
from import LinearRegression
We are loading the data in Spark using pandas data frame, and later using MLlib classes, we will convert the data frame into feature vectors for model building.
Loading the car data frame using cars.csv
cars = pd.read_csv('',header=None, names=["mpg", "hp", "weight"])
we are interested in loading only 3 columns, and that is mpg
(mileage per gallon, hp
(horsepower), and weight
We use the function createDataFrame
to load the data into a Spark dataframe.
sdf = spark.createDataFrame(cars2)
Feature vectors are used to work with ML models in Spark. Using the function, VectorAssembler
we can convert the dataframes column into feature vectors.
Hp and weight are the input features, and mpg(miles per gallon) is the target label.
assembler = VectorAssembler(
inputCols=["hp", "weight"],
output = assembler.transform(sdf).select('features','mpg')
here output is a spark dataframe, and it contains the column features
and mpg
train, test = output.randomSplit([0.75, 0.25])
75% of the data will be used for training, and 25 % of the data will be used for testing purposes.
In this part, we perform feature engineering and fundamental statistics to improve our training data by creating new features and dropping redundant features.
This process involves the study of our data on how the different feature vectors are working and how are they related. We use statistics to understand the relation between features.
Spark ML offers inbuilt statistical functions. We can use the Correlation function from the Stat library to determine the different types of correlations betweenhp
and weight
Correlation function → Correlation.corr(df_name,”feature_col”,”type_of_correlation”)
r1 = Correlation.corr(train, "features","pearson").head()
print("Pearson correlation matrix:n" + str(r1[0]))
r2 = Correlation.corr(train, "features", "spearman").head()
print("Spearman correlation matrix:n" + str(r2[0]))
We observed that there is a 0.86 (or 86%) correlation between features ( “weight
” and “hp
”). That is logical as a vehicle with higher horsepower will likely have a bigger engine and thus weigh more.
We can visualize the correlation between horsepower and weight.
plt.scatter(cars2["hp"], cars2["weight"])
plt.title("Correlation between Horsepower and Weight")
Normalization helps our model to converge more efficiently and faster. It is a good practice to normalize our feature vectors before training.
Normalization shifts the value ranging between 0 and 1.
Spark ML lib provides a Normalizer
Class to normalize our feature vectors.
normalizer = Normalizer(inputCol="features", outputCol="features_normalized", p=1.0)
train_norm = normalizer.transform(train)
print("Normalized using L^1 norm"), truncate=False)
This is a technique to scale the features such that all columns in the features have 0 mean and 1 unit variance. This creates a bell-shaped distribution.
Standard Scaling does not restrict data values in a certain range.
Spark provides StandardScaler for standardization.
standard_scaler = StandardScaler(inputCol="features", outputCol="features_scaled")
train_model =
train_scaled = train_model.transform(train), truncate=False)
Note: If we are scaling the training data we must scale testing data as well using the same fitted scaling class.
test_scaled = train_model.transform(test), truncate=False)
At this step, we are ready with our scaled feature vectors. We will train our Standard Scaled version of features on the Linear Regression Model using Spark, the Inbuilt Regression model.
If you want to revise your regression concepts, refer to this link.
class and training using the function fit()
.from import LinearRegression
# Create a LR model
lr = LinearRegression(featuresCol='features_scaled', labelCol='mpg', maxIter=100)
# Fit the model
lrModel =
# coefficients and intercept
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))
# Summary
trainingSummary = lrModel.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("R-squared: %f" % trainingSummary.r2)
RMSE : 4.11
→ Our model predicts mpg with an average error of 4.11 units.lrModel.coefficients
→ Returns regression coefficients.Once our model is trained, we can use the function transform()
to get predictions on our new unseen scaled data.
It automatically creates a prediction column in our test data.lrModel.transform(test_scaled).show(5)
As you noticed that, mpg
(true-mpg) and prediction
(predicted-mpg) are pretty close and have an average error of 4.11 units.
This article discussed machine learning capabilities with Apache spark in Python. We then discussed the steps involved in building a machine learning pipeline, and Spark supports many add-on packages for specific tasks.
Spark supports computer vision and natural language processing. Spark can be used for building pipelines for computer vision and natural language processing tasks.
A. Normalizing in PySpark involves using the StandardScaler or Min-Max Scaler functions from the MLlib library to scale numerical data within a specific range, making features comparable and improving machine learning model performance.
A. Normalization in Spark, like in PySpark, is the process of scaling numerical data to a standard range, such as between 0 and 1, to ensure that features are on a common scale for fair comparisons and to mitigate the influence of varying feature scales on machine learning models.
A. Regexp_replace in PySpark is a function used for text manipulation. It enables you to search for substrings within a text column using regular expressions and replace them with a specified replacement string. This function is useful for cleaning and modifying text data in data processing tasks.
A. The Standard Scaler in PySpark is a feature transformation technique that standardizes numerical features by subtracting the mean and dividing by the standard deviation. It scales data to have a mean of 0 and a standard deviation of 1, making it suitable for machine learning algorithms that are sensitive to feature scaling.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.
Thank you. I really appreciate you for sharing your knowledge for PySpark, I'm planning to expand it in my free time. Thanks.
Hi Lakshay, Thanks for your introductory article on Apache Spark and Pyspark. I have built a flask app containing mainly python code and a wee bit of pyspark code (those involving only one instance of SparkSession and ALS import - I am building a collaborative filtering model using ALS (only importable from spark/pyspark) and then combining it with content-based filtering that uses a self-defined model based on XGBClassifier (non-pyspark)). The flask app worked fine in a virtual environment locally but when I tried deploying it on heroku, the front page can be displayed, just that the SparkSession (which was used in my .py script to start a session to read some csv data and then use ALS to build a model on it) could not be executed as the spark-submit (but I did not use spark submit at all anywhere in my pyspark code) file could not be found (and yet that file is actually present when I checked). When I changed the environment variable for SPARK_HOME to $PREV_SPARK_HOME/python/pyspark, that error stops showing up but I get a new error: Something about Spark jar assembly not found. So I looked it up and most sites mentioned that sbt/Maven/Ant etc is used to compile spark applications with multiple dependencies and code files. My question is, since my code is contained in only 1 code file (eg. : and uses minimal pyspark code, is there a need for me to compile my project with sbt and the like before deploying it on heroku especially given in your introductory resource above, that sbt is not needed if one is using pyspark? Hope to hear from you soon, thanks in advance! Best Regards, Jason