Saturday, May 28, 2016

Hadoop 2 HA setup config with ZooKeeper

Hadoop 2.x
  • HDFS: NN Federation, HA(High Availability)
  • MapReduce: Runs on YARN
  • YARN: Resource Manage System
HDFS 2.x

Fix HDFS 1.0 single node failure
  • HDFS HA: Master/Backup NameNode
  • If Master NameNode failure, then switch to Backup NameNode
Fix HDFS 1.0 memory limition problem
  • HDFS Federation
  • Expandable, support multiple NameNode
  • Every NameNode manages a part of directory
  • All NameNode share all DataNode storage resources
HDFS 2.x only changed architecture, it does not change the way of usage
It is transparent to HDFS user
Compatible HDFS 1.x command and API



HDFS HA(High Availability)




ZK: ZooKeeper
ZKFC: ZooKeeper Failover Controller
JN: Journal Node
NN: NameNode
DN: DataNode

Looking at the flow Bottom up.

DataNode reports to both Active NameNode and Standby NameNode.
DataNode only receives command from Active NameNode

NameNode's state info will not stored in NameNode itself. It will be stored in a cluster of JournalNodes.
When Active NameNode fails, Standby NameNode will automatically grab info from JournalNode, and continue the jobs.
When Active NameNode manually proceed to update, we can send command to switch between Active and Standby NameNode.

FailoverControllers report heartbeats to ZooKeeper. ZooKeeper is used for HA.
Client send request to ZooKeeper, ZooKeeper tells Client which NameNode to work with you. Then Client talks to NameNode.


Auto Failover switch
  • Zookeeper Failover Controller: monitor status of NameNode
  • Register NameNode to Zookeeper
  • WHen NameNode fails, ZKFC(ZooKeeperFailverController) tries to acquire lock for NameNode.
  • The NameNode which acquired the lock will become Active


HDFS 2.x Federation
  • Federation is used for super big amount data, so we need multiple NameNodes, each NameNode is independent, but sharing all DataNodes.
  • Independent means completely independent of NameNode, so if we want to do HA for NameNodes, each NameNode need to configure its own HA.
  • use namenode/namespace to separate data storage and management into different nodes. This make namenode/namespace can expand by simply adding machines.
  • Can separate load of one namenode to multiple nodes. This will not reduce efficiency when HDFS data is big. We can use multiple namespace to separate different types of application. Allocate different data storage/management into different namenode based on application type.


Yarn
Yet Another Distributed Resource Negotiator

Hadoop 2.0 imported ResourceManager and ApplicationMaster.
  • Separate JobTracker's Resource Management and Task Dispatch from MRv1.
    Replaced: JobTracker and TaskTracker from Hadoop 1.0
    To : ResourceManager and ApplicationMaster
  • ResourceManager: For whole cluster resource management and dispatch(Only exists one for each cluster)
  • ApplicationMaster: For application related transactions, like task dispatch, task monitor, fault tolerant, etc. Each application has one ApplicationMaster
YARN: make multiple calculation framework works in one cluster
It accomplished a lot framework to be interfaced.
  • Each application has one ApplicationMaster
  • Multiple frameworks can work same time on YARN: MapReduce, Spark, Storm, etc..



HA HDFS Cluster build example

NameNode DataNode ZooKeeper ZKFC JournalNode ResourceManager DataManager
hadoop-yarn 1 1 1 1
hadoop-node1 1 1 1 1 1 1
hadoop-node2 1 1 1 1
hadoop-node3 1 1 1

Followed this doc:
We need

  • dfs.nameservices as name service ID
  • dfs.ha.namenodes.[name service ID]: as NameNode IDs
  • dfs.namenode.rpc-address.[nameservice ID].[name node ID]
  • dfs.namenode.http-address.[nameservice ID].[name node ID]
  • dfs.namenode.shared.edits.dir
  • dfs.client.failover.proxy.provider.[nameservice ID]
  • dfs.ha.fencing.methods
  • fs.defaultFS
  • dfs.journalnode.edits.dir
I started from here, build a basic distributed hadoop first: http://gvace.blogspot.com/2016/05/hadoop-centos-build.html
Then apply with some changes:
  • create soft link in /usr/lib: hadoop -> hadoop-2.6.4
  • change tmp folder from /usr/lib/hadoop-2.6.4/data/tmp/ to /var/opt/hadoop/data/tmp
  • install ZooKeeper-3.4.8
  • create soft link in /usr/lib: zookeeper -> zookeeper-3.4.8
  • create folder /var/opt/zookeeper/tmp/data


