Data Processing With Apache Spark
Spark is a big data processing framework which enables fast and advanced analytics computation over hadoop clusters.
Spark Architecture
A Spark application consists of: a driver program and a list of executors.
The driver program uses the SparkContext object to coordinate the running of the Spark applications run as independent sets of processes on a cluster
The SparkContext can connect to several types of cluster managers
- Standalone cluster manager
- Mesos
- YARN
These cluster managers allocate resources across applications. Once connected to the cluster managers,
Spark Application Execution Steps
-
The Spark driver is launched to invoke the main method of the Spark application.
-
The driver asks the cluster manager for resources to run the application, i.e. to launch executors that run tasks.
-
The cluster manager launches executors.
-
SparkContext acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.
-
SparkContext sends your application code (defined by JAR or Python files passed to SparkContext) to the executors.
-
SparkContext sends tasks to the executors to run.
-
Once SparkContext.stop() is executed from the driver or the main method has exited, all the executors are terminated and the cluster resources are released by the cluster manager.
Reference: http://spark.apache.org/docs/latest/cluster-overview.html
Standalone Cluster Manager
Spark provides a simple standalone mode where spark manages the cluster without an external cluster manager. Each node should have the spark binary installed. In the standalone mode we can manually deploy the spark master and workers.
First we can start the master
The master web UI can seen at http://localhost:8080 by default. We can connect each of workers to this master by specifying the master address
We can also launch the spark standalone cluster with a launch script by specifying the hostnames of all the Spark worker machines in the conf/slaves file in Spark directory
To further configure the spark cluster edit conf/spark-env.sh
- Set the SPARK_WORKER_INSTANCES which determines the number of Worker instances (#Executors) per node (its default value is only 1)
- Set the SPARK_WORKER_CORES # number of cores that one Worker can use
- Set SPARK_WORKER_MEMORY # total amount of memory that can be used on one machine (Worker Node) for running Spark programs.
Copy this configuration file to all Worker Nodes, on the same folder and start the cluster by running sbin/start-all.sh
Various details to configure the master and slaves are documented quite well at the doc https://spark.apache.org/docs/1.2.1/spark-standalone.html
Yarn Cluster Manager
Reference: http://hortonworks.com/blog/apache-hadoop-yarn-concepts-and-applications/
There are two deploy modes that can be used to launch Spark applications on YARN.
- Cluster Mode
- Client Mode
YARN Cluster Mode
In cluster mode, the Spark driver runs inside an application master process.The application master itself runs on one of the node managers in the cluster which is managed by YARN on the cluster. The client can go away after initiating the application.
While running the application in cluster mode, the driver runs on a different machine than the client, so SparkContext.addJar won’t work out of the box with files that are local to the client. To make files on the client available to SparkContext.addJar, include them with the –jars option in the launch command.
YARN Client Mode
In this mode the driver program is running on the yarn client where we input the command to submit the spark application. It may not be a machine in the yarn cluster. In this mode, although the drive program is running on the yarn client machine, the tasks are executed on the executors in the node managers of the YARN cluster and the application master is only used for requesting resources from YARN.
Which mode to use
While deciding which mode to use one the factors to consider is the latency between the drivers and executors. If you are submitting the client application from your laptop and the network latency to worker nodes from your laptop is not high, then you can use the client mode. Else use the cluster mode so that drivers and workers and in co-located space to reduce the latency. Instead of submitting applications from your local machine, it is also a good idea to submit it from a gateway machine that is colocated with the worker nodes.
When the –master parameter is yarn, the ResourceManager’s address is picked up from the Hadoop configuration. Later in the configuration setup the location of these hadoop configuration files need to be provided.
Default Ports
Spark Standalone master UI : 8080
SparkContext web UI: 4040
Spark History Server: 18080
HDFS Namenode: 50070
Yarn Resource manager :8088
Spark History Server
The Spark History Server displays information about the history of completed Spark applications. It provides application history from event logs stored in the file system. It periodically checks in the background for applications that have finished and renders a UI to show the history of applications by parsing the associated event logs.
You can start the history server by executing:
The Web interface for the spark history server is at port 18080
Various other options are there to configure the history server in managing logs. Example spark.history.fs.cleaner.maxAge can be set to 7d to retain only last 7 days logs etc
Ref: http://spark.apache.org/docs/latest/monitoring.html
Spark Infrastructure
One of the main advantage of Spark is much faster data processing as compared to Hadoop. Spark does its processing in memory so to leverage on this we should add more RAM and CPU to the spark infrastructure. The typical recommendation are
- 4-8 disks per node, where each disk is 1-2 TB
- 8-16 cores per node
- 32 GB or more memory each node. 75% of this for Spark and rest for OS and other applications
While development you can also determine how much memory is used for a certain dataset size. Load part of your dataset in a Spark RDD and use the Storage tab of Spark’s monitoring UI (http://<driver-node:4040) to see its size in memory.
Spark Installation
While installing spark the objective here is to get it running in standalone mode and over yarn client. This is a development cluster used for local testing.
My current stack on mac is the following
Jdk1.8 , scala 2.11, hadoop 2.7.1
Download
http://spark.apache.org/downloads.html
Current version of spark is 1.6.1
Build
Running Spark on YARN requires a Spark build which is built with YARN support.
Binary distributions can be downloaded from the downloads page of the project website or you can build it yourself.
The various options for compiling spark are described here
https://spark.apache.org/docs/latest/building-spark.html
http://spark.apache.org/downloads.html
Configuration
Edit the conf file with following content
Run Spark Shell in Standalone mode
bin/spark-shell
Run hdfs from hadoop installation folder
./sbin/start-dfs.sh
namenode UI should be up at http://localhost:50070/dfshealth.html#tab-overview
run yarn from hadoop installation folder
./sbin/start-yarn.sh
Yarn Cluster manager UI should be up at http://localhost:8088/cluster
bin/spark-shell –master yarn
Spark jobs can be seen at http://localhost:4040/jobs/
To view the output result “Pi is roughly 3.140784” while running in cluster mode, go to the Yarn Cluster manager UI (http://localhost:8088/cluster). Here you can see the Pi application marked as finished. Click on the Application ID and open the logs file stdout
MLlib Correlations example:
./bin/run-example org.apache.spark.examples.mllib.Correlations
MLlib Linear Regression example:
Once the work is done, stop the cluster
Refer to the official doc for running jobs on Yarn
http://spark.apache.org/docs/latest/running-on-yarn.html
Spark History Server
Spark Programming
In Spark the computations are expressed in terms of actions and transformations on RDD (Resilient Distributed Datasets). The RDD in turn is an immutable collection of objects. Each RDD is split into multiple partitions which may be computed on different nodes of the cluster. Operations over RDD are automatically parallelized over the cluster. The RDD’s can be persisted in memory which helps in creating a fast pipeline of operations without costly disk seeks.
Driver
Every Spark application consists of a driver program that launches various parallel operations on a cluster. Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster. Once you have a SparkContext, you can use it to build RDDs Example : the call to sc.textFile creates an RDD. To run various operations on the RDD you can call functions on them. Example the count operation. There are more than 80 high level functions that we can use to query the data.
Once created, RDDs offer two types of operations: transformations and actions.
- Transformations construct a new RDD from a previous one.
- Actions, on the other hand, compute a result based on an RDD
Spark computes RDD only in a lazy fashion—that is, the first time they are used in an action. RDD are recomputed each time you want to run an action on it. To persist an RDD you have to use the persist action. Persisting RDD on disk, instead of the memory is also possible.
To Parallize a collection, or a dataset that is already in memory, you can parallize() method. The elements of this collections are copied to an RDD and operated in parallel
Use the aggregate function to do parallel computation in different function and provide custom function to combine the results of computation from different threads
Some functions are available only on certain types of RDDs, such as mean() and variance() on numeric RDDs or join() on key/value pair RDDs.
Pair RDD
Pair RDD’s are RDD containing key value pairs. Pair RDDs have a reduceByKey() method that can aggregate data separately for each key, and a join() method that can merge two RDDs together by grouping elements with the same key. i
Grouping
Instead of doing groupByKey and then applying a map function to do an operation on list of values in a key, do the reduceByKey which is more efficient (as it avoids the middles step of creating a list of keys)
This is inefficient and better use reduceByKey method