Categories
pyspark

window function in pyspark with example

In this post, We will learn about window function in pyspark with example.

What is window function ?

Window function in pyspark acts in a similar way as a group by clause in SQL.

It basically groups a set of rows based on the particular column and performs some aggregating function over the group.

Sample program for creating dataframe

For understanding the concept better, we will create a dataframe containing the salary details of some employees using the below program.

# Creating dictionary with employee and their salary details 
dict1=[{"Emp_id" : 123 , "Dep_name" : "Computer"  , "Salary" : 2500 } , {"Emp_id" : 456 ,"Dep_name"  :"Economy" , "Salary" : 4500} , {"Emp_id" : 789 , "Dep_name" : "History" , "Salary" : 6700 } , {"Emp_id" : 564 , "Dep_name" : "Computer" , "Salary" : 1400 } , {"Emp_id" : 987 , "Dep_name" : "History" , "Salary" : 3450 }, {"Emp_id" :678 , "Dep_name" :"Economy" ,"Salary": 6700}]
# Creating RDD from the dictionary created above
rdd1=sc.parallelize(dict1)
# Converting RDD to dataframe
df1=rdd1.toDF()
print("Printing the dataframe df1")
df1.show()
Printing the dataframe df1
+--------+------+------+
|Dep_name|Emp_id|Salary|
+--------+------+------+
|Computer|   123|  2500|
| Economy|   456|  4500|
| History|   789|  6700|
|Computer|   564|  1400|
| History|   987|  3450|
| Economy|   678|  6700|
+--------+------+------+
How to use window function in our program?

In the below segment of code, the window function used to get the sum of the salaries over each department.

The following library is required before executing the code.

from pyspark.sql import Window

partitionBy includes the column name based on which the grouping needs to be done.

df = df1.withColumn("Sum",sum('Salary').over(Window.partitionBy('Dep_name')))
print("Printing the result")
df.show()
Printing the result
+--------+------+------+-----+
|Dep_name|Emp_id|Salary|  Sum|
+--------+------+------+-----+
|Computer|   123|  2500| 3900|
|Computer|   564|  1400| 3900|
| History|   789|  6700|10150|
| History|   987|  3450|10150|
| Economy|   456|  4500|11200|
| Economy|   678|  6700|11200|
+--------+------+------+-----+
Other aggregate functions

As above, we can do for all the other aggregate functions also. Some of those aggregate functions are max(),Avg(),min(),collect_list().

Below are the few examples of those aggregate functions.

window function with some other aggregate functions

Reference

https://medium.com/@rbahaguejr/window-function-on-pyspark-17cc774b833a

Window function in pyspark with example using advanced aggregate functions like row_number(), rank(),dense_rank() can be discussed in our other blogs .

Categories
pyspark

Left-anti and Left-semi join in pyspark

In this post, We will learn about Left-anti and Left-semi join in pyspark dataframe with examples.

Sample program for creating dataframes

Let us start with the creation of two dataframes . After that we will move into the concept of Left-anti and Left-semi join in pyspark dataframe.

# Creating two dictionaries with Employee and Department details
dict=[{"Emp_id" : 123 , "Emp_name" : "Raja" },{"Emp_id" : 234 , "Emp_name" : "Sindu"},{"Emp_id" : 456 , "Emp_name" : "Ravi"}]
dict1=[{"Emp_id" : 123 , "Dep_name" : "Computer" } , {"Emp_id" : 456 ,"Dep_name"  :"Economy"} , {"Emp_id" : 789 , "Dep_name" : "History"}]
# Creating RDDs from the above dictionaries using parallelize method
rdd=sc.parallelize(dict)
rdd1=sc.parallelize(dict1)
# Converting RDDs to dataframes 
df=rdd.toDF()
df1=rdd1.toDF()
print("Printing the first dataframe")
df.show()
print("Printing the second dataframe")
df1.show()
Printing the first dataframe
+------+--------+
|Emp_id|Emp_name|
+------+--------+
|   123|    Raja|
|   234|   Sindu|
|   456|    Ravi|
+------+--------+
Printing the second dataframe
+--------+------+
|Dep_name|Emp_id|
+--------+------+
|Computer|   123|
| Economy|   456|
| History|   789|
+--------+------+
What is Left-anti join ?

In order to return only the records available in the left dataframe . For those does not have the matching records in the right dataframe, We can use this join.

We could even see in the below sample program . Only the columns from the left dataframe will be available in Left-anti and Left-semi . And not all  the columns from both the dataframes as in other types of joins.

