Saturday, October 22, 2016

HBase Application


API Available

Java API: Full-featured, does not require server daemon, just need client libraries/configs to be installed

REST API: Easy to use, stateless, but slower than Java API, require REST API server, can directly get data from http request

Thrift API: Multi-language support, light weight, require Thrift API server






Configuration
  • Key-values that define client application properties
DDL
  • HBase Admin
    create/modify/delete tables
  • H*Descriptor
    Describe and manipulate metadata
DML
  • Mutations: Incr/Delete/Put/Check-and-Put/Multiput*
Query
  • Scan/Get/Filter

















HBase System Architecture, Compare to Others





.META. 's Region Server will be well known by ZooKeeper, 
So Master Node can know where to read it by talking to ZooKeeper.
Once .META. is cached, Master can find any data lives in HBase





HBase VS Other NoSQL

  • Great large Scalibility
  • Strict Consistency
  • Availability not perfect, but can be managed 

  • Integrates with Hadoop
    Very efficient bulk loads and MapReduce analysis
  • Ordered range partitions(not Hash)
  • Automatically shards/scales(just add more servers)
  • Sparse storage(for each row)

Compare to RDBMS

Enterprise Hadoop Cluster Architecture

Enterprise Hadoop Cluster Architecture

Master: SPOF to one single node

  • NameNode, JobTracker / ResourceManager
  • Hive Metastore, HiveServer2
  • Impala StateStore, Catalog Server
  • Spark Master
Node kernel environment setup
  • Ulimit, /etc/security/limits.conf to configure nofile
    Since default Linux system only allow 1024 sessions for file system.
    For Hadoop Cluster, Habase, this is not enough, so we need to configure this.
  • THP(Transparent Huge Page), ACPI, Memory overcommit issue
    THP: A new function from Linux kernel, its cache may not compatible with Hadoop, this will cause high memory, so it's better to turn it off.
    ACPI: Power management, this may also cause high memory, better to turn off.
    Memory Overcommit: System don't give more memory to app when app has a lot commit(50% total), so we need to either configure it higher or turn it off.
  • Customize configuration on different functional node
        If High Memery? How to configure swap?
        High disk IO? Then we may not need high OverCommit.
        High CPU, high system load?
Enterprise Hadoop Cluster Data Management

HDFS config
  • HDFS Block Size: dfs.block.size
  • Replication Factor: dfs.replication, default is 3
  • Turn on dfs.permissions? fs.permissions.umask-mode
  • User permission
  • DataNode's dfs disk partition, better to be separate from Linux system partition, otherwise may cause system fail to start because of disk space.
Resource Allocation
  • CPU, Memory, Disk IO, Network IO
  • HDFS, MapReduce(Yarn), Hive jobs
  • HBase, Hive, Impala, Spark resource allocation, limitations
Enterprise Hadoop Cluster Task Scheduling

Oozie
  • dispatch HDFS, MapReduce jobs, Pig, Hive, Sqoop, Java Apps, shell, email
  • take advantage of cluster resources
  • an alternative of cron job
    Distribution start job
    Parallel start multiple jobs
    Can fail tolerance, re-try, alarm in working flow

ZooKeeper
  • Synchronize node situation
  • The communication between HBase's master server and region server
  • Synchronize region's situation for HBase's tables
    online or split?
Hadoop Cluster Monitors

Cloudera Manager's Monitor Tool
  • Strong, Full of Monitor values, Monitors well to Impala
  • Waste of system resources
Ganglia
  • Monitor by collecting Hadoop Metrics
  • Use self's gmond to collect CPU, Memory, NetworkIO
  • Collect JMX, IPC, RPC data
  • Weak point: No disk IO by default
Graphite
  • Good third party plugins, can push KPI in java applications(StatsD, Yammer Metrics)
  • Can collect server data using plugins(collectd, logster, jmxtrans)
Splunk: expensive
Nagios, Icinga


Hadoop Cluster Issue and Limitation

Issues
  • HA: Too many too many single nodes
  • Dangerous to upgrade
  • Waste too much memory
Limitations
  • Missing solutions for cross data center
  • High concurrent bottle neck
  • Impala, Spark no ability to handle too many query at the same time, and no solution yet
  • No matter which type of model, database. Join is a big issue
    Spark is still not good for this, it's more like MapReduce + Pig



































HBase Table and Storage Design

HBase is a set of tables defined by Key-Values

Definitions:

  • Row KeyEach row has a unique Row Key
  • Column FamilyUse to group different columns, defined when building the table
  • Column QualifierIdentify each column, can be added in run-time
    Expandable
    , may have different numbers of Qualifiers in each row
  • timestampA cell can have different versions of data based on different timestamps(for consistency)

A Row is referred by a Row Key
So a Column is referred by a Column Family and a Column Qualifier
A Cell is referred by a Row and a Column
A Cell will contain multiple version of values based on different timestamps

The above four things combines together to become a Key in the following format:
    [Row Key]/[Column Family]:[Column Qualifier]/[timestamp]

With this Key, we can find the unique value we want to read.

Every value we get will be in byte[] format, developer need to know the structure of value themselves.

Rows are strongly consistency.






How does HBase db stores in file system?

  1. Group continues rows into the same Region
  2. Region is divided by each Column Family to be several units
  3. Each one of these divided unit stored as a single file in file system
  4. The values in each file are in lexicographical order(I think order by Column Qualifier)
So if we open one HBase db file, it will only have
  1. Continues rows
  2. Values in the same Column Family
  3. (I think)Values are ordered by Column Qualifier




Special Table

Table .META.
This table is using the same format as regular tables.
It can be in any one node of HBase cluster, it stores information for where to find the target region.











Friday, August 12, 2016

TransferQueue

If someone is already waiting to take the element, we don't need to put it in the queue, just directly transfer it to receiver.

Java 7 introduced the TransferQueue. This is essentially a BlockingQueue with an
additional operation—transfer(). This operation will immediately transfer a work
item to a receiver thread if one is waiting. Otherwise it will block until there is a thread
available to take the item. This can be thought of as the “recorded delivery” option—
the thread that was processing the item won’t begin processing another item until it
has handed off the current item. This allows the system to regulate the speed at which
the upstream thread pool takes on new work.
It would also be possible to regulate this by using a blocking queue of bounded
size, but the TransferQueue has a more flexible interface. In addition, your code may
show a performance benefit by replacing a BlockingQueue with a TransferQueue.
This is because the TransferQueue implementation has been written to take into
account modern compiler and processor features and can operate with great efficiency.
As with all discussions of performance, however, you must measure and prove
benefits and not simply assume them. You should also be aware that Java 7 ships with
only one implementation of TransferQueue—the linked version.
In the next code example, we’ll look at how easy it is to drop in a TransferQueue as
a replacement for a BlockingQueue. Just these simple changes to listing 4.13 will
upgrade it to a TransferQueue implementation, as you can see here.



public class BlockingQueueTest1 {
 public static void main(String[] args) {

  final Update.Builder ub = new Update.Builder();
  final TransferQueue<Update> lbq = new LinkedTransferQueue<Update>();
  MicroBlogExampleThread t1 = new MicroBlogExampleThread(lbq, 10) {
   public void doAction() {
    text = text + "X";
    Update u = ub.author(new Author("Tallulah")).updateText(text)
      .build();
    boolean handed = false;
    try {
     handed = updates.tryTransfer(u, 100, TimeUnit.MILLISECONDS);//tryTransfer
    } catch (InterruptedException e) {
    }
    if (!handed)
     System.out.println("Unable to hand off Update to Queue due to timeout");
    else System.out.println("Offered");
   }
  };
  MicroBlogExampleThread t2 = new MicroBlogExampleThread(lbq, 200) {
   public void doAction() {
    Update u = null;
    try {
     u = updates.take();
     System.out.println("Took");
    } catch (InterruptedException e) {
     return;
    }
   }
  };

  t1.start();
  t2.start();
 }
}


