Problem Solved Eventually check my solution in the bottom
Recently I am trying to run the recommender example in the chaper6 (listing 6.1 ~ 6.4)from the Mahout in Action. But I encountered a problem and I have googled around but I can't find the solution.
Here is the problem: I have a pair of mapper-reducer
public final class WikipediaToItemPrefsMapper extends
Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {
private static final Pattern NUMBERS = Pattern.compile("(\\d+)");
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
Matcher m = NUMBERS.matcher(line);
m.find();
VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
VarLongWritable itemID = new VarLongWritable();
while (m.find()) {
itemID.set(Long.parseLong(m.group()));
context.write(userID, itemID);
}
}
}
public class WikipediaToUserVectorReducer
extends
Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {
public void reduce(VarLongWritable userID,
Iterable<VarLongWritable> itemPrefs, Context context)
throws IOException, InterruptedException {
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (VarLongWritable itemPref : itemPrefs) {
userVector.set((int) itemPref.get(), 1.0f);
}
context.write(userID, new VectorWritable(userVector));
}
}
The reducer output a userID and a userVector and it looks like this: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}
Then I want to use another pair of mapper-reducer to process this data
public class UserVectorSplitterMapper
extends
Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {
public void map(VarLongWritable key, VectorWritable value, Context context)
throws IOException, InterruptedException {
long userID = key.get();
Vector userVector = value.get();
Iterator<Vector.Element> it = userVector.iterateNonZero();
IntWritable itemIndexWritable = new IntWritable();
while (it.hasNext()) {
Vector.Element e = it.next();
int itemIndex = e.index();
float preferenceValue = (float) e.get();
itemIndexWritable.set(itemIndex);
context.write(itemIndexWritable,
new VectorOrPrefWritable(userID, preferenceValue));
}
}
}
When I try to run the job, it cast error says
org.apache.hadoop.io.Text cannot be cast to org.apache.mahout.math.VectorWritable
the first mapper-reducer write the output into the hdfs, and the second mapper-reducer try to read the output, the mapper can cast the 98955 to VarLongWritable, but can't convert {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} to VectorWritable, So I am wondering is there a way to make the first mapper-reducer directly send the output to the second pair, then there is no need to do the data converting. I have looked up Hadoop in action, and hadoop: the definitive guide, it seems there is no such a way to do that, any suggestions?
Solution: By using SequenceFileOutputFormat, we can output and save the reduce result of the first MapReduce workflow on the DFS, then the second MapReduce workflow can read the temporary file as input by using SequenceFileInputFormat class as parameter when creating the mapper. Since the vector would be saved in binary sequence file which has specific format, the SequenceFileInputFormat can read it and transform it back to vector format.
Here are some example code:
confFactory ToItemPrefsWorkFlow = new confFactory
(new Path("/dbout"), //input file path
new Path("/mahout/output.txt"), //output file path
TextInputFormat.class, //input format
VarLongWritable.class, //mapper key format
Item_Score_Writable.class, //mapper value format
VarLongWritable.class, //reducer key format
VectorWritable.class, //reducer value format
**SequenceFileOutputFormat.class** //The reducer output format
);
ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
JobConf conf1 = ToItemPrefsWorkFlow.getConf();
confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
(new Path("/mahout/output.txt"),
new Path("/mahout/UserVectorToCooccurrence"),
SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
//UserVectorToCooccurrenceMapper.class,
IntWritable.class,
IntWritable.class,
IntWritable.class,
VectorWritable.class,
SequenceFileOutputFormat.class
);
UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();
JobClient.runJob(conf1);
JobClient.runJob(conf2);
If you have any problem with this please feel free to contact me
The Hadoop Java programs are consist of Mapper class and Reducer class along with the driver class. Hadoop Mapper is a function or task which is used to process all input records from a file and generate the output which works as input for Reducer. It produces the output by returning new key-value pairs.
On local disk, this Mapper output is first stored in a buffer whose default size is 100MB which can be configured with io.sort.mb property. The output of the mapper can be written to HDFS if and only if the job is Map job only, In that case, there will be no Reducer task so the intermediate output is our final output which can be written on HDFS.
The output of each mapper is sent to the sorter which will sort the key-value pairs according to its key value. Shuffling also takes place during the sorting process and the output will be sent to the Reducer part and final output is produced. Let’s take an example to understand the working of Reducer.
The output of each mapper is sent to the sorter which will sort the key-value pairs according to its key value. Shuffling also takes place during the sorting process and the output will be sent to the Reducer part and final output is produced.
You need to explicitly configure the output of the first job to use the SequenceFileOutputFormat and define the output key and value classes:
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(VarLongWritable.class);
job.setOutputKeyClass(VectorWritable.class);
Without seeing your driver code, i'm guessing you're using TextOutputFormat as the output, of the first job, and TextInputFormat as the input to the second - and this input format sends pairs of <Text, Text>
to the second mapper
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With