Wednesday, May 11, 2016

MapReduce Concept

MapReduce: Offline task framework

Move calculation, not move data

Require Key->Value as both input and output

Separate task to two steps: Map and Reduce
  • Map: parallel process input data
  • Reduce: Combine Map results
Shuffle connects Map and Reduce
  • Map Task: write data into local hard disk
  • Reduce Task: read a copy of data from each Map Task

Only good for Offline task batch

  • Fault Tolerance, Expansibility
  • Good for simple batch task

  • cost resources, use up too much hard disk, which cause low efficient


Primitive Variable Type


IntWritable value = new IntWritable(100);
int v = value.get();
Example for String:
Text text = new Text("abc");
String str = text.toString();
File InputFormat
Is an interface which reads file and convert each record to mapper.

File Input Format
The default File Input Format is TextInputFormat
By default, TextInputFormat use

  • LongWritable as KeyInput of Mapper
  • Text as ValueInput of Mapper

There other three File Input Format
And they REQUIRE to specify Type of KeyInput, ValueInput Manually

  • KeyValueTextInputFormat
    Seperate line with white space, using first word as key, the rest part as value
  • SequenceFileInputFormat
  • SequenceFileAsTextInputFormat

In One Map Task

In the whole procedure, the data result is Key-Value based. And we do all things on the Key, not Value. So we do partition for key, sort for key, merge for key. Not for value.

Separate data to different parts, and put result into Memory Buffer.
Partition can have customization, default is Hash-Mod, is for data-load-balance(but Hash-Mod may cause data-load-unbalance, so we may need to do customization to make it data-load-balance).
Based on this partition, it will later determine which part to go to which Reduce machine.

Before results in Memory Buffer exceeds the limit(default 100MB), we have to write data into disk as one file, so we can keep parsing more data. This is called spill.
So the sorted and partitioned data will be spill to disk. So this is just a buffer consideration, like use disk as an extension of memory.
Spill is accomplished by single thread, it does not affect the procedure of partition(writing data into Memory Buffer).
Spill.percent, default is 0.8.
When spill thread started, we need to sort these 80MB(0.8 * 100MB) keys.

Sort can have customization, we can determine how to compare and sort the data, default is compared by ascii value(dictionary order).

Merge on disk(Combiner)
Combiner may not existed, since some map results will never merge together.
So combiner is not required.
Merge on disk(Combiner) can have customization, default rule is Hash Value.
Once the memory buffer is spilled, and the buffer is cleared; when we parsing new data, we have no clue of previous partition status.
And the spilled data is already a file in hdfs, that we cannot change that file.
To bring all spilled results into one file, we have to re-parse all spilled files, and merge them on to disk(Combiner). So in this procedure, the data will be merged together by the combine rule.
The goal of Merge on disk(Combiner):
  1. put all spilled results from one map task into one single file.
  2. shrink the size of map results to me transmitted
  3. bashed on combiner rule, each partition of the result will choose the merge destination.(Not sure if this is right)

Conclusion: we can do customization on these three steps

  1. Partition
  2. Sort
  3. Combiner

In One Reduce Task
Misture of in-memory and on-disk data
This is NOT customizable. This reduce task receives(copy) different pieces of results from multiple maps. And put them in Memory Buffer.

Merge the received data with the same key. This may also exceed the Memory Buffer limit size.
So it will also spill results as a file on to hdfs each time when reaching Memory Buffer limit.

Then merge again for all spilled data(and also data from Memory Buffer).

All merged data will be passed to reduce, by sequence, one-by-one, not parallel, not the same time.

Then the data will pass to Recuder, can have customization, with the Key and Iterable<Value>

Finally the handled results will be Output from RecordWriter

And the file format are

  • part-00000
  • part-00001
  • part-00002
  • etc...

Output File Directory

  • _logs/
  • part-00000

Before MapReduce Job done, there will be an empty file _TEMP as a flag.
After job done, file name will be changed to _SUCCESS, and it's still an empty file as flag

MapReduce Split Size

If a block is too big, we need to split it to small pieces, then each piece will be processed as a single Map Task.
If a block is too small, it will have only one piece and only one Map Task.
  • max.split(100M)
  • min.split(10M)
  • block(64M)
  • max(min.split,min(max.split,block))

MapReduce Logs

MapReduce has two types of logs

  1. history job logs
  2. Container logs
History job logs

Stored in hdfs, staging
  • number of maps used
  • number of reduce used
  • job submission time
  • job start time
  • job finish time
  • how many job succeed
  • how many job failed
  • how many jobs each queue runned
  • etc...
Container Logs

Stored in ${HADOOP_HOME}/logs/userlogs
  • ApplicationMaster Logs
  • Task Logs
  • default dog saved in ${HADOOP_HOME}/logs/userlogs

No comments:

Post a Comment