Categories
pyspark

rank and dense rank in pyspark dataframe

In this post, Let us know rank and dense rank in pyspark dataframe using window function with examples.

Rank and dense rank

The rank and dense rank in pyspark dataframe help us to rank the records based on a particular column.

This works in a similar manner as the row number function .To understand the row number function in better, please refer below link.

The row number function will work well on the columns having non-unique values . Whereas rank and dense rank help us to deal with the unique values.

Sample program – creating dataframe

We could create the dataframe containing the salary details of some employees from different departments using the below program.

from pyspark.sql import Row
# Creating dictionary with employee and their salary details 
dict1=[{"Emp_id" : 123 , "Dep_name" : "Computer"  , "Salary" : 2500 } , {"Emp_id" : 456 ,"Dep_name"  :"Economy" , "Salary" : 4500} , {"Emp_id" : 789 , "Dep_name" : "Economy" , "Salary" : 7200 } , {"Emp_id" : 564 , "Dep_name" : "Computer" , "Salary" : 1400 } , {"Emp_id" : 987 , "Dep_name" : "History" , "Salary" : 3450 }, {"Emp_id" :678 , "Dep_name" :"Economy" ,"Salary": 4500},{"Emp_id" : 943 , "Dep_name" : "Computer" , "Salary" : 3200 }]
# Creating RDD from the dictionary created above
rdd1=sc.parallelize(dict1)
# Converting RDD to dataframe
df1=rdd1.toDF()
print("Printing the dataframe df1")
df1.show()
Printing the dataframe df1
+--------+------+------+
|Dep_name|Emp_id|Salary|
+--------+------+------+
|Computer|   123|  2500|
| Economy|   456|  4500|
| Economy|   789|  7200|
|Computer|   564|  1400|
| History|   987|  3450|
| Economy|   678|  4500|
|Computer|   943|  3200|
+--------+------+------+
Sample program – rank()

In order to use the rank and dense rank in our program, we require below libraries.

from pyspark.sql import Window
from pyspark.sql.functions import rank,dense_rank

from pyspark.sql import Window
from pyspark.sql.functions import rank
df2=df1.withColumn("rank",rank().over(Window.partitionBy("Dep_name").orderBy("Salary")))
print("Printing the dataframe df2")
df2.show()

In the below output, the department economy contains two employees with the first rank. This is because of the same salary being provided for both employees.

But instead of assigning the next salary with the second rank, it is assigned with the third rank. This is how the rank function will work by skipping the ranking order.

Printing the dataframe df2
+--------+------+------+----+
|Dep_name|Emp_id|Salary|rank|
+--------+------+------+----+
|Computer|   564|  1400|   1|
|Computer|   123|  2500|   2|
|Computer|   943|  3200|   3|
| History|   987|  3450|   1|
| Economy|   456|  4500|   1|
| Economy|   678|  4500|   1|
| Economy|   789|  7200|   3|
+--------+------+------+----+
Sample program – dense rank()

In the dense rank, we can skip the ranking order . For the same scenario discussed earlier, the second rank is assigned in this case instead of skipping the sequence order. 

from pyspark.sql import Window
from pyspark.sql.functions import dense_rank
df3=df1.withColumn("denserank",dense_rank().over(Window.partitionBy("Dep_name").orderBy("Salary")))
print("Printing the dataframe df3")
df3.show()
Printing the dataframe df3
+--------+------+------+---------+
|Dep_name|Emp_id|Salary|denserank|
+--------+------+------+---------+
|Computer|   564|  1400|        1|
|Computer|   123|  2500|        2|
|Computer|   943|  3200|        3|
| History|   987|  3450|        1|
| Economy|   456|  4500|        1|
| Economy|   678|  4500|        1|
| Economy|   789|  7200|        2|
+--------+------+------+---------+
Reference

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Column.over

Categories
pyspark

row_number in pyspark dataframe

In this post, we will learn to use row_number in pyspark dataframe with examples.

What is row_number ?

This row_number in pyspark dataframe will assign consecutive numbering over a set of rows.
The window function in pyspark dataframe helps us to achieve it.
To get to know more about window function, Please refer to the below link.

Creating dataframe 

Before moving into the concept, Let us create a dataframe using the below program.