Sample program – Left-anti join

Emp_id: 234 is only available in the left dataframe and not in the right dataframe.

# Left-anti join between the two dataframes df and df1 based on the column Emp_id
df2=df.join(df1,['Emp_id'], how = 'left_anti')
print("Printing the result of left-anti below")
df2.show()
Printing the result of left-anti below
+------+--------+
|Emp_id|Emp_name|
+------+--------+
|   234|   Sindu|
+------+--------+
What is Left-semi join?

The common factors between the two dataframes is listed down in this join.

In the below sample program, two Emp_ids -123,456 are available in both the dataframes and so they picked up here.

Sample program – Left-semi join
# Left-semi join between two dataframes df and df1
df3=df.join(df1,['Emp_id'], how = 'left_semi')
print("Printing the result of left-semi below")
df3.show()
Printing the result of left-semi below
+------+--------+
|Emp_id|Emp_name|
+------+--------+
|   123|    Raja|
|   456|    Ravi|
+------+--------+

Other types of join are outer join  and inner join in pyspark 

Reference

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.join

Categories
pyspark

Outer join in pyspark dataframe with example

In this post , we will learn about outer join in pyspark dataframe with example .

If you want to learn Inner join refer below URL

There are other types of joins like inner join , left-anti join and left semi join

What you will learn

At the end of this tutorial, you will learn Outer join in pyspark dataframe with example

Types of outer join

Types of outer join in pyspark dataframe are as follows :

  • Right outer join / Right join 
  • Left outer join / Left join
  • Full outer join /Outer join / Full join 
Sample program for creating two dataframes

We will start with the creation of two dataframes before moving into the topic of outer join in pyspark dataframe .

#Creating dictionaries
dict=[{"Emp_id" : 123 , "Emp_name" : "Raja" },{"Emp_id" : 234 , "Emp_name" : "Sindu"},{"Emp_id" : 456 , "Emp_name" : "Ravi"}]
dict1=[{"Emp_id" : 123 , "Dep_name" : "Computer" } , {"Emp_id" : 456 ,"Dep_name"  :"Economy"} , {"Emp_id" : 789 , "Dep_name" : "History"}]
# Creating RDDs from the above dictionaries using parallelize method
rdd=sc.parallelize(dict)
rdd1=sc.parallelize(dict1)
# Converting RDDs to dataframes 
df=rdd.toDF()
df1=rdd1.toDF()
print("Printing the first dataframe")
df.show()
print("Printing the second dataframe")
df1.show()
Printing the first dataframe
+------+--------+
|Emp_id|Emp_name|
+------+--------+
|   123|    Raja|
|   234|   Sindu|
|   456|    Ravi|
+------+--------+
Printing the second dataframe
+--------+------+
|Dep_name|Emp_id|
+--------+------+
|Computer|   123|
| Economy|   456|
| History|   789|
+--------+------+
What is Right outer join ?

The Right outer join helps us to get the entire records from the right dataframe along with the matching records from the left dataframe .

And will be populated with null for the remaining unmatched columns of the left dataframe.

Sample program – Right outer join / Right join

Within the join syntax , the type of join to be performed will be mentioned as right_outer or right .

As Emp_name for Emp_id : 789 is not available in the left dataframe , it is populated with null in the following result .

# Right outer join / Right join 
df2=df.join(df1,['Emp_id'], how = 'right_outer')
print("Printing the result of right outer / right join")
df2.show()
# Printing the result of right outer / right join 
+------+--------+--------+
|Emp_id|Emp_name|Dep_name|
+------+--------+--------+
|   789|    null| History|
|   123|    Raja|Computer|
|   456|    Ravi| Economy|
+------+--------+--------+
What is Left outer join ?

This join is used to retrieve all the records from the left dataframe with its matching records from right dataframe .

The type of join is mentioned in either way as Left outer join or left join .

Sample program – Left outer join / Left join

In the below example , For the Emp_id : 234 , Dep_name is populated with null as there is no record for this Emp_id in the right dataframe .

# Left outer join / Left join <br />df3=df.join(df1,['Emp_id'], how = 'left_outer')
Print("Printing the result of Left outer join / Left join") 
 df3.show()
Printing the result of Left outer join / Left join
+------+--------+--------+
|Emp_id|Emp_name|Dep_name|
+------+--------+--------+
|   234|   Sindu|    null|
|   123|    Raja|Computer|
|   456|    Ravi| Economy|
+------+--------+--------+
What is Full outer join ?