hdfs-site.xml
Instead of config single NameNode, we build cluster
        <property>
                <name>dfs.nameservices</name>
                <value>mycluster</value>
        </property>
        <property>
                <name>dfs.ha.namenodes.mycluster</name>
                <value>nn1,nn2</value>
        </property>
        <property>
                <name>dfs.namenode.rpc-address.mycluster.nn1</name>
                <value>hadoop-yarn.gvace.com:8020</value>
        </property>
        <property>
                <name>dfs.namenode.rpc-address.mycluster.nn2</name>
                <value>hadoop-node1.gvace.com:8020</value>
        </property>
        <property>
                <name>dfs.namenode.http-address.mycluster.nn1</name>
                <value>hadoop-yarn.gvace.com:50070</value>
        </property>
        <property>
                <name>dfs.namenode.http-address.mycluster.nn2</name>
                <value>hadoop-node1.gvace.com:50070</value>
        </property>
        <property>
                <name>dfs.namenode.shared.edits.dir</name>
                <value>qjournal://hadoop-node1.gvace.com:8485;hadoop-node2.gvace.com:8485;hadoop-node3.gvace.com:8485/mycluster</value>
        </property>
        <property>
                <name>dfs.client.failover.proxy.provider.mycluster</name>
                <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
        </property>
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>sshfence</value>
        </property>
        <property>
                <name>dfs.ha.fencing.ssh.private-key-files</name>
                <value>/home/yushan/.ssh/id_rsa</value>
        </property>
        <property>
                <name>dfs.journalnode.edits.dir</nam<configuration>

<!-- Site specific YARN configuration properties -->
        <property>
                <description>The hostname of the RM.</description>
                <name>yarn.resourcemanager.hostname</name>
                <value>hadoop-yarn.gvace.com</value>
        </property>
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>

</configuration>e>
                <value>/var/opt/hadoop/data/tmp/journal/edits</value>
        </property>
        <property>
                <name>dfs.ha.automatic-failover.enabled</name>
                <value>true</value>
        </property>


core-site.xml
Instead of using single NameNode, we use cluster
<configuration>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://mycluster</value>
        </property>
        <property>
                <name>ha.zookeeper.quorum</name>
                <value>hadoop-yarn.gvace.com:2181,hadoop-node1.gvace.com:2181,hadoop-node2.gvace.com:2181</value>
        </property>
<!--
        <property>
                <name>fs.default.name</name>
                <value>hdfs://hadoop-yarn.gvace.com:8020</value>
                <description>NameNode</description>
        </property>
-->
        <property>
                <name>hadoop.tmp.dir</name>
                <value>/var/opt/hadoop/data/tmp</value>
        </property>
</configuration>



Also ZooKeeper install, and assign id for each zookeeper

Create file: /var/opt/zookeeper/tmp/data/myid in each ZooKeeper Node
Assign content "1", "2", "3" for each ZooKeeper myid file, which is an id for ZooKeeper Node

Config ZooKeeper
In /usr/lib/zookeeper/conf, copy zoo.cfg from  zoo_sample.cfg
Change zoo.cfg, assign and create dataDir, assign server host with server id for each ZooKeeper Node

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/var/opt/zookeeper/tmp/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=hadoop-yarn.gvace.com:2888:3888
server.2=hadoop-node1.gvace.com:2888:3888
server.3=hadoop-node2.gvace.com:2888:3888




Then start zookeeper on each ZooKeeper Node
./zkServer.sh start
You will see QuorumPeerMain in running as Java Application in jps

Start each Journal Node
hadoop-daemon.sh start journalnode

Format on one of the NameNode
./bin/hdfs namenode -format

Start the formatted NameNode
./sbin/hadoop-daemon.sh start namenode

Start StandBy NameNode from StandBy NameNode, this will synchronize formatted image files from the formated NameNode
./bin/hdfs namenode -bootstrapStandby

Format ZKFC, just one of the NameNode host
./bin/hdfs zkfc -formatZK

Stop all services from just one NameNode
sbin/stop-dfs.sh

Start all services from just one NameNode
sbin/start-dfs.sh

We will not sure which NameNode will be Active, it depends on which NameNode acquired the lock first.


After all install/config/format, one NameNode Restart









































Saturday, May 21, 2016

MapReduce Examples

Running MapReduce in YARN

We have some default example of MapReduce jars from Hadoop


Example: WordCount

Put a text file into hdfs system, I put it as /data01/LICENSE.txt
I want the output result to /data01/LICENSE_WordCount.txt
(I was thinking the output result is just one file, but it has to be a folder), so here LICENSE_WordCount.txt is a folder name

Run MR
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar wordcount /data01/LICENSE.txt /data01/LICENSE_WordCount


The result folder contains an empty file named _SUCCESS which marks the job is succeed.
And a result file: part-r-00000




