Categories
pyspark

Inner join in pyspark dataframe with example

In this post, We will learn about Inner join in pyspark dataframe with example. 

Types of join in pyspark dataframe

Before proceeding with the post, we will get familiar with the types of join available in pyspark dataframe.

Types  of join: inner join, cross join, outer join, full join, full_outer join, left join, left_outer join, right join, right_outer join, left_semi join, and left_anti join

What is Inner join ?

As similar to  SQL , Inner join helps us to get the matching records between two datasets . To understand it better , we will create two dataframes with the following piece of code .

Sample program for creating two dataframes
spark = SparkSession.builder.appName("Inner Join").getOrCreate()
from pyspark.sql import Row
# Creating dictionary with columns Emp_id and Emp_name
dict=[{"Emp_id" : 123 , "Emp_name" : "Raja" }, {"Emp_id" : 456 , "Emp_name" : "Ravi"}]
# Creating RDD from the above dictionary using parallelize method
rdd=sc.parallelize(dict)
# Converting RDD to dataframe 
df=rdd.toDF()
print("Printing the first dataframe df")
df.show()
Printing the first dataframe df 
+------+--------+
|Emp_id|Emp_name|
+------+--------+
|   123|    Raja|
|   456|    Ravi|
+------+--------+
# Creating dictionary with columns Emp_id and Dep_name 
dict1=[{"Emp_id" : 123 , "Dep_name" : "Computer" } , {"Emp_id" : 456 ,"Dep_name"  :"Economy"} , {"Emp_id" : 789 , "Dep_name" : "History"}]
# Creating RDD from the above dictionary using parallelize method
rdd1=sc.parallelize(dict1)
# Converting RDD to dataframe  
df1=rdd1.toDF()
print("Printing the second dataframe df1")
df1.show()
Printing the second dataframe df1
+--------+------+
|Dep_name|Emp_id|
+--------+------+
|Computer|   123|
| Economy|   456|
| History|   789|
+--------+------+
How to do inner join ?

The syntax of join requires three parameters to be passed –

1) The dataframe to be joined with

2) Column to be checked for

3) Type of join to be do . 

By default , Inner join will be taken for the third parameter if no input is passed .

First method

Let us see the first method in understanding Inner join in pyspark dataframe with example.

# Inner joining the two dataframes df and df1 based on the column Emp_id 
df2=df.join(df1,['Emp_id'], how = 'inner')
print("Printing the dataframe df2")
df2.show()
Printing the dataframe df2
+------+--------+--------+
|Emp_id|Emp_name|Dep_name|
+------+--------+--------+
|   123|    Raja|Computer|
|   456|    Ravi| Economy|
+------+--------+--------+
Second method
# Inner joining the two dataframes df and df1 based on the column Emp_id with default join i.e inner join
df3=df.join(df1,['Emp_id'])
print("Printing the dataframe df3")
df3.show()
Printing the dataframe df3
+------+--------+--------+
|Emp_id|Emp_name|Dep_name|
+------+--------+--------+
|   123|    Raja|Computer|
|   456|    Ravi| Economy|
+------+--------+--------+
Reference

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.join

Categories
pyspark

Where condition in pyspark with example

In this post, we will understand the usage of where condition in pyspark with example.

Where condition in pyspark

This topic where condition in pyspark with example works in a similar manner as the where clause in SQL operation.

We cannot use the filter condition to filter null or non-null values. In that case, where condition helps us to deal with the null values also.

Sample program in pyspark

In the below sample program, the dictionary data1 created with key and value pairs and the dataframe df1 created with rows and columns. 

Using the createDataFrame method, the dictionary data1 converted to a dataframe df1.

Here , We can use isNull() or isNotNull() to filter the Null values or Non-Null values.

 spark = SparkSession.builder \
     .appName("Filtering Null records") \
     .getOrCreate()
# Creating dictionary
data1=[{"Name" : 'Usha', "Class" : 7, "Marks" : 250 }, \
{"Name" : 'Rajesh' , "Class" : 5, "Marks" : None }]
# Converting dictionary to dataframe
df1=spark.createDataFrame(data1)
df1.show()
# Filtering Null records 
df2=df1.where(df1["Marks"].isNull())
df2.show()
#Filtering Non-Null records
df3=df1.where(df1["Marks"].isNotNull())
df3.show()
Output

The dataframe df1 is created from the dictionary with one null record and one non-null record using the above sample program.

The dataframe df2 filters only the null records whereas the dataframe df3 filters the non-null records.

Other than filtering null and non-null values, we can even use the where() to filter based on any particular values.

