Saturday, October 22, 2016

HBase Application


API Available

Java API: Full-featured, does not require server daemon, just need client libraries/configs to be installed

REST API: Easy to use, stateless, but slower than Java API, require REST API server, can directly get data from http request

Thrift API: Multi-language support, light weight, require Thrift API server






Configuration
  • Key-values that define client application properties
DDL
  • HBase Admin
    create/modify/delete tables
  • H*Descriptor
    Describe and manipulate metadata
DML
  • Mutations: Incr/Delete/Put/Check-and-Put/Multiput*
Query
  • Scan/Get/Filter

















HBase System Architecture, Compare to Others





.META. 's Region Server will be well known by ZooKeeper, 
So Master Node can know where to read it by talking to ZooKeeper.
Once .META. is cached, Master can find any data lives in HBase





HBase VS Other NoSQL

  • Great large Scalibility
  • Strict Consistency
  • Availability not perfect, but can be managed 

  • Integrates with Hadoop
    Very efficient bulk loads and MapReduce analysis
  • Ordered range partitions(not Hash)
  • Automatically shards/scales(just add more servers)
  • Sparse storage(for each row)

Compare to RDBMS

Enterprise Hadoop Cluster Architecture

Enterprise Hadoop Cluster Architecture

Master: SPOF to one single node

  • NameNode, JobTracker / ResourceManager
  • Hive Metastore, HiveServer2
  • Impala StateStore, Catalog Server
  • Spark Master
Node kernel environment setup
  • Ulimit, /etc/security/limits.conf to configure nofile
    Since default Linux system only allow 1024 sessions for file system.
    For Hadoop Cluster, Habase, this is not enough, so we need to configure this.
  • THP(Transparent Huge Page), ACPI, Memory overcommit issue
    THP: A new function from Linux kernel, its cache may not compatible with Hadoop, this will cause high memory, so it's better to turn it off.
    ACPI: Power management, this may also cause high memory, better to turn off.
    Memory Overcommit: System don't give more memory to app when app has a lot commit(50% total), so we need to either configure it higher or turn it off.
  • Customize configuration on different functional node
        If High Memery? How to configure swap?
        High disk IO? Then we may not need high OverCommit.
        High CPU, high system load?
Enterprise Hadoop Cluster Data Management

HDFS config
  • HDFS Block Size: dfs.block.size
  • Replication Factor: dfs.replication, default is 3
  • Turn on dfs.permissions? fs.permissions.umask-mode
  • User permission
  • DataNode's dfs disk partition, better to be separate from Linux system partition, otherwise may cause system fail to start because of disk space.
Resource Allocation
  • CPU, Memory, Disk IO, Network IO
  • HDFS, MapReduce(Yarn), Hive jobs
  • HBase, Hive, Impala, Spark resource allocation, limitations
Enterprise Hadoop Cluster Task Scheduling

Oozie
  • dispatch HDFS, MapReduce jobs, Pig, Hive, Sqoop, Java Apps, shell, email
  • take advantage of cluster resources
  • an alternative of cron job
    Distribution start job
    Parallel start multiple jobs
    Can fail tolerance, re-try, alarm in working flow

ZooKeeper
  • Synchronize node situation
  • The communication between HBase's master server and region server
  • Synchronize region's situation for HBase's tables
    online or split?
Hadoop Cluster Monitors

Cloudera Manager's Monitor Tool
  • Strong, Full of Monitor values, Monitors well to Impala
  • Waste of system resources
Ganglia
  • Monitor by collecting Hadoop Metrics
  • Use self's gmond to collect CPU, Memory, NetworkIO
  • Collect JMX, IPC, RPC data
  • Weak point: No disk IO by default
Graphite
  • Good third party plugins, can push KPI in java applications(StatsD, Yammer Metrics)
  • Can collect server data using plugins(collectd, logster, jmxtrans)
Splunk: expensive
Nagios, Icinga


Hadoop Cluster Issue and Limitation

Issues
  • HA: Too many too many single nodes
  • Dangerous to upgrade
  • Waste too much memory
Limitations
  • Missing solutions for cross data center
  • High concurrent bottle neck
  • Impala, Spark no ability to handle too many query at the same time, and no solution yet
  • No matter which type of model, database. Join is a big issue
    Spark is still not good for this, it's more like MapReduce + Pig



































HBase Table and Storage Design

HBase is a set of tables defined by Key-Values

Definitions:

  • Row KeyEach row has a unique Row Key
  • Column FamilyUse to group different columns, defined when building the table
  • Column QualifierIdentify each column, can be added in run-time
    Expandable
    , may have different numbers of Qualifiers in each row
  • timestampA cell can have different versions of data based on different timestamps(for consistency)

A Row is referred by a Row Key
So a Column is referred by a Column Family and a Column Qualifier
A Cell is referred by a Row and a Column
A Cell will contain multiple version of values based on different timestamps

The above four things combines together to become a Key in the following format:
    [Row Key]/[Column Family]:[Column Qualifier]/[timestamp]

With this Key, we can find the unique value we want to read.

Every value we get will be in byte[] format, developer need to know the structure of value themselves.

Rows are strongly consistency.






How does HBase db stores in file system?

  1. Group continues rows into the same Region
  2. Region is divided by each Column Family to be several units
  3. Each one of these divided unit stored as a single file in file system
  4. The values in each file are in lexicographical order(I think order by Column Qualifier)
So if we open one HBase db file, it will only have
  1. Continues rows
  2. Values in the same Column Family
  3. (I think)Values are ordered by Column Qualifier




Special Table

Table .META.
This table is using the same format as regular tables.
It can be in any one node of HBase cluster, it stores information for where to find the target region.











Friday, August 12, 2016

TransferQueue

If someone is already waiting to take the element, we don't need to put it in the queue, just directly transfer it to receiver.

Java 7 introduced the TransferQueue. This is essentially a BlockingQueue with an
additional operation—transfer(). This operation will immediately transfer a work
item to a receiver thread if one is waiting. Otherwise it will block until there is a thread
available to take the item. This can be thought of as the “recorded delivery” option—
the thread that was processing the item won’t begin processing another item until it
has handed off the current item. This allows the system to regulate the speed at which
the upstream thread pool takes on new work.
It would also be possible to regulate this by using a blocking queue of bounded
size, but the TransferQueue has a more flexible interface. In addition, your code may
show a performance benefit by replacing a BlockingQueue with a TransferQueue.
This is because the TransferQueue implementation has been written to take into
account modern compiler and processor features and can operate with great efficiency.
As with all discussions of performance, however, you must measure and prove
benefits and not simply assume them. You should also be aware that Java 7 ships with
only one implementation of TransferQueue—the linked version.
In the next code example, we’ll look at how easy it is to drop in a TransferQueue as
a replacement for a BlockingQueue. Just these simple changes to listing 4.13 will
upgrade it to a TransferQueue implementation, as you can see here.



