Categories
scala

Anonymous function in scala

In this post let us learn about the anonymous function in scala.

Anonymous function in scala

Anonymous function is the function without any specific name. It is also called as Lambda function .

Lambda is the word derived from the word lambda calculus which is the  mathematical expression of functional programming.

Basic syntax

To have a little introduction about the functions in scala , go through the previous blog in scala.

The following is the basic syntax to define anonymous function with single parameter.

val a: Int => Int = (x : Int) => x * 2
println(a(3))
6

There are alternative ways for defining the same , the followings are those.

val b = (x : Int) => x * 2
println(b(3))
6

For further information on the syntax of anonymous function in scala

val c = (_:Int) * 2
println(c(3)
6

With multiple parameters

If you have multiple parameters , need to mention the arguments within parenthesis. The below will be the syntax for anonymous function with multiple parameters.

val d: (Int,Int) => Int = (x : Int, y: Int) => x * y
println(d(3,3))
9

With no parameter

In the following example println(e) prints the instance , whereas println(e()) prints the value of the function . We might used to call objects in the former way but for lambda , we must use the latter way.

val e: () => Int = () => 2
println(e())
2

Using Curly braces

The following is the anonymous function defined using curly braces.

val f = { (colour: String) => colour.toInt}

MOAR syntactic sugar

val g: Int => Int = (x: Int) => x+1 — This line is equivalent to the below

val g: Int => Int = _+1
println(g(3))
4

val h: (Int,Int) => Int = (a,b) => a+b — This is equivalent to the below

val h: (Int,Int) => Int = _+ _
println(h(3,3))
6
Categories
scala

callbyvalue and callbyname in scala

In this post let us learn the topic callbyvalue and callbyname in scala.

Generating random number

Will see how to generate random number before starting the topic. This we are going to use in our further code segment .

  val r = scala.util.Random
  println(r.nextInt)
  println(r.nextInt)
  println(r.nextInt)
  println(r.nextInt)
1242716978
868935609
1888491218
-1140363327

callbyvalue

The value computes before invoking the function . And this use the same value whatever it evaluates everywhere the function invokes . In the below example , the value of a remains same while invoking it for two times .

syntax :

function_name(value_name : datatype)  

   val r = scala.util.Random
  callbyvalue(r.nextInt)
    def callbyvalue (a : Long) : Unit = {
      println(" value of a :" +a)
      println(" value of a :" +a)
    }
 value of a :1644239565
 value of a :1644239565

callbyname

Though we pass the expression , the expression evaluates newly at every time it invoked . In the below example , the value of a varies in each time we invoke .

syntax :

function_name(value_name : =>datatype) 

  val r = scala.util.Random
  callbyname(r.nextInt)
  def callbyname (a : => Long) : Unit = {
    println("value of a :" +a)
    println("value of a :" +a)
  }
value of a :761546004
value of a :-892955369

syntax difference

The syntax difference between callbyvalue and callbyname in scala is highlighted below .

function_name(value_name : datatype)  

function_name(value_name : =>datatype

https://beginnersbug.com/values-variables-and-datatypes-in-scala/

Categories
scala

Expressions and functions in scala

In this post , let us learn about the expressions and functions in scala

Expressions

Expressions are the one which gets evaluated  in scala . We have different operators to achieve this .Will look at those below .

Mathematical operators:
+ -> Addition
– -> Subtraction
* -> Multiplication
/ -> Division
& -> bitwise AND
| -> bitwise OR
^ -> bitwise exclusive OR
<< -> bitwise left shift
>> -> bitwise right shift
>>> -> right shift with zero extension only in scala

Relational operators:
== -> Equal to
!= -> not equal to
> -> greater than
>= -> greater than or equal to
< -> lesser than
<= -> lesser than or equal to

Boolean operators :
! -> negation(unary operator)
&& -> logical AND(binary operator)
|| -> logical OR(binary operator)

other operators:

+= , -= , *= , /=

Functions

Following is the function used for adding two values .

def – keyword

a and b – parameters

Int – data type (First letter must in capital)

1. Normal function :
def func (a: Int, b: Int ): Int =
  {
    a + b
  }
  println(func(20,10))
30

Its not mandatory to provide return type in the case of normal function . Below is the way I tried without any return type .

def func (a: Int, b: Int ) =
  {
    a + b
  }
  println(func(20,10))
30
2. Function without parameter

Function can also be without any parameter as like below .

// invoking with parenthesis   
def func1 () : Int =42
   println(func1())
42

Other way to invoke parameterless function in scala is the below option.

// invoking without parenthesis
def func1 () : Int =42
   println(func1)
42
3.Recursive function

A function gets called continuously by itself .

def func1(a: String ,b: Int) : String = {
    if (b==1)  a
    else a + (func1("Success",b-1))
  }
  println(func1("Success", 5))
SuccessSuccessSuccessSuccessSuccess

We must specify the return type for this .

def func1(a: String ,b: Int) = {
    if (b==1)  a
    else a + (func1("Success",b-1))
  }
  println(func1("Success", 5))
identifier expected but '=' found.
    def func1(a: String ,b: Int) :  = {
Categories
scala

values variables and datatypes in scala

In this post , let us learn about values variables and datatypes in scala .

Values

Values are immutable which cannot be modified . In the below example , ValuesVariablesTypes is the object created for this tutorial .

  • val – keyword (only in lower case) .
  • Int – datatype  
object ValuesVariablesTypes extends App {
    val x : Int = 43 ;
    print(x)
}
43
Process finished with exit code 0

We are trying to modifying the value in the below case . It failed with reassignment error . Hence this immutable feature will not allow us to modify the values .

object ValuesVariablesTypes extends App {
    val x : Int = 43 ;
    print(x)
    x = 44 ;
}
reassignment to val
    x = 44 ;

It is not mandatory to specify the datatype as compiler can infer datatypes automatically . And so it worked for the below case also .

object ValuesVariablesTypes extends App {
    val x = 43 ;
    print(x)
}
43
Process finished with exit code 0

Datatypes

Few of the main datatypes along with ways to specifying as follows

Datatype specify with datatype specify without datatype
String val a : String = “Hello” val a = “Hello”
Boolean val b : Boolean = false val b = false
Char val c : Char = ‘ abc’ val c = ‘ abc’
Short(2bytes of length ) val d : Short =7416 val d = 7416
Long(4bytes of length ) val e : Long = 74167416 val e = 74167416
Long (for more than 4 bytes) val f : Long = 7416741674167416L val f = 7416741674167416L
Float val g : Float = 14.0f val g = 14.0f
Double val h : Double = 3.63 val h = 3.63

Variables

Variables can be modified which is the difference between values and variables .

object ValuesVariablesTypes extends App {
    var Y = 43 ;
    println(Y)
    Y = 44 ;
    print(Y)
}
43
44
Process finished with exit code 0

Hope this post gives an idea about values variables and datatypes in scala .

Categories
spark

spark submit options

In this post , let us get to know about spark submit options .

spark submit options

spark submit options mentioned below contain < > .We can replace them with respective values . Following are the few of the prominent spark submit options available .

Everything will contain option name and its values . For instance , — master is the option name and can pass value next to it .

spark-submit \
--master < > \
--deploy-mode < > \
--keytab < > \
--principal < > \
--driver-memory < > \
--executor-memory  < > \
--executor-cores  < > \
--num-executors  < > \
--class  < > \
<jar name>
--conf <key>=<value> 

master : It is suitable for local mode

deploy mode : default – client

This is applicable only for cluster set up – YARN , standalone . It can have client or cluster as the value . It depends on where driver program needs to run . That is on worker node (cluster mode ) or local machine (client mode) .

keytab and principal : 

Every host that provides a service must have a local file, called a keytab . This file contains the pairs of Kerberos principals and encrypted keys. It allow scripts to authenticate using Kerberos automatically . Without any  involvement of human while accessing the password stored in a file.

driver-memory : Memory required for driver program which enclose the main method . The default value is 1 GB 

executor-memory : Executors are worker nodes responsible for running individual tasks in a given Spark job . Memory allocated for each executor which could be assigned based on some calculation.

executor-cores : Number of cores each executor can have.

num-executors : Specify the number of executors to have .Running executors with too much memory often results in excessive garbage collection delays.
Whereas running tiny executors throws away the benefits that come from running multiple tasks in a single JVM. (with a single core and just enough memory needed to run a single task, for example) . It is better to calculate before assigning .

class : class file name 

Other options :

–conf <key>=<value>

For instance , –conf spark.sql.files.maxPartitionBytes = 128

spark.sql.files.maxPartitionBytes : default (128 MB)
spark.sql.files.openCostInBytes : default (4 MB)
spark.sql.files.minPartitionNum
spark.sql.broadcastTimeout : default (300)
spark.sql.autoBroadcastJoinThreshold : default (10 MB)
spark.sql.shuffle.partitions : default (200)
spark.sql.sources.parallelPartitionDiscovery.threshold : default (32)
spark.sql.sources.parallelPartitionDiscovery.parallelism : default (10000)

Reference

https://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options

Categories
spark

spark local and standalone mode

In this post , let us have a look at spark local and standalone mode .

Local mode

Other than the local and standalone mode which we are going to see in this post , we do have few other deployment mode  as well .

Local Mode is the default mode of spark which runs everything on the same machine.

In the case of not mentioning –master flag to the command whether spark-shell or spark-submit , ideally it means it is running in local mode.

Other way is to pass –master option with local as argument which defaults to 1 thread. 

We can even increase the number of threads by providing the required number within the square bracket . For instance , spark-shell –master local[2] .

By using asterisks instead like local[*] we can use as many threads as the number of processors available to the Java virtual machine.

spark-submit --class <class name> --master local[8] <jar file>

Standalone mode

  • Spark standalone cluster in client deploy mode
  • Spark standalone cluster in cluster deploy mode with supervise
  • Run a Python application on a Spark standalone cluster
Spark standalone cluster in client deploy mode

Application will submit on the gateway machine which is interlinked with any other worker machines physically . The input and output of the application is attached to the console . And so this mode well suite for the application which include REPL (i.e Spark shell) .In client mode, the driver launches directly within the spark-submit process which acts as a client to the cluster.

spark-submit --class <class name> --master <spark://host id> --executor-memory 20G --total-executor-cores 100 <jar name> 
Spark standalone cluster in cluster deploy mode with supervise

For a Spark standalone cluster with cluster deploy mode, you can also provide –supervise.  The driver restarts automatically incase of any kind of failures with a non-zero exit code.

Few applications will submit from a machine far from the local machine . It is common to use cluster mode to minimize network latency between the drivers and the executors.

spark-submit --class <class name> --master <spark://host id> --deploy-mode cluster --supervise --executor-memory 20G --total-executor-cores 100 <jar name>
Run a Python application on a Spark standalone cluster

Currently, the standalone mode does not support cluster mode for Python applications.

spark-submit --master <spark:host id> <python file>

Reference

https://spark.apache.org/docs/latest/submitting-applications.html

Categories
spark

spark client and cluster mode

In this post , let us learn about the spark client and cluster mode .

Where should we use ?

spark submit options will require master and deployment mode related information .

For YARN cluster manager , master will be YARN and deploy mode will vary depends on the requirement .

spark-submit --master <master-url> --deploy-mode <deploy-mode> -- <other options >

What is Driver program ?

Before getting into the topic , let us get to know about the driver program . When a user submit any spark job , the very first step will be the driver program triggers . This plays a prominent role in controlling the  executors in worker nodes .

What are the main types of deployment modes in YARN ?

Based on where actually the driver program triggers , we can differentiate the types of modes . The YARN client mode is the default mode . 

  • YARN client mode
  • YARN cluster mode

Client mode

In this client mode , the driver program launches on the same edge node where the spark job is submitted . This driver program utilizes the resources like memory and CPU from the same machine where we submit the job .

If multiple number of jobs try to use the client mode , we might encounter out of memory issue because of over utilization of resources . And so it is better to use the client mode only for development and debugging purposes . It supports Spark shell as well.

spark-submit --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 1 --num-executors 5 

Cluster mode

The driver program launches on any other worker node except the edge node on which spark job submitted. One disadvantage is that it does not support Spark shell .

We can run multiple jobs using the cluster mode simultaneously without any trouble . For this reason , the cluster mode is used whenever we are deploying in production .

spark-submit --master yarn --deploy-mode cluster --driver-memory 4g --executor-memory 2g --executor-cores 1 --num-executors 5 

Any common features between client and cluster mode ?

Yes , We do have few common characteristics among both client and cluster mode  . 

1. Application Master – request the resources
2. Executors are initiated by – YARN Nodemanagers
3. Other persistent services by – YARN Nodemanagers and YARN Resourcemanager

Reference

https://spark.apache.org/docs/latest/running-on-yarn.html

Categories
spark

createTempView and createGlobalTempView

In this post , let us learn the difference between createTempView and createGlobalTempView

createOrReplaceTempView

In Spark 2.0 , createOrReplaceTempView came into picture to replace registerTempTable. It creates or replaces an in-memory reference to the Dataframe in the form of local temporary view . Lifetime of this view is dependent to SparkSession class .

df=spark.sql("select * from table")
df.createOrReplaceTempView("ViewName")

Both createOrReplaceTempView and createTempView used for creating temporary view from the existing dataframe .

If the view already exists createOrReplaceTempView replace the existing view with the new one . Wheareas  ‘already exists’ exception will be thrown for createTempView.

The following command is used for dropping the view . One another way to make the view out of scope is by shuttingdown the session using stop()

spark.catalog.dropTempView("ViewName")

Libraries required :

pyspark.sql.DataFrame.createOrReplaceTempView
pyspark.sql.DataFrame.createTempView
pyspark.sql.Catalog.dropTempView

createOrReplaceGlobalTempView

It creates references in the form of global temporary view which used across spark sessions. Life time of this view is dependent to spark application itself

df=spark.sql("select * from table")
df.createOrReplaceGlobalTempView("ViewName")

Following is the command to drop the view , or can stop() the session

spark.catalog.dropGlobalTempView("ViewName")

Libraries required

pyspark.sql.DataFrame.createOrReplaceGlobalTempView
pyspark.sql.DataFrame.createGlobalTempView
pyspark.sql.Catalog.dropGlobalTempView

Inorder to get more information on the difference between createTempView and createGlobalTempView , please refer the below URL https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.createGlobalTempView.html

Categories
spark

spark executor memory and cores

In this post let us discuss on the spark executor memory and cores .

Factors need to consider while calculating

spark submit configuration parameters include number of executors , executor core and executor memory . Determining values for the same include set of calculations and depends on few factors which we would see below.

Hadoop/OS Deamons : NameNode, Secondary NameNode, DataNode, JobTracker and TaskTracker are the daemons running in background while spark application run through cluster manager like yarn . We need to allocate few cores for these daemons as well which is approximately of about 1 core per node.

Yarn ApplicationMaster (AM): This will get the enough resources from the ResourceManager and work along with the NodeManagers to execute and monitor the containers and their resource consumption.It would need (~1024MB and 1 Executor).

HDFS Throughput: HDFS client achieves full write throughput with ~5 tasks per executor whereas it faces difficulty with lot of concurrent threads. So it’s better to have the number of cores per executor lesser than 5.

MemoryOverhead:
Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.

spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

So, if we request 20GB per executor, AM will actually get 20GB + memoryOverhead = 20 + 7% of 20GB = ~23GB memory for us.

Cluster configuration

Will calculate number of executor , executor cores and executor memory for the following cluster Configuration :
10 Nodes
16 cores per Node
64GB RAM per Node

Running executors with too much memory often results in excessive garbage collection delays.
Whereas running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM.

Calculating values for parameters

1) To identify the number of available of cores within a cluster
Number of cores = available number of nodes in a cluster * number of cores per node
= 10 * 16
= 160 cores

2) Total memory = available number of nodes in a cluster * memory per node
= 10 * 64 GB
= 640 GB

3) Within the available number of cores 160 , one core per node needs to be allocated for deamons
Remaining cores after allocating one core per node for daemons = 160 – (1 * number of nodes )
= 160 – (1 * 10)
= 160 – 10
= 150 cores

