This article was published as a part of the Data Science Blogathon.
In this tutorial, we will learn about the building blocks of PySpark called Resilient Distributed Dataset, which is popularly known as PySpark RDD. Before we do so, let’s understand its basic concept.
class pyspark.RDD ( Judd, ctx jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()) )
words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs Hadoop", "pyspark", "pyspark and spark"] )
---------------------------------------count.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) counts = words.count() print "elements are -> %i" % (counts) ---------------------------------------count.py------- --------------------------------
$SPARK_HOME/bin/spark-submit count.py
Number of elements in RDD → 8
---------------------------------------collect.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Collect app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) coll = words.collect() print "RDD elements -> %s" % (collection) ---------------------------------------collect.py------- --------------------------------
$SPARK_HOME/bin/spark-submit collect.py
Elements in RDD -> [ 'scale', 'Java', 'hadoop', 'spark', 'Akka', 'spark vs Hadoop, 'pyspark', 'pyspark and spark' ]
----------------------------------------foreach.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "ForEach app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) def f(x): print(x) fore = words.foreach(f) ----------------------------------------foreach.py------- --------------------------------
$SPARK_HOME/bin/spark-submit foreach.py
scala Java hadoop spark Akka spark vs hadoop pyspark pyspark and spark
----------------------------------------filter.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) word_filter = words.filter(lambda x: 'spark' in x) filtered = word_filter.collect() print "Filtered RDD -> %s" % (filtered) ----------------------------------------filter.py------- ---------------------------------
$SPARK_HOME/bin/spark-submit filter.py
Filtered RDD -> [ 'spark', 'spark vs Hadoop, 'pyspark', 'pyspark and spark' ]
----------------------------------------map.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Map app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) word_map = words.map(lambda x: (x, 1)) mapping = word_map.collect() print "Key-value pair -> %s" % (mapping) ----------------------------------------map.py------- --------------------------------
$SPARK_HOME/bin/spark-submit map.py
Key-value pair -> [ ('scale', 1), ('java', 1), ('Hadoop, 1), ('spark', 1), ('akka', 1), ('spark vs hHadoop 1), ('pyspark', 1), ('pyspark and spark', 1) ]
-----------------------------------------reduce.py------- -------------------------------- from pyspark import SparkContext from the import add operator sc = SparkContext("local", "Reduce app") nums = sc.parallelize([1, 2, 3, 4, 5]) add = nums.reduce(add) print "Adding elements -> %i" % (add) -----------------------------------------reduce.py------- --------------------------------
$SPARK_HOME/bin/spark-submit reduction.py
Adding all elements -> 15
----------------------------------------join.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Join app") x = sc.parallelize([("spark", 1), ("hadoop", 4)]) y = sc.parallelize([("spark", 2), ("Hadoop", 5)]) joined = x.join(y) final = join.collect() print "connect RDD -> %s" % (final) ----------------------------------------join.py------- --------------------------------
$SPARK_HOME/bin/spark-submit join.py
Connect to RDD -> [ ('spark', (1, 2)), ('Hadoop, (4, 5)) ]
---------------------------------------cache.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Cache app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words.cache() caching = words.persist().is_cached print "cached words are > %s" % (caching) ---------------------------------------cache.py------- --------------------------------
$SPARK_HOME/bin/spark-submit cache.py
Words have been cached -> True
RDD stands for Resilient Distributed Dataset, which are elements that run and work on multiple nodes to perform parallel processing in a cluster. RDDs are immutable, meaning that you cannot change them once you create an RDD.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.