Printing dataframe df1
+-----+-----+------+
|Class|Marks|  Name|
+-----+-----+------+
|    7|  250|  Usha|
|    5| null|Rajesh|
+-----+-----+------+
Printing dataframe df2
+-----+-----+------+
|Class|Marks|  Name|
+-----+-----+------+
|    5| null|Rajesh|
+-----+-----+------+
Printing dataframe df3
+-----+-----+----+
|Class|Marks|Name|
+-----+-----+----+
|    7|  250|Usha|
+-----+-----+----+
Reference

https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.filter

Please refer to the below link for understanding the filter condition in pyspark with example.

https://beginnersbug.com/how-to-use-filter-condition-in-pyspark/

Categories
pyspark

from_unixtime in pyspark with example

In this Post , we will learn about from_unixtime in pyspark with example .

Sample program

Inorder to pass the date parameter into a column in the dataframe , we will go with this option .

Using lit () we can pass any value into the dataframe . But the date values passed through can’t be retrieved properly .

Here , unix_timestamp() and from_unixtime() helps us to do the above  easily.

import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.sql.functions import *
sc=SparkContext.getOrCreate()
df.select(from_unixtime(unix_timestamp(lit('2018-09-30 00:00:00')),'yyyy-MM-dd')).alias('months_add')
Print("Printing df below")
df.show()
Output
Printing df below
+-----------------------------------------------------------------------------------+
|from_unixtime(unix_timestamp(2018-09-30 00:00:00, yyyy-MM-dd HH:mm:ss), yyyy-MM-dd)|
+-----------------------------------------------------------------------------------+
|                                                                         2018-09-30|
+-----------------------------------------------------------------------------------+
Reference

https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.from_unixtime

how to get the current date in pyspark with example

How to change the date format in pyspark

Categories
pyspark

how to add/subtract months to the date in pyspark

In this post, We will learn how to add/subtract months to the date in pyspark with examples.

Creating dataframe – Sample program

With the following program , we first create a dataframe df with dt as of its column populated with date value '2019-02-28'

import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.sql.functions import *
sc=SparkContext.getOrCreate()
#Creating a dataframe df with date column
df=spark.createDataFrame([('2019-02-28',)],['dt'])
print("Printing df below")
df.show()
Output

The dataframe is created with the date value as below .

Printing df below
+----------+
|        dt|
+----------+
|2019-02-28|
+----------+
Adding months – Sample program

In the Next step , we will create another dataframe df1 by adding  months to the column dt using add_months() 

date_format() helps us to convert the string '2019-02-28' into date by specifying the date format within the function .

You could get to know more about the date_format() from https://beginnersbug.com/how-to-change-the-date-format-in-pyspark/

#Adding the months 
df1=df.withColumn("months_add",add_months(date_format('dt','yyyy-MM-dd'),1))
print("Printing df1 below")
Output

add_months(column name , number of months ) requires two inputs – date column to be considered and the number of months to be incremented or decremented 

Printing df1 below
+----------+----------+
|        dt|months_add|
+----------+----------+
|2019-02-28|2019-03-31|
+----------+----------+
Subtracting months – Sample program

We can even decrement the months by giving the value negatively

#Subtracting the months 
df2=df.withColumn("months_sub",add_months(date_format('dt','yyyy-MM-dd'),-1))
print("Printing df2 below")
Output

Hence we get the one month back date using the same function .

Printing df2 below
+----------+----------+
|        dt|months_sub|
+----------+----------+
|2019-02-28|2019-01-31|
+----------+----------+
Reference

https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.add_months

from_unixtime in pyspark with example

Categories
pyspark

How to change the date format in pyspark

In this post, We will learn how to change the date format in pyspark

Creating dataframe

Inorder to understand this better , We will create a dataframe having date  format as yyyy-MM-dd  .

Note: createDataFrame – underlined letters need to be in capital

#Importing libraries required
import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.sql.functions import *

sc=SparkContext.getOrCreate()
#creating dataframe with date column
df=spark.createDataFrame([('2019-02-28',)],['dt'])
df.show()
Output

With the above code ,  a dataframe named df is created with dt as one its column as below.

+----------+
|        dt|
+----------+
|2019-02-28|
+----------+
Changing the format

With the dataframe created from  the above code , the function date_format() is used to modify its format .

date_format(<column_name>,<format required>)

#Changing the format of the date
df.select(date_format('dt','yyyy-MM-dd').alias('new_dt')).show()
Output

Thus we convert the date format  2019-02-28 to the format 2019/02/28

+----------+
|    new_dt|
+----------+
|2019/02/28|
+----------+
Reference