Full outer join generate the result with all the records from both the dataframes . Null will populate in the columns for the unmatched records  .

Sample program – Full outer join / Full join / Outer join

All the Emp_ids from both the dataframes combined in this case with null population for unavailable values .

# Full outer join / Full join / Outer join
df4=df.join(df1,['Emp_id'], how = 'Full_outer')
print(Printing the result of Full outer join")
df4.show()
Printing the result of Full outer join
+------+--------+--------+
|Emp_id|Emp_name|Dep_name|
+------+--------+--------+
|   789|    null| History|
|   234|   Sindu|    null|
|   123|    Raja|Computer|
|   456|    Ravi| Economy|
+------+--------+--------+
Reference

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html?highlight=outer%20join

Categories
pyspark

Inner join in pyspark dataframe with example

In this post, We will learn about Inner join in pyspark dataframe with example. 

Types of join in pyspark dataframe

Before proceeding with the post, we will get familiar with the types of join available in pyspark dataframe.

Types  of join: inner join, cross join, outer join, full join, full_outer join, left join, left_outer join, right join, right_outer join, left_semi join, and left_anti join

What is Inner join ?

As similar to  SQL , Inner join helps us to get the matching records between two datasets . To understand it better , we will create two dataframes with the following piece of code .

Sample program for creating two dataframes
spark = SparkSession.builder.appName("Inner Join").getOrCreate()
from pyspark.sql import Row
# Creating dictionary with columns Emp_id and Emp_name
dict=[{"Emp_id" : 123 , "Emp_name" : "Raja" }, {"Emp_id" : 456 , "Emp_name" : "Ravi"}]
# Creating RDD from the above dictionary using parallelize method
rdd=sc.parallelize(dict)
# Converting RDD to dataframe 
df=rdd.toDF()
print("Printing the first dataframe df")
df.show()
Printing the first dataframe df 
+------+--------+
|Emp_id|Emp_name|
+------+--------+
|   123|    Raja|
|   456|    Ravi|
+------+--------+
# Creating dictionary with columns Emp_id and Dep_name 
dict1=[{"Emp_id" : 123 , "Dep_name" : "Computer" } , {"Emp_id" : 456 ,"Dep_name"  :"Economy"} , {"Emp_id" : 789 , "Dep_name" : "History"}]
# Creating RDD from the above dictionary using parallelize method
rdd1=sc.parallelize(dict1)
# Converting RDD to dataframe  
df1=rdd1.toDF()
print("Printing the second dataframe df1")
df1.show()
Printing the second dataframe df1
+--------+------+
|Dep_name|Emp_id|
+--------+------+
|Computer|   123|
| Economy|   456|
| History|   789|
+--------+------+
How to do inner join ?

The syntax of join requires three parameters to be passed –

1) The dataframe to be joined with

2) Column to be checked for

3) Type of join to be do . 

By default , Inner join will be taken for the third parameter if no input is passed .

First method

Let us see the first method in understanding Inner join in pyspark dataframe with example.

# Inner joining the two dataframes df and df1 based on the column Emp_id 
df2=df.join(df1,['Emp_id'], how = 'inner')
print("Printing the dataframe df2")
df2.show()
Printing the dataframe df2
+------+--------+--------+
|Emp_id|Emp_name|Dep_name|
+------+--------+--------+
|   123|    Raja|Computer|
|   456|    Ravi| Economy|
+------+--------+--------+
Second method
# Inner joining the two dataframes df and df1 based on the column Emp_id with default join i.e inner join
df3=df.join(df1,['Emp_id'])
print("Printing the dataframe df3")
df3.show()
Printing the dataframe df3
+------+--------+--------+
|Emp_id|Emp_name|Dep_name|
+------+--------+--------+
|   123|    Raja|Computer|
|   456|    Ravi| Economy|
+------+--------+--------+
Reference

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.join

Categories
pyspark

Where condition in pyspark with example

In this post, we will understand the usage of where condition in pyspark with example.

Where condition in pyspark

This topic where condition in pyspark with example works in a similar manner as the where clause in SQL operation.

We cannot use the filter condition to filter null or non-null values. In that case, where condition helps us to deal with the null values also.

Sample program in pyspark

In the below sample program, the dictionary data1 created with key and value pairs and the dataframe df1 created with rows and columns. 

Using the createDataFrame method, the dictionary data1 converted to a dataframe df1.

Here , We can use isNull() or isNotNull() to filter the Null values or Non-Null values.

 spark = SparkSession.builder \
     .appName("Filtering Null records") \
     .getOrCreate()
