Saturday, May 28, 2016

Hadoop 2 HA setup config with ZooKeeper

Hadoop 2.x
  • HDFS: NN Federation, HA(High Availability)
  • MapReduce: Runs on YARN
  • YARN: Resource Manage System
HDFS 2.x

Fix HDFS 1.0 single node failure
  • HDFS HA: Master/Backup NameNode
  • If Master NameNode failure, then switch to Backup NameNode
Fix HDFS 1.0 memory limition problem
  • HDFS Federation
  • Expandable, support multiple NameNode
  • Every NameNode manages a part of directory
  • All NameNode share all DataNode storage resources
HDFS 2.x only changed architecture, it does not change the way of usage
It is transparent to HDFS user
Compatible HDFS 1.x command and API

HDFS HA(High Availability)

ZK: ZooKeeper
ZKFC: ZooKeeper Failover Controller
JN: Journal Node
NN: NameNode
DN: DataNode

Looking at the flow Bottom up.

DataNode reports to both Active NameNode and Standby NameNode.
DataNode only receives command from Active NameNode

NameNode's state info will not stored in NameNode itself. It will be stored in a cluster of JournalNodes.
When Active NameNode fails, Standby NameNode will automatically grab info from JournalNode, and continue the jobs.
When Active NameNode manually proceed to update, we can send command to switch between Active and Standby NameNode.

FailoverControllers report heartbeats to ZooKeeper. ZooKeeper is used for HA.
Client send request to ZooKeeper, ZooKeeper tells Client which NameNode to work with you. Then Client talks to NameNode.

Auto Failover switch
  • Zookeeper Failover Controller: monitor status of NameNode
  • Register NameNode to Zookeeper
  • WHen NameNode fails, ZKFC(ZooKeeperFailverController) tries to acquire lock for NameNode.
  • The NameNode which acquired the lock will become Active

HDFS 2.x Federation
  • Federation is used for super big amount data, so we need multiple NameNodes, each NameNode is independent, but sharing all DataNodes.
  • Independent means completely independent of NameNode, so if we want to do HA for NameNodes, each NameNode need to configure its own HA.
  • use namenode/namespace to separate data storage and management into different nodes. This make namenode/namespace can expand by simply adding machines.
  • Can separate load of one namenode to multiple nodes. This will not reduce efficiency when HDFS data is big. We can use multiple namespace to separate different types of application. Allocate different data storage/management into different namenode based on application type.

Yet Another Distributed Resource Negotiator

Hadoop 2.0 imported ResourceManager and ApplicationMaster.
  • Separate JobTracker's Resource Management and Task Dispatch from MRv1.
    Replaced: JobTracker and TaskTracker from Hadoop 1.0
    To : ResourceManager and ApplicationMaster
  • ResourceManager: For whole cluster resource management and dispatch(Only exists one for each cluster)
  • ApplicationMaster: For application related transactions, like task dispatch, task monitor, fault tolerant, etc. Each application has one ApplicationMaster
YARN: make multiple calculation framework works in one cluster
It accomplished a lot framework to be interfaced.
  • Each application has one ApplicationMaster
  • Multiple frameworks can work same time on YARN: MapReduce, Spark, Storm, etc..

HA HDFS Cluster build example

NameNode DataNode ZooKeeper ZKFC JournalNode ResourceManager DataManager
hadoop-yarn 1 1 1 1
hadoop-node1 1 1 1 1 1 1
hadoop-node2 1 1 1 1
hadoop-node3 1 1 1

Followed this doc:
We need

  • dfs.nameservices as name service ID
  • dfs.ha.namenodes.[name service ID]: as NameNode IDs
  • dfs.namenode.rpc-address.[nameservice ID].[name node ID]
  • dfs.namenode.http-address.[nameservice ID].[name node ID]
  • dfs.namenode.shared.edits.dir
  • dfs.client.failover.proxy.provider.[nameservice ID]
  • dfs.ha.fencing.methods
  • fs.defaultFS
  • dfs.journalnode.edits.dir