https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.date_format

how to get the current date in pyspark with example

Categories
pyspark

how to get the current date in pyspark with example

In this Post, We will learn to get the current date  in pyspark with example 

Getting current date

Following lines help to get the current date and time .

import findspark
from pyspark.sql import Row
from pyspark import SparkContext , SparkConf
import datetime
now = datetime.datetime.now()
#Getting Current date and time
print (now.strftime("%Y-%m-%d %H:%M:%S"))
Output
2020-02-26 21:21:03
Getting current date and current timestamp within dataframe

current_date() helps to get the current date and current_timestamp() used to get the timestamp .

import findspark 
findspark.init() 
from pyspark import SparkContext,SparkConf 
from pyspark.sql import Row 
from pyspark.sql.functions import * 

sc=SparkContext.getOrCreate() 
#creating dataframe with three records 
df=sc.parallelize([Row(name='Gokul',Class=10,marks=480,grade='A')]).toDF() 
print("Printing df dataframe below ") 
df.show()
#Getting current date and timestamp
ddf.withColumn("currentdt",current_date()).withColumn("timestamp",current_timestamp()).show()
Output
Printing df dataframe below 
+-----+-----+-----+-----+
|Class|grade|marks| name|
+-----+-----+-----+-----+
|   10|    A|  480|Gokul|
+-----+-----+-----+-----+
+-----+-----+-----+-----+----------+--------------------+
|Class|grade|marks| name| currentdt|           timestamp|
+-----+-----+-----+-----+----------+--------------------+
|   10|    A|  480|Gokul|2020-02-27|2020-02-27 21:45:...|
+-----+-----+-----+-----+----------+--------------------+
Reference

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=date

renaming dataframe column in pyspark

Categories
pyspark

renaming dataframe column in pyspark

In this post, we can learn about renaming dataframe column in pyspark.

Sample program

withColumn() used for creating a new column in a dataframe.

Whereas withColumnRenamed() can be used while renaming the columns .

Note : Underlined characters must need to be in Capital letter.

import findspark 
findspark.init() 
from pyspark import SparkContext,SparkConf 
from pyspark.sql import Row 
from pyspark.sql.functions import * 
sc=SparkContext.getOrCreate() 
#creating dataframe with three records
df=sc.parallelize([Row(name='Gokul',Class=10,marks=480,grade='A'),Row(name='Usha',Class=12,marks=450,grade='A'),Row(name='Rajesh',Class=12,marks=430,grade='B')]).toDF()
print("Printing df dataframe ")
df.show()
# Creating new column as Remarks
df1=df.withColumn("Remarks",lit('Good'))
print("Printing df1 dataframe")
df1.show()
#Renaming the column Remarks as Feedback
df2=df1.withColumnRenamed('Remarks','Feedback')
print("Printing df2 dataframe")
df2.show()
Output
Printing df dataframe 
+-----+-----+-----+------+
|Class|grade|marks|  name|
+-----+-----+-----+------+
|   10|    A|  480| Gokul|
|   12|    A|  450|  Usha|
|   12|    B|  430|Rajesh|
+-----+-----+-----+------+

Printing df1 dataframe
+-----+-----+-----+------+-------+
|Class|grade|marks|  name|Remarks|
+-----+-----+-----+------+-------+
|   10|    A|  480| Gokul|   Good|
|   12|    A|  450|  Usha|   Good|
|   12|    B|  430|Rajesh|   Good|
+-----+-----+-----+------+-------+

Printing df2 dataframe
+-----+-----+-----+------+--------+
|Class|grade|marks|  name|Feedback|
+-----+-----+-----+------+--------+
|   10|    A|  480| Gokul|    Good|
|   12|    A|  450|  Usha|    Good|
|   12|    B|  430|Rajesh|    Good|
+-----+-----+-----+------+--------+
printSchema()

This function printSchema() help us to view the schema of each dataframe.

df2.printSchema()
root
 |-- Class: long (nullable = true)
 |-- grade: string (nullable = true)
 |-- marks: long (nullable = true)
 |-- name: string (nullable = true)
 |-- Feedback: string (nullable = false)
Reference

https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumnRenamed

Creating dataframes in pyspark using parallelize

Categories
pyspark

When otherwise in pyspark with examples

In this post , We will learn about When otherwise in pyspark with examples

when otherwise used as a condition statements like if else statement 

In below examples we will learn with single,multiple & logic conditions

Sample program – Single condition check

In Below example, df is a dataframe with three records .

df1 is a new dataframe created from df by adding one more column named as First_Level .

