Saturday, May 28, 2016

Hadoop 2 HA setup config with ZooKeeper

Hadoop 2.x
  • HDFS: NN Federation, HA(High Availability)
  • MapReduce: Runs on YARN
  • YARN: Resource Manage System
HDFS 2.x

Fix HDFS 1.0 single node failure
  • HDFS HA: Master/Backup NameNode
  • If Master NameNode failure, then switch to Backup NameNode
Fix HDFS 1.0 memory limition problem
  • HDFS Federation
  • Expandable, support multiple NameNode
  • Every NameNode manages a part of directory
  • All NameNode share all DataNode storage resources
HDFS 2.x only changed architecture, it does not change the way of usage
It is transparent to HDFS user
Compatible HDFS 1.x command and API



HDFS HA(High Availability)




ZK: ZooKeeper
ZKFC: ZooKeeper Failover Controller
JN: Journal Node
NN: NameNode
DN: DataNode

Looking at the flow Bottom up.

DataNode reports to both Active NameNode and Standby NameNode.
DataNode only receives command from Active NameNode

NameNode's state info will not stored in NameNode itself. It will be stored in a cluster of JournalNodes.
When Active NameNode fails, Standby NameNode will automatically grab info from JournalNode, and continue the jobs.
When Active NameNode manually proceed to update, we can send command to switch between Active and Standby NameNode.

FailoverControllers report heartbeats to ZooKeeper. ZooKeeper is used for HA.
Client send request to ZooKeeper, ZooKeeper tells Client which NameNode to work with you. Then Client talks to NameNode.


Auto Failover switch
  • Zookeeper Failover Controller: monitor status of NameNode
  • Register NameNode to Zookeeper
  • WHen NameNode fails, ZKFC(ZooKeeperFailverController) tries to acquire lock for NameNode.
  • The NameNode which acquired the lock will become Active


HDFS 2.x Federation
  • Federation is used for super big amount data, so we need multiple NameNodes, each NameNode is independent, but sharing all DataNodes.
  • Independent means completely independent of NameNode, so if we want to do HA for NameNodes, each NameNode need to configure its own HA.
  • use namenode/namespace to separate data storage and management into different nodes. This make namenode/namespace can expand by simply adding machines.
  • Can separate load of one namenode to multiple nodes. This will not reduce efficiency when HDFS data is big. We can use multiple namespace to separate different types of application. Allocate different data storage/management into different namenode based on application type.


Yarn
Yet Another Distributed Resource Negotiator

Hadoop 2.0 imported ResourceManager and ApplicationMaster.
  • Separate JobTracker's Resource Management and Task Dispatch from MRv1.
    Replaced: JobTracker and TaskTracker from Hadoop 1.0
    To : ResourceManager and ApplicationMaster
  • ResourceManager: For whole cluster resource management and dispatch(Only exists one for each cluster)
  • ApplicationMaster: For application related transactions, like task dispatch, task monitor, fault tolerant, etc. Each application has one ApplicationMaster
YARN: make multiple calculation framework works in one cluster
It accomplished a lot framework to be interfaced.
  • Each application has one ApplicationMaster
  • Multiple frameworks can work same time on YARN: MapReduce, Spark, Storm, etc..



HA HDFS Cluster build example

NameNode DataNode ZooKeeper ZKFC JournalNode ResourceManager DataManager
hadoop-yarn 1 1 1 1
hadoop-node1 1 1 1 1 1 1
hadoop-node2 1 1 1 1
hadoop-node3 1 1 1

Followed this doc:
We need

  • dfs.nameservices as name service ID
  • dfs.ha.namenodes.[name service ID]: as NameNode IDs
  • dfs.namenode.rpc-address.[nameservice ID].[name node ID]
  • dfs.namenode.http-address.[nameservice ID].[name node ID]
  • dfs.namenode.shared.edits.dir
  • dfs.client.failover.proxy.provider.[nameservice ID]
  • dfs.ha.fencing.methods
  • fs.defaultFS
  • dfs.journalnode.edits.dir
I started from here, build a basic distributed hadoop first: http://gvace.blogspot.com/2016/05/hadoop-centos-build.html
Then apply with some changes:
  • create soft link in /usr/lib: hadoop -> hadoop-2.6.4
  • change tmp folder from /usr/lib/hadoop-2.6.4/data/tmp/ to /var/opt/hadoop/data/tmp
  • install ZooKeeper-3.4.8
  • create soft link in /usr/lib: zookeeper -> zookeeper-3.4.8
  • create folder /var/opt/zookeeper/tmp/data