I started from here, build a basic distributed hadoop first:
Then apply with some changes:
  • create soft link in /usr/lib: hadoop -> hadoop-2.6.4
  • change tmp folder from /usr/lib/hadoop-2.6.4/data/tmp/ to /var/opt/hadoop/data/tmp
  • install ZooKeeper-3.4.8
  • create soft link in /usr/lib: zookeeper -> zookeeper-3.4.8
  • create folder /var/opt/zookeeper/tmp/data

Instead of config single NameNode, we build cluster

<!-- Site specific YARN configuration properties -->
                <description>The hostname of the RM.</description>


Instead of using single NameNode, we use cluster

Also ZooKeeper install, and assign id for each zookeeper

Create file: /var/opt/zookeeper/tmp/data/myid in each ZooKeeper Node
Assign content "1", "2", "3" for each ZooKeeper myid file, which is an id for ZooKeeper Node

Config ZooKeeper
In /usr/lib/zookeeper/conf, copy zoo.cfg from  zoo_sample.cfg
Change zoo.cfg, assign and create dataDir, assign server host with server id for each ZooKeeper Node

# The number of milliseconds of each tick
# The number of ticks that the initial 
# synchronization phase can take
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
# the port at which the clients will connect
# the maximum number of client connections.
# increase this if you need to handle more clients
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
# The number of snapshots to retain in dataDir
# Purge task interval in hours
# Set to "0" to disable auto purge feature

Then start zookeeper on each ZooKeeper Node
./ start
You will see QuorumPeerMain in running as Java Application in jps

Start each Journal Node start journalnode

Format on one of the NameNode
./bin/hdfs namenode -format

Start the formatted NameNode
./sbin/ start namenode

Start StandBy NameNode from StandBy NameNode, this will synchronize formatted image files from the formated NameNode
./bin/hdfs namenode -bootstrapStandby

Format ZKFC, just one of the NameNode host
./bin/hdfs zkfc -formatZK

Stop all services from just one NameNode

Start all services from just one NameNode

We will not sure which NameNode will be Active, it depends on which NameNode acquired the lock first.

After all install/config/format, one NameNode Restart

Saturday, May 21, 2016

MapReduce Examples

Running MapReduce in YARN

We have some default example of MapReduce jars from Hadoop

Example: WordCount

Put a text file into hdfs system, I put it as /data01/LICENSE.txt
I want the output result to /data01/LICENSE_WordCount.txt
(I was thinking the output result is just one file, but it has to be a folder), so here LICENSE_WordCount.txt is a folder name

Run MR
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar wordcount /data01/LICENSE.txt /data01/LICENSE_WordCount

The result folder contains an empty file named _SUCCESS which marks the job is succeed.
And a result file: part-r-00000

Example: pi

Pick up one of them, what I choose is pi
Run it:
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi

It requires two arguments: <nMaps> and <nSamples>
Usage: org.apache.hadoop.examples.QuasiMonteCarlo <nMaps> <nSamples>
Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|resourcemanager:port>    specify a ResourceManager
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
We assigned 5 maps, each map with 20 samples
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 5 20

Looking at the running status of MapReduce job, we found that

We assigned 5 maps, each map with 20 samples.
Each Map has a Container. But the running MapReduce has 6 containers. 
The one addition container is ApplicationMaster.

