We are generating data at an unprecedented pace. Honestly, I can’t keep up with the sheer volume of data around the world! I’m sure you’ve come across an estimate of how much data is being produced – McKinsey, Gartner, IBM, etc. all offer their own figures.
Here are some mind-boggling numbers for your reference – more than 500 million tweets, 90 billion emails, 65 million WhatsApp messages are sent – all in a single day! 4 Petabytes of data are generated only on Facebook in 24 hours. That’s incredible!
This, of course, comes with challenges of its own. How does a data science team capture this amount of data? How do you process it and build machine learning models from it? These are exciting questions if you’re a data scientist or a data engineer.
And this is where Spark comes into the picture. Spark writes in Scala and provides APIs to work with Scala, Java, Python, and R. PySpark serves as the Python API that supports Spark.
One traditional way to handle Big Data is to use a distributed framework like Hadoop but these frameworks require a lot of read-write operations on a hard disk which makes it very expensive in terms of time and speed. Computational power is a significant hurdle.
PySpark tutorial deals with this in an efficient and easy-to-understand manner. So in this article, we will start learning all about it. We’ll understand what is Spark, how to install it on your machine and then we’ll deep dive into the different Spark Functionality and its components. There’s a whole bunch of code here too so let’s have some fun!
Here’s a quick introduction to the world of Big Data in case you need a refresher. Keep in mind that the numbers have exceeded what’s shown there, and only three years have passed since we published that article!
Apache Spark serves as an open-source, distributed cluster computing framework that enables fast processing, querying, and analyzing of Big Data.
It is the most effective data processing framework in enterprises today. It’s true that the cost of Spark is high as it requires a lot of RAM for in-memory computation but is still a hot favorite among Data Scientists and Big Data Engineers. And you’ll see why that’s the case in this article.
Organizations that typically relied on Map Reduce-like frameworks are now shifting to the Apache Spark framework. Spark not only performs in-memory computing but it’s 100 times faster than Map Reduce frameworks like Hadoop. Spark enjoys popularity among data scientists because it distributes and caches data in memory, helping them optimize machine learning algorithms on Big Data.
I recommend checking out Spark’s official page here for more details. It has extensive documentation and is a good reference guide for all things Spark.
One simple way to install Spark is via pip. But that’s not the recommended method according to Spark’s official documentation since the Python package for Spark does not intend to replace all the other use cases.
There’s a high chance you’ll encounter a lot of errors in implementing even basic functionalities. It is only suitable for interacting with an existing cluster (be it standalone Spark, YARN, or Mesos).
So, the first step is to download the latest version of Apache Spark from here. Unzip and move the compressed file:
tar xzvf spark-2.4.4-bin-hadoop2.7.tgz
mv spark-2.4.4-bin-hadoop2.7 spark
sudo mv spark/ /usr/lib/
Ensure that you install JAVA on your system. I highly recommend JAVA 8 because Spark version 2 has known issues with JAVA 9 and later versions.
sudo apt install default-jre
sudo apt install openjdk-8-jdk
When you are working on a small project that contains very few source code files, it is easier to compile them manually. But what if you are working on a bigger project that has hundreds of source code files? You would need to use build tools in that case.
SBT, short for Scala Build Tool, manages your Spark project and also the dependencies of the libraries that you have used in your code.
Keep in mind that you don’t need to install this if you are using PySpark. But if you are using JAVA or Scala to build Spark applications, then you need to install SBT on your machine. Run the below commands to install SBT:
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add
sudo apt-get update
sudo apt-get install sbt
Next, open the configuration directory of Spark and make a copy of the default Spark environment template. This is already present there as spark-env.sh.template. Open this using the editor:
cd /usr/lib/spark/conf/
cp spark-env.sh.template spark-env.sh
sudo gedit spark-env.sh
Now, in the file spark-env.sh, add the JAVA_HOME path and assign memory limit to SPARK_WORKER_MEMORY. Here, I have assigned it to be 4GB:
## add variables
JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
SPARK_WORKER_MEMORY=4g
Open and edit the bashrc file using the below command. This bashrc file is a script that is executed whenever you start a new terminal session:
## open bashrc file
sudo gedit ~/bashrc
Add the below environment variables in the file:
## add following variables
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export SBT_HOME=/usr/share/sbt/bin/sbt-launch.jar
export SPARK_HOME=/usr/lib/spark
export PATH=$PATH:$JAVA_HOME/bin
export PATH=$PATH:$SBT_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
Now, source the bashrc file. This will restart the terminal session with the updated script:
## source bashrc file
source ~/.bashrc
Now, type pyspark in the terminal and it will open Jupyter in your default browser and a Spark context (it is the entry point of the Spark services) will automatically initialize with the variable name sc:
A Spark application is an instance of the Spark Context. It consists of a driver process and a set of executor processes.
The driver process plays a pivotal role in the Spark Application ecosystem. It serves as the central hub for managing various aspects of the application, such as distributing and scheduling tasks across the executors. Moreover, it holds crucial information throughout the application’s lifespan. This includes overseeing historical data, orchestrating the reading of data, and coordinating tasks within the Spark cluster. In essence, the driver process acts as the backbone, ensuring smooth operation and efficient utilization of resources within the Spark environment.
The executors are responsible for actually executing the work that the driver assigns them. So, each executor is responsible for only two things:
We know that a driver process controls the Spark SQl Application. The driver process makes itself available to the user as an object called the Spark Session.
The Spark Session instance is the way Spark executes user-defined manipulations across the cluster. In Scala and Python, the Spark Session variable is available as pyspark api when you start up the console:
Partitioning means that the complete data is not present in a single place. It is divided into multiple chunks and these chunks are placed on different nodes.
If you have one partition, Spark will only have a parallelism of one, even if you have thousands of executors. Also, if you have many partitions but only one executor, Spark will still only have a parallelism of one because there is only one computation resource.
In Spark, the lower level APIs allow us to define the number of partitions.
Let’s take a simple example to understand how partitioning helps us to give faster results. We will create a list of 20 million random numbers between 10 to 1000 and will count the numbers greater than 200.
Let’s see how fast we can do this with just one partition:
from random import randint
# create a list of random numbers between 10 to 1000
my_large_list = [randint(10,1000) for x in range(0,20000000)]
# create one partition of the list
my_large_list_one_partition = sc.parallelize(my_large_list,numSlices=1)
# check number of partitions
print(my_large_list_one_partition.getNumPartitions())
# >> 1
# filter numbers greater than equal to 200
my_large_list_one_partition = my_large_list_one_partition.filter(lambda x : x >= 200)
# code was run in a jupyter notebook
# to calculate the time taken to execute the following command
%%time
# count the number of elements in filtered list
print(my_large_list_one_partition.count())
It took 34.5 ms to filter the results with one partition:
Now, let’s increase the number of partitions to 5 and check if we get any improvements in the execution time:
# create five partitions of the list
my_large_list_with_five_partition = sc.parallelize(my_large_list, numSlices=5)
# filter numbers greater than equal to 200
my_large_list_with_five_partition = my_large_list_with_five_partition.filter(lambda x : x >= 200)
%%time
# count the number of elements in the filtered list
print(my_large_list_with_five_partition.count())
# >> 16162207
It took 11.1 ms to filter the results using five partitions:
Data structures are immutable in Spark. This means that they cannot be changed once created. But if we cannot change it, how are we supposed to use it?
So, In order to make any change, we need to instruct Spark on how we would like to modify our data. These instructions are called transformations.
Recall the example we saw above. We asked Spark to filter the numbers greater than 200 – that was essentially one type of transformation. There are two types of transformations in Spark:
Let’s say you have a very large data file that contains millions of rows. You need to perform analysis on that by doing some manipulations like mapping, filtering, random split or even very basic addition or subtraction.
Now, for large datasets, even a basic transformation will take millions of operations to execute.
It is essential to optimize these operations when working with Big Data, and Spark handles it in a very creative way. All you need to do is tell Spark what are the transformations you want to do on the dataset and Spark will maintain a series of transformations. When you ask for the results from Spark, it will then find out the best path and perform the required transformations and give you the result.
Now, let’s take an example. You have a text file of 1 GB and have created 10 partitions of it. You also performed some transformations and in the end, you requested to see how the first line looks. In this case, Spark will read the file only from the first partition and give you the results as your requested results do not require to read the complete file.
Let’s take a few practical examples to see how Spark dataframes performs lazy evaluation. In the first step, we have created a list of 10 million numbers and created a RDD with 3 partitions:
# create a sample list
my_list = [i for i in range(1,10000000)]
# parallelize the data
rdd_0 = sc.parallelize(my_list,3)
rdd_0
Next, we will perform a very basic transformation, like adding 4 to each number. Note that Spark at this point in time has not started any transformation. It only records a series of transformations in the form of RDD Lineage. You can see that RDD lineage using the function toDebugString:
# add value 4 to each number
rdd_1 = rdd_0.map(lambda x : x+4)
# RDD object
print(rdd_1)
# get the RDD Lineage
print(rdd_1.toDebugString())
We can see that PythonRDD[1] is connected with ParallelCollectionRDD[0]. Now, let’s go ahead and add one more transformation to add 20 to all the elements of the list.
You might be thinking it would be better if added 24 in a single step instead of making an extra step. But check the RDD Lineage after this step:
# add value 20 each number
rdd_2 = rdd_1.map(lambda x : x+20)
# RDD Object
print(rdd_2)
# get the RDD Lineage
print(rdd_2.toDebugString())
We can see that it has automatically skipped that redundant step and will add 24 in a single step instead of how we defined it. So, Spark automatically defines the best path to perform any action and only perform the transformations when required.
Let’s take another example to understand the Lazy Evaluation process.
Suppose we have a text file and we created an RDD of it with 4 partitions. Now, we define some transformations like converting the text data to lower case, slicing the words, adding some prefix to the words, etc.
But in the end, when we perform an action like getting the first element of the transformed data, Spark performs the transformations on the first partition only as there is no need to view the complete data to execute the requested result:
# create a RDD of the text file with Number of Partitions = 4
my_text_file = sc.textFile('tokens_spark.txt',minPartitions=4)
# RDD Object
print(my_text_file)
# convert to lower case
my_text_file = my_text_file.map(lambda x : x.lower())
# Updated RDD Object
print(my_text_file)
# Get the RDD Lineage
print(my_text_file.toDebugString())
Here, we have converted the words to lower case and sliced the first two characters of each word (and then requested for the first word).
# slice the words
my_text_file = my_text_file.map(lambda x : x[:2])
# RDD Object
print(my_text_file)
# Get the RDD Lineage
print(my_text_file.toDebugString())
# Get the first element after all the transformations
print(my_text_file.first())
What happened here? We created 4 partitions of the text file. But according to the result we needed, it was not required to read and perform transformations on all the partitions, hence Spark only did that.
What if we want to count the unique words? Then we need to read all the partitions and that’s exactly what Spark does:
print(my_text_file.countApproxDistinct())
MLlib is Spark’s scalable Machine Learning library. It consists of common machine learning algorithms like Regression, Classification, Dimensionality Reduction, and some utilities to perform basic statistical operations on the data.
In this article, we will go through some of the data types that MLlib provides. We’ll cover topics like feature extraction and building machine learning pipelines in upcoming articles.
MLlib supports two types of Local Vectors: dense and sparse. Sparse Vectors are used when most of the numbers are zero. To create a sparse vector, you need to provide the length of the vector – indices of non-zero values which should be strictly increasing and non-zero values.
from pyspark.mllib.linalg import Vectors
## Dense Vector
print(Vectors.dense([1,2,3,4,5,6,0]))
# >> DenseVector([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 0.0])
### SPARSE VECTOR
### Vectors.sparse( length, index_of_non_zero_values, non_zero_values)
### Indices values should be strictly increasing
print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]))
# >> SparseVector(10, {0: 1.0, 1: 5.0, 2: 3.0, 4: 5.0, 5: 7.0})
print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]).toArray())
# >> array([1., 5., 3., 0., 5., 7., 0., 0., 0., 0.])
Labeled Point is a local vector where a label is assigned to each vector. You must have solved supervised problems where you have some target corresponding to some features. Label Point is exactly the same where you provide a vector as a set of features and a label associated with it.
from pyspark.mllib.regression import LabeledPoint
# set a Label against a Dense Vector
point_1 = LabeledPoint(1,Vectors.dense([1,2,3,4,5]))
# Features
print(point_1.features)
# Label
print(point_1.label)
Local Matrices are stored on a single machine. MLlib supports both dense and sparse matrices. In a Sparse matrix, non-zero entry values are stored in the Compressed Sparse Column (CSC) format in column-major order.
# import the Matrices
from pyspark.mllib.linalg import Matrices
# create a dense matrix of 3 Rows and 2 columns
matrix_1 = Matrices.dense(3, 2, [1,2,3,4,5,6])
print(matrix_1)
# >> DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False)
print(matrix_1.toArray())
"""
>> array([[1., 4.],
[2., 5.],
[3., 6.]])
"""
# create a sparse matrix
matrix_2 = Matrices.sparse(3, 3, [0, 1, 2, 3], [0, 0, 2], [9, 6, 8])
print(matrix_2)
# SparseMatrix(3, 3, [0, 1, 2, 3], [0, 0, 2], [9.0, 6.0, 8.0], False)
print(matrix_2.toArray())
"""
>> array([[9., 6., 0.],
[0., 0., 0.],
[0., 0., 8.]])
"""
Distributed matrices are stored in one or more RDDs. It is very important to choose the right format of distributed matrices. Four types of distributed matrices have been implemented so far:
# Distributed Data Type - Row Matrix
from pyspark.mllib.linalg.distributed import RowMatrix
# create RDD
rows = sc.parallelize([[1,2,3], [4,5,6], [7,8,9], [10,11,12]])
# create a distributed Row Matrix
row_matrix = RowMatrix(rows)
print(row_matrix)
# >> <pyspark.mllib.linalg.distributed.RowMatrix at 0x7f425884d7f0>
print(row_matrix.numRows())
# >> 4
print(row_matrix.numCols())
# >> 3
# Indexed Row Matrix
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
# create RDD
indexed_rows = sc.parallelize([
IndexedRow(0, [0,1,2]),
IndexedRow(1, [1,2,3]),
IndexedRow(2, [3,4,5]),
IndexedRow(3, [4,2,3]),
IndexedRow(4, [2,2,5]),
IndexedRow(5, [4,5,5])
])
# create IndexedRowMatrix
indexed_rows_matrix = IndexedRowMatrix(indexed_rows)
print(indexed_rows_matrix.numRows())
# >> 6
print(indexed_rows_matrix.numCols())
# >> 3
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
# Create an RDD of coordinate entries with the MatrixEntry class:
matrix_entries = sc.parallelize([MatrixEntry(0, 5, 2), MatrixEntry(1, 1, 1), MatrixEntry(1, 5, 4)])
# Create an CoordinateMatrix from an RDD of MatrixEntries.
c_matrix = CoordinateMatrix(matrix_entries)
# number of columns
print(c_matrix.numCols())
# >> 6
# number of rows
print(c_matrix.numRows())
# >> 2
view raw
# import the libraries
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 3, [1, 2, 1, 2, 1, 2, 1, 2, 1])),
((1, 1), Matrices.dense(3, 3, [3, 4, 5, 3, 4, 5, 3, 4, 5])),
((2, 0), Matrices.dense(3, 3, [1, 1, 1, 1, 1, 1, 1, 1, 1]))])
# Create a BlockMatrix from an RDD of sub-matrix blocks of size 3X3
b_matrix = BlockMatrix(blocks, 3, 3)
# columns per block
print(b_matrix.colsPerBlock)
# >> 3
# rows per block
print(b_matrix.rowsPerBlock)
# >> 3
# convert the block matrix to local matrix
local_mat = b_matrix.toLocalMatrix()
# print local matrix
print(local_mat.toArray())
"""
>> array([[1., 2., 1., 0., 0., 0.],
[2., 1., 2., 0., 0., 0.],
[1., 2., 1., 0., 0., 0.],
[0., 0., 0., 3., 3., 3.],
[0., 0., 0., 4., 4., 4.],
[0., 0., 0., 5., 5., 5.],
[1., 1., 1., 0., 0., 0.],
[1., 1., 1., 0., 0., 0.],
[1., 1., 1., 0., 0., 0.]])
"""
We’ve covered quite a lot of ground today. Spark is one of the more fascinating languages in data science and one I feel you should at least be familiar with.
This is just the start of our PySpark learning journey! I plan to cover a lot more ground in this series with multiple articles spanning different machine learning tasks.
In the upcoming PySpark articles, we will see how can we do feature extraction and creating Machine Learning Pipelines and building models. In the meantime, feel free to leave your thoughts and feedback in the comments section below.
To use PySpark effectively, you need a good understanding of fundamental Python concepts like data structures, control flow, functions, and object-oriented programming.
Experienced PySpark developers can earn over $100,000 per year. The average salary for a PySpark developer in the United States is $112,329 per year.
1.PySpark is a more powerful tool for processing large and unstructured data.
2. It’s faster than SQL due to distributed processing across multiple machines.
3.PySpark’s MLlib library enables machine learning tasks like predictive modeling and recommendation systems.
Boost Python Proficiency: Get comfy with Python.
Grasp Apache Spark Basics: Know its architecture.
Install Apache Spark: Set up locally or in the cloud.
Learn PySpark APIs: Focus on sql, ml, streaming.
Practice with Examples: Work on projects for hands-on experience.
Thank you. I really appreciate you for sharing your knowledge for PySpark, I'm planning to expand it in my free time. Thanks.
Hi Lakshay, Thanks for your introductory article on Apache Spark and Pyspark. I have built a flask app containing mainly python code and a wee bit of pyspark code (those involving only one instance of SparkSession and ALS import - I am building a collaborative filtering model using ALS (only importable from spark/pyspark) and then combining it with content-based filtering that uses a self-defined model based on XGBClassifier (non-pyspark)). The flask app worked fine in a virtual environment locally but when I tried deploying it on heroku, the front page can be displayed, just that the SparkSession (which was used in my .py script to start a session to read some csv data and then use ALS to build a model on it) could not be executed as the spark-submit (but I did not use spark submit at all anywhere in my pyspark code) file could not be found (and yet that file is actually present when I checked). When I changed the environment variable for SPARK_HOME to $PREV_SPARK_HOME/python/pyspark, that error stops showing up but I get a new error: Something about Spark jar assembly not found. So I looked it up and most sites mentioned that sbt/Maven/Ant etc is used to compile spark applications with multiple dependencies and code files. My question is, since my code is contained in only 1 code file (eg. : flaskr.py) and uses minimal pyspark code, is there a need for me to compile my project with sbt and the like before deploying it on heroku especially given in your introductory resource above, that sbt is not needed if one is using pyspark? Hope to hear from you soon, thanks in advance! Best Regards, Jason
IT REALLY HELPS TO UNSDERSTAND THE BASIC CONCEPTS ON SPARK .