Hadoop study notes-07MapReduce ReduceTask source code analysis

Hadoop study notes-07MapReduce ReduceTask source code analysis

Written in front: This article briefly introduces

ReduceTask
The operation steps of
ReduceTask
It is how to use the iterator mode to read data, avoiding the OOM problem of priority memory processing big data.

Source code analysis

We rewrite

Reducer
3.steps are written in the class:

  • Shuffle
    :identical
    key
    Pull to a partition
  • Sort
    :
    MapTask
    Have been different
    key
    The data has been sorted, and the data has arrived
    reduce
    There is no need to sort in the process. here
    Sort
    In fact, it is merged and sorted once, and the same
    key
    put it together
  • Reduce
    :which is
    reduce
    Calculation

First from

ReduceTask
of
run
From the perspective of the method, the above three steps are reflected in the method.

public void run (JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); if (isMapOrReduce()) { //reduceTask work is divided into three stages: copy, group sort, and reduce execution copyPhase = getProgress().addPhase( "copy" ); sortPhase = getProgress().addPhase( "sort" ); reducePhase = getProgress().addPhase( "reduce" ); } //start thread that will handle communication with parent TaskReporter reporter = startReporter(umbilical); boolean useNewApi = job.getUseNewReducer(); initialize(job, getJobID(), reporter, useNewApi); //check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return ; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return ; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return ; } //Initialize the codec codec = initCodec(); RawKeyValueIterator rIter = null ; ShuffleConsumerPlugin shuffleConsumerPlugin = null ; Class combinerClass = conf.getCombinerClass(); CombineOutputCollector combineCollector = ( null != combinerClass)? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf): null ; Class<? extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job); LOG.info( "Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super .lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter, shuffledMapsCounter, reduceShuffleBytes, failedShuffleCounter, mergedMapOutputsCounter, taskStatus, copyPhase, sortPhase, this , mapOutputFile, localMapFiles); //The initialization process of shuffle shuffleConsumerPlugin.init(shuffleContext); //Perform the shuffle operation to pull map records, and eventually return the iterator. rIter = shuffleConsumerPlugin.run(); //free up the data structures mapOutputFilesOnDisk.clear(); sortPhase.complete(); //sort is complete setPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); //Get group comparator RawComparator comparator = job.getOutputValueGroupingComparator(); if (useNewApi) { runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } else { runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } shuffleConsumerPlugin.close(); done(umbilical, reporter); } Copy code

The first is the first step,

shuffle
The process will be the same
key
Pull to a partition, pull data, and finally return to the iterator
rIter
. Due to big data processing, data cannot be loaded into memory at one time, so it is more appropriate to read data from disk to memory one by one through an iterator.

Then the second step, get the packet comparator

RawComparator comparator = job.getOutputValueGroupingComparator()
.

public RawComparator getOutputValueGroupingComparator () { Class<? extends RawComparator> theClass = getClass( JobContext.GROUP_COMPARATOR_CLASS, null , RawComparator.class); if (theClass == null ) { return getOutputKeyComparator(); } return ReflectionUtils.newInstance(theClass, this ); } public RawComparator getOutputKeyComparator () { Class<? extends RawComparator> theClass = getClass( JobContext.KEY_COMPARATOR, null , RawComparator.class); if (theClass != null ) return ReflectionUtils.newInstance(theClass, this ); //getMapOutputKeyClass uses the key's own comparator return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable) .class), this ); } Copy code

here

getOutputKeyComparator
in
MapTask
I have also encountered it. This method first obtains the user-defined group sorter, if the group sorter is empty, the sorting comparator is called, and if the sorting comparator is not set, it is used
key
Own comparator.

Packet sorter for comparison

key
Whether they are the same, the semantically returned boolean value. And the sorting comparator usually returns
-1 0 1
, Semantically is less than, equal to, and greater than. In fact, the comparator here returns
int
Type, and is also directly compared with 0. This piece of code will be encountered below.

public int compare ( byte [] b1, int s1, int l1, byte [] b2, int s2, int l2) ; nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0 , currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength()-nextKey.getPosition() ) == 0 ; copy the code

Then the third step, execute

reduce
method. Let's see
runNewReducer
method.

private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewReducer (JobConf job, final TaskUmbilicalProtocol umbilical, final TaskReporter reporter, RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass ) throws IOException,InterruptedException, ClassNotFoundException { //wrap value iterator to report progress. final RawKeyValueIterator rawIter = rIter; rIter = new RawKeyValueIterator() { public void close () throws IOException { rawIter.close(); } public DataInputBuffer getKey () throws IOException { return rawIter.getKey(); } public Progress getProgress () { return rawIter.getProgress(); } public DataInputBuffer getValue () throws IOException { return rawIter.getValue(); } public boolean next () throws IOException { boolean ret = rawIter.next(); reporter.setProgress(rawIter.getProgress().getProgress()); return ret; } }; //make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); //make a reducer org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getReducerClass(), job); //Write the Writer used by the reduce record org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>( this , taskContext); job.setBoolean( "mapred.skip.on" , isSkipping()); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); //Create the context of reduce and set some parameters of reduce runtime. org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(), rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW, committer, reporter, comparator, keyClass, valueClass); try { reducer.run(reducerContext); } finally { trackedRW.close(reducerContext); } } Copy code

createReduceContext
already setup
reduce
Runtime parameters, and iterators that can directly manipulate data
rIter
Packed up. And then passed the object to
reducer.run(reducerContext)
.

public void run (Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); //If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if (iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); } } } finally { cleanup(context); } } Copy code

versus

Mapper
middle
context.nextKeyValue()
Judge whether there is another piece of data that is different,
Reducer
of
context.nextKey()
It is to judge whether there is the next set of data.

public boolean nextKey () throws IOException,InterruptedException { while (hasMore && nextKeyIsSame) {//nextKeyIsSame defaults to false nextKeyValue(); } if (hasMore) { if (inputKeyCounter != null ) { inputKeyCounter.increment( 1 ); } return nextKeyValue(); } else { return false ; } } Copy code

1. it will determine whether there is the next piece of data, and determine whether it is a unified group

key
, And enter the loop if it is. But as long as there is data, it will be executed eventually
nextKeyValue
method.

public boolean nextKeyValue () throws IOException, InterruptedException { //If there is no data, return false if (!hasMore) { key = null ; value = null ; return false ; } //Determine whether it is the first data of a set of keys firstValue = !nextKeyIsSame; //Get key DataInputBuffer nextKey = input.getKey(); currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength()-nextKey.getPosition()); buffer.reset(currentRawKey.getBytes(), 0 , currentRawKey.getLength()); key = keyDeserializer.deserialize(key); //Get value DataInputBuffer nextVal = input.getValue(); buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength() -nextVal.getPosition()); value = valueDeserializer.deserialize(value); currentKeyLength = nextKey.getLength()-nextKey.getPosition(); currentValueLength = nextVal.getLength()-nextVal.getPosition(); if (isMarked) { backupStore.write(nextKey, nextVal); } //Continue to read the next record after reading the current kv, and return whether there is the next record hasMore = input.next(); if (hasMore) { //record the key of the next record nextKey = input.getKey(); //Compare whether the next record is the same key, and update nextKeyIsSame nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0 , currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength()-nextKey.getPosition() ) == 0 ; } else { nextKeyIsSame = false ; } inputValueCounter.increment( 1 ); return true ; } Copy code

This method will judge whether there is the next data after reading and writing the data, and if there is, then judge whether the next data is the same group as the current data

key
And update
hasNore
with
nextKeyIsSame
, Used in
nextKey
Continue to judge.

Look at

Reducer
middle
reduce
Method, including the parameters
context.getValues()
What is returned is a
Iterable
, The corresponding iterator is
ValueIterator
.

public Iterable<VALUEIN> getValues () throws IOException, InterruptedException { return iterable; } protected class ValueIterable implements Iterable < VALUEIN > { private ValueIterator iterator = new ValueIterator(); @Override public Iterator<VALUEIN> iterator () { return iterator; } } @Override public boolean hasNext () { try { if (inReset && backupStore.hasNext()) { return true ; } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException( "hasNext failed" , e); } return firstValue || nextKeyIsSame; } @Override public VALUEIN next () { if (inReset) { try { if (backupStore.hasNext()) { backupStore.next(); DataInputBuffer next = backupStore.nextValue(); buffer.reset(next.getData(), next.getPosition(), next.getLength() -next.getPosition()); value = valueDeserializer.deserialize(value); return value; } else { inReset = false ; backupStore.exitResetMode(); if (clearMarkFlag) { clearMarkFlag = false ; isMarked = false ; } } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException( "next value iterator failed" , e); } } //if this is the first record, we don't need to advance if (firstValue) { firstValue = false ; return value; } //if this isn't the first record and the next key is different, they //can't advance it here. if (!nextKeyIsSame) { throw new NoSuchElementException( "iterate past last value" ); } //otherwise, go to the next key/value pair try { nextKeyValue(); return value; } catch (IOException ie) { throw new RuntimeException( "next value iterator failed" , ie); } catch (InterruptedException ie) { //this is bad, but we can't modify the exception list of java.util throw new RuntimeException( "next value iterator interrupted" , ie); } } Copy code

hasNext
In the method, as long as it is the first piece of data in the same group, or the next piece of data is in the same group as this piece of data, the next piece of data exists.

next
In the method, if it is the first data in the same group, it will return directly
value
. Otherwise it needs to be executed
nextKeyValue
Method i.e. use
ReduceContextImpl
of
input
Get the next piece of data, and finally return
value
.

Conclusion

Briefly summarize

ReduceTask
Work flow:

  • ReduceTask
    Wrap the pulled data into an iterator
  • reduce
    When the method is called, it passes
    values
    The iterator of does not load the data into memory
  • In the iterator
    hasNext
    Method to determine whether it is the first item in a group of data, or whether the next data is in the same group (
    nextKeyIsSame
    )
  • next
    Method to set the first item of a set of data
    value
    Return directly, if not, use the real iterator to get the record and update
    nextKeyIsSame

It can be seen that in order to avoid the OOM problem caused by the large amount of data, the iterator mode is fully utilized. Plus

MapTask
The data has been sorted, so the iterator only needs one time
I/O
All data can be processed linearly.