Looking at the result of running MR job:
[yushan@hadoop-yarn hadoop-2.6.4]$ bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 5 20
Number of Maps  = 5
Samples per Map = 20
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Starting Job
16/05/21 15:23:50 INFO client.RMProxy: Connecting to ResourceManager at /
16/05/21 15:23:50 INFO input.FileInputFormat: Total input paths to process : 5
16/05/21 15:23:51 INFO mapreduce.JobSubmitter: number of splits:5
16/05/21 15:23:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1463856930642_0002
16/05/21 15:23:51 INFO impl.YarnClientImpl: Submitted application application_1463856930642_0002
16/05/21 15:23:51 INFO mapreduce.Job: The url to track the job:
16/05/21 15:23:51 INFO mapreduce.Job: Running job: job_1463856930642_0002
16/05/21 15:23:59 INFO mapreduce.Job: Job job_1463856930642_0002 running in uber mode : false
16/05/21 15:23:59 INFO mapreduce.Job:  map 0% reduce 0%
16/05/21 15:24:36 INFO mapreduce.Job:  map 100% reduce 0%
16/05/21 15:24:36 INFO mapreduce.Job: Task Id : attempt_1463856930642_0002_m_000002_0, Status : FAILED
Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
16/05/21 15:24:37 INFO mapreduce.Job:  map 80% reduce 0%
16/05/21 15:24:47 INFO mapreduce.Job:  map 100% reduce 0%
16/05/21 15:24:50 INFO mapreduce.Job:  map 100% reduce 100%
16/05/21 15:24:50 INFO mapreduce.Job: Job job_1463856930642_0002 completed successfully
16/05/21 15:24:50 INFO mapreduce.Job: Counters: 51
File System Counters FILE: Number of bytes read=116
FILE: Number of bytes written=643059
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1385
HDFS: Number of bytes written=215
HDFS: Number of read operations=23
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters  Failed map tasks=1
Launched map tasks=6
Launched reduce tasks=1
Other local map tasks=1
Data-local map tasks=5
Total time spent by all maps in occupied slots (ms)=189730
Total time spent by all reduces in occupied slots (ms)=9248
Total time spent by all map tasks (ms)=189730
Total time spent by all reduce tasks (ms)=9248
Total vcore-milliseconds taken by all map tasks=189730
Total vcore-milliseconds taken by all reduce tasks=9248
Total megabyte-milliseconds taken by all map tasks=194283520
Total megabyte-milliseconds taken by all reduce tasks=9469952
Map-Reduce Framework Map input records=5
Map output records=10
Map output bytes=90
Map output materialized bytes=140
Input split bytes=795
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=140
Reduce input records=10
Reduce output records=0
Spilled Records=20
Shuffled Maps =5
Failed Shuffles=0
Merged Map outputs=5
GC time elapsed (ms)=16465
CPU time spent (ms)=5220
Physical memory (bytes) snapshot=670711808
Virtual memory (bytes) snapshot=5995196416
Total committed heap usage (bytes)=619401216
Shuffle Errors BAD_ID=0
File Input Format Counters  Bytes Read=590
File Output Format Counters  Bytes Written=97
Job Finished in 60.235 seconds
Estimated value of Pi is 3.20000000000000000000

Seeing the Important factors:

Job ID
Format for job id: job_timestamp_[job serial number]
0002: job serial number, starts from 0, up to 1000

Task Id
Task Id : attempt_1463856930642_0002_m_000002_0, Status : FAILED
Format for task id: attempt_timestamp_[job serial number]_m_[task serial number]_0
m means Map
r mean Reduce

MapReduce has 6 counters

  1. File System Counters
  2. Job Counters 
  3. Map-Reduce Framework
  4. Shuffle Errors
  5. File Input Format Counters 
  6. File Output Format Counters 

MapReduce History Server

Provides MapReduce's working logs, for example: Number of Maps, Number of Reduce, job submit time, job start time, job finish time, etc..
When we click on History of application, it's showing ERR_CONNECTION_REFUSED
It's pointing to port 19888, but port 19888 is not opened by default.

using netstat -tnlp to see the port it's really not opened.

To Start History Server
sbin/ start historyserver

Web UI

To Stop History Server
sbin/ stop historyserver

Hadoop HDFS Web UI

Now goto we will see namenode info from webpage

Now goto we will see secondarynamenode info from webpage

Started:Sat May 21 11:21:01 EDT 2016
Version:2.6.4, r5082c73637530b0b7e115f9625ed7fac69f937e6
Compiled:2016-02-12T09:45Z by jenkins from (detached from 5082c73)
Cluster ID:CID-4fc1b111-3c85-4e0a-9cd5-e44ec8fe0192
Block Pool ID:BP-796093918-

Block Pool ID: New in Hadoop2,
HDFS files are stored as blocks in hard drive. These blocks were put into a physical pool to be managed.
Every Namenode has a pool, each pool has an ID

Cluster ID and Block Pool ID are concepts for Namenode Federation.


IN $HADOOP_HOME/logs 156419 bytes May 21, 2016 12:54:32 PM 4913 bytes May 21, 2016 11:31:45 AM

.log file: by log4j, log most application logs, configured by
.out file: output standard output and standard error logs, does not have a lot

log file name format:
[framework]-[username]-[process name]-[host name].log/

can also run command: ulimit -a

NameNode Journal Status