abstract class MicroBlogExampleThread extends Thread {
 protected final TransferQueue<Update> updates;
 protected String text = "";
 protected final int pauseTime;
 private boolean shutdown = false;

 public MicroBlogExampleThread(TransferQueue<Update> lbq_, int pause_) {
  updates = lbq_;
  pauseTime = pause_;
 }

 public synchronized void shutdown() {
  shutdown = true;
 }

 @Override
 public void run() {
  while (!shutdown) {
   doAction();
   try {
    Thread.sleep(pauseTime);
   } catch (InterruptedException e) {
    shutdown = true;
   }
  }
 }

 public abstract void doAction();
}









Fine-Grained Control of BlockingQueue

We have learned BlockingQueue
http://gvace.blogspot.com/2016/08/blockingqueue.html

In addition to the simple take() and offer() API, BlockingQueue offers another way
to interact with the queue that provides even more control, at the cost of a bit of
extra complexity. This is the possibility of putting or taking with a timeout, to allow
the thread encountering issues to back out from its interaction with the queue and do something else instead.


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueTest1 {
 public static void main(String[] args) {

  final Update.Builder ub = new Update.Builder();
  final BlockingQueue<Update> lbq = new LinkedBlockingQueue<>(100);
  MicroBlogExampleThread t1 = new MicroBlogExampleThread(lbq, 10) {
   public void doAction() {
    text = text + "X";
    Update u = ub.author(new Author("Tallulah")).updateText(text)
      .build();
    boolean handed = false;
    try {
     handed = updates.offer(u, 100, TimeUnit.MILLISECONDS);//set timeout
    } catch (InterruptedException e) {
    }
    if (!handed)
     System.out.println("Unable to hand off Update to Queue due to timeout");
    else System.out.println("Offered");
   }
  };
  MicroBlogExampleThread t2 = new MicroBlogExampleThread(lbq, 1000) {
   public void doAction() {
    Update u = null;
    try {
     u = updates.take();
     System.out.println("Took");
    } catch (InterruptedException e) {
     return;
    }
   }
  };

  t1.start();
  t2.start();
 }
}
abstract class MicroBlogExampleThread extends Thread {
 protected final BlockingQueue<Update> updates;
 protected String text = "";
 protected final int pauseTime;
 private boolean shutdown = false;

 public MicroBlogExampleThread(BlockingQueue<Update> lbq_, int pause_) {
  updates = lbq_;
  pauseTime = pause_;
 }

 public synchronized void shutdown() {
  shutdown = true;
 }

 @Override
 public void run() {
  while (!shutdown) {
   doAction();
   try {
    Thread.sleep(pauseTime);
   } catch (InterruptedException e) {
    shutdown = true;
   }
  }
 }

 public abstract void doAction();
}




Followed a TransferQueue which is a little more efficient.
http://gvace.blogspot.com/2016/08/transferqueue.html





Thursday, August 11, 2016

BlockingQueue

The BlockingQueue is a queue that has two additional special properties:
■ When trying to put() to the queue, it will cause the putting thread to wait for
space to become available if the queue is full.
■ When trying to take() from the queue, it will cause the taking thread to block
if the queue is empty.
These two properties are very useful because if one thread (or pool of threads) is outstripping
the ability of the other to keep up, the faster thread is forced to wait, thus
regulating the overall system.


Two implementations of BlockingQueue
Java ships with two basic implementations of the BlockingQueue interface: the
LinkedBlockingQueue and the ArrayBlockingQueue. They offer slightly different
properties; for example, the array implementation is very efficient when an exact
bound is known for the size of the queue, whereas the linked implementation may be
slightly faster under some circumstances.

It can be better to have this,

BlockingQueue<WorkUnit<MyAwesomeClass>>

where WorkUnit (or QueueObject, or whatever you want to call the container class) is
a packaging interface or class that may look something like this:
public class WorkUnit<T> {
 private final T workUnit;

 public T getWork() {
  return workUnit;
 }

 public WorkUnit(T workUnit_) {
  workUnit = workUnit_;
 }
}

The reason for doing this is that this level of indirection provides a place to add additional
metadata without compromising the conceptual integrity of the contained type
(MyAwesomeClass in this example).
This is surprisingly useful. Use cases where additional metadata is helpful are
abundant. Here are a few examples:
■ Testing (such as showing the change history for an object)
■ Performance indicators (such as time of arrival or quality of service)
■ Runtime system information (such as how this instance of MyAwesomeClass has
been routed)

Example

public class Appointment<T> {
 private final T toBeSeen;

 public T getPatient() {
  return toBeSeen;
 }

 public Appointment(T incoming) {
  toBeSeen = incoming;
 }
}
abstract class Pet {
 protected final String name;

 public Pet(String name) {
  this.name = name;
 }

 public abstract void examine();
}
class Cat extends Pet {
 public Cat(String name) {
  super(name);
 }

 public void examine() {
  System.out.println("Meow!");
 }
}
class Dog extends Pet {
 public Dog(String name) {
  super(name);
 }

 public void examine() {
  System.out.println("Woof!");
 }
}




From this simple model, you can see that we can model the veterinarian’s queue as
LinkedBlockingQueue<Appointment<Pet>>, with the Appointment class taking the role
of WorkUnit.




import java.util.concurrent.BlockingQueue;

public class Veterinarian extends Thread {
 protected final BlockingQueue<Appointment<Pet>> appts;
 protected String text = "";
 protected final int restTime;
 private boolean shutdown = false;

 public Veterinarian(BlockingQueue<Appointment<Pet>> lbq, int pause) {
  appts = lbq;
  restTime = pause;
 }

 public synchronized void shutdown() {
  shutdown = true;
 }

 @Override
 public void run() {
  while (!shutdown) {
   seePatient();
   try {
    Thread.sleep(restTime);
   } catch (InterruptedException e) {
    shutdown = true;
   }
  }
 }

 public void seePatient() {
  try {
   Appointment<Pet> ap = appts.take();
   Pet patient = ap.getPatient();
   patient.examine();
  } catch (InterruptedException e) {
   shutdown = true;
  }
 }
}