# Creating dictionary
data1=[{"Name" : 'Usha', "Class" : 7, "Marks" : 250 }, \
{"Name" : 'Rajesh' , "Class" : 5, "Marks" : None }]
# Converting dictionary to dataframe
df1=spark.createDataFrame(data1)
df1.show()
# Filtering Null records 
df2=df1.where(df1["Marks"].isNull())
df2.show()
#Filtering Non-Null records
df3=df1.where(df1["Marks"].isNotNull())
df3.show()
Output

The dataframe df1 is created from the dictionary with one null record and one non-null record using the above sample program.

The dataframe df2 filters only the null records whereas the dataframe df3 filters the non-null records.

Other than filtering null and non-null values, we can even use the where() to filter based on any particular values.

Printing dataframe df1
+-----+-----+------+
|Class|Marks|  Name|
+-----+-----+------+
|    7|  250|  Usha|
|    5| null|Rajesh|
+-----+-----+------+
Printing dataframe df2
+-----+-----+------+
|Class|Marks|  Name|
+-----+-----+------+
|    5| null|Rajesh|
+-----+-----+------+
Printing dataframe df3
+-----+-----+----+
|Class|Marks|Name|
+-----+-----+----+
|    7|  250|Usha|
+-----+-----+----+
Reference

https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.filter

Please refer to the below link for understanding the filter condition in pyspark with example.

https://beginnersbug.com/how-to-use-filter-condition-in-pyspark/

Categories
pyspark

How to use filter condition in pyspark

Filter condition in pyspark

In this post, we will learn how to use filter condition in pyspark with example.

Sample program using filter condition

We will create a dataframe using the following sample program.

Then we filter the dataframe based on marks and store the result in another dataframe.

The following classes imported at the beginning of the code.

import findspark 
findspark.init() 
from pyspark import SparkContext,SparkConf 
from pyspark.sql import Row 
from pyspark.sql.functions import *
sc=SparkContext.getOrCreate()
#creating dataframe with three records
df=sc.parallelize([Row(name='Gokul',Class=10,marks=480,grade='A'),Row(name='Usha',Class=12,marks=450,grade='A'),Row(name='Rajesh',Class=12,marks=430,grade='B')]).toDF() 
print("Printing df dataframe below ")
df.show() 
#Filtering based on the marks
df1=df.filter(col("marks")==480)
print("Printing df1 dataframe below")
df1.show()
Output

The following dataframes created as the result of the above sample program.

Here this filter condition helps us to filter the records having marks as 480 from the dataframe.

Printing df dataframe below 
+-----+-----+-----+------+ 
|Class|grade|marks| name| 
+-----+-----+-----+------+ 
| 10| A| 480| Gokul| 
| 12| A| 450| Usha| 
| 12| B| 430|Rajesh| 
+-----+-----+-----+------+
 Printing df1 dataframe below
 +-----+-----+-----+-----+ 
|Class|grade|marks| name| 
+-----+-----+-----+-----+ 
| 10| A| 480|Gokul| 
+-----+-----+-----+-----+

Must use double equal to inside the filter condition.

I hope that everyone got an idea about how to use filter condition in pyspark now.

Reference

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=filter#pyspark.sql.DataFrame

https://beginnersbug.com/where-condition-in-pyspark-with-example/

Categories
pyspark

from_unixtime in pyspark with example

In this Post , we will learn about from_unixtime in pyspark with example .

Sample program

Inorder to pass the date parameter into a column in the dataframe , we will go with this option .

Using lit () we can pass any value into the dataframe . But the date values passed through can’t be retrieved properly .

Here , unix_timestamp() and from_unixtime() helps us to do the above  easily.

import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.sql.functions import *
sc=SparkContext.getOrCreate()
df.select(from_unixtime(unix_timestamp(lit('2018-09-30 00:00:00')),'yyyy-MM-dd')).alias('months_add')
Print("Printing df below")
df.show()
Output
Printing df below
+-----------------------------------------------------------------------------------+
|from_unixtime(unix_timestamp(2018-09-30 00:00:00, yyyy-MM-dd HH:mm:ss), yyyy-MM-dd)|
+-----------------------------------------------------------------------------------+
|                                                                         2018-09-30|
+-----------------------------------------------------------------------------------+
Reference

https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.from_unixtime

how to get the current date in pyspark with example

How to change the date format in pyspark

Categories
pyspark

how to add/subtract months to the date in pyspark