Example: pi

Pick up one of them, what I choose is pi
Run it:
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi

It requires two arguments: <nMaps> and <nSamples>
Usage: org.apache.hadoop.examples.QuasiMonteCarlo <nMaps> <nSamples>
Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|resourcemanager:port>    specify a ResourceManager
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
We assigned 5 maps, each map with 20 samples
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 5 20





Looking at the running status of MapReduce job, we found that


We assigned 5 maps, each map with 20 samples.
Each Map has a Container. But the running MapReduce has 6 containers. 
The one addition container is ApplicationMaster.



Looking at the result of running MR job:
[yushan@hadoop-yarn hadoop-2.6.4]$ bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 5 20
Number of Maps  = 5
Samples per Map = 20
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Starting Job
16/05/21 15:23:50 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/05/21 15:23:50 INFO input.FileInputFormat: Total input paths to process : 5
16/05/21 15:23:51 INFO mapreduce.JobSubmitter: number of splits:5
16/05/21 15:23:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1463856930642_0002
16/05/21 15:23:51 INFO impl.YarnClientImpl: Submitted application application_1463856930642_0002
16/05/21 15:23:51 INFO mapreduce.Job: The url to track the job: http://hadoop-yarn.gvace.com:8088/proxy/application_1463856930642_0002/
16/05/21 15:23:51 INFO mapreduce.Job: Running job: job_1463856930642_0002
16/05/21 15:23:59 INFO mapreduce.Job: Job job_1463856930642_0002 running in uber mode : false
16/05/21 15:23:59 INFO mapreduce.Job:  map 0% reduce 0%
16/05/21 15:24:36 INFO mapreduce.Job:  map 100% reduce 0%
16/05/21 15:24:36 INFO mapreduce.Job: Task Id : attempt_1463856930642_0002_m_000002_0, Status : FAILED
Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
16/05/21 15:24:37 INFO mapreduce.Job:  map 80% reduce 0%
16/05/21 15:24:47 INFO mapreduce.Job:  map 100% reduce 0%
16/05/21 15:24:50 INFO mapreduce.Job:  map 100% reduce 100%
16/05/21 15:24:50 INFO mapreduce.Job: Job job_1463856930642_0002 completed successfully
16/05/21 15:24:50 INFO mapreduce.Job: Counters: 51
File System Counters FILE: Number of bytes read=116
FILE: Number of bytes written=643059
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1385
HDFS: Number of bytes written=215
HDFS: Number of read operations=23
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters  Failed map tasks=1
Launched map tasks=6
Launched reduce tasks=1
Other local map tasks=1
Data-local map tasks=5
Total time spent by all maps in occupied slots (ms)=189730
Total time spent by all reduces in occupied slots (ms)=9248
Total time spent by all map tasks (ms)=189730
Total time spent by all reduce tasks (ms)=9248
Total vcore-milliseconds taken by all map tasks=189730
Total vcore-milliseconds taken by all reduce tasks=9248
Total megabyte-milliseconds taken by all map tasks=194283520
Total megabyte-milliseconds taken by all reduce tasks=9469952
Map-Reduce Framework Map input records=5
Map output records=10
Map output bytes=90
Map output materialized bytes=140
Input split bytes=795
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=140
Reduce input records=10
Reduce output records=0
Spilled Records=20
Shuffled Maps =5
Failed Shuffles=0
Merged Map outputs=5
GC time elapsed (ms)=16465
CPU time spent (ms)=5220
Physical memory (bytes) snapshot=670711808
Virtual memory (bytes) snapshot=5995196416
Total committed heap usage (bytes)=619401216
Shuffle Errors BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters  Bytes Read=590
File Output Format Counters  Bytes Written=97
Job Finished in 60.235 seconds
Estimated value of Pi is 3.20000000000000000000


Seeing the Important factors:

Job ID
job_1463856930642_0002
Format for job id: job_timestamp_[job serial number]
0002: job serial number, starts from 0, up to 1000

Task Id
Task Id : attempt_1463856930642_0002_m_000002_0, Status : FAILED
Format for task id: attempt_timestamp_[job serial number]_m_[task serial number]_0
m means Map
r mean Reduce

Counters
MapReduce has 6 counters

  1. File System Counters
  2. Job Counters 
  3. Map-Reduce Framework
  4. Shuffle Errors
  5. File Input Format Counters 
  6. File Output Format Counters 

MapReduce History Server

Provides MapReduce's working logs, for example: Number of Maps, Number of Reduce, job submit time, job start time, job finish time, etc..
When we click on History of application, it's showing ERR_CONNECTION_REFUSED
It's pointing to port 19888, but port 19888 is not opened by default.

