Categories
spark

spark submit options

In this post , let us get to know about spark submit options .

spark submit options

spark submit options mentioned below contain < > .We can replace them with respective values . Following are the few of the prominent spark submit options available .

Everything will contain option name and its values . For instance , — master is the option name and can pass value next to it .

spark-submit \
--master < > \
--deploy-mode < > \
--keytab < > \
--principal < > \
--driver-memory < > \
--executor-memory  < > \
--executor-cores  < > \
--num-executors  < > \
--class  < > \
<jar name>
--conf <key>=<value> 

master : It is suitable for local mode

deploy mode : default – client

This is applicable only for cluster set up – YARN , standalone . It can have client or cluster as the value . It depends on where driver program needs to run . That is on worker node (cluster mode ) or local machine (client mode) .

keytab and principal : 

Every host that provides a service must have a local file, called a keytab . This file contains the pairs of Kerberos principals and encrypted keys. It allow scripts to authenticate using Kerberos automatically . Without any  involvement of human while accessing the password stored in a file.

driver-memory : Memory required for driver program which enclose the main method . The default value is 1 GB 

executor-memory : Executors are worker nodes responsible for running individual tasks in a given Spark job . Memory allocated for each executor which could be assigned based on some calculation.

executor-cores : Number of cores each executor can have.

num-executors : Specify the number of executors to have .Running executors with too much memory often results in excessive garbage collection delays.
Whereas running tiny executors throws away the benefits that come from running multiple tasks in a single JVM. (with a single core and just enough memory needed to run a single task, for example) . It is better to calculate before assigning .

class : class file name 

Other options :

–conf <key>=<value>

For instance , –conf spark.sql.files.maxPartitionBytes = 128

spark.sql.files.maxPartitionBytes : default (128 MB)
spark.sql.files.openCostInBytes : default (4 MB)
spark.sql.files.minPartitionNum
spark.sql.broadcastTimeout : default (300)
spark.sql.autoBroadcastJoinThreshold : default (10 MB)
spark.sql.shuffle.partitions : default (200)
spark.sql.sources.parallelPartitionDiscovery.threshold : default (32)
spark.sql.sources.parallelPartitionDiscovery.parallelism : default (10000)

Reference

https://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options

Categories
spark

spark local and standalone mode

In this post , let us have a look at spark local and standalone mode .

Local mode

Other than the local and standalone mode which we are going to see in this post , we do have few other deployment mode  as well .

Local Mode is the default mode of spark which runs everything on the same machine.

In the case of not mentioning –master flag to the command whether spark-shell or spark-submit , ideally it means it is running in local mode.

Other way is to pass –master option with local as argument which defaults to 1 thread. 

We can even increase the number of threads by providing the required number within the square bracket . For instance , spark-shell –master local[2] .

By using asterisks instead like local[*] we can use as many threads as the number of processors available to the Java virtual machine.

spark-submit --class <class name> --master local[8] <jar file>

Standalone mode

  • Spark standalone cluster in client deploy mode
  • Spark standalone cluster in cluster deploy mode with supervise
  • Run a Python application on a Spark standalone cluster
Spark standalone cluster in client deploy mode

Application will submit on the gateway machine which is interlinked with any other worker machines physically . The input and output of the application is attached to the console . And so this mode well suite for the application which include REPL (i.e Spark shell) .In client mode, the driver launches directly within the spark-submit process which acts as a client to the cluster.

spark-submit --class <class name> --master <spark://host id> --executor-memory 20G --total-executor-cores 100 <jar name> 
Spark standalone cluster in cluster deploy mode with supervise

For a Spark standalone cluster with cluster deploy mode, you can also provide –supervise.  The driver restarts automatically incase of any kind of failures with a non-zero exit code.

Few applications will submit from a machine far from the local machine . It is common to use cluster mode to minimize network latency between the drivers and the executors.

spark-submit --class <class name> --master <spark://host id> --deploy-mode cluster --supervise --executor-memory 20G --total-executor-cores 100 <jar name>
Run a Python application on a Spark standalone cluster

