This article was published as a part of the Data Science Blogathon.
Innovations and new technologies are transforming the world in many facets. The Internet, the web, and smartphones have become a necessity of today’s life. The manual to digital transition in work has already occurred in developed nations. By using modern technologies, developing countries also strive for comprehensive national growth.
Today we have data from sectors like automobile, finance, technologies, aviation, the food industry, media, etc. These data are in different formats and massive in quantity. We need modern tools and technologies to dig out hidden insights from these data.
In this article, we will work on a case study using PySpark and try to find some hidden information from the data to answer some questions. To work on this case study, I am using the Databricks Community Edition.
There is a small restaurant that sells some fantastic Indian food items. They need our assistance to help the restaurant stay afloat. The restaurant has captured some data from its few months of operation but has yet to learn how to use its data to help them run the business.
Now we want to use the data to answer a few questions about his customers, especially their visiting patterns, the money they’ve spent, and which menu items are the customer’s favorite.
The restaurant has sales data which contains customers’ information. We have menu data with food items-related information and members data with subscriptions related information if any customer has purchased it.
We are going to create three data frames. Which contains the following information:-
1. Sales dataframe:- This dataframe contains information related to sales. It has three columns customer_id, order_date, and product_id.
# create sales data sales_data = [ ('A', '2021-01-01', '1'), ('A', '2021-01-01', '2'), ('A', '2021-01-07', '2'), ('A', '2021-01-10', '3'), ('A', '2021-01-11', '3'), ('A', '2021-01-11', '3'), ('B', '2021-01-01', '2'), ('B', '2021-01-02', '2'), ('B', '2021-01-04', '1'), ('B', '2021-01-11', '1'), ('B', '2021-01-16', '3'), ('B', '2021-02-01', '3'), ('C', '2021-01-01', '3'), ('C', '2021-01-01', '1'), ('C', '2021-01-07', '3')] # cols sales_cols= ["customer_id", "order_date", "product_id"] sales_df = spark.createDataFrame(data = sales_data, schema = sales_cols) # view the data sales_df.show()
The Sales dataframe will look like this:-
+-----------+----------+----------+ |customer_id|order_date|product_id| +-----------+----------+----------+ | A|2021-01-01| 1| | A|2021-01-01| 2| | A|2021-01-07| 2| | A|2021-01-10| 3| | A|2021-01-11| 3| | A|2021-01-11| 3| | B|2021-01-01| 2| | B|2021-01-02| 2| | B|2021-01-04| 1| | B|2021-01-11| 1| | B|2021-01-16| 3| | B|2021-02-01| 3| | C|2021-01-01| 3| | C|2021-01-01| 1| | C|2021-01-07| 3| +-----------+----------+----------+
Now we will create the menu dataframe, which has food items related information.
2. Menu dataframe:- This dataframe contains food items-related information. It has three columns’ product_id’, ‘product_name’, and ‘price.’
# menu data menu_data = [ ('1', 'palak_paneer', 100), ('2', 'chicken_tikka', 150), ('3', 'jeera_rice', 120), ('4', 'kheer', 110), ('5', 'vada_pav', 80), ('6', 'paneer_tikka', 180)] # cols menu_cols = ['product_id', 'product_name', 'price'] # create the menu dataframe menu_df = spark.createDataFrame(data = menu_data, schema = menu_cols) # view the data menu_df.show()
The Menu dataframe will look like this:-
+----------+-------------+-----+ |product_id| product_name|price| +----------+-------------+-----+ | 1| palak_paneer| 100| | 2|chicken_tikka| 150| | 3| jeera_rice| 120| | 4| kheer| 110| | 5| vada_pav| 80| | 6| paneer_tikka| 180| +----------+-------------+-----+
3. Members dataframe:- This dataframe has details of those customers who hold any membership from this restaurant. The columns in this dataframe are- customer_id and join_date. The join_date column has the membership purchase date.
# create member data members_data = [ ('A', '2021-01-07'), ('B', '2021-01-09')] # cols members_cols = ["customer_id", "join_date"] # create the member's dataframe members_df = spark.createDataFrame(data = members_data, schema = members_cols) # view the data members_df.show()
The member’s dataframe will look like this:-
+-----------+----------+ |customer_id| join_date| +-----------+----------+ | A|2021-01-07| | B|2021-01-09| +-----------+----------+
# solution:- total_spent_df = (sales_df.join(menu_df, 'product_id') .groupBy('customer_id').agg({'price':'sum'}) .withColumnRenamed('sum(price)', 'total_spent_amounts') .orderBy('customer_id')) # output:- +-----------+-------------------+ |customer_id|total_spent_amounts| +-----------+-------------------+ | A| 760| | B| 740| | C| 340| +-----------+-------------------+Â
Approach:-
Output:-
# solution:- res_visit_df = sales_df.groupBy('customer_id').agg(countDistinct('order_date')) .withColumnRenamed('count(order_date)', 'visit_days') # output:- +-----------+----------+ |customer_id|visit_days| +-----------+----------+ | A| 4| | B| 6| | C| 2| +-----------+----------+
Approach:-
Output:-
# create windowspec windowSpec = Window.partitionBy("customer_id").orderBy("order_date") items_purchased_df = sales_df.withColumn("dense_rank", dense_rank().over(windowSpec)) .filter("dense_rank == 1") .join(menu_df, 'product_id') .select('customer_id', 'product_name') .orderBy('customer_id') # output:- +-----------+-------------+ |customer_id| product_name| +-----------+-------------+ | A|chicken_tikka| | A| palak_paneer| | B|chicken_tikka| | C| palak_paneer| | C| jeera_rice| +-----------+-------------+
Approach:-
Output:-
# code most_purchased_df = sales_df.join(menu_df, 'product_id') .groupBy('product_id', 'product_name') .agg(count('product_id').alias('product_count')) .orderBy('product_count', ascending=0) .drop('product_id') .limit(1) # output:- +------------+-------------+ |product_name|product_count| +------------+-------------+ | jeera_rice| 7| +------------+-------------+
Approach:-
Output:-
# code:- popular_item_df = sales_df.join(menu_df, 'product_id') .groupBy('customer_id', 'product_name') .agg(count('product_id').alias('count_orders')) .withColumn('dense_rank', dense_rank().over(Window.partitionBy("customer_id") .orderBy(col("count_orders").desc()))) .filter('dense_rank=1') .drop('dense_rank') # output:- +-----------+-------------+------------+ |customer_id| product_name|count_orders| +-----------+-------------+------------+ | A| jeera_rice| 3| | B|chicken_tikka| 2| | B| palak_paneer| 2| | B| jeera_rice| 2| | C| jeera_rice| 2| +-----------+-------------+------------+
Approach:-
Output:-
# code:- windowSpec = Window.partitionBy("customer_id").orderBy("order_date") items_after_member_df = sales_df.join(members_df, 'customer_id') .filter(sales_df.order_date >= members_df.join_date) .withColumn('dense_rank', dense_rank().over(windowSpec)) .filter('dense_rank = 1') .join(menu_df,'product_id') .select('customer_id', 'order_date', 'product_name') # output:- +-----------+----------+-------------+ |customer_id|order_date| product_name| +-----------+----------+-------------+ | B|2021-01-11| palak_paneer| | A|2021-01-07|chicken_tikka| +-----------+----------+-------------+
Approach:-
Output:-
# code:- windowSpec = Window.partitionBy("customer_id").orderBy(col("order_date").desc()) items_before_member_df = sales_df.join(members_df, 'customer_id') .filter(sales_df.order_date < members_df.join_date) .withColumn('dense_rank', dense_rank().over(windowSpec)) .filter('dense_rank = 1') .join(menu_df,'product_id') .select('customer_id', 'order_date', 'product_name') # output:- +-----------+----------+-------------+ |customer_id|order_date| product_name| +-----------+----------+-------------+ | B|2021-01-04| palak_paneer| | A|2021-01-01| palak_paneer| | A|2021-01-01|chicken_tikka| +-----------+----------+-------------+
Approach:-
Output:-
# code:- total_items_spent_df = sales_df.join(menu_df, 'product_id') .join(members_df, 'customer_id') .filter(sales_df.order_date < members_df.join_date) .groupBy('customer_id') .agg(countDistinct('product_id').alias('item_counts'), sum('price').alias('total_amount')) # output:- +-----------+-----------+------------+ |customer_id|item_counts|total_amount| +-----------+-----------+------------+ | A| 2| 250| | B| 2| 400| +-----------+-----------+------------+
Approach:-
output:-
# code:- earned_points_df = sales_df.join(menu_df, 'product_id') .withColumn('points', when(col('product_id') == 3, col('price')*20) .otherwise(col('price')*10)) .groupBy('customer_id') .agg(sum('points').alias('rewards_points')) # output:- +-----------+--------------+ |customer_id|rewards_points| +-----------+--------------+ | B| 9800| | C| 5800| | A| 11200| +-----------+--------------+
Approach:-
Output:-
# code:- all_data_df = sales_df.join(menu_df, 'product_id', 'left') .join(members_df, 'customer_id', 'left') .withColumn('member', when(col('order_date') < col('join_date'), 'N') .when(col('order_date')>= col('join_date'), 'Y') .otherwise('N')) .drop('product_id', 'product_name', 'join_date') # output:- +-----------+----------+-------------+-----+------+ |customer_id|order_date| product_name|price|member| +-----------+----------+-------------+-----+------+ | A|2021-01-01| palak_paneer| 100| N| | A|2021-01-01|chicken_tikka| 150| N| | A|2021-01-07|chicken_tikka| 150| Y| | A|2021-01-10| jeera_rice| 120| Y| | A|2021-01-11| jeera_rice| 120| Y| | A|2021-01-11| jeera_rice| 120| Y| | B|2021-01-04| palak_paneer| 100| N| | B|2021-01-11| palak_paneer| 100| Y| | B|2021-01-01|chicken_tikka| 150| N| | B|2021-01-02|chicken_tikka| 150| N| | B|2021-01-16| jeera_rice| 120| Y| | B|2021-02-01| jeera_rice| 120| Y| | C|2021-01-01| palak_paneer| 100| N| | C|2021-01-01| jeera_rice| 120| N| | C|2021-01-07| jeera_rice| 120| N| +-----------+----------+-------------+-----+------+
Approach:-
Output:-
# code:- ranking_final_df = sales_df.join(menu_df, 'product_id', 'left') .join(members_df, 'customer_id', 'left') .withColumn('is_member', when(col('order_date') < col('join_date'), 'N') .when(col('order_date')>= col('join_date'), 'Y') .otherwise('N')) .withColumn('rank', when(col('is_member') == 'N', None) .when(col('is_member') == 'Y', rank().over(Window.partitionBy('customer_id','is_member') .orderBy('order_date'))) .otherwise(0))
Approach:-
Output:-
In this article, we worked on a case study using restaurant data. We find some hidden insights from the data that case increases the sales of items for the restaurants using PySpark. Now the owner can understand which items are popular and which need more focus. Based on this insight, the owner can increase his sales in many ways by giving good discounts to the user without worrying about losses.
Some Important Takeaways from this article are as follows:-
1. We worked with real-world data and gathered some insightful information that can benefit businesses in various ways.
2. We worked with different types of PySpark functions, including join, groupBy, withColumn, withColumnRenamed, partitionBy, and Windows functions like rank and dense rank, to provide answers.
3. Furthermore, we talked about various step-by-step solutions to the problem using PySpark functions. We also analyze the output at the end of each problem statement.
I hope this article helps you to understand the PySpark functions in more detail. If you have any opinions or questions, then comment down below. Connect with me on LinkedIn for further discussion. Keep Learning!!!
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.