Edits of NameNode

NameNode Storage

Startup Progress
SecondaryNameNode combines fsimage and edits periodically, and update fsimage, delete  edits.
Each time it combines, it saves a checkpoint

Thursday, May 12, 2016

Hadoop 2 CentOS build(from pseudo to real distributed)

We gonna have four machines:       hadoop-yarn     hadoop-node1     hadoop-node2     hadoop-node3
We can have all of them as virtual machines, start from the first one, then clone hadoop-yarn to other nodes.

We will start with building a pseudo distributed first, start with only will use VirtualBox), which runs everything by it self.
After everything tests ok in pseudo distributed, we move on to real distributed(I will use VirtualBox to build all servers).

Linux Version
In VirtualBox

Hadoop Version

JDK Version
OpenJDK 7

Disable iptable
File: /etc/sysconfig/selinux
Config: SELINUX=disable

Network config:

Choose your own host-only interface which set from virtualbox, mine here is enp0s8
Set a static IP with the same network from host machine

If you want to connect internet(Optional)
This version of CentOS does not provide default DNSChoose your own NAT interface which set from virtualbox, mine here is enp0s3
Install net-tools for convenient, like ifconfig commands(Optional)
sudo yum install net-tools

Change Hostname
Change to my own HOSTNAME in /etc/sysconfig/network
Also change in hosts /etc/hosts   localhost
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6      hadoop-yarn     hadoop-node1     hadoop-node2     hadoop-node3

Note:Standard config: IP followed by hostname with domain followed by hostname

Also change in here /etc/hostname
This may not necessary, but let's put it here first.

Setup SSH

Some intro to ssh

For distributed hadoop, it's all related to ssh communication.
So the Namenode server required to talk to any of the Datanode server.
One first step is to call "", and this script is using ssh to communicate with other servers.
We need to enable SSH without password from Namenode server to any slave.

For pseudo hadoop server, it also required to ssh to itself if we use "" like scripts.

So we can try first, let's see what server/ip/hostname it requesting to have ssh login.

As we see, requested three times from me to enter password

  1. namenode:
  2. datanode:
  3. secondarynamenode:

We know means localhost, but when we are in distributed hadoop, which is normally we will do, there will be a lot different datanodes with different hostname/ip. So all these required to have remote ssh without password.

So in Namenode server
First, generate public key and private key: ssh-keygen -t rsa
Second, copy my public key to myself: ssh-copy-id yushan@localhost  this solves ssh to
Third, copy my public key to any other servers: ssh-copy-id yushan@datanode1
In real world, when starting and, we may have more times to enter password.
So remember these host names that requiring password, and then copy public key to those hosts. This will make sure your hadoop system runs without manually typing password.

Install JDK
sudo yum install java-1.7.0-openjdk-devel
If install succeed, we can try command java and javac to verify.
Use "whereis java" to find where it installed, you will jump through it's link and finally find the install location
Here it is: /usr/lib/jvm/java-1.7.0-openjdk-
This is too long. Luckily we have also several links which are much shorter in /usr/lib/jvm.
It's pointing from /etc/alternatives/, and all finally point to same jdk directory
We can use /usr/lib/jvm/java-1.7.0 as $JAVA_HOME

Set JDK env
Create file /etc/profile.d/
export JAVA_HOME=/usr/lib/jvm/java-1.7.0export PATH=.:$JAVA_HOME/bin:$PATH
Update new configs: source /etc/profile

Install Hadoop

download hadoop-2.6.4.tar.gz
extract it to /usr/lib/

Config files

We can find default xml files in source code for core/hdfs/yarn/mapred. So we can pick up the things we need to change from the default xml files, and put them into our xml files.

In /usr/lib/hadoop/conf
  4. core-site.xml
  5. hdfs-site.xml
  6. yarn-site.xml
  7. mapred-site.xml

Change JAHA_HOME to jdk directory
export JAVA_HOME=/usr/lib/jvm/java-1.7.0
Uncomment and change:
export JAVA_HOME=/usr/lib/jvm/java-1.7.0
Uncomment and change:
export JAVA_HOME=/usr/lib/jvm/java-1.7.0

Assign hdfs default name, data/tmp directory, manually create this directory
This need to be point to hostname of Namenode