Currently, the standalone mode does not support cluster mode for Python applications.

spark-submit --master <spark:host id> <python file>

Reference

https://spark.apache.org/docs/latest/submitting-applications.html

Categories
spark

spark client and cluster mode

In this post , let us learn about the spark client and cluster mode .

Where should we use ?

spark submit options will require master and deployment mode related information .

For YARN cluster manager , master will be YARN and deploy mode will vary depends on the requirement .

spark-submit --master <master-url> --deploy-mode <deploy-mode> -- <other options >

What is Driver program ?

Before getting into the topic , let us get to know about the driver program . When a user submit any spark job , the very first step will be the driver program triggers . This plays a prominent role in controlling the  executors in worker nodes .

What are the main types of deployment modes in YARN ?

Based on where actually the driver program triggers , we can differentiate the types of modes . The YARN client mode is the default mode . 

  • YARN client mode
  • YARN cluster mode

Client mode

In this client mode , the driver program launches on the same edge node where the spark job is submitted . This driver program utilizes the resources like memory and CPU from the same machine where we submit the job .

If multiple number of jobs try to use the client mode , we might encounter out of memory issue because of over utilization of resources . And so it is better to use the client mode only for development and debugging purposes . It supports Spark shell as well.

spark-submit --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 1 --num-executors 5 

Cluster mode

The driver program launches on any other worker node except the edge node on which spark job submitted. One disadvantage is that it does not support Spark shell .

We can run multiple jobs using the cluster mode simultaneously without any trouble . For this reason , the cluster mode is used whenever we are deploying in production .

spark-submit --master yarn --deploy-mode cluster --driver-memory 4g --executor-memory 2g --executor-cores 1 --num-executors 5 

Any common features between client and cluster mode ?

Yes , We do have few common characteristics among both client and cluster mode  . 

1. Application Master – request the resources
2. Executors are initiated by – YARN Nodemanagers
3. Other persistent services by – YARN Nodemanagers and YARN Resourcemanager

Reference

https://spark.apache.org/docs/latest/running-on-yarn.html

Categories
spark

createTempView and createGlobalTempView

In this post , let us learn the difference between createTempView and createGlobalTempView

createOrReplaceTempView

In Spark 2.0 , createOrReplaceTempView came into picture to replace registerTempTable. It creates or replaces an in-memory reference to the Dataframe in the form of local temporary view . Lifetime of this view is dependent to SparkSession class .

df=spark.sql("select * from table")
df.createOrReplaceTempView("ViewName")

Both createOrReplaceTempView and createTempView used for creating temporary view from the existing dataframe .

If the view already exists createOrReplaceTempView replace the existing view with the new one . Wheareas  ‘already exists’ exception will be thrown for createTempView.

The following command is used for dropping the view . One another way to make the view out of scope is by shuttingdown the session using stop()

spark.catalog.dropTempView("ViewName")

Libraries required :

pyspark.sql.DataFrame.createOrReplaceTempView
pyspark.sql.DataFrame.createTempView
pyspark.sql.Catalog.dropTempView

createOrReplaceGlobalTempView

It creates references in the form of global temporary view which used across spark sessions. Life time of this view is dependent to spark application itself

df=spark.sql("select * from table")
df.createOrReplaceGlobalTempView("ViewName")

Following is the command to drop the view , or can stop() the session

spark.catalog.dropGlobalTempView("ViewName")

Libraries required

pyspark.sql.DataFrame.createOrReplaceGlobalTempView
pyspark.sql.DataFrame.createGlobalTempView
pyspark.sql.Catalog.dropGlobalTempView

Inorder to get more information on the difference between createTempView and createGlobalTempView , please refer the below URL https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.createGlobalTempView.html

Categories
spark

spark executor memory and cores

In this post let us discuss on the spark executor memory and cores .

Factors need to consider while calculating