4) Total executor = number of cores / 5(recommended concurrent threads for maximum throughput )
= 150 / 5
= 30

5) Executor per node = Total executor / number of nodes
= 30 / 10
= 3

6) Memory per executor = Total memory / Total executor
= 640 GB / 30
= 21 GB

7) MemoryOverhead = Max(384MB, 7% of spark.executor-memory)
= Max(384MB, 7% 21GB)
= Max(384MB, 1.47 GB)
= ~2 GB

8) Final Executor memory = Memory per executor – memory overhead
= 21 – 2
= 19 GB

9) Remaining executors = Total executor – One executor for Application manager
= 30 – 1
= 29

So , the maximum value we can have for the configuration parameters is as follows

Executor memory – 19 GB 

Total available executors – 29

And for driver memory , we can have the same size as executor memory , I t dint require any specific calculation.

https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html

Categories
spark

RDD , Dataframe and Dataset in spark

In this post , let us learn about RDD , Dataframe and Dataset in spark

Resilient Distributed Datasets – RDD

RDD is the fundamental data structure in spark .

  • fault-tolerant
  • in memory
  • immutable distributed collection of elements
  • partitioned across nodes in the cluster and operate in parallel
  • low-level API that offers transformation and action

Two ways to create RDDs:

  1. Parallelizing an existing collection
  2. Referencing external storage system

Parallelizing an existing collection:

val a = Array(1, 2, 3, 4)
val b = sc.parallelize(a)

Referencing external storage system:

Referenced external storage system such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat .
Text file RDDs created using SparkContext’s textFile method which takes a URI for the file (either a local path or a hdfs://, s3a://, etc) and reads it as a collection of lines. 

val c = sc.textFile("data.txt")

By default, Spark creates one partition for each block of the file (default block size in HDFS – 128MB).

We can even increase the partitions by passing a larger value as second argument to textFile method but cannot have fewer partitions than blocks.

Dataframe

A DataFrame is a Dataset organized into named columns , conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations.

Dataframes created from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

Dataset

A Dataset is a distributed collection of data and a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.

A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

RDDs and Datasets are type safe means that compiler know the Columns and it’s data type of the Column whether it is Long, String, etc…. But, In Dataframe, every time when you call an action, collect() for instance,then it will return the result as an Array of Rows not as Long, String data type.