I just found that is Deprecated in hadoop2. We can use fs.defaultFS instead.

dfs.replication default value is 3
Rule: replication number must NOT bigger than datanode number.





Format Hadoop

hadoop namenode -format

We may find some issue by user permissions.

This is the error I got.
16/05/02 00:47:35 INFO namenode.FSNamesystem: fsOwner=yushan
16/05/02 00:47:35 INFO namenode.FSNamesystem: supergroup=supergroup
16/05/02 00:47:35 INFO namenode.FSNamesystem: isPermissionEnabled=false
16/05/02 00:47:35 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
16/05/02 00:47:35 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
16/05/02 00:47:36 INFO namenode.FSEditLog: dfs.namenode.edits.toleration.length = 0
16/05/02 00:47:36 INFO namenode.NameNode: Caching file names occuring more than 10 times Cannot create directory /user/lib/hadoop-2.6.4/data/tmp/dfs/name/current

As wee see, by default, hadoop user is the creating user's username, and group is "supergroup"
We can change the whole folder /usr/lib/hadoop-2.6.4 and all its sub forlder/files to the username and group,(create the group name if not exist) and assign 755 on the whole folder. This will be fixed.

Cannot do hadoop namenode -format more than once
If want to format more than once, we can delete all files from /user/lib/hadoop-2.6.4/data/tmp/ folder, and format again

The namenode format generates a Cluster ID
You can also assign a Cluster ID if needed
bin/hdfs namenode -format -clusterid yarn-cluster

namenode.HostFileManager: read includes:
namenode.HostFileManager: read excludes:

Safe Mode

  • When namenode starts, it loads fsimage into memory, and run operations from edits.
  • Once the memory built, it creates a new fsimage and an empty edits
  • Right in this moment, namenode is in Safe Mode. Now Namenode is read-only for clients
  • In this moment, Namenode collects reports from datanodes. When data block reached smallest replication number line, it can be seen as "Safe". 
  • When a portion(configurable) of data blocks are "Safe", it waits for a moment, then exit Safe Mode.
  • When detect replication number of data block is lower than smallest line, that datablock will be duplicating until it reached smallest line.
    Datablock's position is not determined by Namenode, it is stored in datanode as block list.

If Safe Mode is off, that means in hdfs-site.xml, we set dfs.permissions to false
This means any user can enter the hdfs do file create/edits/delete to hdfs

Different ways of starting Hadoop services
We have the following scripts to start/stop services
  • (Deprecated)
Looking at each files, they are actually calling each other
If use and, they require ssh in slave sub scripts, so it can start all services from remote. After ssh to remote server, it uses to start each individual services.
Single pseudo mode 

Start HDFS

NameNode, DataNode, SecondaryNameNode

Start namenode
sbin/ start namenode

Start datanode
sbin/ start datanode

Start secondarynamenode
sbin/ start secondarynamenode

Now goto we will see namenode info from webpage

Now goto we will see secondarynamenode info from webpage

Start YARN
ResourceManager, NodeManager

Start ResourceManager
sbin/ start resourcemanager

Start NodeManager

sbin/ start nodemanager

Now goto we will see yarn info from webpage
Now goto we will see yarn node info from webpage

Stop HDFS and YARN

NameNode, DataNode, SecondaryNameNode, ResourceManager, NodeManager

Stop namenode
sbin/ stop namenode

Stop datanode
sbin/ stop datanode

Stop secondarynamenode
sbin/ stop secondarynamenode

Stop ResourceManager
sbin/ stop resourcemanager

Stop NodeManager
sbin/ stop nodemanager

Distributed Mode

All things above are for pseudo mode. Once we done pseudo mode, we can easily apply the mostly same configs to slaves(datanodes). If all nodes are virtual box, we can clone the Namenode, then apply a little config changes to the new nodes.(Remember select to reinitialize MAC address for all network cards when clone in VirtualBox/VM)

Before we clone the machine, we need to do some config changes for the Namenode.

We increase dfs.replication to 3(which default is also 3)
As we know, safe mode is related to dfs.replication, so the number of dfs.replication required to be lower/equal to the number of slaves we have.
In this case, we have 3 slaves, so 3 replication is just about good.

