Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hadoop: intermediate merge failed

I'm running into a strange issue. When I run my Hadoop job over a large dataset (>1TB compressed text files), several of the reduce tasks fail, with stacktraces like these:

java.io.IOException: Task: attempt_201104061411_0002_r_000044_0 - The reduce copier failed
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:240)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
    at org.apache.hadoop.mapred.Child.main(Child.java:234)
Caused by: java.io.IOException: Intermediate merge failed
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: java.lang.RuntimeException: java.io.EOFException
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
    at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
    at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:139)
    at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
    at org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:335)
    at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:350)
    at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:156)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2698)
    ... 1 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:375)
    at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:241)
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:125)
    ... 8 more
java.io.IOException: Task: attempt_201104061411_0002_r_000056_0 - The reduce copier failed
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:240)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
    at org.apache.hadoop.mapred.Child.main(Child.java:234)
Caused by: java.io.IOException: Intermediate merge failed
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: java.lang.RuntimeException: java.io.EOFException
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
    at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
    at org.apache.hadoop.util.PriorityQueue.upHeap(PriorityQueue.java:123)
    at org.apache.hadoop.util.PriorityQueue.put(PriorityQueue.java:50)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:447)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:381)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:107)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:93)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2689)
    ... 1 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)
    at org.apache.hadoop.io.Text.readString(Text.java:402)
    at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:240)
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:122)
    ... 9 more

Not all of my reducers fail. Several often succeed before I see failures with others. As you can see, the stacktraces always seem to originate from IPAndIPCookieCount.readFields() and always in the in-memory merge stage, but not always from the same part of readFields.

This job succeeds when running over smaller datasets (about 1/30th the size). There are nearly as many outputs as inputs to the job, but each output record is shorter. This job is essentially an implementation of a secondary sort.

We are using the CDH3 Hadoop distribution.

Here is my custom WritableComparable implementation:

public static class IpAndIpCookieCount implements WritableComparable<IpAndIpCookieCount> {

        private String ip;
        private int ipCookieCount;

        public IpAndIpCookieCount() {
            // empty constructor for hadoop
        }

        public IpAndIpCookieCount(String ip, int ipCookieCount) {
            this.ip = ip;
            this.ipCookieCount = ipCookieCount;
        }

        public String getIp() {
            return ip;
        }

        public int getIpCookieCount() {
            return ipCookieCount;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            ip = Text.readString(in);
            ipCookieCount = in.readInt();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            Text.writeString(out, ip);
            out.writeInt(ipCookieCount);
        }

        @Override
        public int compareTo(IpAndIpCookieCount other) {
            int firstComparison = ip.compareTo(other.getIp());
            if (firstComparison == 0) {
                int otherIpCookieCount = other.getIpCookieCount();
                if (ipCookieCount == otherIpCookieCount) {
                    return 0;
                } else {
                    return ipCookieCount < otherIpCookieCount ? 1 : -1;
                }
            } else {
                return firstComparison;
            }
        }

        @Override
        public boolean equals(Object o) {
            if (o instanceof IpAndIpCookieCount) {
                IpAndIpCookieCount other = (IpAndIpCookieCount) o;
                return ip.equals(other.getIp()) && ipCookieCount == other.getIpCookieCount();
            } else {
                return false;
            }
        }

        @Override
        public int hashCode() {
            return ip.hashCode() ^ ipCookieCount;
        }

    }

The readFields method is very simple, and I can't see any problems in this class. Additionally, I have seen other people getting essentially the same stack trace:

  • http://lucene.472066.n3.nabble.com/Reduce-Copier-Failed-td2120228.html
  • https://groups.google.com/a/cloudera.org/group/cdh-user/browse_thread/thread/3544da912bf66506
  • http://www.listware.net/201010/hadoop-common-user/70382-merging-of-the-local-fs-files-threw-an-exception-javaioioexception-javalangruntimeexception-javaioeofexception.html
  • http://mail-archives.apache.org/mod_mbox/hadoop-mapreduce-user/201101.mbox/%[email protected]%3E
  • http://web.archiveorange.com/archive/v/5nvvZTgeqwCRQ3F9vEzI

None seemed to have actually figured out the issue behind this. The last two seem to suggest that this could be a memory issue (although these stacktraces aren't OutOfMemoryExceptions). Like the second to last post in that list of links, I have tried setting the number of reducers higher (up to 999), but I still get failures. I have not (yet) tried to allocate more memory to reduce tasks, as that would require us to reconfigure our cluster.

Is this a bug in Hadoop? Or am I doing something wrong?

EDIT: My data is partitioned by day. If I run the job 7 times, once for each day, all 7 complete. If I run one job over all 7 days, it fails. The large report over all 7 days will see exactly the same keys as the smaller ones do (in aggregate), but obviously not in the same order, at the same reducers, etc.

like image 630
ajduff574 Avatar asked Apr 07 '11 15:04

ajduff574


1 Answers

I think this is an artifact of Cloudera's backport of MAPREDUCE-947 to CDH3. This patch results in the formation of a _SUCCESS file for successful job.

Also a _SUCCESS file is created in the output folder for successful jobs. A configuration parameter mapreduce.fileoutputcommitter.marksuccessfuljobs can be set to false to disable creation of _SUCCESS file, or to true to enable creation of the _SUCCESS file.

Looking at your error,

Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)

and comparing it to errors I've seen for this issue before,

Exception in thread "main" java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)
    at java.io.DataInputStream.readFully(DataInputStream.java:152)
    at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1465)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1437)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1424)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1419)
    at org.apache.hadoop.mapred.SequenceFileOutputFormat.getReaders(SequenceFileOutputFormat.java:89)
    at org.apache.nutch.crawl.CrawlDbReader.processStatJob(CrawlDbReader.java:323)
    at org.apache.nutch.crawl.CrawlDbReader.main(CrawlDbReader.java:511)

and on the Mahout mailing list

Exception in thread "main" java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)
    at java.io.DataInputStream.readFully(DataInputStream.java:152)
    at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1457)
    at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1435)
    at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1424)
    at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1419)
    at
org.apache.mahout.df.mapreduce.partial.Step0Job.parseOutput(Step0Job.java:145)
    at
org.apache.mahout.df.mapreduce.partial.Step0Job.run(Step0Job.java:119)
    at
org.apache.mahout.df.mapreduce.partial.PartialBuilder.parseOutput(PartialBuilder.java:115)
    at org.apache.mahout.df.mapreduce.Builder.build(Builder.java:338)
    at
org.apache.mahout.df.mapreduce.BuildForest.buildForest(BuildForest.java:195)

it seems that DataInputStream.readFully is choked by this file.

I would suggest setting mapreduce.fileoutputcommitter.marksuccessfuljobs to false and retrying your job - it should work.

like image 199
viksit Avatar answered Sep 21 '22 09:09

viksit