This article was published as a part of the Data Science Blogathon
When dealing with Python data frames, it is easy to edit the 10th row, 5th column values. Also editing a column, based on the value of another column (s) is easy. In other words, the dataframe is mutable and provides great flexibility to work with.
While Pyspark derives its basic data types from Python, its own data structures are limited to RDD, Dataframes, Graphframes. These data frames are immutable and offer reduced flexibility during row/column level handling, as compared to Python.
During PySpark coding, it is crucial that we stick to Spark and not stray away into Python. This is pivotal for the reason of not losing out on the parallel computational efficiency that exclusively belongs to Spark. While handling massive amounts of data, as the processor reaches the code lines that are solely written in Python, the parallel computation advantage is lost. The entire data is cramped into 1 node and the job struggles to process them or worse fails.
When we write an end-to-end Spark code, there will be requirements to customize values at individual column level/row value level. And it is imperative to do this while retaining the parallel processing power of Spark.
This blog shows some ways for ‘Row/Column level flexible handling in immutable data frames while sticking to Spark.’
The first go-to solution for column-level manipulations is UDF. Write the column level processing code as a Python function and call it Spark UDF.
There is structured data in one column that has to be converted into dictionary format.
Input:
id | struct |
1 | “{‘2021-06-01’:300, ‘2021-06-02’:400, ‘2021-06-03’:300,’2021-06-04’:500}” |
Output(generated by UDF function):
id | dict_val |
1 | [{‘date:‘2021-06-01’, ‘count’:300},
{‘date:‘2021-06-02’, ‘count’:400}, {‘date:‘2021-06-03’, ‘count’:200}, {‘date:‘2021-06-04’, ‘count’:500}] |
from ast import literal_eval
from pyspark.sql import SparkSession, DataFrame from pyspark.sql.types import StructType, StringType, StructField import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate()
def structure_change_fn(struct_col): ''' Desc -Function to change the format of the structure column to dictionary type Input- Struct column to be formatted Output- formatted intermediary output ''' dict_val = literal_eval(struct_col) result = [‘{“date”:”’+str(val1)+’”,”val”:”’str(val2)+’”}’ for val1,val2 in dict_val.items()] return result udf_restructure= F.udf(structure_change_fn, StringType())
input_df = [("1","{'2021-06-01':300, '2021-06-02':400, '2021-06-03':300,'2021-06-04':500}")] schema = StructType([ StructField("id",StringType(),True), StructField("struct",StringType(),True)]) input_df = spark.createDataFrame(data=input_df, schema=schema) result_df = input_df.withColumn('dict_val', udf_restructure('struct')).select('id','dict_val')
Let us now consider the case of deriving a column’s value based on multiple columns.
Scenario: The topper student name in each class is given subject-wise, along with the overall topper class name. Both are combined to generate the name of the student topper in all 3 classes, dynamically for each record. The output expected is a description column derived by substituting top class and top student names in a description text. Hence the description derived has customized text for each record.
In this particular scenario, the conditions are assumed to be mutually exclusive.
‘Description_temp’ and ‘Desc_col_edit’ are intermediary outputs
‘Description’ column is the final output
Subject | Class A topper |
Class B topper |
Class C topper |
Overall topper class |
Description_temp |
Desc_col_edit (inside udf) |
Description |
English | Aaron | Brutus | Chloe | C | of Class {3} got the first mark | {2} of Class {3} got the first mark | Chloe of Class C got the first mark |
Maths | Arjun | Billy | Clare | A | of Class {3} got the first mark | {0} of Class {3} got the first mark | Arjun of Class A got the first mark |
Science | Arjun | Balu | Connor | B | of Class {3} got the first mark | {1} of Class {3} got the first mark | Balu of Class B got the first mark |
def desc_fn(desc_col, format_col0, format_col1, format_col2, format_class_col3):
''' Returns python formatted description column value'''
get_class_value = '{0}' if format_class_col3=='A' else '{1}' if format_class_col3=='B' else '{2}' desc_col_edit = get_class_value + desc_col #Python string format function returned return(desc_col_edit.format(format_col0, format_col1, format_col2, format_class_col_3))
udf_desc_fn = F.udf(desc_fn, StringType())
#Hard coding description text in temp column inter_df = input_df.withColumn('description_temp', ' of Class {3} got the first mark')
#Call to UDF function final_df = inter_df.withColumn(‘Description’, udf_desc_fn('description_temp', ‘class_A_topper’, ’class_B_topper’, ’class_C_topper’, ‘overall_topper_class’))
UDF is utilized here again. But instead of 1 column value, this UDF uses multiple column values and generates a dynamic description value for every column. This is achieved by writing a python string format function within the UDF.
Let us now consider the case, where there are multiple conditions to be validated and a lot of possibilities for the resultant description column.
For every user, their status w.r.t three conditions(A, B, and C) are given. ‘1’ represents condition match, while ‘-1’ represents condition mismatch. A suitable description has to be generated. The description has a big list of possible values, based on the conditions.
‘Description’ column is the output to be generated based on the user’s values in Condition columns.
User ID | Condition A | Condition B | Condition C | Description |
1 | -1 | 1 | -1 | None of the conditions are met |
2 | -1 | -1 | 1 | Only Condition C is met |
3 | -1 | -1 | -1 | None of the conditions are met |
4 | -1 | 1 | 1 | Both conditions B and C are met |
5 | 1 | -1 | -1 | Only Condition A is met |
In case we follow the previous UDF solution, all the conditions have to be hardcoded within the UDF function with multiple if-else conditions.
Also, these conditions are bound to change. For example, let us consider there is a new condition record with Conditions A, B, and C = 1. Changes will be tiresome and make the code lengthy.
Instead, the conditions can be split into a separate configuration file. This way,
i) the file can be easily configured and be separated from code, which is a good practice.
ii) the changes to the conditions does not affect the code
iii) the code is not lengthy anymore with nested if-else conditions that are hardly decipherable and maintainable
The unique scenarios of conditions and their descriptions are written into config file-
Condition A | Condition B | Condition C | Description |
-1 | -1 | -1 | None of the conditions are met |
-1 | -1 | 1 | Only Condition C is met |
-1 | 1 | -1 | Only Condition B is met |
-1 | 1 | 1 | Both conditions B and C are met |
1 | -1 | -1 | Only Condition A is met |
#the input and the config files are read as dataframes
input_df.createOrReplaceTempView(‘input_tbl’) config_df.createOrReplaceTempView(‘config_tbl’) join_qry = “SELECT input_tbl.user_id, input_tbl.condition_A, input_tbl.condition_B, input_tbl.condition_C, config_tbl.description FROM input_tbl, config_tbl WHERE input_tbl.condition_A*config_tbl.condition_A>0 AND input_tbl.condition_B*config_tbl.condition_B>0 AND input_tbl.condition_C*config_tbl.condition_C>0 “ join_df = spark.sql(join_qry)
The input data and the config file data conditions are matched.
The main idea here is, if condition_A in both files match, then the product of the two values will be 1 (ie.,) >0.
Condition A (from input) | Condition A (from config) | Product |
1 | 1 | 1 (condition match) |
-1 | -1 | 1 (condition match) |
1 | -1 | -1 (condition mismatch) |
-1 | 1 | -1 (condition mismatch) |
Similarly, the product of all 3 conditions is considered to identify the matching record in the config file.
For every record in the input dataframe, the matching record in the config file is identified this way and the description column is extracted correspondingly from the config file.
These are sample cases and scenarios that explain some ways of handling spark data frames to edit column-level information dynamically. The solutions provided pertain to the scenarios depicted.
The media shown in this article are not owned by Analytics Vidhya and are used at the Author’s discretion.