using netstat -tnlp to see the port it's really not opened.

To Start History Server
sbin/mr-jobhistory-daemon.sh start historyserver

Web UI
http://hadoop-yarn.gvace.com:19888/

To Stop History Server
sbin/mr-jobhistory-daemon.sh stop historyserver

























Hadoop HDFS Web UI


Now goto http://hadoop-yarn.gvace.com:50070/ we will see namenode info from webpage

Now goto http://hadoop-yarn.gvace.com:50090/ we will see secondarynamenode info from webpage






Started:Sat May 21 11:21:01 EDT 2016
Version:2.6.4, r5082c73637530b0b7e115f9625ed7fac69f937e6
Compiled:2016-02-12T09:45Z by jenkins from (detached from 5082c73)
Cluster ID:CID-4fc1b111-3c85-4e0a-9cd5-e44ec8fe0192
Block Pool ID:BP-796093918-192.168.56.95-1463114304816

Block Pool ID: New in Hadoop2,
HDFS files are stored as blocks in hard drive. These blocks were put into a physical pool to be managed.
Every Namenode has a pool, each pool has an ID

Cluster ID and Block Pool ID are concepts for Namenode Federation.




Logs

IN $HADOOP_HOME/logs

hadoop-yushan-namenode-hadoop-yarn.gvace.com.log 156419 bytes May 21, 2016 12:54:32 PM
hadoop-yushan-namenode-hadoop-yarn.gvace.com.out 4913 bytes May 21, 2016 11:31:45 AM



.log file: by log4j, log most application logs, configured by log4j.properties
.out file: output standard output and standard error logs, does not have a lot


log file name format:
[framework]-[username]-[process name]-[host name].log/outhadoop-yushan-namenode-hadoop-yarn.gvace.com.log


Seeing hadoop-yushan-namenode-hadoop-yarn.gvace.com.out
can also run command: ulimit -a


NameNode Journal Status

Edits of NameNode


NameNode Storage



Startup Progress
SecondaryNameNode combines fsimage and edits periodically, and update fsimage, delete  edits.
Each time it combines, it saves a checkpoint

Wednesday, May 11, 2016

MapReduce Concept

MapReduce: Offline task framework

Move calculation, not move data

Require Key->Value as both input and output

Separate task to two steps: Map and Reduce
  • Map: parallel process input data
  • Reduce: Combine Map results
Shuffle connects Map and Reduce
  • Map Task: write data into local hard disk
  • Reduce Task: read a copy of data from each Map Task

pros 
Only good for Offline task batch


  • Fault Tolerance, Expansibility
  • Good for simple batch task

cons
  • cost resources, use up too much hard disk, which cause low efficient

















Shuffler






Primitive Variable Type

intIntWritable
longLongWritable
floatFloatWritable
doubleDoubleWritable
StringText
charText


Example:
IntWritable value = new IntWritable(100);
int v = value.get();
Example for String:
Text text = new Text("abc");
String str = text.toString();
File InputFormat
RecordReader
Is an interface which reads file and convert each record to mapper.

File Input Format
The default File Input Format is TextInputFormat
By default, TextInputFormat use

  • LongWritable as KeyInput of Mapper
  • Text as ValueInput of Mapper


There other three File Input Format
And they REQUIRE to specify Type of KeyInput, ValueInput Manually

  • KeyValueTextInputFormat
    Seperate line with white space, using first word as key, the rest part as value
  • SequenceFileInputFormat
  • SequenceFileAsTextInputFormat

In One Map Task

In the whole procedure, the data result is Key-Value based. And we do all things on the Key, not Value. So we do partition for key, sort for key, merge for key. Not for value.

Partition
Separate data to different parts, and put result into Memory Buffer.
Partition can have customization, default is Hash-Mod, is for data-load-balance(but Hash-Mod may cause data-load-unbalance, so we may need to do customization to make it data-load-balance).
Based on this partition, it will later determine which part to go to which Reduce machine.


Spill
Before results in Memory Buffer exceeds the limit(default 100MB), we have to write data into disk as one file, so we can keep parsing more data. This is called spill.
So the sorted and partitioned data will be spill to disk. So this is just a buffer consideration, like use disk as an extension of memory.
Spill is accomplished by single thread, it does not affect the procedure of partition(writing data into Memory Buffer).
Spill.percent, default is 0.8.
When spill thread started, we need to sort these 80MB(0.8 * 100MB) keys.



Sort
Sort can have customization, we can determine how to compare and sort the data, default is compared by ascii value(dictionary order).


