Google Colab is a life savior for data scientists when it comes to working with huge datasets and running complex models.
While for data engineers, PySpark is, simply put, a demigod!
So what happens when we take these two, each the finest player in their respective category, and combine them together?
We get the perfect solution (almost) for all your data science and machine learning problems!
In this article, we will see how we can run PySpark in a Google Colaboratory notebook. We will also perform some basic data exploratory tasks common to most data science problems. So, let’s get cracking!
Note – I am assuming you are already familiar with the basics of Spark and Google Colab. If not, I recommend going over the following articles before reading this one:
The first thing you want to do when you are working on Colab is mounting your Google Drive. This will enable you to access any directory on your Drive inside the Colab notebook.
from google.colab import drive drive.mount('/content/drive')
Once you have done that, the next obvious step is to load the data.
Bonus – You can find some amazing hacks for Google Colab in this article!
Now, I am assuming that you will be working with a large enough dataset. Therefore, the best way to upload data to Drive is in a zip format. Just drag and drop your zip folder inside any directory you want on Drive.
Unzipping this data is not a hassle at all. You just have to provide the path to the zip folder along with the !unzip command.
!unzip "/content/drive/My Drive/AV articles/PySpark on Colab/black_friday_train.zip"
If you aren’t really sure what is the exact location of the folder, you can check it out from the side panel on Colab.
Right, let’s set up Spark
Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
Note – At the time of writing this article, 3.0.1 was the latest version of Apache Spark. But Spark is developing quite rapidly. So, if there is a newer version of Spark when you are executing this code, then you just need to replace 3.0.1, wherever you see it, with the latest version.
There is one last thing that we need to install and that is the findspark library. It will locate Spark on the system and import it as a regular library.
!pip install -q findspark
import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"
Time for the real test!
We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.
import findspark findspark.init()
findspark.find()
Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark.
You can give a name to the session using appName() and add some configurations with config() if you wish.
from pyspark.sql import SparkSession spark = SparkSession.builder\ .master("local")\ .appName("Colab")\ .config('spark.ui.port', '4050')\ .getOrCreate()
spark
If everything goes well, you should be able to view the above output.
If you want to view the Spark UI, you would have to include a few more lines of code to create a public URL for the UI page.
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip !unzip ngrok-stable-linux-amd64.zip get_ipython().system_raw('./ngrok http 4050 &') !curl -s http://localhost:4040/api/tunnels
Now you should be able to view the jobs and their stages at the link created.
Great! Now let’s get started with PySpark!
First thing first, we need to load the dataset. We will use the read.csv module. The inferSchema parameter provided will enable Spark to automatically determine the data type for each column but it has to go over the data once. If you don’t want that to happen, then you can instead provide the schema explicitly in the schema parameter.
df = spark.read.csv("train.csv", header=True, inferSchema=True)
This will create a Spark dataframe.
Bonus – There are multiple data sources in Spark and you can know all about them in this article!
We have the Black Friday dataset here from the DataHack Platform. There are purchase summaries of various customers of a retail company from the past month. We are provided with customer demographics, purchase details, and total purchase amount. The goal is to predict the purchase amount per customer against various products.
It is now time to use the PySpark dataframe functions to explore our data. And along the way, we will keep comparing it with the Pandas dataframes.
The first step in an exploratory data analysis is to check out the schema of the dataframe. This will give you a bird’s-eye view of the columns in the dataframe along with their data types.
df.printSchema()
Now you would obviously want to have a view of the actual data as well.
Just like in Pandas Dataframe you have the df.head() function, here you have the show() function. You can provide the number of rows you want to print within the parenthesis.
df.show(5)
If you want to know the total number of rows in the dataframe, which you would, just use the count() function.
df.count()
550068
Sometimes you might want to view some specific columns from the dataframe. For those purposes, you can leverage the capabilities of Spark’s SQL.
Using the select() function you can mention any columns you want to view.
df.select("User_ID","Gender","Age","Occupation").show(5)
Often when we are working with numeric features, we want to have a look at the statistics regarding the dataframe. The describe() function is best suited for such purposes.
It is pretty similar to Panda’s describe function but the statistical values are far less and the string columns are described as well.
df.describe().show()
The distinct() will come in handy when you want to determine the unique values in the categorical columns in the dataframe.
df.select("City_Category").distinct().show()
We can use the groupBy function to group the dataframe column values and then apply an aggregate function on them to derive some useful insight.
Here, we can group the various city categories in the dataframe and determine the total Purchase per City category. For this, we have to use the sum aggregate function from the Spark SQL functions module.
from pyspark.sql import functions as F df.groupBy("City_Category").agg(F.sum("Purchase")).show()
Now we all know that real-world data is not oblivious to missing values. Therefore, it is prudent to always check for missing values and remove them if present.
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()
We have a few columns with null values. So it’s best to replace them with some values. According to our dataset, a null value in the Product Category column could mean that the user didn’t buy the product. Therefore, it is best to replace the null value with 0.
We will use the fillna() function to replace the null values. Since Spark dataframes are immutable, we need to store the result in a new dataframe.
df = df.fillna({'Product_Category_2':0, 'Product_Category_3':0})
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()
Finally, after doing all the analysis if you want to save your results into a new CSV file, you can do that using the write.csv() function.
df.write.csv("/content/drive/My Drive/AV articles/PySpark on Colab/preprocessed_data")
df.rdd.getNumPartitions()
2
# Spark df to Pandas df df_pd = df.toPandas() # Store result df_pd.to_csv("/content/drive/My Drive/AV articles/PySpark on Colab/pandas_preprocessed_data.csv")
Nice tutorial. I cannot seem to find where I can download the dataset "black_friday_train.zip" from. Can you please post the link to it?
Hi, I provided the link in the dataset hyperlink. But you can find here as well - https://datahack.analyticsvidhya.com/contest/black-friday/#ProblemStatement?utm_source=blog&utm_medium=working-with-pyspark-on-google-colab-for-data-scientists Thanks!
Thanks .. Everything works EXCEPT the public URL via the ngrok! when I run !curl -s http://localhost:4040/api/tunnels | python3 -c "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])" i get --- Traceback (most recent call last): File "", line 1, in File "/usr/lib/python3.6/json/__init__.py", line 299, in load parse_constant=parse_constant, object_pairs_hook=object_pairs_hook, **kw) File "/usr/lib/python3.6/json/__init__.py", line 354, in loads return _default_decoder.decode(s) File "/usr/lib/python3.6/json/decoder.py", line 339, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) --------------------------------- stackoverflow gave too many confusing possibilities
Hi, I have updated the code. Could you please try again and see if it works. Thanks!
thanks. Can we use pyspark in colab to setup a cluster of multiple colab sessions for shared computing to training big models?