Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NullPointerException from Hadoop's JobSplitWriter / SerializationFactory when calling InputSplit's getClass()

I'm getting a NullPointerException when launching a MapReduce job. It's being thrown by the SerializationFactory's getSerializer() method. I'm using a custom InputSplit, InputFormat, RecordReader and MapReduce value class.

I know the error is being thrown some time after the splits are being created by my InputFormat class, but before the creation of the RecordReader. As far as I can tell, it is occurring directly after the "cleaning up the staging area" message.

By checking the Hadoop source in the places indicated by the stack trace, it looks like the error is occuring when getSerialization() receives a null Class<T> pointer. The JobClient's writeNewSplits() calls that method like this:

Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());

Thus, I assume that when getClass() is being called on my custom InputSplit objects, it's returning a null pointer, but that is just baffling. Any ideas?

The full stack trace from the error follows:

12/06/24 14:26:49 INFO mapred.JobClient: Cleaning up the staging area hdfs://localhost:54310/tmp/hadoop-s3cur3/mapred/staging/s3cur3/.staging/job_201206240915_0035
Exception in thread "main" java.lang.NullPointerException
    at org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
    at org.apache.hadoop.mapreduce.split.JobSplitWriter.writeNewSplits(JobSplitWriter.java:123)
    at org.apache.hadoop.mapreduce.split.JobSplitWriter.createSplitFiles(JobSplitWriter.java:74)
    at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:968)
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:979)
    at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:500)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:530)
    at edu.cs.illinois.cogcomp.hadoopinterface.infrastructure.CuratorJob.start(CuratorJob.java:94)
    at edu.cs.illinois.cogcomp.hadoopinterface.HadoopInterface.main(HadoopInterface.java:58)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

Thanks!

EDIT: My code for the custom InputSplit follows:

import . . .

/**
 * A document directory within the input directory. 
 * Returned by DirectoryInputFormat.getSplits()
 * and passed to DirectoryInputFormat.createRecordReader().
 *
 * Represents the data to be processed by an individual Map process.
 */
public class DirectorySplit extends InputSplit {
    /**
     * Constructs a DirectorySplit object
     * @param docDirectoryInHDFS The location (in HDFS) of this
     *            document's directory, complete with all annotations.
     * @param fs The filesystem associated with this job
     */
    public  DirectorySplit( Path docDirectoryInHDFS, FileSystem fs )
            throws IOException {
        this.inputPath = docDirectoryInHDFS;
        hash = FileSystemHandler.getFileNameFromPath(inputPath);
        this.fs = fs;
    }

    /**
     * Get the size of the split so that the input splits can be sorted by size.
     * Here, we calculate the size to be the number of bytes in the original
     * document (i.e., ignoring all annotations).
     *
     * @return The number of characters in the original document
     */
    @Override
    public long getLength() throws IOException, InterruptedException {
        Path origTxt = new Path( inputPath, "original.txt" );
        HadoopInterface.logger.log( msg );
        return FileSystemHandler.getFileSizeInBytes( origTxt, fs);
    }

    /**
     * Get the list of nodes where the data for this split would be local.
     * This list includes all nodes that contain any of the required data---it's
     * up to Hadoop to decide which one to use.
     *
     * @return An array of the nodes for whom the split is local
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public String[] getLocations() throws IOException, InterruptedException {
        FileStatus status = fs.getFileStatus(inputPath);

        BlockLocation[] blockLocs = fs.getFileBlockLocations( status, 0,
                                                              status.getLen() );

        HashSet<String> allBlockHosts = new HashSet<String>();
        for( BlockLocation blockLoc : blockLocs ) {
            allBlockHosts.addAll( Arrays.asList( blockLoc.getHosts() ) );
        }

        return (String[])allBlockHosts.toArray();
    }

    /**
     * @return The hash of the document that this split handles
     */
    public String toString() {
        return hash;
    }

    private Path inputPath;
    private String hash;
    private FileSystem fs;
}
like image 297
s3cur3 Avatar asked Jun 24 '12 19:06

s3cur3


1 Answers

InputSplit does not extend Writable, you will need to explicitly declare that your input split implements Writable

like image 176
Chris White Avatar answered Nov 11 '22 01:11

Chris White