Merge on disk(Combiner)
Combiner may not existed, since some map results will never merge together.
So combiner is not required.
Merge on disk(Combiner) can have customization, default rule is Hash Value.
Once the memory buffer is spilled, and the buffer is cleared; when we parsing new data, we have no clue of previous partition status.
And the spilled data is already a file in hdfs, that we cannot change that file.
To bring all spilled results into one file, we have to re-parse all spilled files, and merge them on to disk(Combiner). So in this procedure, the data will be merged together by the combine rule.
The goal of Merge on disk(Combiner):
  1. put all spilled results from one map task into one single file.
  2. shrink the size of map results to me transmitted
  3. bashed on combiner rule, each partition of the result will choose the merge destination.(Not sure if this is right)



Conclusion: we can do customization on these three steps

  1. Partition
  2. Sort
  3. Combiner





In One Reduce Task
Misture of in-memory and on-disk data
This is NOT customizable. This reduce task receives(copy) different pieces of results from multiple maps. And put them in Memory Buffer.

Merge the received data with the same key. This may also exceed the Memory Buffer limit size.
So it will also spill results as a file on to hdfs each time when reaching Memory Buffer limit.

Then merge again for all spilled data(and also data from Memory Buffer).

All merged data will be passed to reduce, by sequence, one-by-one, not parallel, not the same time.

Then the data will pass to Recuder, can have customization, with the Key and Iterable<Value>

Finally the handled results will be Output from RecordWriter

And the file format are

  • part-00000
  • part-00001
  • part-00002
  • etc...

Output File Directory

  • _SUCCESS
  • _logs/
  • part-00000

Before MapReduce Job done, there will be an empty file _TEMP as a flag.
After job done, file name will be changed to _SUCCESS, and it's still an empty file as flag








MapReduce Split Size

If a block is too big, we need to split it to small pieces, then each piece will be processed as a single Map Task.
If a block is too small, it will have only one piece and only one Map Task.
  • max.split(100M)
  • min.split(10M)
  • block(64M)
  • max(min.split,min(max.split,block))







MapReduce Logs


MapReduce has two types of logs

  1. history job logs
  2. Container logs
History job logs

Stored in hdfs, staging
  • number of maps used
  • number of reduce used
  • job submission time
  • job start time
  • job finish time
  • how many job succeed
  • how many job failed
  • how many jobs each queue runned
  • etc...
Container Logs

Stored in ${HADOOP_HOME}/logs/userlogs
  • ApplicationMaster Logs
  • Task Logs
  • default dog saved in ${HADOOP_HOME}/logs/userlogs


























Yarn

Four Components


Client request job to Resource Manager

Resource Manager

  • handles Client request
  • Start/Monitor ApplicationMaster
  • Monitor NodeManager
  • Resource allocation and call


NodeManger

Node Manager
 sends status heartbeat to Resource Manager
  • single node self resource management
  • handle commands from ResourceManager
  • handle commands from ApplicationMaster



Container: (in NodeManager)a unit of Resource(Memory, disk size, cpu, network io, disk io),  there will be a job task in each Container. Some task in Container also will send Task Status to App Master.

App Master: (in NodeManager)Manager(monitor, manage) for one app. Calculate how much Container it needs, then request the Container from Resource Manager. Once get Container, run the app.
  • cut data to blocks
  • request resource from Resource Manager, and allocate resource to jobs
  • monitor jobs and failure compatible
Example: MapReduce App Master, MPI App Master, etc..




MapReduce in Yarn flow
















Tuesday, May 10, 2016

HDFS and MapReduce introduction

HDFS
Hadoop Distributed File System

namenode: Only one
datanode: many

namenode
has content of file name, directory, file property(time, replication,permission), and position of DataNode
  • receive user request
  • maintain file system directory structure
  • manage relationship between file and block, relationship between block and datanode

datanode
  • store file
  • file are separated to blocks(default 128M), storing on hard drive
  • also has checksum of the data
  • to ensure safety, file will have multiple copies

Secondary NameNode

  • monitor HDFS status, background assistant program
  • grap snapshot of HDFS periodically




MapReduce

JobTracker: only one

  • receive user job request
  • allocate job to TaskTrackers
  • Monitor TaskTracker running status

TaskTracker: many

  • Run job which assigned from JobTracker

Hadoop Directory

bin/
based on sbin/, used as management scripts

sbin/
scripts include hdfs, yarn etc..


etc/
include config scripts(xml, sh, properties, template)

include/
c++ headers

lib/
dynamic and static native library

share/
public hadoop jar package and docs


libexec/
hadoop config shell scripts, configs log, environment arguments etc..

Monday, May 9, 2016

Maven Compile

sudo apt-get install autoconf automake libtool cmake

sudo apt-get install ncurses-base ncurses-bin

sudo apt-get install openssl

sudo apt-get install protobuf-compiler