import findspark 
findspark.init() 
from pyspark import SparkContext,SparkConf 
from pyspark.sql import Row 
from pyspark.sql.functions import * 

sc=SparkContext.getOrCreate() 
#creating dataframe with three records
df=sc.parallelize([Row(name='Gokul',Class=10,marks=480,grade='A'),Row(name='Usha',Class=12,marks=450,grade='A'),Row(name='Rajesh',Class=12,marks=430,grade='B')]).toDF() 
print("Printing df dataframe below ")
df.show() 
df1=df.withColumn("First_Level",when(col("grade") =='A',"Good").otherwise("Average")) 
print("Printing df1 dataframe below ")
df1.show()
Output
print("printing df")
+-----+-----+-----+------+
|Class|grade|marks|  name|
+-----+-----+-----+------+
|   10|    A|  480| Gokul|
|   12|    A|  450|  Usha|
|   12|    B|  430|Rajesh|
+-----+-----+-----+------+
print("printing df1")
+-----+-----+-----+------+-----------+
|Class|grade|marks|  name|First_Level|
+-----+-----+-----+------+-----------+
|   10|    A|  480| Gokul|       Good|
|   12|    A|  450|  Usha|       Good|
|   12|    B|  430|Rajesh|    Average|
+-----+-----+-----+------+-----------+
Sample program – Multiple checks

We can check multiple conditions using when otherwise as like below 

import findspark 
findspark.init() 
from pyspark import SparkContext,SparkConf 
from pyspark.sql import Row 
from pyspark.sql.functions import * 

sc=SparkContext.getOrCreate() 
#creating dataframe with three records
df=sc.parallelize([Row(name='Gokul',Class=10,marks=480,grade='A'),Row(name='Usha',Class=12,marks=450,grade='A'),Row(name='Rajesh',Class=12,marks=430,grade='B')]).toDF() 
print("Printing df dataframe below")
df.show()
#In below line we are using multiple condition
df2=df.withColumn("Second_Level",when(col("grade") == 'A','Excellent').when(col("grade") == 'B','Good').otherwise("Average"))
print("Printing df2 dataframe below")
df2.show() 
Output

The column Second_Level is created from the above program

Printing df dataframe below
+-----+-----+-----+------+
|Class|grade|marks|  name|
+-----+-----+-----+------+
|   10|    A|  480| Gokul|
|   12|    A|  450|  Usha|
|   12|    B|  430|Rajesh|
+-----+-----+-----+------+

Printing df2 dataframe below
+-----+-----+-----+------+------------+
|Class|grade|marks|  name|Second_Level|
+-----+-----+-----+------+------------+
|   10|    A|  480| Gokul|   Excellent|
|   12|    A|  450|  Usha|   Excellent|
|   12|    B|  430|Rajesh|        Good|
+-----+-----+-----+------+------------+
Sample program with logical operators & and |

Logical operators & (AND) , |(OR) is used in when otherwise as like below .

import findspark 
findspark.init() 
from pyspark import SparkContext,SparkConf 
from pyspark.sql import Row 
from pyspark.sql.functions import * 
sc=SparkContext.getOrCreate() 
#creating dataframe with three records
df=sc.parallelize([Row(name='Gokul',Class=10,marks=480,grade='A'),Row(name='Usha',Class=12,marks=450,grade='A'),Row(name='Rajesh',Class=12,marks=430,grade='B'),Row(name='Mahi',Class=5,marks=350,grade='C')]).toDF() 
print("Printing df dataframe")
df.show()
# In below line we are using logical operators
df3=df.withColumn("Third_Level",when((col("grade") =='A') | (col("Marks") > 450) ,"Excellent").when((col("grade") =='B') | ((col("Marks") > 400) & (col("Marks") < 450)),"Good").otherwise("Average") )
print("Printing  df3 dataframe ")
df3.show()
Output
Printing df dataframe
+-----+-----+-----+------+
|Class|grade|marks|  name|
+-----+-----+-----+------+
|   10|    A|  480| Gokul|
|   12|    A|  450|  Usha|
|   12|    B|  430|Rajesh|
|    5|    C|  350|  Mahi|
+-----+-----+-----+------+

Printing  df3 dataframe 
+-----+-----+-----+------+-----------+
|Class|grade|marks|  name|Third_Level|
+-----+-----+-----+------+-----------+
|   10|    A|  480| Gokul|  Excellent|
|   12|    A|  450|  Usha|  Excellent|
|   12|    B|  430|Rajesh|       Good|
|    5|    C|  350|  Mahi|    Average|
+-----+-----+-----+------+-----------+
Reference

https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.when

case when statement in pyspark with example