Learning about Window Functions in PySpark can be challenging but worth the effort. Window Functions are a powerful tool for analyzing data and can help you gain insights you may not have seen otherwise. By understanding how to use Window Functions in Spark; you can take your data analysis skills to the next level and make more informed decisions. Whether you’re working with large or small datasets, learning Window Functions in Spark will allow you to manipulate and analyze data in new and exciting ways.
In this blog, we will first understand the concept of window functions and then discuss how to use them with Spark SQL and PySpark DataFrame API. So that by the end of this article, you will understand how to use window functions with real datasets and get essential insights for business.
This article was published as a part of the Data Science Blogathon.
Window functions help analyze data within a group of rows that are related to each other. They enable users to perform complex transformations on the rows of a dataframe or dataset associated with each other based on some partitioning and ordering criteria.
Window functions operate on a specific partition of a dataframe or dataset defined by a set of partitioning columns. The ORDER BY clause partitions the data in a window function to arrange it in a specific order. Window functions then perform calculations on a sliding window of rows that includes the current row and a subset of the preceding either ‘and’/’or’ following rows, as specified in the window frame.
Some common examples of window functions include calculating moving averages, ranking or sorting rows based on a specific column or group of columns, calculating running totals, and finding the first or last value in a group of rows. With Spark’s powerful window functions, users can perform complex analyses and aggregations over large datasets with relative ease, making it a popular tool for big data processing and analytics.
Spark SQL supports three kinds of window functions:
We will create a sample dataframe so, that we can practically work with different window functions. Also we will try to answer some questions with the help of this data and window functions.
The dataframe have employees details like their Name, Designation, Employee Number, Hire Date, Salary etc. Total we have 8 columns which are as follow:
# Create Sample Dataframe
employees = [
(7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
(7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
(7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
(7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
(7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
(7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
(7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
(7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
(7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
(7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
(7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
(7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)
]
# create dataframe
emp_df = spark.createDataFrame(employees,
["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()
# Output:
+-----+------+---------+---------+----+----+------+
|empno| ename| job| hiredate| sal|comm|deptno|
+-----+------+---------+---------+----+----+------+
| 7369| SMITH| CLERK|17-Dec-80| 800| 20| 10|
| 7499| ALLEN| SALESMAN|20-Feb-81|1600| 300| 30|
| 7521| WARD| SALESMAN|22-Feb-81|1250| 500| 30|
| 7566| JONES| MANAGER| 2-Apr-81|2975| 0| 20|
| 7654|MARTIN| SALESMAN|28-Sep-81|1250|1400| 30|
| 7698| BLAKE| MANAGER| 1-May-81|2850| 0| 30|
| 7782| CLARK| MANAGER| 9-Jun-81|2450| 0| 10|
| 7788| SCOTT| ANALYST|19-Apr-87|3000| 0| 20|
| 7629| ALEX| SALESMAN|28-Sep-79|1150|1400| 30|
| 7839| KING|PRESIDENT|17-Nov-81|5000| 0| 10|
| 7844|TURNER| SALESMAN| 8-Sep-81|1500| 0| 30|
| 7876| ADAMS| CLERK|23-May-87|1100| 0| 20|
+-----+------+---------+---------+----+----+------+
# Checking the schema
emp_df.printSchema()
# Output:-
root
|-- empno: long (nullable = true)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: long (nullable = true)
|-- comm: long (nullable = true)
|-- deptno: long (nullable = true)
Create a temporary view of the DataFrame ’emp_df’ with the name “emp”. It allows us to query the DataFrame using SQL syntax in Spark SQL as if it were a table. The temporary view is only valid for the duration of the Spark Session.
emp_df.createOrReplaceTempView("emp")
Here we will be solving several problem statements using windows functions:
# Using spark sql
rank_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()
# Output:-
+-----+------+---------+------+----+----+
|empno| ename| job|deptno| sal|rank|
+-----+------+---------+------+----+----+
| 7839| KING|PRESIDENT| 10|5000| 1|
| 7782| CLARK| MANAGER| 10|2450| 2|
| 7369| SMITH| CLERK| 10| 800| 3|
| 7788| SCOTT| ANALYST| 20|3000| 1|
| 7566| JONES| MANAGER| 20|2975| 2|
| 7876| ADAMS| CLERK| 20|1100| 3|
| 7698| BLAKE| MANAGER| 30|2850| 1|
| 7499| ALLEN| SALESMAN| 30|1600| 2|
| 7844|TURNER| SALESMAN| 30|1500| 3|
| 7521| WARD| SALESMAN| 30|1250| 4|
| 7654|MARTIN| SALESMAN| 30|1250| 4|
| 7629| ALEX| SALESMAN| 30|1150| 6|
+-----+------+---------+------+----+----+
Approach for PySpark Code
Output:
The outcome has salary rank in each department.
# Using Spark SQL
dense_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC)
AS dense_rank FROM emp""")
dense_df.show()
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()
# Output:-
+-----+------+---------+------+----+----------+
|empno| ename| job|deptno| sal|dense_rank|
+-----+------+---------+------+----+----------+
| 7839| KING|PRESIDENT| 10|5000| 1|
| 7782| CLARK| MANAGER| 10|2450| 2|
| 7369| SMITH| CLERK| 10| 800| 3|
| 7788| SCOTT| ANALYST| 20|3000| 1|
| 7566| JONES| MANAGER| 20|2975| 2|
| 7876| ADAMS| CLERK| 20|1100| 3|
| 7698| BLAKE| MANAGER| 30|2850| 1|
| 7499| ALLEN| SALESMAN| 30|1600| 2|
| 7844|TURNER| SALESMAN| 30|1500| 3|
| 7521| WARD| SALESMAN| 30|1250| 4|
| 7654|MARTIN| SALESMAN| 30|1250| 4|
| 7629| ALEX| SALESMAN| 30|1150| 5|
+-----+------+---------+------+----+----------+
Approach for PySpark Code
Output:
The outcome has a salary-wise dense rank.
# Using Spark SQL
row_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
AS row_num FROM emp """)
row_df.show()
# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()
# Output:-
+-----+------+---------+------+----+-------+
|empno| ename| job|deptno| sal|row_num|
+-----+------+---------+------+----+-------+
| 7839| KING|PRESIDENT| 10|5000| 1|
| 7782| CLARK| MANAGER| 10|2450| 2|
| 7369| SMITH| CLERK| 10| 800| 3|
| 7788| SCOTT| ANALYST| 20|3000| 1|
| 7566| JONES| MANAGER| 20|2975| 2|
| 7876| ADAMS| CLERK| 20|1100| 3|
| 7698| BLAKE| MANAGER| 30|2850| 1|
| 7499| ALLEN| SALESMAN| 30|1600| 2|
| 7844|TURNER| SALESMAN| 30|1500| 3|
| 7521| WARD| SALESMAN| 30|1250| 4|
| 7654|MARTIN| SALESMAN| 30|1250| 5|
| 7629| ALEX| SALESMAN| 30|1150| 6|
+-----+------+---------+------+----+-------+
Approach for PySpark code
Output:
The output will have the row number of each employee within their department based on their salary.
# Using Spark SQL
running_sum_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal,
SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC)
AS running_total FROM emp
""")
running_sum_df.show()
# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()
# Output:-
+-----+------+---------+------+----+-------------+
|empno| ename| job|deptno| sal|running_total|
+-----+------+---------+------+----+-------------+
| 7839| KING|PRESIDENT| 10|5000| 5000|
| 7782| CLARK| MANAGER| 10|2450| 7450|
| 7369| SMITH| CLERK| 10| 800| 8250|
| 7788| SCOTT| ANALYST| 20|3000| 3000|
| 7566| JONES| MANAGER| 20|2975| 5975|
| 7876| ADAMS| CLERK| 20|1100| 7075|
| 7698| BLAKE| MANAGER| 30|2850| 2850|
| 7499| ALLEN| SALESMAN| 30|1600| 4450|
| 7844|TURNER| SALESMAN| 30|1500| 5950|
| 7521| WARD| SALESMAN| 30|1250| 8450|
| 7654|MARTIN| SALESMAN| 30|1250| 8450|
| 7629| ALEX| SALESMAN| 30|1150| 9600|
+-----+------+---------+------+----+-------------+
Approach for PySpark code
Output:
The output will have a running total of each department’s salary data.
To find the next salary within each department we use LEAD function.
The lead() window function helps to get the value of the expression in the next row of the window partition. It returns a column for each input column, where each column will contain the value of the input column for the offset row above the current row within the window partition. The syntax for the lead function is:- lead(col, offset=1, default=None).
# Using Spark SQL
next_sal_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal, LEAD(sal, 1)
OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
""")
next_sal_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| 2450|
| 7782| CLARK| MANAGER| 10|2450| 800|
| 7369| SMITH| CLERK| 10| 800| null|
| 7788| SCOTT| ANALYST| 20|3000| 2975|
| 7566| JONES| MANAGER| 20|2975| 1100|
| 7876| ADAMS| CLERK| 20|1100| null|
| 7698| BLAKE| MANAGER| 30|2850| 1600|
| 7499| ALLEN| SALESMAN| 30|1600| 1500|
| 7844|TURNER| SALESMAN| 30|1500| 1250|
| 7521| WARD| SALESMAN| 30|1250| 1250|
| 7654|MARTIN| SALESMAN| 30|1250| 1150|
| 7629| ALEX| SALESMAN| 30|1150| null|
+-----+------+---------+------+----+--------+
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|next_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| 2450|
| 7782| CLARK| MANAGER| 10|2450| 800|
| 7369| SMITH| CLERK| 10| 800| 0|
| 7788| SCOTT| ANALYST| 20|3000| 2975|
| 7566| JONES| MANAGER| 20|2975| 1100|
| 7876| ADAMS| CLERK| 20|1100| 0|
| 7698| BLAKE| MANAGER| 30|2850| 1600|
| 7499| ALLEN| SALESMAN| 30|1600| 1500|
| 7844|TURNER| SALESMAN| 30|1500| 1250|
| 7521| WARD| SALESMAN| 30|1250| 1250|
| 7654|MARTIN| SALESMAN| 30|1250| 1150|
| 7629| ALEX| SALESMAN| 30|1150| 0|
+-----+------+---------+------+----+--------+
Approach for PySpark code
Output:
The output contains the salary of the next employee in the department based on the order of descending salary.
To calculate the previous salary, we use the LAG function.
The lag function returns the value of an expression at a given offset before the current row within the window partition. The syntax of the lag function is:- lag(expr, offset=1, default=None).over(windowSpec).
# Using Spark SQL
preious_sal_df = spark.sql(
"""SELECT empno, ename, job, deptno, sal, LAG(sal, 1)
OVER (PARTITION BY deptno ORDER BY sal DESC)
AS prev_val FROM emp
""")
preious_sal_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| null|
| 7782| CLARK| MANAGER| 10|2450| 5000|
| 7369| SMITH| CLERK| 10| 800| 2450|
| 7788| SCOTT| ANALYST| 20|3000| null|
| 7566| JONES| MANAGER| 20|2975| 3000|
| 7876| ADAMS| CLERK| 20|1100| 2975|
| 7698| BLAKE| MANAGER| 30|2850| null|
| 7499| ALLEN| SALESMAN| 30|1600| 2850|
| 7844|TURNER| SALESMAN| 30|1500| 1600|
| 7521| WARD| SALESMAN| 30|1250| 1500|
| 7654|MARTIN| SALESMAN| 30|1250| 1250|
| 7629| ALEX| SALESMAN| 30|1150| 1250|
+-----+------+---------+------+----+--------+
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
prev_sal_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.lag('sal', offset=1, default=0).over(windowSpec).alias('prev_val'))
prev_sal_df.show()
# Output:-
+-----+------+---------+------+----+--------+
|empno| ename| job|deptno| sal|prev_val|
+-----+------+---------+------+----+--------+
| 7839| KING|PRESIDENT| 10|5000| 0|
| 7782| CLARK| MANAGER| 10|2450| 5000|
| 7369| SMITH| CLERK| 10| 800| 2450|
| 7788| SCOTT| ANALYST| 20|3000| 0|
| 7566| JONES| MANAGER| 20|2975| 3000|
| 7876| ADAMS| CLERK| 20|1100| 2975|
| 7698| BLAKE| MANAGER| 30|2850| 0|
| 7499| ALLEN| SALESMAN| 30|1600| 2850|
| 7844|TURNER| SALESMAN| 30|1500| 1600|
| 7521| WARD| SALESMAN| 30|1250| 1500|
| 7654|MARTIN| SALESMAN| 30|1250| 1250|
| 7629| ALEX| SALESMAN| 30|1150| 1250|
+-----+------+---------+------+----+--------+
Approach for PySpark code
Output:
The output represents the previous salary for each employee within each department, based on ordering the salaries in descending order.
# Using Spark SQL
first_val_df = spark.sql("""SELECT empno, ename, job, deptno, sal,
FIRST_VALUE(sal) OVER (PARTITION BY deptno ORDER BY sal DESC)
AS first_val FROM emp """)
first_val_df.show()
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
first_value_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal',
F.first('sal').over(windowSpec).alias('first_val'))
first_value_df.show()
# Output:-
+-----+------+---------+------+----+---------+
|empno| ename| job|deptno| sal|first_val|
+-----+------+---------+------+----+---------+
| 7839| KING|PRESIDENT| 10|5000| 5000|
| 7782| CLARK| MANAGER| 10|2450| 5000|
| 7369| SMITH| CLERK| 10| 800| 5000|
| 7788| SCOTT| ANALYST| 20|3000| 3000|
| 7566| JONES| MANAGER| 20|2975| 3000|
| 7876| ADAMS| CLERK| 20|1100| 3000|
| 7698| BLAKE| MANAGER| 30|2850| 2850|
| 7499| ALLEN| SALESMAN| 30|1600| 2850|
| 7844|TURNER| SALESMAN| 30|1500| 2850|
| 7521| WARD| SALESMAN| 30|1250| 2850|
| 7654|MARTIN| SALESMAN| 30|1250| 2850|
| 7629| ALEX| SALESMAN| 30|1150| 2850|
+-----+------+---------+------+----+---------+
Approach for PySpark code
Output:
The output shows the first highest salary for each department in an employee DataFrame.
In this article, we learn about window functions. Spark SQL has three kinds of window functions: Ranking functions, Aggregate functions and Value functions. Using this function, we worked on a dataset to find some important and valuable insights. Spark Window Functions offer powerful data analysis tools like ranking, analytics, and value computations. Whether analyzing salary insights by department or employing practical examples with PySpark & SQL, these functions provide essential tools for effective data processing and analysis in Spark.
This case study helps you better understand the PySpark functions. If you have any opinions or questions, then comment down below. Connect with me on LinkedIn for further discussion. Keep Learning!!!
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.