Download protobuf-2.6.1, 
 ./configure
 make & make check & make install

sudo apt-get install lzop zlib-bin gcc gcc-4.9-base

Install maven

sudo apt-get install findbugs

Example

mvn package -Pdist,native,docs,src -DskipTests -Dtar

Maven

Met an warning
-Dmaven.multiModuleProjectDirectory system property is not set. Check $M2_HOME environment variable and mvn script match.
Looks like Maven required more specific argument to run, so add an argument into the environment.
export MAVEN_OPTS="-Dmaven.multiModuleProjectDirectory=$M2_HOME"
We may require more if met memory issue. So it may finally look like this:

export MAVEN_OPTS="-Xms256m -Xmx512m -Dmaven.multiModuleProjectDirectory=$M2_HOME"








Sunday, May 8, 2016

HADOOP src code

Import hadoop src to eclipse
Go to hadoop src folder, enter hadoop-maven-plugins/
cd hadoop-maven-plugins/
mvn install
Then go back to src folder
mvn eclipse:eclipse -DskipTests
Take a while to download repositories



/**********************************************************
 * NameNode serves as both directory namespace manager and
 * "inode table" for the Hadoop DFS.  There is a single NameNode
 * running in any DFS deployment.  (Well, except when there
 * is a second backup/failover NameNode.)
 *
 * The NameNode controls two critical tables:
 *   1)  filename->blocksequence (namespace)
 *   2)  block->machinelist ("inodes")
 *
 * The first table is stored on disk and is very precious.
 * The second table is rebuilt every time the NameNode comes
 * up.
 *
 * 'NameNode' refers to both this class as well as the 'NameNode server'.
 * The 'FSNamesystem' class actually performs most of the filesystem
 * management.  The majority of the 'NameNode' class itself is concerned
 * with exposing the IPC interface and the http server to the outside world,
 * plus some configuration management.
 *
 * NameNode implements the ClientProtocol interface, which allows
 * clients to ask for DFS services.  ClientProtocol is not
 * designed for direct use by authors of DFS client code.  End-users
 * should instead use the org.apache.nutch.hadoop.fs.FileSystem class.
 *
 * NameNode also implements the DatanodeProtocol interface, used by
 * DataNode programs that actually store DFS data blocks.  These
 * methods are invoked repeatedly and automatically by all the
 * DataNodes in a DFS deployment.
 *
 * NameNode also implements the NamenodeProtocol interface, used by
 * secondary namenodes or rebalancing processes to get partial namenode's
 * state, for example partial blocksMap etc.
 **********************************************************/

Everything we need to use is just FileSystem, which is Implements by DistributedFileSystem.

The whole communication between NameNode and DataNode is through RPC, so all functions for user is looks like everything runs locally. But it actually done by Proxy.


ClientProtocol and DatanodeProtocol



public interface ClientProtocol extends VersionedProtocol {

// create rpc server


    this.server = RPC.getServer(this, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());



//also starts a web server
startHttpServer(conf);

httpServer = new HttpServer("hdfs", infoHost, infoPort,
              infoPort == 0, conf,
              SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN))


Communications:


/**********************************************************************
 * Protocol that a DFS datanode uses to communicate with the NameNode.
 * It's used to upload current load information and block reports.
 *
 * The only way a NameNode can communicate with a DataNode is by
 * returning values from these functions.
 *
 **********************************************************************/