public class BlockingQueueTest1 {
 public static void main(String[] args) {

  final Update.Builder ub = new Update.Builder();
  final TransferQueue<Update> lbq = new LinkedTransferQueue<Update>();
  MicroBlogExampleThread t1 = new MicroBlogExampleThread(lbq, 10) {
   public void doAction() {
    text = text + "X";
    Update u = ub.author(new Author("Tallulah")).updateText(text)
      .build();
    boolean handed = false;
    try {
     handed = updates.tryTransfer(u, 100, TimeUnit.MILLISECONDS);//tryTransfer
    } catch (InterruptedException e) {
    }
    if (!handed)
     System.out.println("Unable to hand off Update to Queue due to timeout");
    else System.out.println("Offered");
   }
  };
  MicroBlogExampleThread t2 = new MicroBlogExampleThread(lbq, 200) {
   public void doAction() {
    Update u = null;
    try {
     u = updates.take();
     System.out.println("Took");
    } catch (InterruptedException e) {
     return;
    }
   }
  };

  t1.start();
  t2.start();
 }
}


abstract class MicroBlogExampleThread extends Thread {
 protected final TransferQueue<Update> updates;
 protected String text = "";
 protected final int pauseTime;
 private boolean shutdown = false;

 public MicroBlogExampleThread(TransferQueue<Update> lbq_, int pause_) {
  updates = lbq_;
  pauseTime = pause_;
 }

 public synchronized void shutdown() {
  shutdown = true;
 }

 @Override
 public void run() {
  while (!shutdown) {
   doAction();
   try {
    Thread.sleep(pauseTime);
   } catch (InterruptedException e) {
    shutdown = true;
   }
  }
 }

 public abstract void doAction();
}









Fine-Grained Control of BlockingQueue

We have learned BlockingQueue
http://gvace.blogspot.com/2016/08/blockingqueue.html

In addition to the simple take() and offer() API, BlockingQueue offers another way
to interact with the queue that provides even more control, at the cost of a bit of
extra complexity. This is the possibility of putting or taking with a timeout, to allow
the thread encountering issues to back out from its interaction with the queue and do something else instead.


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueTest1 {
 public static void main(String[] args) {

  final Update.Builder ub = new Update.Builder();
  final BlockingQueue<Update> lbq = new LinkedBlockingQueue<>(100);
  MicroBlogExampleThread t1 = new MicroBlogExampleThread(lbq, 10) {
   public void doAction() {
    text = text + "X";
    Update u = ub.author(new Author("Tallulah")).updateText(text)
      .build();
    boolean handed = false;
    try {
     handed = updates.offer(u, 100, TimeUnit.MILLISECONDS);//set timeout
    } catch (InterruptedException e) {
    }
    if (!handed)
     System.out.println("Unable to hand off Update to Queue due to timeout");
    else System.out.println("Offered");
   }
  };
  MicroBlogExampleThread t2 = new MicroBlogExampleThread(lbq, 1000) {
   public void doAction() {
    Update u = null;
    try {
     u = updates.take();
     System.out.println("Took");
    } catch (InterruptedException e) {
     return;
    }
   }
  };

  t1.start();
  t2.start();
 }
}
abstract class MicroBlogExampleThread extends Thread {
 protected final BlockingQueue<Update> updates;
 protected String text = "";
 protected final int pauseTime;
 private boolean shutdown = false;

 public MicroBlogExampleThread(BlockingQueue<Update> lbq_, int pause_) {
  updates = lbq_;
  pauseTime = pause_;
 }

 public synchronized void shutdown() {
  shutdown = true;
 }

 @Override
 public void run() {
  while (!shutdown) {
   doAction();
   try {
    Thread.sleep(pauseTime);
   } catch (InterruptedException e) {
    shutdown = true;
   }
  }
 }

 public abstract void doAction();
}




Followed a TransferQueue which is a little more efficient.
http://gvace.blogspot.com/2016/08/transferqueue.html





Thursday, August 11, 2016

BlockingQueue

The BlockingQueue is a queue that has two additional special properties:
■ When trying to put() to the queue, it will cause the putting thread to wait for
space to become available if the queue is full.
■ When trying to take() from the queue, it will cause the taking thread to block
if the queue is empty.
These two properties are very useful because if one thread (or pool of threads) is outstripping
the ability of the other to keep up, the faster thread is forced to wait, thus
regulating the overall system.


Two implementations of BlockingQueue
Java ships with two basic implementations of the BlockingQueue interface: the
LinkedBlockingQueue and the ArrayBlockingQueue. They offer slightly different
properties; for example, the array implementation is very efficient when an exact
bound is known for the size of the queue, whereas the linked implementation may be
slightly faster under some circumstances.

It can be better to have this,

BlockingQueue<WorkUnit<MyAwesomeClass>>

where WorkUnit (or QueueObject, or whatever you want to call the container class) is
a packaging interface or class that may look something like this:
public class WorkUnit<T> {
 private final T workUnit;

 public T getWork() {
  return workUnit;
 }

 public WorkUnit(T workUnit_) {
  workUnit = workUnit_;
 }
}

The reason for doing this is that this level of indirection provides a place to add additional
metadata without compromising the conceptual integrity of the contained type
(MyAwesomeClass in this example).
This is surprisingly useful. Use cases where additional metadata is helpful are
abundant. Here are a few examples:
■ Testing (such as showing the change history for an object)
■ Performance indicators (such as time of arrival or quality of service)
■ Runtime system information (such as how this instance of MyAwesomeClass has
been routed)

Example

public class Appointment<T> {
 private final T toBeSeen;

 public T getPatient() {
  return toBeSeen;
 }

 public Appointment(T incoming) {
  toBeSeen = incoming;
 }
}
abstract class Pet {
 protected final String name;

 public Pet(String name) {
  this.name = name;
 }

 public abstract void examine();
}
class Cat extends Pet {
 public Cat(String name) {
  super(name);
 }

 public void examine() {
  System.out.println("Meow!");
 }
}
class Dog extends Pet {
 public Dog(String name) {
  super(name);
 }

 public void examine() {
  System.out.println("Woof!");
 }
}




From this simple model, you can see that we can model the veterinarian’s queue as
LinkedBlockingQueue<Appointment<Pet>>, with the Appointment class taking the role
of WorkUnit.




import java.util.concurrent.BlockingQueue;