In this post, We will learn how to add/subtract months to the date in pyspark with examples.

Creating dataframe – Sample program

With the following program , we first create a dataframe df with dt as of its column populated with date value '2019-02-28'

import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.sql.functions import *
sc=SparkContext.getOrCreate()
#Creating a dataframe df with date column
df=spark.createDataFrame([('2019-02-28',)],['dt'])
print("Printing df below")
df.show()
Output

The dataframe is created with the date value as below .

Printing df below
+----------+
|        dt|
+----------+
|2019-02-28|
+----------+
Adding months – Sample program

In the Next step , we will create another dataframe df1 by adding  months to the column dt using add_months() 

date_format() helps us to convert the string '2019-02-28' into date by specifying the date format within the function .

You could get to know more about the date_format() from https://beginnersbug.com/how-to-change-the-date-format-in-pyspark/

#Adding the months 
df1=df.withColumn("months_add",add_months(date_format('dt','yyyy-MM-dd'),1))
print("Printing df1 below")
Output

add_months(column name , number of months ) requires two inputs – date column to be considered and the number of months to be incremented or decremented 

Printing df1 below
+----------+----------+
|        dt|months_add|
+----------+----------+
|2019-02-28|2019-03-31|
+----------+----------+
Subtracting months – Sample program

We can even decrement the months by giving the value negatively

#Subtracting the months 
df2=df.withColumn("months_sub",add_months(date_format('dt','yyyy-MM-dd'),-1))
print("Printing df2 below")
Output

Hence we get the one month back date using the same function .

Printing df2 below
+----------+----------+
|        dt|months_sub|
+----------+----------+
|2019-02-28|2019-01-31|
+----------+----------+
Reference

https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.add_months

from_unixtime in pyspark with example

Categories
pyspark

How to change the date format in pyspark

In this post, We will learn how to change the date format in pyspark

Creating dataframe

Inorder to understand this better , We will create a dataframe having date  format as yyyy-MM-dd  .

Note: createDataFrame – underlined letters need to be in capital

#Importing libraries required
import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.sql.functions import *

sc=SparkContext.getOrCreate()
#creating dataframe with date column
df=spark.createDataFrame([('2019-02-28',)],['dt'])
df.show()
Output

With the above code ,  a dataframe named df is created with dt as one its column as below.

+----------+
|        dt|
+----------+
|2019-02-28|
+----------+
Changing the format

With the dataframe created from  the above code , the function date_format() is used to modify its format .

date_format(<column_name>,<format required>)

#Changing the format of the date
df.select(date_format('dt','yyyy-MM-dd').alias('new_dt')).show()
Output

Thus we convert the date format  2019-02-28 to the format 2019/02/28

+----------+
|    new_dt|
+----------+
|2019/02/28|
+----------+
Reference

https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.date_format

how to get the current date in pyspark with example

Categories
pyspark

how to get the current date in pyspark with example

In this Post, We will learn to get the current date  in pyspark with example 

Getting current date

Following lines help to get the current date and time .

import findspark
from pyspark.sql import Row
from pyspark import SparkContext , SparkConf
import datetime
now = datetime.datetime.now()
#Getting Current date and time
print (now.strftime("%Y-%m-%d %H:%M:%S"))
Output
2020-02-26 21:21:03
Getting current date and current timestamp within dataframe

current_date() helps to get the current date and current_timestamp() used to get the timestamp .

import findspark 
findspark.init() 
from pyspark import SparkContext,SparkConf 
from pyspark.sql import Row 
from pyspark.sql.functions import * 

sc=SparkContext.getOrCreate() 
#creating dataframe with three records 
df=sc.parallelize([Row(name='Gokul',Class=10,marks=480,grade='A')]).toDF() 
print("Printing df dataframe below ") 
df.show()
#Getting current date and timestamp
ddf.withColumn("currentdt",current_date()).withColumn("timestamp",current_timestamp()).show()
Output
Printing df dataframe below 
+-----+-----+-----+-----+
|Class|grade|marks| name|
+-----+-----+-----+-----+
|   10|    A|  480|Gokul|
+-----+-----+-----+-----+
+-----+-----+-----+-----+----------+--------------------+
|Class|grade|marks| name| currentdt|           timestamp|
+-----+-----+-----+-----+----------+--------------------+
|   10|    A|  480|Gokul|2020-02-27|2020-02-27 21:45:...|
+-----+-----+-----+-----+----------+--------------------+
Reference

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=date

renaming dataframe column in pyspark