from pyspark.sql import Row
# Creating dictionary with employee and their salary details 
dict1=[{"Emp_id" : 123 , "Dep_name" : "Computer"  , "Salary" : 2500 } , {"Emp_id" : 456 ,"Dep_name"  :"Economy" , "Salary" : 4500} , {"Emp_id" : 789 , "Dep_name" : "Economy" , "Salary" : 7200 } , {"Emp_id" : 564 , "Dep_name" : "Computer" , "Salary" : 1400 } , {"Emp_id" : 987 , "Dep_name" : "History" , "Salary" : 3450 }, {"Emp_id" :678 , "Dep_name" :"Economy" ,"Salary": 6700},{"Emp_id" : 943 , "Dep_name" : "Computer" , "Salary" : 3200 }]
# Creating RDD from the dictionary created above
rdd1=sc.parallelize(dict1)
# Converting RDD to dataframe
df1=rdd1.toDF()
print("Printing the dataframe df1")
df1.show()

Thus we created the below dataframe with the salary details of some employees from various departments.

Printing the dataframe df1
+--------+------+------+
|Dep_name|Emp_id|Salary|
+--------+------+------+
|Computer|   123|  2500|
| Economy|   456|  4500|
| Economy|   789|  7200|
|Computer|   564|  1400|
| History|   987|  3450|
| Economy|   678|  6700|
|Computer|   943|  3200|
+--------+------+------+
Sample program – row_number

With the below segment of the code, we can populate the row number based on the Salary for each department separately.

We need to import the following libraries before using the window and row_number in the code.

orderBy clause is used for sorting the values before generating the row number.

from pyspark.sql import Window
from pyspark.sql.functions import row_number
df2=df1.withColumn("row_num",row_number().over(Window.partitionBy("Dep_name").orderBy("Salary")))
print("Printing the dataframe df2")
df2.show()
Printing the dataframe df2
+--------+------+------+-------+
|Dep_name|Emp_id|Salary|row_num|
+--------+------+------+-------+
|Computer|   564|  1400|      1|
|Computer|   123|  2500|      2|
|Computer|   943|  3200|      3|
| History|   987|  3450|      1|
| Economy|   456|  4500|      1|
| Economy|   678|  6700|      2|
| Economy|   789|  7200|      3|
+--------+------+------+-------+
Reference

https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.row_number

Categories
pyspark

window function in pyspark with example

In this post, We will learn about window function in pyspark with example.

What is window function ?

Window function in pyspark acts in a similar way as a group by clause in SQL.

It basically groups a set of rows based on the particular column and performs some aggregating function over the group.

Sample program for creating dataframe

For understanding the concept better, we will create a dataframe containing the salary details of some employees using the below program.

# Creating dictionary with employee and their salary details 
dict1=[{"Emp_id" : 123 , "Dep_name" : "Computer"  , "Salary" : 2500 } , {"Emp_id" : 456 ,"Dep_name"  :"Economy" , "Salary" : 4500} , {"Emp_id" : 789 , "Dep_name" : "History" , "Salary" : 6700 } , {"Emp_id" : 564 , "Dep_name" : "Computer" , "Salary" : 1400 } , {"Emp_id" : 987 , "Dep_name" : "History" , "Salary" : 3450 }, {"Emp_id" :678 , "Dep_name" :"Economy" ,"Salary": 6700}]
# Creating RDD from the dictionary created above
rdd1=sc.parallelize(dict1)
# Converting RDD to dataframe
df1=rdd1.toDF()
print("Printing the dataframe df1")
df1.show()
Printing the dataframe df1
+--------+------+------+
|Dep_name|Emp_id|Salary|
+--------+------+------+
|Computer|   123|  2500|
| Economy|   456|  4500|
| History|   789|  6700|
|Computer|   564|  1400|
| History|   987|  3450|
| Economy|   678|  6700|
+--------+------+------+
How to use window function in our program?

In the below segment of code, the window function used to get the sum of the salaries over each department.

The following library is required before executing the code.

from pyspark.sql import Window

partitionBy includes the column name based on which the grouping needs to be done.

df = df1.withColumn("Sum",sum('Salary').over(Window.partitionBy('Dep_name')))
print("Printing the result")
df.show()
Printing the result
+--------+------+------+-----+
|Dep_name|Emp_id|Salary|  Sum|
+--------+------+------+-----+
|Computer|   123|  2500| 3900|
|Computer|   564|  1400| 3900|
| History|   789|  6700|10150|
| History|   987|  3450|10150|
| Economy|   456|  4500|11200|
| Economy|   678|  6700|11200|
+--------+------+------+-----+
Other aggregate functions

As above, we can do for all the other aggregate functions also. Some of those aggregate functions are max(),Avg(),min(),collect_list().

Below are the few examples of those aggregate functions.

window function with some other aggregate functions

Reference

https://medium.com/@rbahaguejr/window-function-on-pyspark-17cc774b833a

Window function in pyspark with example using advanced aggregate functions like row_number(), rank(),dense_rank() can be discussed in our other blogs .