We also move SecondaryNameNode from hadoop-yarn server to hadoop-node1, normally NameNode and SecondaryNameNode are not in the save server.


yarn will still run in Namenode, I just replace default value of to real hostname hadoop-yarn.


<!-- Site specific YARN configuration properties -->
                <description>The hostname of the RM.</description>

The slaves config file located in the same folder as hdfs-site.xml and yarn-site.xml
Add the following lines to slaves, which points out all nodes

Up to here, we can shutdown Namenode and start clone if you also using VM for all the nodes.

Again, here is the list of all four boxes:

The goal is: Use and from Namenode without password, this require Namenode can ssh to all other nodes without password.
To do this, make sure all slaves hosts has ~/.ssh/authorized_keys of Namenode's public key.
(If before clone, you can ssh from Namenode host to Namenode hostname itself without passowrd, then probably we will be OK, since each cloned host will have Namenode's public key)

We need to change the following configs for each node, as node1, node2, node3
change the IP:
change hostname: /etc/sysconfig/network
change hostname: /etc/hostname

After all nodes configured, we will do from Namenode manually once, from command line. 
If prompt to ask for yes/no, type yes.(For the first time running in distributed mode, the lines of output may be queued up. So when you see output contains "yes/no", just type "yes", no matter which output line you are in.)

All we want to make sure is: ssh can go through from Namenode to every other nodes without password.

This is the final result when we try
This is the final result when we try

The web url will change some based on which service running on which server
NameNode: http://hadoop-yarn:50070/

Yarn(Resource Manager):
Each slave server has a Node Manager
Node Manager:
Node Manager:
Node Manager:

MapReduce History Server
Provides MapReduce's working logs, for example: Number of Maps, Number of Reduce, job submit time, job start time, job finish time, etc..
When we click on History of application, it's showing ERR_CONNECTION_REFUSED
It's pointing to port 19888, but port 19888 is not opened by default.

using netstat -tnlp to see the port it's really not opened.

To Start History Server
sbin/ start historyserver

Web UI

To Stop History Server
sbin/ stop historyserver

Wednesday, May 11, 2016

MapReduce Concept

MapReduce: Offline task framework

Move calculation, not move data

Require Key->Value as both input and output

Separate task to two steps: Map and Reduce
  • Map: parallel process input data
  • Reduce: Combine Map results
Shuffle connects Map and Reduce
  • Map Task: write data into local hard disk
  • Reduce Task: read a copy of data from each Map Task

Only good for Offline task batch

  • Fault Tolerance, Expansibility
  • Good for simple batch task

  • cost resources, use up too much hard disk, which cause low efficient


Primitive Variable Type


IntWritable value = new IntWritable(100);
int v = value.get();
Example for String:
Text text = new Text("abc");
String str = text.toString();
File InputFormat
Is an interface which reads file and convert each record to mapper.

File Input Format
The default File Input Format is TextInputFormat
By default, TextInputFormat use

  • LongWritable as KeyInput of Mapper
  • Text as ValueInput of Mapper

There other three File Input Format
And they REQUIRE to specify Type of KeyInput, ValueInput Manually

  • KeyValueTextInputFormat
    Seperate line with white space, using first word as key, the rest part as value
  • SequenceFileInputFormat
  • SequenceFileAsTextInputFormat

In One Map Task

In the whole procedure, the data result is Key-Value based. And we do all things on the Key, not Value. So we do partition for key, sort for key, merge for key. Not for value.

Separate data to different parts, and put result into Memory Buffer.
Partition can have customization, default is Hash-Mod, is for data-load-balance(but Hash-Mod may cause data-load-unbalance, so we may need to do customization to make it data-load-balance).
Based on this partition, it will later determine which part to go to which Reduce machine.

Before results in Memory Buffer exceeds the limit(default 100MB), we have to write data into disk as one file, so we can keep parsing more data. This is called spill.
So the sorted and partitioned data will be spill to disk. So this is just a buffer consideration, like use disk as an extension of memory.
Spill is accomplished by single thread, it does not affect the procedure of partition(writing data into Memory Buffer).
Spill.percent, default is 0.8.
When spill thread started, we need to sort these 80MB(0.8 * 100MB) keys.

Sort can have customization, we can determine how to compare and sort the data, default is compared by ascii value(dictionary order).

Merge on disk(Combiner)
Combiner may not existed, since some map results will never merge together.
So combiner is not required.
Merge on disk(Combiner) can have customization, default rule is Hash Value.
Once the memory buffer is spilled, and the buffer is cleared; when we parsing new data, we have no clue of previous partition status.
And the spilled data is already a file in hdfs, that we cannot change that file.
To bring all spilled results into one file, we have to re-parse all spilled files, and merge them on to disk(Combiner). So in this procedure, the data will be merged together by the combine rule.
The goal of Merge on disk(Combiner):
  1. put all spilled results from one map task into one single file.
  2. shrink the size of map results to me transmitted
  3. bashed on combiner rule, each partition of the result will choose the merge destination.(Not sure if this is right)

Conclusion: we can do customization on these three steps

  1. Partition
  2. Sort
  3. Combiner

In One Reduce Task
Misture of in-memory and on-disk data
This is NOT customizable. This reduce task receives(copy) different pieces of results from multiple maps. And put them in Memory Buffer.

Merge the received data with the same key. This may also exceed the Memory Buffer limit size.
So it will also spill results as a file on to hdfs each time when reaching Memory Buffer limit.

Then merge again for all spilled data(and also data from Memory Buffer).

All merged data will be passed to reduce, by sequence, one-by-one, not parallel, not the same time.

Then the data will pass to Recuder, can have customization, with the Key and Iterable<Value>

Finally the handled results will be Output from RecordWriter

And the file format are

  • part-00000
  • part-00001
  • part-00002
  • etc...

Output File Directory

  • _logs/
  • part-00000

Before MapReduce Job done, there will be an empty file _TEMP as a flag.
After job done, file name will be changed to _SUCCESS, and it's still an empty file as flag

MapReduce Split Size

If a block is too big, we need to split it to small pieces, then each piece will be processed as a single Map Task.
If a block is too small, it will have only one piece and only one Map Task.
  • max.split(100M)
  • min.split(10M)
  • block(64M)
  • max(min.split,min(max.split,block))

MapReduce Logs

MapReduce has two types of logs

  1. history job logs
  2. Container logs
History job logs

Stored in hdfs, staging
  • number of maps used
  • number of reduce used
  • job submission time
  • job start time
  • job finish time
  • how many job succeed
  • how many job failed
  • how many jobs each queue runned
  • etc...
Container Logs

Stored in ${HADOOP_HOME}/logs/userlogs
  • ApplicationMaster Logs
  • Task Logs
  • default dog saved in ${HADOOP_HOME}/logs/userlogs


Four Components

Client request job to Resource Manager

Resource Manager

  • handles Client request
  • Start/Monitor ApplicationMaster
  • Monitor NodeManager
  • Resource allocation and call


Node Manager
 sends status heartbeat to Resource Manager
  • single node self resource management
  • handle commands from ResourceManager
  • handle commands from ApplicationMaster

Container: (in NodeManager)a unit of Resource(Memory, disk size, cpu, network io, disk io),  there will be a job task in each Container. Some task in Container also will send Task Status to App Master.

App Master: (in NodeManager)Manager(monitor, manage) for one app. Calculate how much Container it needs, then request the Container from Resource Manager. Once get Container, run the app.
  • cut data to blocks
  • request resource from Resource Manager, and allocate resource to jobs
  • monitor jobs and failure compatible
Example: MapReduce App Master, MPI App Master, etc..

MapReduce in Yarn flow

Tuesday, May 10, 2016

HDFS and MapReduce introduction

Hadoop Distributed File System

namenode: Only one
datanode: many

has content of file name, directory, file property(time, replication,permission), and position of DataNode
  • receive user request
  • maintain file system directory structure
  • manage relationship between file and block, relationship between block and datanode

  • store file
  • file are separated to blocks(default 128M), storing on hard drive
  • also has checksum of the data
  • to ensure safety, file will have multiple copies

Secondary NameNode

  • monitor HDFS status, background assistant program
  • grap snapshot of HDFS periodically


JobTracker: only one

  • receive user job request
  • allocate job to TaskTrackers
  • Monitor TaskTracker running status

TaskTracker: many

  • Run job which assigned from JobTracker

Hadoop Directory

based on sbin/, used as management scripts

scripts include hdfs, yarn etc..

include config scripts(xml, sh, properties, template)

c++ headers

dynamic and static native library

public hadoop jar package and docs

hadoop config shell scripts, configs log, environment arguments etc..

Monday, May 9, 2016

Maven Compile

sudo apt-get install autoconf automake libtool cmake

sudo apt-get install ncurses-base ncurses-bin

sudo apt-get install openssl

sudo apt-get install protobuf-compiler

Download protobuf-2.6.1, 
 make & make check & make install

sudo apt-get install lzop zlib-bin gcc gcc-4.9-base

Install maven

sudo apt-get install findbugs


mvn package -Pdist,native,docs,src -DskipTests -Dtar


Met an warning
-Dmaven.multiModuleProjectDirectory system property is not set. Check $M2_HOME environment variable and mvn script match.
Looks like Maven required more specific argument to run, so add an argument into the environment.
export MAVEN_OPTS="-Dmaven.multiModuleProjectDirectory=$M2_HOME"
We may require more if met memory issue. So it may finally look like this:

export MAVEN_OPTS="-Xms256m -Xmx512m -Dmaven.multiModuleProjectDirectory=$M2_HOME"

Sunday, May 8, 2016

HADOOP src code

Import hadoop src to eclipse
Go to hadoop src folder, enter hadoop-maven-plugins/
cd hadoop-maven-plugins/
mvn install
Then go back to src folder
mvn eclipse:eclipse -DskipTests
Take a while to download repositories

 * NameNode serves as both directory namespace manager and
 * "inode table" for the Hadoop DFS.  There is a single NameNode
 * running in any DFS deployment.  (Well, except when there
 * is a second backup/failover NameNode.)
 * The NameNode controls two critical tables:
 *   1)  filename->blocksequence (namespace)
 *   2)  block->machinelist ("inodes")
 * The first table is stored on disk and is very precious.
 * The second table is rebuilt every time the NameNode comes
 * up.
 * 'NameNode' refers to both this class as well as the 'NameNode server'.
 * The 'FSNamesystem' class actually performs most of the filesystem
 * management.  The majority of the 'NameNode' class itself is concerned
 * with exposing the IPC interface and the http server to the outside world,
 * plus some configuration management.
 * NameNode implements the ClientProtocol interface, which allows
 * clients to ask for DFS services.  ClientProtocol is not
 * designed for direct use by authors of DFS client code.  End-users
 * should instead use the org.apache.nutch.hadoop.fs.FileSystem class.
 * NameNode also implements the DatanodeProtocol interface, used by
 * DataNode programs that actually store DFS data blocks.  These
 * methods are invoked repeatedly and automatically by all the
 * DataNodes in a DFS deployment.
 * NameNode also implements the NamenodeProtocol interface, used by
 * secondary namenodes or rebalancing processes to get partial namenode's
 * state, for example partial blocksMap etc.

Everything we need to use is just FileSystem, which is Implements by DistributedFileSystem.

The whole communication between NameNode and DataNode is through RPC, so all functions for user is looks like everything runs locally. But it actually done by Proxy.

ClientProtocol and DatanodeProtocol

public interface ClientProtocol extends VersionedProtocol {

// create rpc server

    this.server = RPC.getServer(this, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf, namesystem

//also starts a web server

httpServer = new HttpServer("hdfs", infoHost, infoPort,
              infoPort == 0, conf,
              SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN))


 * Protocol that a DFS datanode uses to communicate with the NameNode.
 * It's used to upload current load information and block reports.
 * The only way a NameNode can communicate with a DataNode is by
 * returning values from these functions.
    serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
    clientPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
public interface DatanodeProtocol extends VersionedProtocol {

   * sendHeartbeat() tells the NameNode that the DataNode is still
   * alive and well.  Includes some status info, too.
   * It also gives the NameNode a chance to return
   * an array of "DatanodeCommand" objects.
   * A DatanodeCommand tells the DataNode to invalidate local block(s), 
   * or to copy them to other DataNodes, etc.
DataNode sendHeartBeat() to NameNode, NameNode returns DatanodeCommand[] so DataNode can do job by following the returning command.