public class Veterinarian extends Thread {
 protected final BlockingQueue<Appointment<Pet>> appts;
 protected String text = "";
 protected final int restTime;
 private boolean shutdown = false;

 public Veterinarian(BlockingQueue<Appointment<Pet>> lbq, int pause) {
  appts = lbq;
  restTime = pause;
 }

 public synchronized void shutdown() {
  shutdown = true;
 }

 @Override
 public void run() {
  while (!shutdown) {
   seePatient();
   try {
    Thread.sleep(restTime);
   } catch (InterruptedException e) {
    shutdown = true;
   }
  }
 }

 public void seePatient() {
  try {
   Appointment<Pet> ap = appts.take();
   Pet patient = ap.getPatient();
   patient.examine();
  } catch (InterruptedException e) {
   shutdown = true;
  }
 }
}


The following is instance to start two threads, one is en-queue pets, one is examine cats in the queue.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class BlockingQueueTest {
 public static final BlockingQueue<Appointment<Pet>> QUEUE = new LinkedBlockingQueue<Appointment<Pet>>(10);
 public static void main(String[] args){
  new Thread(){
   @Override
   public void run(){
    for(int i=0;i<10;i++){
     Pet p1 = new Cat("cat"+i);
     Pet p2 = new Dog("dog"+i);
     Appointment<Pet> app1 = new Appointment<Pet>(p1);
     Appointment<Pet> app2 = new Appointment<Pet>(p2);
     try {
      QUEUE.put(app1);
      System.out.println("EnqueueCat"+i);
      QUEUE.put(app2);
      System.out.println("EnqueueDog"+i);
      Thread.sleep(300);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   }
  }.start();
  
  Veterinarian veter = new Veterinarian(QUEUE, 500);
  veter.start();
 }
}

We have investigate on BlockingQueue Usage
http://gvace.blogspot.com/2016/08/fine-grained-control-of-blockingqueue.html

CopyOnWriteArrayList

If the list is altered while a read or a traversal is taking place, the entire array
must be copied.

As the name suggests, the CopyOnWriteArrayList class is a replacement for
the standard ArrayList class. CopyOnWriteArrayList has been made threadsafe
by the addition of copy-on-write semantics, which means that any operations
that mutate the list will create a new copy of the array backing the list
(as shown in figure 4.8). This also means that any iterators formed don’t have to
worry about any modifications that they didn’t expect.
This approach to shared data is ideal when a quick, consistent snapshot of data
(which may occasionally be different between readers) is more important than perfect
synchronization (and the attendant performance hit). This is often seen in non-missioncritical
data.
Let’s look at an example of copy-on-write in action. Consider a timeline of microblogging
updates. This is a classic example of data that isn’t 100 percent mission-critical
and where a performant, self-consistent snapshot for each reader is preferred over
total global consistency. This listing shows a holder class that represents an individual
user’s view of their timeline.


public class MicroBlogTimeline {
 private final CopyOnWriteArrayList<Update> updates;
 private final ReentrantLock lock;
 private final String name;
 private Iterator<Update> it;

 public MicroBlogTimeline(String name_,
   CopyOnWriteArrayList<Update> updates_, ReentrantLock lock_) {
  this.name = name_;
  this.updates = updates_;
  this.lock = lock_;
 }

 public void addUpdate(Update update_) {
  updates.add(update_);
 }

 public void prep() { // Set up Iterator
  it = updates.iterator();
 }

 public void printTimeline() {
  lock.lock();
  try {
   if (it != null) {
    System.out.print(name + ": ");
    while (it.hasNext()) {
     Update s = it.next();
     System.out.print(s + ", ");
    }
    System.out.println();
   }
  } finally {
   lock.unlock();
  }
 }
}

class CopyOnWriteArrayListTest {
 public static void main(String[] args) {
  final CountDownLatch firstLatch = new CountDownLatch(1);
  final CountDownLatch secondLatch = new CountDownLatch(1);
  final Update.Builder ub = new Update.Builder();
  final CopyOnWriteArrayList<Update> l = new CopyOnWriteArrayList<>();
  l.add(ub.author(new Author("Ben")).updateText("I like pie").build());
  l.add(ub.author(new Author("Charles")).updateText("I like ham on rye").build());
  ReentrantLock lock = new ReentrantLock();
  final MicroBlogTimeline tl1 = new MicroBlogTimeline("TL1", l, lock);
  final MicroBlogTimeline tl2 = new MicroBlogTimeline("TL2", l, lock);
  Thread t1 = new Thread() {
   public void run() {
    l.add(ub.author(new Author("Jeffrey")).updateText("I like a lot of things").build());
    tl1.prep();
    firstLatch.countDown();
    try {
     secondLatch.await();
    } catch (InterruptedException e) {
    }
    tl1.printTimeline();
   }
  };
  Thread t2 = new Thread() {
   public void run() {
    try {
     firstLatch.await();
     l.add(ub.author(new Author("Gavin")).updateText("I like otters").build());
     tl2.prep();
     secondLatch.countDown();
    } catch (InterruptedException e) {
    }
    tl2.printTimeline();
   }
  };
  t1.start();
  t2.start();
 }
}



There is a lot of scaffolding in the listing—unfortunately this is difficult to avoid.
There are quite a few things to notice about this code:
■ CountDownLatch is used to maintain close control over what is happening
between the two threads.
■ If the CopyOnWriteArrayList was replaced with an ordinary List (B), the
result would be a ConcurrentModificationException.
■ This is also an example of a Lock object being shared between two threads to
control access to a shared resource (in this case, STDOUT). This code would be
much messier if expressed in the block-structured view.


As you can see, the second output line (tagged as TL1) is missing the final update (the
one that mentions otters), despite the fact that the latching meant that mbex1 was
accessed after the list had been modified. This demonstrates that the Iterator contained
in mbex1 was copied by mbex2, and that the addition of the final update was invisible
to mbex1. This is the copy-on-write property that we want these objects to display.


CuncurrentHashMap

The classic HashMap uses a function (the hash function) to determine which “bucket” it will store the key/value pair in. This is where the “hash” part of the class’s name comes from. This suggests a rather straightforward multithreaded generalization—instead of needing to lock the whole structure when making a change, it’s only necessary to lock the bucket that’s being altered.


The ConcurrentHashMap class also implements the ConcurrentMap interface, which contains some new methods to provide truly atomic functionality:
■ putIfAbsent()—Adds the key/value pair to the HashMap if the key isn’t already present.
■ remove()—Atomically removes the key/value pair only if the key is present and the value is equal to the current state.
■ replace()—The API provides two different forms of this method for atomic replacement in the HashMap.
As an example, you can replace the synchronized methods in listing with regular, unsynchronized access if you alter the HashMap called arrivalTime to be a ConcurrentHashMap as well. Notice the lack of locks in the following listing—there is no explicit synchronization at all.


Original HashMap
public class ExampleTimingNode implements SimpleMicroBlogNode {
 private final String identifier;
 private final Map<Update, Long> arrivalTime = new HashMap<>();

 public ExampleTimingNode(String identifier_) {
  identifier = identifier_;
 }

 public synchronized String getIdentifier() {
  return identifier;
 }

 public synchronized void propagateUpdate(Update update_) {
  long currentTime = System.currentTimeMillis();
  arrivalTime.put(update_, currentTime);
 }

 public synchronized boolean confirmUpdateReceived(Update update_) {
  Long timeRecvd = arrivalTime.get(update_);
  return timeRecvd != null;
 }
}

Replace with ConcurrentHashMap

import java.util.concurrent.ConcurrentHashMap;

public class ExampleMicroBlogTimingNode implements SimpleMicroBlogNode {

 private final ConcurrentHashMap<Update, Long> arrivalTime = new ConcurrentHashMap<Update, Long>();

 public void propagateUpdate(Update upd_) {
  arrivalTime.putIfAbsent(upd_, System.currentTimeMillis());
 }

 public boolean confirmUpdateReceived(Update upd_) {
  return arrivalTime.get(upd_) != null;
 }
}

CountDownLatch

CountDownLatch: Let's start together

This is achieved by providing an int value (the count) when constructing a new
instance of CountDownLatch. After that point, two methods are used to control the
latch: countDown() and await(). The former reduces the count by 1, and the latter
causes the calling thread to wait until the count reaches 0 (it does nothing if the count
is already 0 or less). This simple mechanism allows the minimum preparation pattern
to be easily deployed.



import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

public class ProcessingThreadTest {
 public static final int MAX_THREADS = 10;
 public static class ProcessingThread extends Thread {
  private final String ident;
  private final CountDownLatch latch;
  public ProcessingThread(String ident_, CountDownLatch cdl_) {
   ident = ident_;
   latch = cdl_;
  }
  public String getIdentifier() {
   return ident;
  }
  public void initialize() {
   latch.countDown();
   try {
    latch.await();
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   
  }
  @Override
  public void run() {
   initialize();
   while(true){
    System.out.println(getIdentifier());
    try {
     Thread.sleep(500);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }
 }
 public static void main(String[] args) {
  final int quorum = 1 + (int) (MAX_THREADS / 2);
  final CountDownLatch cdl = new CountDownLatch(quorum);
  final Set<ProcessingThread> nodes = new HashSet<>();
  try {
   for (int i = 0; i < MAX_THREADS; i++) {
    ProcessingThread local = new ProcessingThread("localhost:" + (9000 + i), cdl);
    nodes.add(local);
    local.start();
    Thread.sleep(1000);
    System.out.println("-----------------");
   }
  } catch (InterruptedException e) {
  } finally {
  }
 }
}









Wednesday, August 10, 2016

ReentrantLock usage and fix DeadLock

Test code:
Two MicroBlogNode to update themselves and treat each other as backup, then call confirm on the backup.

public class UpdateTest{
 public static void main(String[] args){
  Update.Builder ub = new Update.Builder();
  Author myAuthor = new Author();
  final Update u1 = ub.author(myAuthor).updateText("Hello1").build();
  final Update u2 = ub.author(myAuthor).updateText("Hello2").build();
  
  final MicroBlogNode mbn1 = new MicroBlogNode("11");
  final MicroBlogNode mbn2 = new MicroBlogNode("22");
  new Thread(){
   @Override
   public void run(){
    mbn1.propagateUpdate(u1, mbn2);
   }
  }.start();
  new Thread(){
   @Override
   public void run(){
    mbn2.propagateUpdate(u2, mbn1);
   }
  }.start();
 }
}

Look at a deadlock first, without ReentrantLock:

public class MicroBlogNode{
 private final String ident;
 public MicroBlogNode(String ident_) {
  ident = ident_;
 }
 public String getIdent() {
  return ident;
 }
 public synchronized void propagateUpdate(Update upd_, MicroBlogNode backup_) {
  System.out.println(ident + ": recvd: " + upd_.getUpdateText()
    + " ; backup: " + backup_.getIdent());
  backup_.confirmUpdate(this, upd_);
 }
 public synchronized void confirmUpdate(MicroBlogNode other_, Update update_) {
  System.out.println(ident + ": recvd confirm: "
    + update_.getUpdateText() + " from " + other_.getIdent());
 }
}
When Thread A in propagateUpdate and Thread B also in propagateUpdate, both of them are holding their own lock and waiting for each other to release the lock. This will cause deadlock.

Then we use ReentrantLock instead of synchronized, but this one is still deadlock in the same way.

public class MicroBlogNode1{
 private final String ident;
 private final Lock lock = new ReentrantLock();
 public MicroBlogNode1(String ident_) {
  ident = ident_;
 }
 public String getIdent() {
  return ident;
 }
 public void propagateUpdate(Update upd_, MicroBlogNode1 backup_) {
  lock.lock();
  try {
   System.out.println(ident + ": recvd: " + upd_.getUpdateText()
     + " ; backup: " + backup_.getIdent());
   backup_.confirmUpdate(this, upd_);
  }finally{
   lock.unlock();
  }
 }
 public void confirmUpdate(MicroBlogNode1 other_, Update update_) {
  lock.lock();
  try {
   System.out.println(ident + ": recvd confirm: "
    + update_.getUpdateText() + " from " + other_.getIdent());
  }finally{
   lock.unlock();
  }
 }
}

Trying to fix the deadlock, we can use tryLock with a timeout, and retry until we get the lock.
But this is still will cause  live busy DeadLock, because there will be a time Thread A and B keeping their own locks and retrying to acquire each other's.

public class MicroBlogNode2 {
 private final String ident;
 private final Lock lock = new ReentrantLock();
 public MicroBlogNode2(String ident_) {
  ident = ident_;
 } 
 private String getIdent() {
  return ident;
 }
 public void propagateUpdate(Update upd_, MicroBlogNode2 backup_) {
  boolean acquired = false;
  while (!acquired) {
   try {
    int wait = (int) (Math.random() * 10);
    acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS);
    if (acquired) {
     System.out.println(ident + ": recvd: "
       + upd_.getUpdateText() + " ; backup: "
       + backup_.getIdent());
     backup_.confirmUpdate(this, upd_);
    } else {
     Thread.sleep(wait);
    }
   } catch (InterruptedException e) {
   } finally {
     // Attempts to lock other thread b Try and lock, with random
     // timeout Confirm on other thread
    if (acquired)
     lock.unlock();
   }
  }
 }
 public void confirmUpdate(MicroBlogNode2 other_, Update upd_) {
  lock.lock();// Attempts to lock other thread
  try {
   System.out.println(ident + ": recvd confirm: "
     + upd_.getUpdateText() + " from " + other_.getIdent());
  } finally {
   lock.unlock();
  }
 }
}

To really fix the DeadLock, the Thread need to release the lock when failed to acquire other's lock.
public class MicroBlogNode3 {
 private final String ident;
 private final Lock lock = new ReentrantLock();
 public MicroBlogNode3(String ident_) {
  ident = ident_;
 }
 public String getIdent() {
  return ident;
 }
 public void propagateUpdate(Update upd_, MicroBlogNode3 backup_) {
  boolean acquired = false;
  boolean done = false;
  while (!done) {
   int wait = (int) (Math.random() * 10);
   try {
    acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS);
    if (acquired) {
     System.out.println(ident + ": recvd: "
       + upd_.getUpdateText() + " ; backup: "
       + backup_.getIdent());
     done = backup_.tryConfirmUpdate(this, upd_); //Examine return from tryConfirmUpdate()
    }
   } catch (InterruptedException e) {
   } finally {
    if (acquired)
     lock.unlock();
   }
   if (!done)
    try {
     Thread.sleep(wait);
    } catch (InterruptedException e) {
    }
  }
 }
 public boolean tryConfirmUpdate(MicroBlogNode3 other_, Update upd_) {
  boolean acquired = false;
  try {
   long startTime = System.currentTimeMillis();
   int wait = (int) (Math.random() * 10);
   acquired = lock.tryLock(wait, TimeUnit.MILLISECONDS);
   if (acquired) {
    long elapsed = System.currentTimeMillis() - startTime;
    System.out.println(ident + ": recvd confirm: "
      + upd_.getUpdateText() + " from " + other_.getIdent()
      + " - took " + elapsed + " millis");
    return true;
   }
  } catch (InterruptedException e) {
  } finally {
   if (acquired)
    lock.unlock();
  }
  return false;
 }
}





















Builder Pattern for Immutability

For immutability, we can use either Factory or Builder

If we have a lot parameters in the constructor, the code is kind of mess. And you may have 100 constructors because of this.

Builder pattern:
This is a combination of two constructs:

  • A static inner class that implements a generic builder interface
  • And a private constructor for the immutable class itself.

The static inner class is the builder for the immutable class, and it provides the only way that a developer can get hold of new instances of the immutable type.
One very common implementation is for the Builder class to have exactly the same fields as the immutable class, but to allow mutation of the fields.
This listing shows how you might use this to model a microblogging update (again, building on the earlier listings in this chapter).


public interface ObjBuilder {
 T build();
}


public class Update {

   //   Final fields must be initialized in constructor

 private final Author author;
 private final String updateText;

 private Update(Builder b_) {
  author = b_.author;
  updateText = b_.updateText;
 }

  // Builder class must be static inner

 public static class Builder implements ObjBuilder<Update> {
  private Author author;
  private String updateText;
  
  // Methods on Builder return Builder for chain calls

  public Builder author(Author author_) {
   author = author_;
   return this;
  }

  public Builder updateText(String updateText_) {
   updateText = updateText_;
   return this;
  }

  public Update build() {
   return new Update(this);
  }
   //hashCode() and equals() methods omitted
   
 }
}
class UpdateTest{
 public static void main(String[] args){
  Update.Builder ub = new Update.Builder();
  Author myAuthor = new Author();
  Update u = ub.author(myAuthor ).updateText("Hello").build();
 }
}



With this code, you could then create a new Update object like this:
 Update.Builder ub = new Update.Builder();
 Update u = ub.author(myAuthor).updateText("Hello").build();




















Tuesday, August 2, 2016

Reference Type Strong/Soft/Weak/Phantom

Strong
Regular reference

Soft
Used for memory cache, only gc when memory not enough

Weak
Used for storing data, only gc on the second time. Example: ClassLoader

Phantom
Used only for monitoring if object has already been gc

Reference Queue
When an object has been gc, we can poll it from ReferenceQueue


import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;

public class ReferenceType {
 public static void main(String[] args) throws InterruptedException{
  String s1 = new String("aaa");
  String s2 = new String("bbb");
  String s3 = new String("ccc");
  
  ReferenceQueue<String> srq = new ReferenceQueue<String>();
  SoftReference<String> ss = new SoftReference<String>(s1,srq);
  ReferenceQueue<String> wrq = new ReferenceQueue<String>();
  WeakReference<String> ws = new WeakReference<String>(s2,wrq);
  ReferenceQueue<String> prq = new ReferenceQueue<String>();
  PhantomReference<String> ps = new PhantomReference<String>(s3,prq);
  
  s1 = null;
  s2 = null;
  s3 = null;
  
  System.gc();
    
  System.out.println("ss="+ss.get());
  System.out.println("srq="+srq.poll());
  System.out.println("ws="+ws.get());
  System.out.println("wrq="+wrq.poll());
  System.out.println("ps="+ps.get());
  System.out.println("prq="+prq.poll());
 }
}


Output result
ss=aaa
srq=null
ws=null
wrq=java.lang.ref.WeakReference@15db9742
ps=null
prq=java.lang.ref.PhantomReference@6d06d69c




Saturday, May 28, 2016

Hadoop 2 HA setup config with ZooKeeper

Hadoop 2.x
  • HDFS: NN Federation, HA(High Availability)
  • MapReduce: Runs on YARN
  • YARN: Resource Manage System
HDFS 2.x

Fix HDFS 1.0 single node failure
  • HDFS HA: Master/Backup NameNode
  • If Master NameNode failure, then switch to Backup NameNode
Fix HDFS 1.0 memory limition problem
  • HDFS Federation
  • Expandable, support multiple NameNode
  • Every NameNode manages a part of directory
  • All NameNode share all DataNode storage resources
HDFS 2.x only changed architecture, it does not change the way of usage
It is transparent to HDFS user
Compatible HDFS 1.x command and API



HDFS HA(High Availability)




ZK: ZooKeeper
ZKFC: ZooKeeper Failover Controller
JN: Journal Node
NN: NameNode
DN: DataNode

Looking at the flow Bottom up.

DataNode reports to both Active NameNode and Standby NameNode.
DataNode only receives command from Active NameNode

NameNode's state info will not stored in NameNode itself. It will be stored in a cluster of JournalNodes.
When Active NameNode fails, Standby NameNode will automatically grab info from JournalNode, and continue the jobs.
When Active NameNode manually proceed to update, we can send command to switch between Active and Standby NameNode.

FailoverControllers report heartbeats to ZooKeeper. ZooKeeper is used for HA.
Client send request to ZooKeeper, ZooKeeper tells Client which NameNode to work with you. Then Client talks to NameNode.


Auto Failover switch
  • Zookeeper Failover Controller: monitor status of NameNode
  • Register NameNode to Zookeeper
  • WHen NameNode fails, ZKFC(ZooKeeperFailverController) tries to acquire lock for NameNode.
  • The NameNode which acquired the lock will become Active


HDFS 2.x Federation
  • Federation is used for super big amount data, so we need multiple NameNodes, each NameNode is independent, but sharing all DataNodes.
  • Independent means completely independent of NameNode, so if we want to do HA for NameNodes, each NameNode need to configure its own HA.
  • use namenode/namespace to separate data storage and management into different nodes. This make namenode/namespace can expand by simply adding machines.
  • Can separate load of one namenode to multiple nodes. This will not reduce efficiency when HDFS data is big. We can use multiple namespace to separate different types of application. Allocate different data storage/management into different namenode based on application type.


Yarn
Yet Another Distributed Resource Negotiator

Hadoop 2.0 imported ResourceManager and ApplicationMaster.
  • Separate JobTracker's Resource Management and Task Dispatch from MRv1.
    Replaced: JobTracker and TaskTracker from Hadoop 1.0
    To : ResourceManager and ApplicationMaster
  • ResourceManager: For whole cluster resource management and dispatch(Only exists one for each cluster)
  • ApplicationMaster: For application related transactions, like task dispatch, task monitor, fault tolerant, etc. Each application has one ApplicationMaster
YARN: make multiple calculation framework works in one cluster
It accomplished a lot framework to be interfaced.
  • Each application has one ApplicationMaster
  • Multiple frameworks can work same time on YARN: MapReduce, Spark, Storm, etc..



HA HDFS Cluster build example

NameNode DataNode ZooKeeper ZKFC JournalNode ResourceManager DataManager
hadoop-yarn 1 1 1 1
hadoop-node1 1 1 1 1 1 1
hadoop-node2 1 1 1 1
hadoop-node3 1 1 1

Followed this doc:
We need

  • dfs.nameservices as name service ID
  • dfs.ha.namenodes.[name service ID]: as NameNode IDs
  • dfs.namenode.rpc-address.[nameservice ID].[name node ID]
  • dfs.namenode.http-address.[nameservice ID].[name node ID]
  • dfs.namenode.shared.edits.dir
  • dfs.client.failover.proxy.provider.[nameservice ID]
  • dfs.ha.fencing.methods
  • fs.defaultFS
  • dfs.journalnode.edits.dir
I started from here, build a basic distributed hadoop first: http://gvace.blogspot.com/2016/05/hadoop-centos-build.html
Then apply with some changes:
  • create soft link in /usr/lib: hadoop -> hadoop-2.6.4
  • change tmp folder from /usr/lib/hadoop-2.6.4/data/tmp/ to /var/opt/hadoop/data/tmp
  • install ZooKeeper-3.4.8
  • create soft link in /usr/lib: zookeeper -> zookeeper-3.4.8
  • create folder /var/opt/zookeeper/tmp/data


hdfs-site.xml
Instead of config single NameNode, we build cluster
        <property>
                <name>dfs.nameservices</name>
                <value>mycluster</value>
        </property>
        <property>
                <name>dfs.ha.namenodes.mycluster</name>
                <value>nn1,nn2</value>
        </property>
        <property>
                <name>dfs.namenode.rpc-address.mycluster.nn1</name>
                <value>hadoop-yarn.gvace.com:8020</value>
        </property>
        <property>
                <name>dfs.namenode.rpc-address.mycluster.nn2</name>
                <value>hadoop-node1.gvace.com:8020</value>
        </property>
        <property>
                <name>dfs.namenode.http-address.mycluster.nn1</name>
                <value>hadoop-yarn.gvace.com:50070</value>
        </property>
        <property>
                <name>dfs.namenode.http-address.mycluster.nn2</name>
                <value>hadoop-node1.gvace.com:50070</value>
        </property>
        <property>
                <name>dfs.namenode.shared.edits.dir</name>
                <value>qjournal://hadoop-node1.gvace.com:8485;hadoop-node2.gvace.com:8485;hadoop-node3.gvace.com:8485/mycluster</value>
        </property>
        <property>
                <name>dfs.client.failover.proxy.provider.mycluster</name>
                <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
        </property>
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>sshfence</value>
        </property>
        <property>
                <name>dfs.ha.fencing.ssh.private-key-files</name>
                <value>/home/yushan/.ssh/id_rsa</value>
        </property>
        <property>
                <name>dfs.journalnode.edits.dir</nam<configuration>

<!-- Site specific YARN configuration properties -->
        <property>
                <description>The hostname of the RM.</description>
                <name>yarn.resourcemanager.hostname</name>
                <value>hadoop-yarn.gvace.com</value>
        </property>
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>

</configuration>e>
                <value>/var/opt/hadoop/data/tmp/journal/edits</value>
        </property>
        <property>
                <name>dfs.ha.automatic-failover.enabled</name>
                <value>true</value>
        </property>


core-site.xml
Instead of using single NameNode, we use cluster
<configuration>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://mycluster</value>
        </property>
        <property>
                <name>ha.zookeeper.quorum</name>
                <value>hadoop-yarn.gvace.com:2181,hadoop-node1.gvace.com:2181,hadoop-node2.gvace.com:2181</value>
        </property>
<!--
        <property>
                <name>fs.default.name</name>
                <value>hdfs://hadoop-yarn.gvace.com:8020</value>
                <description>NameNode</description>
        </property>
-->
        <property>
                <name>hadoop.tmp.dir</name>
                <value>/var/opt/hadoop/data/tmp</value>
        </property>
</configuration>



Also ZooKeeper install, and assign id for each zookeeper

Create file: /var/opt/zookeeper/tmp/data/myid in each ZooKeeper Node
Assign content "1", "2", "3" for each ZooKeeper myid file, which is an id for ZooKeeper Node

Config ZooKeeper
In /usr/lib/zookeeper/conf, copy zoo.cfg from  zoo_sample.cfg
Change zoo.cfg, assign and create dataDir, assign server host with server id for each ZooKeeper Node

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/var/opt/zookeeper/tmp/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=hadoop-yarn.gvace.com:2888:3888
server.2=hadoop-node1.gvace.com:2888:3888
server.3=hadoop-node2.gvace.com:2888:3888




Then start zookeeper on each ZooKeeper Node
./zkServer.sh start
You will see QuorumPeerMain in running as Java Application in jps

Start each Journal Node
hadoop-daemon.sh start journalnode

Format on one of the NameNode
./bin/hdfs namenode -format

Start the formatted NameNode
./sbin/hadoop-daemon.sh start namenode

Start StandBy NameNode from StandBy NameNode, this will synchronize formatted image files from the formated NameNode
./bin/hdfs namenode -bootstrapStandby

Format ZKFC, just one of the NameNode host
./bin/hdfs zkfc -formatZK

Stop all services from just one NameNode
sbin/stop-dfs.sh

Start all services from just one NameNode
sbin/start-dfs.sh

We will not sure which NameNode will be Active, it depends on which NameNode acquired the lock first.


After all install/config/format, one NameNode Restart









































Saturday, May 21, 2016

MapReduce Examples

Running MapReduce in YARN

We have some default example of MapReduce jars from Hadoop


Example: WordCount

Put a text file into hdfs system, I put it as /data01/LICENSE.txt
I want the output result to /data01/LICENSE_WordCount.txt
(I was thinking the output result is just one file, but it has to be a folder), so here LICENSE_WordCount.txt is a folder name

Run MR
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar wordcount /data01/LICENSE.txt /data01/LICENSE_WordCount


The result folder contains an empty file named _SUCCESS which marks the job is succeed.
And a result file: part-r-00000




Example: pi

Pick up one of them, what I choose is pi
Run it:
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi

It requires two arguments: <nMaps> and <nSamples>
Usage: org.apache.hadoop.examples.QuasiMonteCarlo <nMaps> <nSamples>
Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|resourcemanager:port>    specify a ResourceManager
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
We assigned 5 maps, each map with 20 samples
bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 5 20





Looking at the running status of MapReduce job, we found that


We assigned 5 maps, each map with 20 samples.
Each Map has a Container. But the running MapReduce has 6 containers. 
The one addition container is ApplicationMaster.



Looking at the result of running MR job:
[yushan@hadoop-yarn hadoop-2.6.4]$ bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.4.jar pi 5 20
Number of Maps  = 5
Samples per Map = 20
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Starting Job
16/05/21 15:23:50 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/05/21 15:23:50 INFO input.FileInputFormat: Total input paths to process : 5
16/05/21 15:23:51 INFO mapreduce.JobSubmitter: number of splits:5
16/05/21 15:23:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1463856930642_0002
16/05/21 15:23:51 INFO impl.YarnClientImpl: Submitted application application_1463856930642_0002
16/05/21 15:23:51 INFO mapreduce.Job: The url to track the job: http://hadoop-yarn.gvace.com:8088/proxy/application_1463856930642_0002/
16/05/21 15:23:51 INFO mapreduce.Job: Running job: job_1463856930642_0002
16/05/21 15:23:59 INFO mapreduce.Job: Job job_1463856930642_0002 running in uber mode : false
16/05/21 15:23:59 INFO mapreduce.Job:  map 0% reduce 0%
16/05/21 15:24:36 INFO mapreduce.Job:  map 100% reduce 0%
16/05/21 15:24:36 INFO mapreduce.Job: Task Id : attempt_1463856930642_0002_m_000002_0, Status : FAILED
Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
16/05/21 15:24:37 INFO mapreduce.Job:  map 80% reduce 0%
16/05/21 15:24:47 INFO mapreduce.Job:  map 100% reduce 0%
16/05/21 15:24:50 INFO mapreduce.Job:  map 100% reduce 100%
16/05/21 15:24:50 INFO mapreduce.Job: Job job_1463856930642_0002 completed successfully
16/05/21 15:24:50 INFO mapreduce.Job: Counters: 51
File System Counters FILE: Number of bytes read=116
FILE: Number of bytes written=643059
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1385
HDFS: Number of bytes written=215
HDFS: Number of read operations=23
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters  Failed map tasks=1
Launched map tasks=6
Launched reduce tasks=1
Other local map tasks=1
Data-local map tasks=5
Total time spent by all maps in occupied slots (ms)=189730
Total time spent by all reduces in occupied slots (ms)=9248
Total time spent by all map tasks (ms)=189730
Total time spent by all reduce tasks (ms)=9248
Total vcore-milliseconds taken by all map tasks=189730
Total vcore-milliseconds taken by all reduce tasks=9248
Total megabyte-milliseconds taken by all map tasks=194283520
Total megabyte-milliseconds taken by all reduce tasks=9469952
Map-Reduce Framework Map input records=5
Map output records=10
Map output bytes=90
Map output materialized bytes=140
Input split bytes=795
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=140
Reduce input records=10
Reduce output records=0
Spilled Records=20
Shuffled Maps =5
Failed Shuffles=0
Merged Map outputs=5
GC time elapsed (ms)=16465
CPU time spent (ms)=5220
Physical memory (bytes) snapshot=670711808
Virtual memory (bytes) snapshot=5995196416
Total committed heap usage (bytes)=619401216
Shuffle Errors BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters  Bytes Read=590
File Output Format Counters  Bytes Written=97
Job Finished in 60.235 seconds
Estimated value of Pi is 3.20000000000000000000


Seeing the Important factors:

Job ID
job_1463856930642_0002
Format for job id: job_timestamp_[job serial number]
0002: job serial number, starts from 0, up to 1000

Task Id
Task Id : attempt_1463856930642_0002_m_000002_0, Status : FAILED
Format for task id: attempt_timestamp_[job serial number]_m_[task serial number]_0
m means Map
r mean Reduce

Counters
MapReduce has 6 counters

  1. File System Counters
  2. Job Counters 
  3. Map-Reduce Framework
  4. Shuffle Errors
  5. File Input Format Counters 
  6. File Output Format Counters 

MapReduce History Server

Provides MapReduce's working logs, for example: Number of Maps, Number of Reduce, job submit time, job start time, job finish time, etc..
When we click on History of application, it's showing ERR_CONNECTION_REFUSED
It's pointing to port 19888, but port 19888 is not opened by default.

using netstat -tnlp to see the port it's really not opened.

To Start History Server
sbin/mr-jobhistory-daemon.sh start historyserver

Web UI
http://hadoop-yarn.gvace.com:19888/

To Stop History Server
sbin/mr-jobhistory-daemon.sh stop historyserver

























Hadoop HDFS Web UI


Now goto http://hadoop-yarn.gvace.com:50070/ we will see namenode info from webpage

Now goto http://hadoop-yarn.gvace.com:50090/ we will see secondarynamenode info from webpage






Started:Sat May 21 11:21:01 EDT 2016
Version:2.6.4, r5082c73637530b0b7e115f9625ed7fac69f937e6
Compiled:2016-02-12T09:45Z by jenkins from (detached from 5082c73)
Cluster ID:CID-4fc1b111-3c85-4e0a-9cd5-e44ec8fe0192
Block Pool ID:BP-796093918-192.168.56.95-1463114304816

Block Pool ID: New in Hadoop2,
HDFS files are stored as blocks in hard drive. These blocks were put into a physical pool to be managed.
Every Namenode has a pool, each pool has an ID

Cluster ID and Block Pool ID are concepts for Namenode Federation.




Logs

IN $HADOOP_HOME/logs

hadoop-yushan-namenode-hadoop-yarn.gvace.com.log 156419 bytes May 21, 2016 12:54:32 PM
hadoop-yushan-namenode-hadoop-yarn.gvace.com.out 4913 bytes May 21, 2016 11:31:45 AM



.log file: by log4j, log most application logs, configured by log4j.properties
.out file: output standard output and standard error logs, does not have a lot


log file name format:
[framework]-[username]-[process name]-[host name].log/outhadoop-yushan-namenode-hadoop-yarn.gvace.com.log


Seeing hadoop-yushan-namenode-hadoop-yarn.gvace.com.out
can also run command: ulimit -a


NameNode Journal Status

Edits of NameNode


NameNode Storage



Startup Progress
SecondaryNameNode combines fsimage and edits periodically, and update fsimage, delete  edits.
Each time it combines, it saves a checkpoint

Wednesday, May 11, 2016

MapReduce Concept

MapReduce: Offline task framework

Move calculation, not move data

Require Key->Value as both input and output

Separate task to two steps: Map and Reduce
  • Map: parallel process input data
  • Reduce: Combine Map results
Shuffle connects Map and Reduce
  • Map Task: write data into local hard disk
  • Reduce Task: read a copy of data from each Map Task

pros 
Only good for Offline task batch


  • Fault Tolerance, Expansibility
  • Good for simple batch task

cons
  • cost resources, use up too much hard disk, which cause low efficient

















Shuffler






Primitive Variable Type

intIntWritable
longLongWritable
floatFloatWritable
doubleDoubleWritable
StringText
charText


Example:
IntWritable value = new IntWritable(100);
int v = value.get();
Example for String:
Text text = new Text("abc");
String str = text.toString();
File InputFormat
RecordReader
Is an interface which reads file and convert each record to mapper.

File Input Format
The default File Input Format is TextInputFormat
By default, TextInputFormat use

  • LongWritable as KeyInput of Mapper
  • Text as ValueInput of Mapper


There other three File Input Format
And they REQUIRE to specify Type of KeyInput, ValueInput Manually

  • KeyValueTextInputFormat
    Seperate line with white space, using first word as key, the rest part as value
  • SequenceFileInputFormat
  • SequenceFileAsTextInputFormat

In One Map Task

In the whole procedure, the data result is Key-Value based. And we do all things on the Key, not Value. So we do partition for key, sort for key, merge for key. Not for value.

Partition
Separate data to different parts, and put result into Memory Buffer.
Partition can have customization, default is Hash-Mod, is for data-load-balance(but Hash-Mod may cause data-load-unbalance, so we may need to do customization to make it data-load-balance).
Based on this partition, it will later determine which part to go to which Reduce machine.


Spill
Before results in Memory Buffer exceeds the limit(default 100MB), we have to write data into disk as one file, so we can keep parsing more data. This is called spill.
So the sorted and partitioned data will be spill to disk. So this is just a buffer consideration, like use disk as an extension of memory.
Spill is accomplished by single thread, it does not affect the procedure of partition(writing data into Memory Buffer).
Spill.percent, default is 0.8.
When spill thread started, we need to sort these 80MB(0.8 * 100MB) keys.



Sort
Sort can have customization, we can determine how to compare and sort the data, default is compared by ascii value(dictionary order).


Merge on disk(Combiner)
Combiner may not existed, since some map results will never merge together.
So combiner is not required.
Merge on disk(Combiner) can have customization, default rule is Hash Value.
Once the memory buffer is spilled, and the buffer is cleared; when we parsing new data, we have no clue of previous partition status.
And the spilled data is already a file in hdfs, that we cannot change that file.
To bring all spilled results into one file, we have to re-parse all spilled files, and merge them on to disk(Combiner). So in this procedure, the data will be merged together by the combine rule.
The goal of Merge on disk(Combiner):
  1. put all spilled results from one map task into one single file.
  2. shrink the size of map results to me transmitted
  3. bashed on combiner rule, each partition of the result will choose the merge destination.(Not sure if this is right)



Conclusion: we can do customization on these three steps

  1. Partition
  2. Sort
  3. Combiner





In One Reduce Task
Misture of in-memory and on-disk data
This is NOT customizable. This reduce task receives(copy) different pieces of results from multiple maps. And put them in Memory Buffer.

Merge the received data with the same key. This may also exceed the Memory Buffer limit size.
So it will also spill results as a file on to hdfs each time when reaching Memory Buffer limit.

Then merge again for all spilled data(and also data from Memory Buffer).

All merged data will be passed to reduce, by sequence, one-by-one, not parallel, not the same time.

Then the data will pass to Recuder, can have customization, with the Key and Iterable<Value>

Finally the handled results will be Output from RecordWriter

And the file format are

  • part-00000
  • part-00001
  • part-00002
  • etc...

Output File Directory

  • _SUCCESS
  • _logs/
  • part-00000

Before MapReduce Job done, there will be an empty file _TEMP as a flag.
After job done, file name will be changed to _SUCCESS, and it's still an empty file as flag








MapReduce Split Size

If a block is too big, we need to split it to small pieces, then each piece will be processed as a single Map Task.
If a block is too small, it will have only one piece and only one Map Task.
  • max.split(100M)
  • min.split(10M)
  • block(64M)
  • max(min.split,min(max.split,block))







MapReduce Logs


MapReduce has two types of logs

  1. history job logs
  2. Container logs
History job logs

Stored in hdfs, staging
  • number of maps used
  • number of reduce used
  • job submission time
  • job start time
  • job finish time
  • how many job succeed
  • how many job failed
  • how many jobs each queue runned
  • etc...
Container Logs

Stored in ${HADOOP_HOME}/logs/userlogs
  • ApplicationMaster Logs
  • Task Logs
  • default dog saved in ${HADOOP_HOME}/logs/userlogs