Map Slots Hadoop
In Hadoop, map slots and reduce slots are separated and cannot be interchangeably used. Dynamic borrowing between them can improve slot utilization and system throughput 24. The map or reduce. Remaining all Hadoop Ecosystem components work on top of these three major components: HDFS, YARN and MapReduce. We will discuss all Hadoop Ecosystem components in-detail in my coming posts. When compared to Hadoop 1.x, Hadoop 2.x Architecture is designed completely different.
In Hadoop, the JobTracker sets up TaskTrackers, where each TaskTracker assign slots to be either a map slot or a reduced slot. Slot assignments cannot be dynamically changed during the application process. Slot function reassignment by the TastTrackers need permission from the JobTracker, which consumes time. Dynamic Job Ordering and Slot Configurations for MapReduce Workloads ABSTRACT: MapReduce is a popular parallel computing paradigm for large-scale data processing in clusters and data centers. A MapReduce workload generally contains a set of jobs, each of which consists of. Executes map tasks java project blackjack gui code and reduce tasks in map slots and reduce When the busy hadoop occupied map slots slots finish.! Also, Tradeoff of time taken to casino cakes london compress-decompress should be taken into consideration hadoop occupied map slots READOPS Number of read operations It is accumulated at the client.
One of the three components of Hadoop is Map Reduce. The first component of Hadoop that is, Hadoop Distributed File System (HDFS) is responsible for storing the file. The second component that is, Map Reduce is responsible for processing the file.
Suppose there is a word file containing some text. Let us name this file as sample.txt
. Note that we use Hadoop to deal with huge files but for the sake of easy explanation over here, we are taking a text file as an example. So, let’s assume that this sample.txt
file contains few lines as text. The content of the file is as follows:
Hence, the above 8 lines are the content of the file. Let’s assume that while storing this file in Hadoop, HDFS broke this file into four parts and named each part as first.txt
, second.txt
, third.txt
, and fourth.txt
. So, you can easily see that the above file will be divided into four equal parts and each part will contain 2 lines. First two lines will be in the file first.txt
, next two lines in second.txt
, next two in third.txt
and the last two lines will be stored in fourth.txt
. All these files will be stored in Data Nodes and the Name Node will contain the metadata about them. All this is the task of HDFS.
Now, suppose a user wants to process this file. Here is what Map-Reduce comes into the picture. Suppose this user wants to run a query on this sample.txt. So, instead of bringing sample.txt
on the local computer, we will send this query on the data. To keep a track of our request, we use Job Tracker (a master service). Job Tracker traps our request and keeps a track of it.
Now suppose that the user wants to run his query on sample.txt
and want the output in result.output
file. Let the name of the file containing the query is query.jar
. So, the user will write a query like:
- query.jar : query file that needs to be processed on the input file.
- sample.txt: input file.
- result.output: directory in which output of the processing will be received.
So, now the Job Tracker traps this request and asks Name Node to run this request on sample.txt
. Name Node then provides the metadata to the Job Tracker. Job Tracker now knows that sample.txt
is stored in first.txt
, second.txt
, third.txt
, and fourth.txt
. As all these four files have three copies stored in HDFS, so the Job Tracker communicates with theTask Tracker (a slave service) of each of these files but it communicates with only one copy of each file which is residing nearest to it.
Note: Applying the desired code on local first.txt
, second.txt
, third.txt
and fourth.txt
is a process., This process is called Map.
In Hadoop terminology, the main file sample.txt
is called input file and its four subfiles are called input splits. So, in Hadoop the number of mappers for an input file are equal to number of input splits of this input file. In the above case, the input file sample.txt
has four input splits hence four mappers will be running to process it. The responsibility of handling these mappers is of Job Tracker.
Note that the task trackers are slave services to the Job Tracker. So, in case any of the local machines breaks down then the processing over that part of the file will stop and it will halt the complete process. So, each task tracker sends heartbeat and its number of slots to Job Tracker in every 3 seconds. This is called the status of Task Trackers. In case any task tracker goes down, the Job Tracker then waits for 10 heartbeat times, that is, 30 seconds, and even after that if it does not get any status, then it assumes that either the task tracker is dead or is extremely busy. So it then communicates with the task tracker of another copy of the same file and directs it to process the desired code over it. Similarly, the slot information is used by the Job Tracker to keep a track of how many tasks are being currently served by the task tracker and how many more tasks can be assigned to it. In this way, the Job Tracker keeps track of our request.
Now, suppose that the system has generated output for individual first.txt
, second.txt
, third.txt
, and fourth.txt
. But this is not the user’s desired output. To produce the desired output, all these individual outputs have to be merged or reduced to a single output. This reduction of multiple outputs to a single one is also a process which is done by REDUCER. In Hadoop, as many reducers are there, those many number of output files are generated.By default, there is always one reducer per cluster.
Note: Map and Reduce are two different processes of the second component of Hadoop, that is, Map Reduce. These are also called phases of Map Reduce. Thus we can say that Map Reduce has two phases. Phase 1 is Map and Phase 2 is Reduce.
Functioning of Map Reduce
Now, let us move back to our sample.txt
file with the same content. Again it is being divided into four input splits namely, first.txt
, second.txt
, third.txt
, and fourth.txt
. Now, suppose we want to count number of each word in the file. That is the content of the file looks like:
Then the output of the ‘word count’ code will be like:
Map Slots Hadoop Game
Thus in order to get this output, the user will have to send his query on the data. Suppose the query ‘word count’ is in the file wordcount.jar
. So, the query will look like:
Types of File Format in Hadoop
Now, as we know that there are four input splits, so four mappers will be running. One on each input split. But, Mappers don’t run directly on the input splits. It is because the input splits contain text but mappers don’t understand the text. Mappers understand (key, value) pairs only. Thus the text in input splits first needs to be converted to (key, value) pairs. This is achieved by Record Readers. Thus we can also say that as many numbers of input splits are there, those many numbers of record readers are there.
In Hadoop terminology, each line in a text is termed as a ‘record’. How record reader converts this text into (key, value) pair depends on the format of the file. In Hadoop, there are four formats of a file. These formats are Predefined Classes in Hadoop.
Four types of formats are:
- TextInputFormat
- KeyValueTextInputFormat
- SequenceFileInputFormat
- SequenceFileAsTextInputFormat
By default, a file is in TextInputFormat. Record reader reads one record(line) at a time. While reading, it doesn’t consider the format of the file. But, it converts each record into (key, value) pair depending upon its format. For the time being, let’s assume that the first input split first.txt
is in TextInputFormat. Now, the record reader working on this input split converts the record in the form of (byte offset, entire line). For example first.txt
has the content:
So, the output of record reader has two pairs (since two records are there in the file). The first pair looks like (0, Hello I am geeksforgeeks) and the second pair looks like (26, How can I help you). Note that the second pair has the byte offset of 26 because there are 25 characters in the first line and the newline operator (n) is also considered a character. Thus, after the record reader as many numbers of records is there, those many numbers of (key, value) pairs are there. Now, the mapper will run once for each of these pairs. Similarly, other mappers are also running for (key, value) pairs of different input splits. Thus in this way, Hadoop breaks a big task into smaller tasks and executes them in parallel execution.
Shuffling and Sorting
Now, the mapper provides an output corresponding to each (key, value) pair provided by the record reader. Let us take the first input split of first.txt
. The two pairs so generated for this file by the record reader are (0, Hello I am GeeksforGeeks) and (26, How can I help you). Now mapper takes one of these pair at a time and produces output like (Hello, 1), (I, 1), (am, 1) and (GeeksforGeeks, 1) for the first pair and (How, 1), (can, 1), (I, 1), (help, 1) and (you, 1) for the second pair. Similarly, we have outputs of all the mappers. Note that this data contains duplicate keys like (I, 1) and further (how, 1) etc. These duplicate keys also need to be taken care of. This data is also called Intermediate Data. Before passing this intermediate data to the reducer, it is first passed through two more stages, called Shuffling and Sorting.
- Shuffling Phase: This phase combines all values associated to an identical key. For eg, (Are, 1) is there three times in the input file. So after the shuffling phase, the output will be like (Are, [1,1,1]).
- Sorting Phase: Once shuffling is done, the output is sent to the sorting phase where all the (key, value) pairs are sorted automatically. In Hadoop sorting is an automatic process because of the presence of an inbuilt interface called WritableComparableInterface.
After the completion of the shuffling and sorting phase, the resultant output is then sent to the reducer. Now, if there are n (key, value) pairs after the shuffling and sorting phase, then the reducer runs n times and thus produces the final result in which the final processed output is there. In the above case, the resultant output after the reducer processing will get stored in the directory result.output as specified in the query code written to process the query on the data.
Recommended Posts:
If you like GeeksforGeeks and would like to contribute, you can also write an article using contribute.geeksforgeeks.org or mail your article to contribute@geeksforgeeks.org. See your article appearing on the GeeksforGeeks main page and help other Geeks.
Please Improve this article if you find anything incorrect by clicking on the 'Improve Article' button below.
Its not uncommon for a beginner to think Spark as a replacement to Hadoop. The term “Hadoop” is interchangeably used to refer to either Hadoop ecosystem (or) Hadoop MapReduce (or) Hadoop HDFS
. Apache Spark came in as a very strong contender to replace Hadoop MapReduce
computation engine.
This blog is to better understand what motivated Spark and how it evolved successfully as a strong contender to MapReduce.
We will section this blog in 3 parts:
MapReduce computation Engine in a Nutshell
- Cons of MapReduce as motivation for Spark
- Look at the drawbacks of MapReduce
- How Spark addressed them
- How Spark works
- Behind the scenes of a spark application running in cluster
- Appendix
- Look at other attempts like Corona done to make up for the downsides of MapReduce Engine.
1. MapReduce (MR) computation in a nutshell
Map Slots Hadoop Play
I’ll not go deep into the details, but, lets see birds eye view of how Hadoop MapReduce works. Below figure shows a typical Hadoop Cluster running two Map-Reduce applications. Each of these application’s Map(M) and Reduce(R) jobs are marked with black and white colours respectively.
- NameNode and DataNode:
- NameNode + DataNodes essentially make up HDFS.
- NameNode only stores the metadata of HDFS i.e., it stores the list of all files in the file system (not its data), and keeps a track of them across the cluster.
- DataNodes store the actual data of files.
- NameNode JVM heart beats with DataNode JVM’s every 3secs.
- JobTracker (JT) and TaskTracker (TT):
- JobTracker JVM is the brain of the MapReduce Engine and runs on NameNode.
- JobTracker creates and allocates jobs to TaskTracker which runs on DataNodes.
- TaskTrackers runs the task and reports task status to JobTracker.
- Inside TaskTracker JVM, we have slots where we run our jobs. These slots are hardcoded to be either Map slot or Reduce slot. One cannot run a reduce job on a map slot and vice-versa.
- Parallelism in MapReduce is achieved by having multiple parallel map & reduce jobs running as processes in respective TaskTracker JVM slots.
- Job execution: In a typical MapReduce application, we chain multiple jobs of map and reduce together. It starts execution by reading a chunk of data from HDFS, run one-phase of map-reduce computation, write results back to HDFS, read those results into another map-reduce and write it back to HDFS again. There is usually like a loop going on there where we run this process over and over again
2. Cons of Map-Reduce as motivation for Spark
One can say that Spark has taken direct motivation from the downsides of MapReduce computation system. Let’s see the drawbacks of MapReduce computation engine and how Spark addressed them:
- Parallelism via processes:
- MapReduce: MapReduce doesn’t run Map and Reduce jobs as threads. They are processes which are heavyweight compared to threads.
- Spark: Spark runs its jobs by spawning different threads running inside the executor.
- CPU Utilization:
- MapReduce: The slots within TaskTracker, where Map and Reduce jobs gets executed, are not generic slots that can be used to run either Map or Reduce job. These slots are categorized into two types, one to run Map jobs and the other to run Reduce jobs. How does it matter? So, when you start a MapReduce application, initially, that application might spend like hours in just the Map phase. So, during this time none of the reduce slots are going to be used. This is why if you notice, your CPU% would not be high because all these Reduce slots are sitting empty. Facebook came up with an improvement to address this a bit. If you are interested please check Appendix section below.
- Spark: Similar to TaskTracker in MapReduce, Spark has Executor JVM’s on each machine. But, unlike hardcoded Map and Reduce slots in TaskTracker, these slots are generic where any task can run.
- Extensive Reads and writes:
- MapReduce: There is a whole lot of intermediate results which are written to HDFS and then read back by the next job from HDFS. Data handshake between the any two jobs chained together happens via reads and writes to HDFS.
- Spark: Spark is an in-memory processing engine. All of the data and intermediate results are kept in-memory. This is one of the reasons that you get 10-100x faster speed because of the efficient memory leverage.
Note: Facebook came up with Corona to address some of these cons and it did achieve 17% performance improvements on MapReduce Jobs. I’ve detailed it in Appendix.
3. How Spark works:
Now that we have seen the disadvantages with MapReduce and how Spark addressed it, its time to jump in and look at the internals of Spark briefly. In that, i’ll mainly try to cover how a spark application running in a cluster looks like. Below picture depicts Spark cluster:
Let’s look at differenct components shown in the above picture:
- Spark Master, Worker and Executor JVM’s:SparkMaster and Worker JVM’s are the resource managers. All worker JVM’s will register themselves with SparkMaster. They are very small. Example: Master use like 500MB of RAM & Worker uses like 1GB RAM.
- Master:Master JVM’s job is to decide and schedule the launch of Executor JVM’s across the nodes. But, Mater will not launch executors.
- Worker:It is the Worker who heartbeats with Master and launches the executors as per schedule.
- Executor:Executor JVM has generic slots where tasks run as threads. Also, all the data needed to run a task is cached within Executor memory.
- Driver:When we start our spark application with spark submit command, a driver will start and that driver will contact spark master to launch executors and run the tasks. Basically, Driver is a representative of our application and does all the communication with Spark.
- Task:Task is the smallest unit of execution which works on a partition of our data. Spark actually calls them cores. —executor-cores setting defines number of tasks that run within the Executor. For example, if we have set —executor-cores to six, then we can run six simultaneous threads within the executor JVM.
- Resilience:Worker JVM’s work is only to launch Executor JVM’s whenever Master tells them to do so. If Executor crashes, Worker will restart it. If Worker JVM crashes, Master will start it. Master will take care of driver JVM restart as well. But then, if a driver restarts, all the Ex’s will have to restart.
- Flexible Distribution of CPU resources:By CPU resources, We are referring to the tasks/threads running within an executor. Let’s assume that the second machine in the cluster has lot more ram and cpu resources. Can we run more threads in this second machine? Yes! You can do that by tweaking spark-env.sh file and set SPARK_WORKER_CORES to 10 in the second machine. The same setting if set to 6 in other machines, then master will launch 10 threads/tasks in that second machine and 6 in the remaining one’s. But, you could still oversubscribe in general. SPARK_WORKER_CORES tells worker JVM as to how many cores/tasks it can give out to its underlying executor JVM’s.
3. Conclusion:
We’ve seen:
- The initial motivation behind Spark
- Why it evolved successfully as a strong contender to MapReduce
- Why is Spark orders of magnitude faster than traditional Hadoop’s map-reduce system
- An overview of Spark application running in cluster
4. Appendix:
4.1 Corona - An attempt to make up for the downsides of MapReduce and improve CPU Utilization
Facebook came up with Corona to address the CPU Utilization problem that MapReduce has. In their hadoop cluster, when Facebook was running 100’s of (MapReduce) MR jobs with lots of them already in the backlog waiting to be run because all the MR slots were full with currently running MR jobs, they noticed that their CPU utilisation was pretty low (~60%). It was weird because they thought that all the Map (M) & Reduce (R) slots were full and they had a whole lot of backlog waiting out there for a free slot. What they noticed was that in traditional MR, once a Map Job finishes, then TaskTracker has to let JobTracker know that there is an empty slot. JobTracker will then allot this empty slot to the next job. This handshake between TaskTracker & JobTracker is taking ~15-20secs before the next job takes up that freed up slot. This is because, heartbeat of JobTracker is 3secs. So, it checks with TaskTracker for free slots once in every 3secs and it is not necessary that the next job will be assigned in the very next heartbeat. So, FaceBook added Corona which is a more aggressive job scheduler added on top of JobTracker. MapReduce took 66secs to fill a slot while Corona took like 55 secs (~17%). Slots here are M or R process id’s.
4.2. Legend:
- MR - MapReduce
- M - Map Job
- R - Reduce Job
- JT - JobTracker
- TT - TaskTracker