hdfs-site.xml
Instead of config single NameNode, we build cluster
        <property>
                <name>dfs.nameservices</name>
                <value>mycluster</value>
        </property>
        <property>
                <name>dfs.ha.namenodes.mycluster</name>
                <value>nn1,nn2</value>
        </property>
        <property>
                <name>dfs.namenode.rpc-address.mycluster.nn1</name>
                <value>hadoop-yarn.gvace.com:8020</value>
        </property>
        <property>
                <name>dfs.namenode.rpc-address.mycluster.nn2</name>
                <value>hadoop-node1.gvace.com:8020</value>
        </property>
        <property>
                <name>dfs.namenode.http-address.mycluster.nn1</name>
                <value>hadoop-yarn.gvace.com:50070</value>
        </property>
        <property>
                <name>dfs.namenode.http-address.mycluster.nn2</name>
                <value>hadoop-node1.gvace.com:50070</value>
        </property>
        <property>
                <name>dfs.namenode.shared.edits.dir</name>
                <value>qjournal://hadoop-node1.gvace.com:8485;hadoop-node2.gvace.com:8485;hadoop-node3.gvace.com:8485/mycluster</value>
        </property>
        <property>
                <name>dfs.client.failover.proxy.provider.mycluster</name>
                <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
        </property>
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>sshfence</value>
        </property>
        <property>
                <name>dfs.ha.fencing.ssh.private-key-files</name>
                <value>/home/yushan/.ssh/id_rsa</value>
        </property>
        <property>
                <name>dfs.journalnode.edits.dir</nam<configuration>

<!-- Site specific YARN configuration properties -->
        <property>
                <description>The hostname of the RM.</description>
                <name>yarn.resourcemanager.hostname</name>
                <value>hadoop-yarn.gvace.com</value>
        </property>
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>

</configuration>e>
                <value>/var/opt/hadoop/data/tmp/journal/edits</value>
        </property>
        <property>
                <name>dfs.ha.automatic-failover.enabled</name>
                <value>true</value>
        </property>


core-site.xml
Instead of using single NameNode, we use cluster
<configuration>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://mycluster</value>
        </property>
        <property>
                <name>ha.zookeeper.quorum</name>
                <value>hadoop-yarn.gvace.com:2181,hadoop-node1.gvace.com:2181,hadoop-node2.gvace.com:2181</value>
        </property>
<!--
        <property>
                <name>fs.default.name</name>
                <value>hdfs://hadoop-yarn.gvace.com:8020</value>
                <description>NameNode</description>
        </property>
-->
        <property>
                <name>hadoop.tmp.dir</name>
                <value>/var/opt/hadoop/data/tmp</value>
        </property>
</configuration>



Also ZooKeeper install, and assign id for each zookeeper

Create file: /var/opt/zookeeper/tmp/data/myid in each ZooKeeper Node
Assign content "1", "2", "3" for each ZooKeeper myid file, which is an id for ZooKeeper Node

Config ZooKeeper
In /usr/lib/zookeeper/conf, copy zoo.cfg from  zoo_sample.cfg
Change zoo.cfg, assign and create dataDir, assign server host with server id for each ZooKeeper Node

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/var/opt/zookeeper/tmp/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=hadoop-yarn.gvace.com:2888:3888
server.2=hadoop-node1.gvace.com:2888:3888
server.3=hadoop-node2.gvace.com:2888:3888




Then start zookeeper on each ZooKeeper Node
./zkServer.sh start
You will see QuorumPeerMain in running as Java Application in jps

Start each Journal Node
hadoop-daemon.sh start journalnode

Format on one of the NameNode
./bin/hdfs namenode -format

Start the formatted NameNode
./sbin/hadoop-daemon.sh start namenode

Start StandBy NameNode from StandBy NameNode, this will synchronize formatted image files from the formated NameNode
./bin/hdfs namenode -bootstrapStandby

Format ZKFC, just one of the NameNode host
./bin/hdfs zkfc -formatZK

Stop all services from just one NameNode
sbin/stop-dfs.sh

Start all services from just one NameNode
sbin/start-dfs.sh

We will not sure which NameNode will be Active, it depends on which NameNode acquired the lock first.


After all install/config/format, one NameNode Restart









































1 comment:

  1. Nice share. I think your website should come up much higher essay writing service in the search results than where it is showing up right now…. write my essay

    ReplyDelete