Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to directly send the output of a mapper-reducer to a another mapper-reducer without saving the output into the hdfs

Tags:

hadoop

mahout

Problem Solved Eventually check my solution in the bottom


Recently I am trying to run the recommender example in the chaper6 (listing 6.1 ~ 6.4)from the Mahout in Action. But I encountered a problem and I have googled around but I can't find the solution.

Here is the problem: I have a pair of mapper-reducer

public final class WikipediaToItemPrefsMapper extends
    Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}
}

public class WikipediaToUserVectorReducer
    extends
    Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
        Iterable<VarLongWritable> itemPrefs, Context context)
        throws IOException, InterruptedException {
    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (VarLongWritable itemPref : itemPrefs) {
        userVector.set((int) itemPref.get(), 1.0f);
    }
    context.write(userID, new VectorWritable(userVector));
}
}

The reducer output a userID and a userVector and it looks like this: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}

Then I want to use another pair of mapper-reducer to process this data

public class UserVectorSplitterMapper
    extends
    Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
        throws IOException, InterruptedException {
    long userID = key.get();
    Vector userVector = value.get();
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    IntWritable itemIndexWritable = new IntWritable();
    while (it.hasNext()) {
        Vector.Element e = it.next();
        int itemIndex = e.index();
        float preferenceValue = (float) e.get();
        itemIndexWritable.set(itemIndex);
        context.write(itemIndexWritable, 
                new VectorOrPrefWritable(userID, preferenceValue));
    }
}
}

When I try to run the job, it cast error says

org.apache.hadoop.io.Text cannot be cast to org.apache.mahout.math.VectorWritable

the first mapper-reducer write the output into the hdfs, and the second mapper-reducer try to read the output, the mapper can cast the 98955 to VarLongWritable, but can't convert {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} to VectorWritable, So I am wondering is there a way to make the first mapper-reducer directly send the output to the second pair, then there is no need to do the data converting. I have looked up Hadoop in action, and hadoop: the definitive guide, it seems there is no such a way to do that, any suggestions?


Problem solved

Solution: By using SequenceFileOutputFormat, we can output and save the reduce result of the first MapReduce workflow on the DFS, then the second MapReduce workflow can read the temporary file as input by using SequenceFileInputFormat class as parameter when creating the mapper. Since the vector would be saved in binary sequence file which has specific format, the SequenceFileInputFormat can read it and transform it back to vector format.

Here are some example code:

confFactory ToItemPrefsWorkFlow = new confFactory
            (new Path("/dbout"), //input file path
             new Path("/mahout/output.txt"), //output file path
             TextInputFormat.class, //input format
             VarLongWritable.class, //mapper key format
             Item_Score_Writable.class, //mapper value format
             VarLongWritable.class, //reducer key format
             VectorWritable.class, //reducer value format
             **SequenceFileOutputFormat.class** //The reducer output format             
             
    );
    ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
    ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
    JobConf conf1 = ToItemPrefsWorkFlow.getConf();
    
    
    confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
            (new Path("/mahout/output.txt"),
             new Path("/mahout/UserVectorToCooccurrence"),
             SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
             //UserVectorToCooccurrenceMapper.class,
             IntWritable.class,
             IntWritable.class,
             IntWritable.class,
             VectorWritable.class,
             SequenceFileOutputFormat.class                                      
             );
     UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
     UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
    JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();
    
    JobClient.runJob(conf1);
    JobClient.runJob(conf2);

If you have any problem with this please feel free to contact me

like image 208
dotcomXY Avatar asked Apr 22 '12 08:04

dotcomXY


People also ask

What is Hadoop Java mapper and reducer?

The Hadoop Java programs are consist of Mapper class and Reducer class along with the driver class. Hadoop Mapper is a function or task which is used to process all input records from a file and generate the output which works as input for Reducer. It produces the output by returning new key-value pairs.

How to write the mapper output to HDFS?

On local disk, this Mapper output is first stored in a buffer whose default size is 100MB which can be configured with io.sort.mb property. The output of the mapper can be written to HDFS if and only if the job is Map job only, In that case, there will be no Reducer task so the intermediate output is our final output which can be written on HDFS.

How do mappers and reducers work?

The output of each mapper is sent to the sorter which will sort the key-value pairs according to its key value. Shuffling also takes place during the sorting process and the output will be sent to the Reducer part and final output is produced. Let’s take an example to understand the working of Reducer.

What happens to the output of each mapper?

The output of each mapper is sent to the sorter which will sort the key-value pairs according to its key value. Shuffling also takes place during the sorting process and the output will be sent to the Reducer part and final output is produced.


1 Answers

You need to explicitly configure the output of the first job to use the SequenceFileOutputFormat and define the output key and value classes:

job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(VarLongWritable.class);
job.setOutputKeyClass(VectorWritable.class);

Without seeing your driver code, i'm guessing you're using TextOutputFormat as the output, of the first job, and TextInputFormat as the input to the second - and this input format sends pairs of <Text, Text> to the second mapper

like image 93
Chris White Avatar answered Nov 15 '22 11:11

Chris White