@KerberosInfo(
    serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY,
    clientPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
public interface DatanodeProtocol extends VersionedProtocol {


  /**
   * sendHeartbeat() tells the NameNode that the DataNode is still
   * alive and well.  Includes some status info, too.
   * It also gives the NameNode a chance to return
   * an array of "DatanodeCommand" objects.
   * A DatanodeCommand tells the DataNode to invalidate local block(s), 
   * or to copy them to other DataNodes, etc.
   */
DataNode sendHeartBeat() to NameNode, NameNode returns DatanodeCommand[] so DataNode can do job by following the returning command.
























Saturday, May 7, 2016

Namenode SecondaryNameNode DataNode Hadoop

NameNode

Namenode manipulate all operations except saving block data.
Datanode only do saving block data.

The manipulation data stored in ${hadoop.tmp.dir}/dfs/name/current
The file fsimage is storing core values for Namenode, we call it name table which is so important.
Because it assembled all pieces of data blocks into one file. It can only access by one process a time.

And a file named "${hadoop.tmp.dir}/dfs/name/in_use.lock" means this manipulation is locked by one process(Namenode)
So if a Namenode trying to start, it needs to check the lock first, if there is no lock, it can start.

For name table redundancy, we can set dfs.name.dir with multiple values, separate by comma, no white space accepted. We can use nfs to store name table to multiple servers.


${hadoop.tmp.dir}/dfs/name/current/edits: Transaction file.
Can also use comma-delimited for redundancy.



SecondaryNameNode

Since NameNode is busy for user requests, SecondaryNameNode take the position to commit the data operations.
SecondaryNameNode downloads fsimage and edits from Namenode, then combine them together and generate new fsimage. Save it locally, then also send new fsimage to NameNode, let NameNode to reset edits.

SecondaryNameNode default installed on NameNode, but should be separate from NameNode, move it to other server.

As we say, SecondaryNameNode acts like a backup of fsimage for NameNode, but some un-committed data will not stay in SecondaryNameNode. So if we use fsimage from SecondaryNameNode, it may not be up-to-date totally because there are some un-commited data not saved in fsimage yet. So SecondaryNameNode is a cold backup.


DataNode


block:  unit of data. Default 67108864K = 64M (can change by dfs.block.size)
Every time DataNode will write at least one block, however, in HDFS, a file will not occupy the whole block of space. So this is good to hard drive disk usage.

File name ending with ".meta" means it's a verification file for one data block.

Version file:


dfs.replication is configured how many replication to have





















Wednesday, May 4, 2016

HDFS shell commands

Hadoop 2.x

bin/hdfs dfs -ls /
bin/hdfs dfs -lsr /
bin/hdfs dfs -mkdir /data01

bin/hdfs dfs -put abc.txt /dir1
This -put operation will not handle file duplication, so duplicated file cannot do overwrite.
If "/dir1" not exist, the "dir1" will be a file name, not directory

bin/hdfs dfs -get /dir1/abc.txt ~/

bin/hdfs dfs -text /dir1/abc.txt

bin/hdfs dfs -rm /dir1/abx.txt
bin/hdfs dfs -rmr /dir1
bin/hdfs dfs -cp /dir1/abx.txt /dir2/abc.txt

bin/hdfs dfs -help


hadoop default user directory: /user/username, for example /user/yushan

check safe mode
bin/hdfs dfsadmin -safemode get


[yushan@hadoop-yarn hadoop-2.6.4]$ bin/hdfs getconf
hdfs getconf is utility for getting configuration information from the config file.

hadoop getconf
[-namenodes] gets list of namenodes in the cluster.
[-secondaryNameNodes] gets list of secondary namenodes in the cluster.
[-backupNodes] gets list of backup nodes in the cluster.
[-includeFile] gets the include file path that defines the datanodes that can join the cluster.
[-excludeFile] gets the exclude file path that defines the datanodes that need to decommissioned.
[-nnRpcAddresses] gets the namenode rpc addresses
[-confKey [key]] gets a specific key from the configuration

0.0.0.0 means localhost
bin/hdfs getconf -namenodes
bin/hdfs getconf -confKey hadoop.tmp.dir



Hadoop 1.x

Commands

hadoop fs -ls /
hadoop fs -lsr /



hadoop fs -mkdir /dir1

hadoop fs -put abc.txt /dir1
This -put operation will not handle file duplication, so duplicated file cannot do overwrite.
If "/dir1" not exist, the "dir1" will be a file name, not directory

hadoop fs -get /dir1/abc.txt ~/

hadoop fs -text /dir1/abc.txt

hadoop fs -rm /dir1/abx.txt
hadoop fs -rmr /dir1
hadoop fs -cp /dir1/abx.txt /dir2/abc.txt

hadoop fs -help


hadoop default user directory: /user/username
So if cannot access the default directory, we need to manually create the directory.



Tuesday, May 3, 2016

Virtual Box Network Config


See this video

https://www.youtube.com/watch?v=Jk5Kfm2-Muk

To make virtual machine connects both the host and public Internet
We need Host-only Adapter and NAT

Host-only Adapter makes virtual machine can ONLY connect to Host server
NAT makes virtual machine connects to public Internet, does not require IP setup, just use DHCP


In addition, sometime we need to use Bridged as an alternate way to connect other IPs in LAN

Bridged Adapter
makes virtual machine connects to public Internet, we need to make sure the ip,subnet, gateway are in the same range with host.

Sunday, May 1, 2016

Hadoop Install and Config

Install

  • Virtual Machine(optional)
  • Config iptables and ssh
  • Config hostname
  • Make sure we have JDK
  • Extract hadoop to /usr/local/hadoop
  • Set environment
  • Config files
  • Format Hadoop
  • Start Hadoop
  • Auto Start Hadoop
  • Warning
Virtual Machine(optional)
Required to make virtual machine and host can connect, so we can use ssh to config hosts
http://gvace.blogspot.com/2016/05/virtual-box-network-config.html

Config iptables and ssh

iptables: make sure ip is not blocked
http://gvace.blogspot.com/2016/04/linux-network-configs.html

ssh: can remote in terminal, and make Hadoop nodes can communicate each other
http://gvace.blogspot.com/2016/04/ssh.html

Config hostname
Config a hostname, and add known hostnames to config, so each node can communicate with hostnames instead of IPs.
http://gvace.blogspot.com/2016/04/linux-network-configs.html


Make sure we have JDK
java -version
javac -version

Download Hadoop and extract
http://hadoop.apache.org/

Extract hadoop to /usr/local/hadoop 

Set Environment

/etc/profile.d/jdk.sh
#JDK Environment
export J2SDKDIR=/usr/lib/jvm/java-7-oracle
export J2REDIR=/usr/lib/jvm/java-7-oracle/jre
export PATH=$PATH:/usr/lib/jvm/java-7-oracle/bin:/usr/lib/jvm/java-7-oracle/db/bin:/usr/lib/jvm/java-7-oracle/jre/bin
export JAVA_HOME=/usr/lib/jvm/java-7-oracle
export DERBY_HOME=/usr/lib/jvm/java-7-oracle/db

/etc/profile.d/hadoop-env.sh
#Hadoop Environment
export PATH=$PATH:/usr/local/hadoop/bin
export HADOOP_HOME=/usr/local/hadoop 
#Optional if see warning "HADOOP_HOME is deprecated"
export HADOOP_HOME_WARN_SUPPRESS=1


Config files


In /usr/local/hadoop/conf
  1. hadoop-env.sh
  2. core-site.xml
  3. hdfs-site.xml
  4. mapred-site.xml
hadoop-env.sh

Change JAHA_HOME to jdk directory
JAVA_HOME=/usr/lib/jvm/java-7-oracle/

core-site.xml
Assign hdfs default name, tmp directory
<configuration>
 <property>
  <name>fs.default.name</name>
  <value>hdfs://hmain:9000</value>
  <description>NameNode</description>
 </property>
 <property>
  <name>hadoop.tmp.dir</name>
  <value>/usr/local/hadoop/tmp</value>
 </property>
</configuration>


hdfs-site.xml
<configuration>
        <property>
                <name>dfs.replication</name>
                <value>1</value>
        </property>
        <property>
                <name>dfs.permissions</name>
                <value>false</value>
        </property>
</configuration>


mapred-site.xml
<configuration>
 <property>
  <name>mapred.job.tracker</name>
  <value>hmain:9001</value>
  <description>Change hostname and port</description>
 </property>
</configuration>


Format Hadoop

hadoop namenode -format

We may find some issue by user permissions.

This is the error I got.
16/05/02 00:47:35 INFO namenode.FSNamesystem: fsOwner=yushan
16/05/02 00:47:35 INFO namenode.FSNamesystem: supergroup=supergroup
16/05/02 00:47:35 INFO namenode.FSNamesystem: isPermissionEnabled=false
16/05/02 00:47:35 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
16/05/02 00:47:35 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
16/05/02 00:47:36 INFO namenode.FSEditLog: dfs.namenode.edits.toleration.length = 0
16/05/02 00:47:36 INFO namenode.NameNode: Caching file names occuring more than 10 times
16/05/02 00:47:36 ERROR namenode.NameNode: java.io.IOException: Cannot create directory /usr/local/hadoop/tmp/dfs/name/current

As wee see, by default, hadoop user is the creating user's username, and group is "supergroup"
We can change the whole folder /usr/local/hadoop and all its sub forlder/files to the username and group,(create the group name if not exist) and assign 755 on the whole folder. This will be fixed.

Cannot do hadoop namenode -format more than once
If did format more than once, we can delete /usr/local/hadoop/tmp folder, and format again


Start Hadoop

Run start-all.sh

See java processes: Run jps
We will have 5 processes from Hadoop

2701 SecondaryNameNode
2989 Jps
2793 JobTracker
2550 DataNode
2940 TaskTracker
2410 NameNode


Auto Start Hadoop
If we want to set hadoop running as user "yushan", and auto starts when machine boots, before user login or ssh login

We need to add commands into /etc/rc.local, because this file runs before user login
#Start Hadoop at machine startup(before login) as user yushan
su yushan -c '/usr/local/hadoop/bin/start-all.sh'
If we want hadoop to start when user "yushan" login
Just put the above script it into /etc/profile.d/hadoop.sh


Stop Hadoop

Run stop-all.sh


Warning

If warning raised like "HADOOP_HOME is deprecated"
Go back to environment config hadoop-env.sh, add the new line

export PATH=$PATH:/usr/local/hadoop/bin
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_HOME_WARN_SUPPRESS=1