spark submit configuration parameters include number of executors , executor core and executor memory . Determining values for the same include set of calculations and depends on few factors which we would see below.

Hadoop/OS Deamons : NameNode, Secondary NameNode, DataNode, JobTracker and TaskTracker are the daemons running in background while spark application run through cluster manager like yarn . We need to allocate few cores for these daemons as well which is approximately of about 1 core per node.

Yarn ApplicationMaster (AM): This will get the enough resources from the ResourceManager and work along with the NodeManagers to execute and monitor the containers and their resource consumption.It would need (~1024MB and 1 Executor).

HDFS Throughput: HDFS client achieves full write throughput with ~5 tasks per executor whereas it faces difficulty with lot of concurrent threads. So it’s better to have the number of cores per executor lesser than 5.

MemoryOverhead:
Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.

spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

So, if we request 20GB per executor, AM will actually get 20GB + memoryOverhead = 20 + 7% of 20GB = ~23GB memory for us.

Cluster configuration

Will calculate number of executor , executor cores and executor memory for the following cluster Configuration :
10 Nodes
16 cores per Node
64GB RAM per Node

Running executors with too much memory often results in excessive garbage collection delays.
Whereas running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM.

Calculating values for parameters

1) To identify the number of available of cores within a cluster
Number of cores = available number of nodes in a cluster * number of cores per node
= 10 * 16
= 160 cores

2) Total memory = available number of nodes in a cluster * memory per node
= 10 * 64 GB
= 640 GB

3) Within the available number of cores 160 , one core per node needs to be allocated for deamons
Remaining cores after allocating one core per node for daemons = 160 – (1 * number of nodes )
= 160 – (1 * 10)
= 160 – 10
= 150 cores

4) Total executor = number of cores / 5(recommended concurrent threads for maximum throughput )
= 150 / 5
= 30

5) Executor per node = Total executor / number of nodes
= 30 / 10
= 3

6) Memory per executor = Total memory / Total executor
= 640 GB / 30
= 21 GB

7) MemoryOverhead = Max(384MB, 7% of spark.executor-memory)
= Max(384MB, 7% 21GB)
= Max(384MB, 1.47 GB)
= ~2 GB

8) Final Executor memory = Memory per executor – memory overhead
= 21 – 2
= 19 GB

9) Remaining executors = Total executor – One executor for Application manager
= 30 – 1
= 29

So , the maximum value we can have for the configuration parameters is as follows

Executor memory – 19 GB 

Total available executors – 29

And for driver memory , we can have the same size as executor memory , I t dint require any specific calculation.

https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html

Categories
spark

RDD , Dataframe and Dataset in spark

In this post , let us learn about RDD , Dataframe and Dataset in spark

Resilient Distributed Datasets – RDD

RDD is the fundamental data structure in spark .

  • fault-tolerant
  • in memory
  • immutable distributed collection of elements
  • partitioned across nodes in the cluster and operate in parallel
  • low-level API that offers transformation and action

Two ways to create RDDs:

  1. Parallelizing an existing collection
  2. Referencing external storage system

Parallelizing an existing collection:

val a = Array(1, 2, 3, 4)
val b = sc.parallelize(a)

Referencing external storage system:

Referenced external storage system such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat .
Text file RDDs created using SparkContext’s textFile method which takes a URI for the file (either a local path or a hdfs://, s3a://, etc) and reads it as a collection of lines. 

val c = sc.textFile("data.txt")

By default, Spark creates one partition for each block of the file (default block size in HDFS – 128MB).

We can even increase the partitions by passing a larger value as second argument to textFile method but cannot have fewer partitions than blocks.

Dataframe

A DataFrame is a Dataset organized into named columns , conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations.

Dataframes created from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

Dataset

A Dataset is a distributed collection of data and a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.

A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).

The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

RDDs and Datasets are type safe means that compiler know the Columns and it’s data type of the Column whether it is Long, String, etc…. But, In Dataframe, every time when you call an action, collect() for instance,then it will return the result as an Array of Rows not as Long, String data type.