The following is instance to start two threads, one is en-queue pets, one is examine cats in the queue.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class BlockingQueueTest {
 public static final BlockingQueue<Appointment<Pet>> QUEUE = new LinkedBlockingQueue<Appointment<Pet>>(10);
 public static void main(String[] args){
  new Thread(){
   @Override
   public void run(){
    for(int i=0;i<10;i++){
     Pet p1 = new Cat("cat"+i);
     Pet p2 = new Dog("dog"+i);
     Appointment<Pet> app1 = new Appointment<Pet>(p1);
     Appointment<Pet> app2 = new Appointment<Pet>(p2);
     try {
      QUEUE.put(app1);
      System.out.println("EnqueueCat"+i);
      QUEUE.put(app2);
      System.out.println("EnqueueDog"+i);
      Thread.sleep(300);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   }
  }.start();
  
  Veterinarian veter = new Veterinarian(QUEUE, 500);
  veter.start();
 }
}

We have investigate on BlockingQueue Usage
http://gvace.blogspot.com/2016/08/fine-grained-control-of-blockingqueue.html

CopyOnWriteArrayList

If the list is altered while a read or a traversal is taking place, the entire array
must be copied.

As the name suggests, the CopyOnWriteArrayList class is a replacement for
the standard ArrayList class. CopyOnWriteArrayList has been made threadsafe
by the addition of copy-on-write semantics, which means that any operations
that mutate the list will create a new copy of the array backing the list
(as shown in figure 4.8). This also means that any iterators formed don’t have to
worry about any modifications that they didn’t expect.
This approach to shared data is ideal when a quick, consistent snapshot of data
(which may occasionally be different between readers) is more important than perfect
synchronization (and the attendant performance hit). This is often seen in non-missioncritical
data.
Let’s look at an example of copy-on-write in action. Consider a timeline of microblogging
updates. This is a classic example of data that isn’t 100 percent mission-critical
and where a performant, self-consistent snapshot for each reader is preferred over
total global consistency. This listing shows a holder class that represents an individual
user’s view of their timeline.


public class MicroBlogTimeline {
 private final CopyOnWriteArrayList<Update> updates;
 private final ReentrantLock lock;
 private final String name;
 private Iterator<Update> it;

 public MicroBlogTimeline(String name_,
   CopyOnWriteArrayList<Update> updates_, ReentrantLock lock_) {
  this.name = name_;
  this.updates = updates_;
  this.lock = lock_;
 }

 public void addUpdate(Update update_) {
  updates.add(update_);
 }

 public void prep() { // Set up Iterator
  it = updates.iterator();
 }

 public void printTimeline() {
  lock.lock();
  try {
   if (it != null) {
    System.out.print(name + ": ");
    while (it.hasNext()) {
     Update s = it.next();
     System.out.print(s + ", ");
    }
    System.out.println();
   }
  } finally {
   lock.unlock();
  }
 }
}

class CopyOnWriteArrayListTest {
 public static void main(String[] args) {
  final CountDownLatch firstLatch = new CountDownLatch(1);
  final CountDownLatch secondLatch = new CountDownLatch(1);
  final Update.Builder ub = new Update.Builder();
  final CopyOnWriteArrayList<Update> l = new CopyOnWriteArrayList<>();
  l.add(ub.author(new Author("Ben")).updateText("I like pie").build());
  l.add(ub.author(new Author("Charles")).updateText("I like ham on rye").build());
  ReentrantLock lock = new ReentrantLock();
  final MicroBlogTimeline tl1 = new MicroBlogTimeline("TL1", l, lock);
  final MicroBlogTimeline tl2 = new MicroBlogTimeline("TL2", l, lock);
  Thread t1 = new Thread() {
   public void run() {
    l.add(ub.author(new Author("Jeffrey")).updateText("I like a lot of things").build());
    tl1.prep();
    firstLatch.countDown();
    try {
     secondLatch.await();
    } catch (InterruptedException e) {
    }
    tl1.printTimeline();
   }
  };
  Thread t2 = new Thread() {
   public void run() {
    try {
     firstLatch.await();
     l.add(ub.author(new Author("Gavin")).updateText("I like otters").build());
     tl2.prep();
     secondLatch.countDown();
    } catch (InterruptedException e) {
    }
    tl2.printTimeline();
   }
  };
  t1.start();
  t2.start();
 }
}



There is a lot of scaffolding in the listing—unfortunately this is difficult to avoid.
There are quite a few things to notice about this code:
■ CountDownLatch is used to maintain close control over what is happening
between the two threads.
■ If the CopyOnWriteArrayList was replaced with an ordinary List (B), the
result would be a ConcurrentModificationException.
■ This is also an example of a Lock object being shared between two threads to
control access to a shared resource (in this case, STDOUT). This code would be
much messier if expressed in the block-structured view.


As you can see, the second output line (tagged as TL1) is missing the final update (the
one that mentions otters), despite the fact that the latching meant that mbex1 was
accessed after the list had been modified. This demonstrates that the Iterator contained
in mbex1 was copied by mbex2, and that the addition of the final update was invisible
to mbex1. This is the copy-on-write property that we want these objects to display.


CuncurrentHashMap

The classic HashMap uses a function (the hash function) to determine which “bucket” it will store the key/value pair in. This is where the “hash” part of the class’s name comes from. This suggests a rather straightforward multithreaded generalization—instead of needing to lock the whole structure when making a change, it’s only necessary to lock the bucket that’s being altered.


The ConcurrentHashMap class also implements the ConcurrentMap interface, which contains some new methods to provide truly atomic functionality:
■ putIfAbsent()—Adds the key/value pair to the HashMap if the key isn’t already present.
■ remove()—Atomically removes the key/value pair only if the key is present and the value is equal to the current state.
■ replace()—The API provides two different forms of this method for atomic replacement in the HashMap.
As an example, you can replace the synchronized methods in listing with regular, unsynchronized access if you alter the HashMap called arrivalTime to be a ConcurrentHashMap as well. Notice the lack of locks in the following listing—there is no explicit synchronization at all.


Original HashMap
public class ExampleTimingNode implements SimpleMicroBlogNode {
 private final String identifier;
 private final Map<Update, Long> arrivalTime = new HashMap<>();

 public ExampleTimingNode(String identifier_) {
  identifier = identifier_;
 }

 public synchronized String getIdentifier() {
  return identifier;
 }

 public synchronized void propagateUpdate(Update update_) {
  long currentTime = System.currentTimeMillis();
  arrivalTime.put(update_, currentTime);
 }

 public synchronized boolean confirmUpdateReceived(Update update_) {
  Long timeRecvd = arrivalTime.get(update_);
  return timeRecvd != null;
 }
}

Replace with ConcurrentHashMap

import java.util.concurrent.ConcurrentHashMap;

public class ExampleMicroBlogTimingNode implements SimpleMicroBlogNode {

 private final ConcurrentHashMap<Update, Long> arrivalTime = new ConcurrentHashMap<Update, Long>();

 public void propagateUpdate(Update upd_) {
  arrivalTime.putIfAbsent(upd_, System.currentTimeMillis());
 }

 public boolean confirmUpdateReceived(Update upd_) {
  return arrivalTime.get(upd_) != null;
 }
}

CountDownLatch

CountDownLatch: Let's start together

This is achieved by providing an int value (the count) when constructing a new
instance of CountDownLatch. After that point, two methods are used to control the
latch: countDown() and await(). The former reduces the count by 1, and the latter
causes the calling thread to wait until the count reaches 0 (it does nothing if the count
is already 0 or less). This simple mechanism allows the minimum preparation pattern
to be easily deployed.



import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

public class ProcessingThreadTest {
 public static final int MAX_THREADS = 10;
 public static class ProcessingThread extends Thread {
  private final String ident;
  private final CountDownLatch latch;
  public ProcessingThread(String ident_, CountDownLatch cdl_) {
   ident = ident_;
   latch = cdl_;
  }
  public String getIdentifier() {
   return ident;
  }
  public void initialize() {
   latch.countDown();
   try {
    latch.await();
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   
  }
  @Override
  public void run() {
   initialize();
   while(true){
    System.out.println(getIdentifier());
    try {
     Thread.sleep(500);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }
 }
 public static void main(String[] args) {
  final int quorum = 1 + (int) (MAX_THREADS / 2);
  final CountDownLatch cdl = new CountDownLatch(quorum);
  final Set<ProcessingThread> nodes = new HashSet<>();
  try {
   for (int i = 0; i < MAX_THREADS; i++) {
    ProcessingThread local = new ProcessingThread("localhost:" + (9000 + i), cdl);
    nodes.add(local);
    local.start();
    Thread.sleep(1000);
    System.out.println("-----------------");
   }
  } catch (InterruptedException e) {
  } finally {
  }
 }
}









Wednesday, August 10, 2016

ReentrantLock usage and fix DeadLock

Test code:
Two MicroBlogNode to update themselves and treat each other as backup, then call confirm on the backup.

public class UpdateTest{
 public static void main(String[] args){
  Update.Builder ub = new Update.Builder();
  Author myAuthor = new Author();
  final Update u1 = ub.author(myAuthor).updateText("Hello1").build();
  final Update u2 = ub.author(myAuthor).updateText("Hello2").build();
  
  final MicroBlogNode mbn1 = new MicroBlogNode("11");
  final MicroBlogNode mbn2 = new MicroBlogNode("22");
  new Thread(){
   @Override
   public void run(){
    mbn1.propagateUpdate(u1, mbn2);
   }
  }.start();
  new Thread(){
   @Override
   public void run(){
    mbn2.propagateUpdate(u2, mbn1);
   }
  }.start();
 }
}

Look at a deadlock first, without ReentrantLock:

public class MicroBlogNode{
 private final String ident;
 public MicroBlogNode(String ident_) {
  ident = ident_;
 }
 public String getIdent() {
  return ident;
 }
 public synchronized void propagateUpdate(Update upd_, MicroBlogNode backup_) {
  System.out.println(ident + ": recvd: " + upd_.getUpdateText()
    + " ; backup: " + backup_.getIdent());
  backup_.confirmUpdate(this, upd_);
 }
 public synchronized void confirmUpdate(MicroBlogNode other_, Update update_) {
  System.out.println(ident + ": recvd confirm: "
    + update_.getUpdateText() + " from " + other_.getIdent());
 }
}
When Thread A in propagateUpdate and Thread B also in propagateUpdate, both of them are holding their own lock and waiting for each other to release the lock. This will cause deadlock.

Then we use ReentrantLock instead of synchronized, but this one is still deadlock in the same way.

public class MicroBlogNode1{
 private final String ident;
 private final Lock lock = new ReentrantLock();
 public MicroBlogNode1(String ident_) {
  ident = ident_;
 }
 public String getIdent() {
  return ident;
 }
 public void propagateUpdate(Update upd_, MicroBlogNode1 backup_) {
  lock.lock();
  try {
   System.out.println(ident + ": recvd: " + upd_.getUpdateText()
     + " ; backup: " + backup_.getIdent());
   backup_.confirmUpdate(this, upd_);
  }finally{
   lock.unlock();
  }
 }
 public void confirmUpdate(MicroBlogNode1 other_, Update update_) {
  lock.lock();
  try {
   System.out.println(ident + ": recvd confirm: "
    + update_.getUpdateText() + " from " + other_.getIdent());
  }finally{
   lock.unlock();
  }
 }
}

Trying to fix the deadlock, we can use tryLock with a timeout, and retry until we get the lock.
But this is still will cause  live busy DeadLock, because there will be a time Thread A and B keeping their own locks and retrying to acquire each other's.

public class MicroBlogNode2 {
 private final String ident;
 private final Lock lock = new ReentrantLock();
 public MicroBlogNode2(String ident_) {
  ident = ident_;
 } 
 private String getIdent() {
  return ident;
 }
 public void propagateUpdate(Update upd_, MicroBlogNode2 backup_) {
  boolean acquired = false;
  while (!acquired) {
   try {
    int wait = (int) (Math.random() * 10);
    acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS);
    if (acquired) {
     System.out.println(ident + ": recvd: "
       + upd_.getUpdateText() + " ; backup: "
       + backup_.getIdent());
     backup_.confirmUpdate(this, upd_);
    } else {
     Thread.sleep(wait);
    }
   } catch (InterruptedException e) {
   } finally {
     // Attempts to lock other thread b Try and lock, with random
     // timeout Confirm on other thread
    if (acquired)
     lock.unlock();
   }
  }
 }
 public void confirmUpdate(MicroBlogNode2 other_, Update upd_) {
  lock.lock();// Attempts to lock other thread
  try {
   System.out.println(ident + ": recvd confirm: "
     + upd_.getUpdateText() + " from " + other_.getIdent());
  } finally {
   lock.unlock();
  }
 }
}

To really fix the DeadLock, the Thread need to release the lock when failed to acquire other's lock.
public class MicroBlogNode3 {
 private final String ident;
 private final Lock lock = new ReentrantLock();
 public MicroBlogNode3(String ident_) {
  ident = ident_;
 }
 public String getIdent() {
  return ident;
 }
 public void propagateUpdate(Update upd_, MicroBlogNode3 backup_) {
  boolean acquired = false;
  boolean done = false;
  while (!done) {
   int wait = (int) (Math.random() * 10);
   try {
    acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS);
    if (acquired) {
     System.out.println(ident + ": recvd: "
       + upd_.getUpdateText() + " ; backup: "
       + backup_.getIdent());
     done = backup_.tryConfirmUpdate(this, upd_); //Examine return from tryConfirmUpdate()
    }
   } catch (InterruptedException e) {
   } finally {
    if (acquired)
     lock.unlock();
   }
   if (!done)
    try {
     Thread.sleep(wait);
    } catch (InterruptedException e) {
    }
  }
 }
 public boolean tryConfirmUpdate(MicroBlogNode3 other_, Update upd_) {
  boolean acquired = false;
  try {
   long startTime = System.currentTimeMillis();
   int wait = (int) (Math.random() * 10);
   acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS);
   if (acquired) {
    long elapsed = System.currentTimeMillis() - startTime;
    System.out.println(ident + ": recvd confirm: "
      + upd_.getUpdateText() + " from " + other_.getIdent()
      + " - took " + elapsed + " millis");
    return true;
   }
  } catch (InterruptedException e) {
  } finally {
   if (acquired)
    lock.unlock();
  }
  return false;
 }
}





















Builder Pattern for Immutability

For immutability, we can use either Factory or Builder

If we have a lot parameters in the constructor, the code is kind of mess. And you may have 100 constructors because of this.

Builder pattern:
This is a combination of two constructs:

  • A static inner class that implements a generic builder interface
  • And a private constructor for the immutable class itself.

The static inner class is the builder for the immutable class, and it provides the only way that a developer can get hold of new instances of the immutable type.
One very common implementation is for the Builder class to have exactly the same fields as the immutable class, but to allow mutation of the fields.
This listing shows how you might use this to model a microblogging update (again, building on the earlier listings in this chapter).


public interface ObjBuilder {
 T build();
}


public class Update {

   //   Final fields must be initialized in constructor

 private final Author author;
 private final String updateText;

 private Update(Builder b_) {
  author = b_.author;
  updateText = b_.updateText;
 }

  // Builder class must be static inner

 public static class Builder implements ObjBuilder<Update> {
  private Author author;
  private String updateText;
  
  // Methods on Builder return Builder for chain calls

  public Builder author(Author author_) {
   author = author_;
   return this;
  }

  public Builder updateText(String updateText_) {
   updateText = updateText_;
   return this;
  }

  public Update build() {
   return new Update(this);
  }
   //hashCode() and equals() methods omitted
   
 }
}
class UpdateTest{
 public static void main(String[] args){
  Update.Builder ub = new Update.Builder();
  Author myAuthor = new Author();
  Update u = ub.author(myAuthor ).updateText("Hello").build();
 }
}



With this code, you could then create a new Update object like this:
 Update.Builder ub = new Update.Builder();
 Update u = ub.author(myAuthor).updateText("Hello").build();




















Tuesday, August 2, 2016

Reference Type Strong/Soft/Weak/Phantom

Strong
Regular reference

Soft
Used for memory cache, only gc when memory not enough

Weak
Used for storing data, only gc on the second time. Example: ClassLoader

Phantom
Used only for monitoring if object has already been gc

Reference Queue
When an object has been gc, we can poll it from ReferenceQueue


import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;

public class ReferenceType {
 public static void main(String[] args) throws InterruptedException{
  String s1 = new String("aaa");
  String s2 = new String("bbb");
  String s3 = new String("ccc");
  
  ReferenceQueue<String> srq = new ReferenceQueue<String>();
  SoftReference<String> ss = new SoftReference<String>(s1,srq);
  ReferenceQueue<String> wrq = new ReferenceQueue<String>();
  WeakReference<String> ws = new WeakReference<String>(s2,wrq);
  ReferenceQueue<String> prq = new ReferenceQueue<String>();
  PhantomReference<String> ps = new PhantomReference<String>(s3,prq);
  
  s1 = null;
  s2 = null;
  s3 = null;
  
  System.gc();
    
  System.out.println("ss="+ss.get());
  System.out.println("srq="+srq.poll());
  System.out.println("ws="+ws.get());
  System.out.println("wrq="+wrq.poll());
  System.out.println("ps="+ps.get());
  System.out.println("prq="+prq.poll());
 }
}


Output result
ss=aaa
srq=null
ws=null
wrq=java.lang.ref.WeakReference@15db9742
ps=null
prq=java.lang.ref.PhantomReference@6d06d69c




Saturday, May 28, 2016

Hadoop 2 HA setup config with ZooKeeper

Hadoop 2.x
  • HDFS: NN Federation, HA(High Availability)
  • MapReduce: Runs on YARN
  • YARN: Resource Manage System
HDFS 2.x

Fix HDFS 1.0 single node failure
  • HDFS HA: Master/Backup NameNode
  • If Master NameNode failure, then switch to Backup NameNode
Fix HDFS 1.0 memory limition problem
  • HDFS Federation
  • Expandable, support multiple NameNode
  • Every NameNode manages a part of directory
  • All NameNode share all DataNode storage resources
HDFS 2.x only changed architecture, it does not change the way of usage
It is transparent to HDFS user
Compatible HDFS 1.x command and API



HDFS HA(High Availability)




ZK: ZooKeeper
ZKFC: ZooKeeper Failover Controller
JN: Journal Node
NN: NameNode
DN: DataNode

Looking at the flow Bottom up.

DataNode reports to both Active NameNode and Standby NameNode.
DataNode only receives command from Active NameNode

NameNode's state info will not stored in NameNode itself. It will be stored in a cluster of JournalNodes.
When Active NameNode fails, Standby NameNode will automatically grab info from JournalNode, and continue the jobs.
When Active NameNode manually proceed to update, we can send command to switch between Active and Standby NameNode.

FailoverControllers report heartbeats to ZooKeeper. ZooKeeper is used for HA.
Client send request to ZooKeeper, ZooKeeper tells Client which NameNode to work with you. Then Client talks to NameNode.


Auto Failover switch
  • Zookeeper Failover Controller: monitor status of NameNode
  • Register NameNode to Zookeeper
  • WHen NameNode fails, ZKFC(ZooKeeperFailverController) tries to acquire lock for NameNode.
  • The NameNode which acquired the lock will become Active


HDFS 2.x Federation
  • Federation is used for super big amount data, so we need multiple NameNodes, each NameNode is independent, but sharing all DataNodes.
  • Independent means completely independent of NameNode, so if we want to do HA for NameNodes, each NameNode need to configure its own HA.
  • use namenode/namespace to separate data storage and management into different nodes. This make namenode/namespace can expand by simply adding machines.
  • Can separate load of one namenode to multiple nodes. This will not reduce efficiency when HDFS data is big. We can use multiple namespace to separate different types of application. Allocate different data storage/management into different namenode based on application type.


Yarn
Yet Another Distributed Resource Negotiator

Hadoop 2.0 imported ResourceManager and ApplicationMaster.
  • Separate JobTracker's Resource Management and Task Dispatch from MRv1.
    Replaced: JobTracker and TaskTracker from Hadoop 1.0
    To : ResourceManager and ApplicationMaster
  • ResourceManager: For whole cluster resource management and dispatch(Only exists one for each cluster)
  • ApplicationMaster: For application related transactions, like task dispatch, task monitor, fault tolerant, etc. Each application has one ApplicationMaster
YARN: make multiple calculation framework works in one cluster
It accomplished a lot framework to be interfaced.
  • Each application has one ApplicationMaster
  • Multiple frameworks can work same time on YARN: MapReduce, Spark, Storm, etc..



HA HDFS Cluster build example

NameNode DataNode ZooKeeper ZKFC JournalNode ResourceManager DataManager
hadoop-yarn 1 1 1 1
hadoop-node1 1 1 1 1 1 1
hadoop-node2 1 1 1 1
hadoop-node3 1 1 1

Followed this doc:
We need

  • dfs.nameservices as name service ID
  • dfs.ha.namenodes.[name service ID]: as NameNode IDs
  • dfs.namenode.rpc-address.[nameservice ID].[name node ID]
  • dfs.namenode.http-address.[nameservice ID].[name node ID]
  • dfs.namenode.shared.edits.dir
  • dfs.client.failover.proxy.provider.[nameservice ID]
  • dfs.ha.fencing.methods
  • fs.defaultFS
  • dfs.journalnode.edits.dir
I started from here, build a basic distributed hadoop first: http://gvace.blogspot.com/2016/05/hadoop-centos-build.html
Then apply with some changes:
  • create soft link in /usr/lib: hadoop -> hadoop-2.6.4
  • change tmp folder from /usr/lib/hadoop-2.6.4/data/tmp/ to /var/opt/hadoop/data/tmp
  • install ZooKeeper-3.4.8
  • create soft link in /usr/lib: zookeeper -> zookeeper-3.4.8
  • create folder /var/opt/zookeeper/tmp/data


hdfs-site.xml
Instead of config single NameNode, we build cluster
        <property>
                <name>dfs.nameservices</name>
                <value>mycluster</value>
        </property>
        <property>
                <name>dfs.ha.namenodes.mycluster</name>
                <value>nn1,nn2</value>
        </property>
        <property>
                <name>dfs.namenode.rpc-address.mycluster.nn1</name>
                <value>hadoop-yarn.gvace.com:8020</value>
        </property>
        <property>
                <name>dfs.namenode.rpc-address.mycluster.nn2</name>
                <value>hadoop-node1.gvace.com:8020</value>
        </property>
        <property>
                <name>dfs.namenode.http-address.mycluster.nn1</name>
                <value>hadoop-yarn.gvace.com:50070</value>
        </property>
        <property>
                <name>dfs.namenode.http-address.mycluster.nn2</name>
                <value>hadoop-node1.gvace.com:50070</value>
        </property>
        <property>
                <name>dfs.namenode.shared.edits.dir</name>
                <value>qjournal://hadoop-node1.gvace.com:8485;hadoop-node2.gvace.com:8485;hadoop-node3.gvace.com:8485/mycluster</value>
        </property>
        <property>
                <name>dfs.client.failover.proxy.provider.mycluster</name>
                <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
        </property>
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>sshfence</value>
        </property>
        <property>
                <name>dfs.ha.fencing.ssh.private-key-files</name>
                <value>/home/yushan/.ssh/id_rsa</value>
        </property>
        <property>
                <name>dfs.journalnode.edits.dir</nam<configuration>

<!-- Site specific YARN configuration properties -->
        <property>
                <description>The hostname of the RM.</description>
                <name>yarn.resourcemanager.hostname</name>
                <value>hadoop-yarn.gvace.com</value>
        </property>
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>

</configuration>e>
                <value>/var/opt/hadoop/data/tmp/journal/edits</value>
        </property>
        <property>
                <name>dfs.ha.automatic-failover.enabled</name>
                <value>true</value>
        </property>


core-site.xml
Instead of using single NameNode, we use cluster
<configuration>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://mycluster</value>
        </property>
        <property>
                <name>ha.zookeeper.quorum</name>
                <value>hadoop-yarn.gvace.com:2181,hadoop-node1.gvace.com:2181,hadoop-node2.gvace.com:2181</value>
        </property>
<!--
        <property>
                <name>fs.default.name</name>
                <value>hdfs://hadoop-yarn.gvace.com:8020</value>
                <description>NameNode</description>
        </property>
-->
        <property>
                <name>hadoop.tmp.dir</name>
                <value>/var/opt/hadoop/data/tmp</value>
        </property>
</configuration>



Also ZooKeeper install, and assign id for each zookeeper

Create file: /var/opt/zookeeper/tmp/data/myid in each ZooKeeper Node
Assign content "1", "2", "3" for each ZooKeeper myid file, which is an id for ZooKeeper Node

Config ZooKeeper
In /usr/lib/zookeeper/conf, copy zoo.cfg from  zoo_sample.cfg
Change zoo.cfg, assign and create dataDir, assign server host with server id for each ZooKeeper Node

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/var/opt/zookeeper/tmp/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=hadoop-yarn.gvace.com:2888:3888
server.2=hadoop-node1.gvace.com:2888:3888
server.3=hadoop-node2.gvace.com:2888:3888




Then start zookeeper on each ZooKeeper Node
./zkServer.sh start
You will see QuorumPeerMain in running as Java Application in jps

Start each Journal Node
hadoop-daemon.sh start journalnode

Format on one of the NameNode
./bin/hdfs namenode -format

Start the formatted NameNode
./sbin/hadoop-daemon.sh start namenode

Start StandBy NameNode from StandBy NameNode, this will synchronize formatted image files from the formated NameNode
./bin/hdfs namenode -bootstrapStandby

Format ZKFC, just one of the NameNode host
./bin/hdfs zkfc -formatZK

Stop all services from just one NameNode
sbin/stop-dfs.sh

Start all services from just one NameNode
sbin/start-dfs.sh

We will not sure which NameNode will be Active, it depends on which NameNode acquired the lock first.


After all install/config/format, one NameNode Restart









































Saturday, May 21, 2016

MapReduce Examples

Running MapReduce in YARN

We have some default example of MapReduce jars from Hadoop


Example: WordCount

Put a text file into hdfs system, I put it as /data01/LICENSE.txt
I want the output result to /data01/LICENSE_WordCount.txt
(I was thinking the output result is just one file, but it has to be a folder), so here LICENSE_WordCount.txt is a folder name

Run MR
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar wordcount /data01/LICENSE.txt /data01/LICENSE_WordCount


The result folder contains an empty file named _SUCCESS which marks the job is succeed.
And a result file: part-r-00000




Example: pi

Pick up one of them, what I choose is pi
Run it:
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi

It requires two arguments: <nMaps> and <nSamples>
Usage: org.apache.hadoop.examples.QuasiMonteCarlo <nMaps> <nSamples>
Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|resourcemanager:port>    specify a ResourceManager
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
We assigned 5 maps, each map with 20 samples
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 5 20





Looking at the running status of MapReduce job, we found that


We assigned 5 maps, each map with 20 samples.
Each Map has a Container. But the running MapReduce has 6 containers. 
The one addition container is ApplicationMaster.



Looking at the result of running MR job:
[yushan@hadoop-yarn hadoop-2.6.4]$ bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 5 20
Number of Maps  = 5
Samples per Map = 20
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Starting Job
16/05/21 15:23:50 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/05/21 15:23:50 INFO input.FileInputFormat: Total input paths to process : 5
16/05/21 15:23:51 INFO mapreduce.JobSubmitter: number of splits:5
16/05/21 15:23:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1463856930642_0002
16/05/21 15:23:51 INFO impl.YarnClientImpl: Submitted application application_1463856930642_0002
16/05/21 15:23:51 INFO mapreduce.Job: The url to track the job: http://hadoop-yarn.gvace.com:8088/proxy/application_1463856930642_0002/
16/05/21 15:23:51 INFO mapreduce.Job: Running job: job_1463856930642_0002
16/05/21 15:23:59 INFO mapreduce.Job: Job job_1463856930642_0002 running in uber mode : false
16/05/21 15:23:59 INFO mapreduce.Job:  map 0% reduce 0%
16/05/21 15:24:36 INFO mapreduce.Job:  map 100% reduce 0%
16/05/21 15:24:36 INFO mapreduce.Job: Task Id : attempt_1463856930642_0002_m_000002_0, Status : FAILED
Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
16/05/21 15:24:37 INFO mapreduce.Job:  map 80% reduce 0%
16/05/21 15:24:47 INFO mapreduce.Job:  map 100% reduce 0%
16/05/21 15:24:50 INFO mapreduce.Job:  map 100% reduce 100%
16/05/21 15:24:50 INFO mapreduce.Job: Job job_1463856930642_0002 completed successfully
16/05/21 15:24:50 INFO mapreduce.Job: Counters: 51
File System Counters FILE: Number of bytes read=116
FILE: Number of bytes written=643059
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1385
HDFS: Number of bytes written=215
HDFS: Number of read operations=23
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters  Failed map tasks=1
Launched map tasks=6
Launched reduce tasks=1
Other local map tasks=1
Data-local map tasks=5
Total time spent by all maps in occupied slots (ms)=189730
Total time spent by all reduces in occupied slots (ms)=9248
Total time spent by all map tasks (ms)=189730
Total time spent by all reduce tasks (ms)=9248
Total vcore-milliseconds taken by all map tasks=189730
Total vcore-milliseconds taken by all reduce tasks=9248
Total megabyte-milliseconds taken by all map tasks=194283520
Total megabyte-milliseconds taken by all reduce tasks=9469952
Map-Reduce Framework Map input records=5
Map output records=10
Map output bytes=90
Map output materialized bytes=140
Input split bytes=795
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=140
Reduce input records=10
Reduce output records=0
Spilled Records=20
Shuffled Maps =5
Failed Shuffles=0
Merged Map outputs=5
GC time elapsed (ms)=16465
CPU time spent (ms)=5220
Physical memory (bytes) snapshot=670711808
Virtual memory (bytes) snapshot=5995196416
Total committed heap usage (bytes)=619401216
Shuffle Errors BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters  Bytes Read=590
File Output Format Counters  Bytes Written=97
Job Finished in 60.235 seconds
Estimated value of Pi is 3.20000000000000000000


Seeing the Important factors:

Job ID
job_1463856930642_0002
Format for job id: job_timestamp_[job serial number]
0002: job serial number, starts from 0, up to 1000

Task Id
Task Id : attempt_1463856930642_0002_m_000002_0, Status : FAILED
Format for task id: attempt_timestamp_[job serial number]_m_[task serial number]_0
m means Map
r mean Reduce

Counters
MapReduce has 6 counters

  1. File System Counters
  2. Job Counters 
  3. Map-Reduce Framework
  4. Shuffle Errors
  5. File Input Format Counters 
  6. File Output Format Counters 

MapReduce History Server

Provides MapReduce's working logs, for example: Number of Maps, Number of Reduce, job submit time, job start time, job finish time, etc..
When we click on History of application, it's showing ERR_CONNECTION_REFUSED
It's pointing to port 19888, but port 19888 is not opened by default.

using netstat -tnlp to see the port it's really not opened.

To Start History Server
sbin/mr-jobhistory-daemon.sh start historyserver

Web UI
http://hadoop-yarn.gvace.com:19888/

To Stop History Server
sbin/mr-jobhistory-daemon.sh stop historyserver

























Hadoop HDFS Web UI


Now goto http://hadoop-yarn.gvace.com:50070/ we will see namenode info from webpage

Now goto http://hadoop-yarn.gvace.com:50090/ we will see secondarynamenode info from webpage






Started:Sat May 21 11:21:01 EDT 2016
Version:2.6.4, r5082c73637530b0b7e115f9625ed7fac69f937e6
Compiled:2016-02-12T09:45Z by jenkins from (detached from 5082c73)
Cluster ID:CID-4fc1b111-3c85-4e0a-9cd5-e44ec8fe0192
Block Pool ID:BP-796093918-192.168.56.95-1463114304816

Block Pool ID: New in Hadoop2,
HDFS files are stored as blocks in hard drive. These blocks were put into a physical pool to be managed.
Every Namenode has a pool, each pool has an ID

Cluster ID and Block Pool ID are concepts for Namenode Federation.




Logs

IN $HADOOP_HOME/logs

hadoop-yushan-namenode-hadoop-yarn.gvace.com.log 156419 bytes May 21, 2016 12:54:32 PM
hadoop-yushan-namenode-hadoop-yarn.gvace.com.out 4913 bytes May 21, 2016 11:31:45 AM



.log file: by log4j, log most application logs, configured by log4j.properties
.out file: output standard output and standard error logs, does not have a lot


log file name format:
[framework]-[username]-[process name]-[host name].log/outhadoop-yushan-namenode-hadoop-yarn.gvace.com.log


Seeing hadoop-yushan-namenode-hadoop-yarn.gvace.com.out
can also run command: ulimit -a


NameNode Journal Status

Edits of NameNode


NameNode Storage



Startup Progress
SecondaryNameNode combines fsimage and edits periodically, and update fsimage, delete  edits.
Each time it combines, it saves a checkpoint

Thursday, May 12, 2016

Hadoop 2 CentOS build(from pseudo to real distributed)

We gonna have four machines:
192.168.56.95    hadoop-yarn.gvace.com       hadoop-yarn
192.168.56.96    hadoop-node1.gvace.com     hadoop-node1
192.168.56.97    hadoop-node2.gvace.com     hadoop-node2
192.168.56.98    hadoop-node3.gvace.com     hadoop-node3
We can have all of them as virtual machines, start from the first one, then clone hadoop-yarn to other nodes.

We will start with building a pseudo distributed first, start with only hadoop-yarn.gvace.com(I will use VirtualBox), which runs everything by it self.
After everything tests ok in pseudo distributed, we move on to real distributed(I will use VirtualBox to build all servers).

Linux Version
CentOS-7-x86_64-Minimal-1511.iso
In VirtualBox

Hadoop Version
hadoop-2.6.4

JDK Version
OpenJDK 7

Disable iptable
File: /etc/sysconfig/selinux
Config: SELINUX=disable

Network config:

Choose your own host-only interface which set from virtualbox, mine here is enp0s8
Set a static IP with the same network from host machine
/etc/sysconfig/network-scripts/ifcfg-enp0s8 
DEVICE=enp0s8
BOOTPROTO=none
ONBOOT=yes
NETWORK=192.168.56.0
NETMASK=255.255.255.0
IPADDR=192.168.56.95
USERCTL=no

If you want to connect internet(Optional)
This version of CentOS does not provide default DNSChoose your own NAT interface which set from virtualbox, mine here is enp0s3
/etc/sysconfig/network-scripts/ifcfg-enp0s3
Config
BOOTPROTO=dhcp
ONBOOT=yes
DNS1=8.8.8.8
DNS2=4.2.2.2 
Install net-tools for convenient, like ifconfig commands(Optional)
sudo yum install net-tools

Change Hostname
Change to my own HOSTNAME in /etc/sysconfig/network
hostname hadoop-yarn.gvace.com
Also change in hosts /etc/hosts
127.0.0.1   localhost
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.56.95    hadoop-yarn.gvace.com      hadoop-yarn
192.168.56.96    hadoop-node1.gvace.com     hadoop-node1
192.168.56.97    hadoop-node2.gvace.com     hadoop-node2
192.168.56.98    hadoop-node3.gvace.com     hadoop-node3

Note:Standard config: IP followed by hostname with domain followed by hostname

Also change in here /etc/hostname
hadoop-yarn
This may not necessary, but let's put it here first.

Setup SSH

Some intro to ssh
http://gvace.blogspot.com/2016/04/ssh.html

For distributed hadoop, it's all related to ssh communication.
So the Namenode server required to talk to any of the Datanode server.
One first step is to call "start-dfs.sh", and this script is using ssh to communicate with other servers.
We need to enable SSH without password from Namenode server to any slave.

For pseudo hadoop server, it also required to ssh to itself if we use "start-dfs.sh" like scripts.

So we can try start-dfs.sh first, let's see what server/ip/hostname it requesting to have ssh login.


As we see, start-dfs.sh requested three times from me to enter password

  1. namenode: yushan@hadoop-yarn.gvace.com
  2. datanode: 0.0.0.0
  3. secondarynamenode: 0.0.0.0

We know 0.0.0.0 means localhost, but when we are in distributed hadoop, which is normally we will do, there will be a lot different datanodes with different hostname/ip. So all these required to have remote ssh without password.

So in Namenode server
First, generate public key and private key: ssh-keygen -t rsa
Second, copy my public key to myself: ssh-copy-id yushan@localhost  this solves ssh to 0.0.0.0
Third, copy my public key to any other servers: ssh-copy-id yushan@datanode1
In real world, when starting start-dfs.sh and start-yarn.sh, we may have more times to enter password.
So remember these host names that requiring password, and then copy public key to those hosts. This will make sure your hadoop system runs without manually typing password.




Install JDK
sudo yum install java-1.7.0-openjdk-devel
If install succeed, we can try command java and javac to verify.
Use "whereis java" to find where it installed, you will jump through it's link and finally find the install location
Here it is: /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101-2.6.6.1.el7_2.x86_64
This is too long. Luckily we have also several links which are much shorter in /usr/lib/jvm.
It's pointing from /etc/alternatives/, and all finally point to same jdk directory
We can use /usr/lib/jvm/java-1.7.0 as $JAVA_HOME

Set JDK env
Create file /etc/profile.d/java.sh
export JAVA_HOME=/usr/lib/jvm/java-1.7.0export PATH=.:$JAVA_HOME/bin:$PATH
Update new configs: source /etc/profile

Install Hadoop

download hadoop-2.6.4.tar.gz
extract it to /usr/lib/

Config files

We can find default xml files in source code for core/hdfs/yarn/mapred. So we can pick up the things we need to change from the default xml files, and put them into our xml files.

In /usr/lib/hadoop/conf
  1. hadoop-env.sh
  2. yarn-env.sh(optional)
  3. mapred-env.sh(optional)
  4. core-site.xml
  5. hdfs-site.xml
  6. yarn-site.xml
  7. mapred-site.xml
hadoop-env.sh

Change JAHA_HOME to jdk directory
export JAVA_HOME=/usr/lib/jvm/java-1.7.0

yarn-env.sh(optional)
Uncomment and change:
export JAVA_HOME=/usr/lib/jvm/java-1.7.0

mapred-env.sh(optional)
Uncomment and change:
export JAVA_HOME=/usr/lib/jvm/java-1.7.0

core-site.xml
Assign hdfs default name, data/tmp directory, manually create this directory
This need to be point to hostname of Namenode
<configuration>
        <property>
                <name>fs.default.name</name>
                <value>hdfs://hadoop-yarn.gvace.com:8020</value>
                <description>NameNode</description>
        </property>
        <property>
                <name>hadoop.tmp.dir</name>
                <value>/usr/lib/hadoop-2.6.4/data/tmp</value>
        </property>
</configuration>

I just found that fs.default.name is Deprecated in hadoop2. We can use fs.defaultFS instead.

hdfs-site.xml
dfs.replication default value is 3
Rule: replication number must NOT bigger than datanode number.

<configuration>
        <property>
                <name>dfs.replication</name>
                <value>1</value>
        </property>
        <property>
                <name>dfs.permissions</name>
                <value>false</value>
        </property>
</configuration>


yarn-site.xml

<configuration>
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>
</configuration>


mapred-site.xml
<configuration>
        <property>
                <name>mapreduce.framework.name</name>
                <value>yarn</value>
        </property>
</configuration>


Format Hadoop

hadoop namenode -format

We may find some issue by user permissions.

This is the error I got.
16/05/02 00:47:35 INFO namenode.FSNamesystem: fsOwner=yushan
16/05/02 00:47:35 INFO namenode.FSNamesystem: supergroup=supergroup
16/05/02 00:47:35 INFO namenode.FSNamesystem: isPermissionEnabled=false
16/05/02 00:47:35 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
16/05/02 00:47:35 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
16/05/02 00:47:36 INFO namenode.FSEditLog: dfs.namenode.edits.toleration.length = 0
16/05/02 00:47:36 INFO namenode.NameNode: Caching file names occuring more than 10 times 
java.io.IOException: Cannot create directory /user/lib/hadoop-2.6.4/data/tmp/dfs/name/current

As wee see, by default, hadoop user is the creating user's username, and group is "supergroup"
We can change the whole folder /usr/lib/hadoop-2.6.4 and all its sub forlder/files to the username and group,(create the group name if not exist) and assign 755 on the whole folder. This will be fixed.

Cannot do hadoop namenode -format more than once
If want to format more than once, we can delete all files from /user/lib/hadoop-2.6.4/data/tmp/ folder, and format again

The namenode format generates a Cluster ID
You can also assign a Cluster ID if needed
bin/hdfs namenode -format -clusterid yarn-cluster

namenode.HostFileManager: read includes:
HostSet(
)
namenode.HostFileManager: read excludes:
HostSet(
)


Safe Mode

  • When namenode starts, it loads fsimage into memory, and run operations from edits.
  • Once the memory built, it creates a new fsimage and an empty edits
  • Right in this moment, namenode is in Safe Mode. Now Namenode is read-only for clients
  • In this moment, Namenode collects reports from datanodes. When data block reached smallest replication number line, it can be seen as "Safe". 
  • When a portion(configurable) of data blocks are "Safe", it waits for a moment, then exit Safe Mode.
  • When detect replication number of data block is lower than smallest line, that datablock will be duplicating until it reached smallest line.
    Datablock's position is not determined by Namenode, it is stored in datanode as block list.


If Safe Mode is off, that means in hdfs-site.xml, we set dfs.permissions to false
This means any user can enter the hdfs do file create/edits/delete to hdfs



Different ways of starting Hadoop services
We have the following scripts to start/stop services
  • start-all.sh (Deprecated)
  • start-dfs.sh
  • start-yarn.sh
  • hadoop-daemons.sh
  • hadoop-daemon.sh
  • yarn-daemons.sh
  • yarn-daemon.sh
  • slaves.sh
Looking at each files, they are actually calling each other
If use start-dfs.sh and start-yarn.sh, they require ssh in slave sub scripts, so it can start all services from remote. After ssh to remote server, it uses hadoop-daemon.sh to start each individual services.
Single pseudo mode 

Start HDFS

NameNode, DataNode, SecondaryNameNode

Start namenode
sbin/hadoop-daemon.sh start namenode

Start datanode
sbin/hadoop-daemon.sh start datanode

Start secondarynamenode
sbin/hadoop-daemon.sh start secondarynamenode

Now goto http://hadoop-yarn.gvace.com:50070/ we will see namenode info from webpage

Now goto http://hadoop-yarn.gvace.com:50090/ we will see secondarynamenode info from webpage

Start YARN
ResourceManager, NodeManager

Start ResourceManager
sbin/yarn-daemon.sh start resourcemanager

Start NodeManager

sbin/yarn-daemon.sh start nodemanager

Now goto http://hadoop-yarn.gvace.com:8088/ we will see yarn info from webpage
Now goto http://hadoop-yarn.gvace.com:8042/ we will see yarn node info from webpage

Stop HDFS and YARN

NameNode, DataNode, SecondaryNameNode, ResourceManager, NodeManager

Stop namenode
sbin/hadoop-daemon.sh stop namenode

Stop datanode
sbin/hadoop-daemon.sh stop datanode

Stop secondarynamenode
sbin/hadoop-daemon.sh stop secondarynamenode

Stop ResourceManager
sbin/yarn-daemon.sh stop resourcemanager

Stop NodeManager
sbin/yarn-daemon.sh stop nodemanager

Distributed Mode

All things above are for pseudo mode. Once we done pseudo mode, we can easily apply the mostly same configs to slaves(datanodes). If all nodes are virtual box, we can clone the Namenode, then apply a little config changes to the new nodes.(Remember select to reinitialize MAC address for all network cards when clone in VirtualBox/VM)

Before we clone the machine, we need to do some config changes for the Namenode.

hdfs-site.xml
We increase dfs.replication to 3(which default is also 3)
As we know, safe mode is related to dfs.replication, so the number of dfs.replication required to be lower/equal to the number of slaves we have.
In this case, we have 3 slaves, so 3 replication is just about good.

We also move SecondaryNameNode from hadoop-yarn server to hadoop-node1, normally NameNode and SecondaryNameNode are not in the save server.

<configuration>
        <property>
                <name>dfs.replication</name>
                <value>3</value>
        </property>
        <property>
                <name>dfs.permissions</name>
                <value>false</value>
        </property>
        <property>
                <name>dfs.namenode.secondary.http-address</name>
                <value>hadoop-node1.gvace.com:50090</value>
        </property>
        <property>
                <name>dfs.namenode.secondary.https-address</name>
                <value>hadoop-node1.gvace.com:50091</value>
        </property>
</configuration>

yarn-site.xml
yarn will still run in Namenode, I just replace default value of 0.0.0.0 to real hostname hadoop-yarn.

<configuration>

<!-- Site specific YARN configuration properties -->
        <property>
                <description>The hostname of the RM.</description>
                <name>yarn.resourcemanager.hostname</name>
                <value>hadoop-yarn.gvace.com</value>
        </property>
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>
        <property>
                <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
                <value>org.apache.hadoop.mapred.ShuffleHandler</value>
        </property>
</configuration>


slaves
The slaves config file located in the same folder as hdfs-site.xml and yarn-site.xml
Add the following lines to slaves, which points out all nodes

hadoop-node1.gvace.com
hadoop-node2.gvace.com
hadoop-node3.gvace.com

Up to here, we can shutdown Namenode and start clone if you also using VM for all the nodes.

Again, here is the list of all four boxes:
192.168.56.95    hadoop-yarn.gvace.com
192.168.56.96    hadoop-node1.gvace.com
192.168.56.97    hadoop-node2.gvace.com
192.168.56.98    hadoop-node3.gvace.com

The goal is: Use start-dfs.sh and start-yarn.sh from Namenode without password, this require Namenode can ssh to all other nodes without password.
To do this, make sure all slaves hosts has ~/.ssh/authorized_keys of Namenode's public key.
(If before clone, you can ssh from Namenode host to Namenode hostname itself without passowrd, then probably we will be OK, since each cloned host will have Namenode's public key)

We need to change the following configs for each node, as node1, node2, node3
change the IP:
/etc/sysconfig/network-scripts/ifcfg-enp0s8
change hostname: /etc/sysconfig/network
change hostname: /etc/hostname

After all nodes configured, we will do start-dfs.sh from Namenode manually once, from command line. 
If prompt to ask for yes/no, type yes.(For the first time running start-dfs.sh in distributed mode, the lines of start-dfs.sh output may be queued up. So when you see output contains "yes/no", just type "yes", no matter which output line you are in.)

All we want to make sure is: ssh can go through from Namenode to every other nodes without password.

This is the final result when we try start-dfs.sh.
This is the final result when we try start-yarn.sh.


The web url will change some based on which service running on which server
NameNode: http://hadoop-yarn:50070/

SecondaryNameNode: http://hadoop-node1.gvace.com:50090/
Yarn(Resource Manager): http://hadoop-yarn.gvace.com:8088/
Each slave server has a Node Manager
Node Manager: http://hadoop-node1.gvace.com:8042/
Node Manager: http://hadoop-node2.gvace.com:8042/
Node Manager: http://hadoop-node3.gvace.com:8042/

MapReduce History Server
Provides MapReduce's working logs, for example: Number of Maps, Number of Reduce, job submit time, job start time, job finish time, etc..
When we click on History of application, it's showing ERR_CONNECTION_REFUSED
It's pointing to port 19888, but port 19888 is not opened by default.

using netstat -tnlp to see the port it's really not opened.

To Start History Server
sbin/mr-jobhistory-daemon.sh start historyserver

Web UI
http://hadoop-yarn.gvace.com:19888/

To Stop History Server
sbin/mr-jobhistory-daemon.sh stop historyserver