Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hadoop: job runs okay on smaller set of data but fails with large dataset

I have a following situation

I have 3 machines cluster with following confirguration.

Master

Usage of /:   91.4% of 74.41GB 
MemTotal:       16557308 kB
MemFree:          723736 kB 

Slave 01

Usage of /:   52.9% of 29.76GB
MemTotal:       16466220 kB 
MemFree:         5320860 kB

Slave 02

Usage of /:   19.0% of 19.84GB
MemTotal:       16466220 kB
MemFree:         6173564 kB

hadoop/conf/core-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/work/app/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>

<property>
  <name>fs.default.name</name>
  <value>hdfs://master:54310</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>
<property>
  <name>dfs.datanode.max.xcievers</name>
  <value>4096</value>
</property>
</configuration>

hadoop/conf/mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>mapred.job.tracker</name>
  <value>master:54311</value>
  <description>The host and port that the MapReduce job tracker runs
  at.  If "local", then jobs are run in-process as a single map
  and reduce task.
  </description>
</property>

<property>
  <name>mapred.reduce.tasks</name>
  <value>1</value>
</property>

<property>
  <name>mapred.map.tasks</name>
  <value>100</value>
</property>

<property>
  <name>mapred.task.timeout</name>
  <value>0</value>
</property>

<property>
  <name>mapred.child.java.opts</name>
  <value>-Xmx512m</value>
</property>
</configuration>

hadoop/conf/hdfs-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>dfs.replication</name>
  <value>3</value>
  <description>Default block replication.
  The actual number of replications can be specified when the file is created.
  The default is used if replication is not specified in create time.
  </description>
</property>
<property>
  <name>dfs.datanode.socket.write.timeout</name>
  <value>0</value>
</property>
</configuration>
  • I have over 2 million XML documents(each document size ~ 400 KB)
  • map tasks opens each of these xmls and emit them as JSON
  • reduce task gets each of these JSON as string, applies transformation and emits it
  • no. of map tasks - 100
  • no. of reduce tasks - 01
  • The entire job runs good when number of documents = 10,000
  • when number of documents = 278262, the job fails and I see various issues as following

On WebUI

on slave-01, slave-02

java.lang.Throwable: Child Error
    at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)
Caused by: java.io.IOException: Task process exit with nonzero status of 255.
    at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258)

On master

java.lang.RuntimeException: java.io.IOException: Spill failed
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:132)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:261)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:255)
Caused by: java.io.IOException: Spill failed
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1029)
    at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:592)
    at org.apache.hadoop.streaming.PipeMapRed$MROutputThread.run(PipeMapRed.java:381)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/spill1.out
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
    at org.apache.hadoop.mapred.MapOutputFile.getSpillFileForWrite(MapOutputFile.java:121)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1392)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$1800(MapTask.java:853)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1344)

java.lang.Throwable: Child Error
    at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)
Caused by: java.io.IOException: Creation of /work/app/hadoop/tmp/mapred/local/userlogs/job_201207220051_0001/attempt_201207220051_0001_m_000004_2 failed.
    at org.apache.hadoop.mapred.TaskLog.createTaskAttemptLogDir(TaskLog.java:102)
    at org.apache.hadoop.mapred.DefaultTaskController.createLogDir(DefaultTaskController.java:71)
    at org.apache.hadoop.mapred.TaskRunner.prepareLogFiles(TaskRunner.java:316)
    at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:228)

-------
java.lang.Throwable: Child Error
    at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)
Caused by: java.io.IOException: Creation of /work/app/hadoop/tmp/mapred/local/userlogs/job_201207220051_0001/attempt_201207220051_0001_m_000004_2.cleanup failed.
    at org.apache.hadoop.mapred.TaskLog.createTaskAttemptLogDir(TaskLog.java:102)
    at org.apache.hadoop.mapred.DefaultTaskController.createLogDir(DefaultTaskController.java:71)
    at org.apache.hadoop.mapred.TaskRunner.prepareLogFiles(TaskRunner.java:316)
    at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:228)

When I go and check logs in slaves, this is what I found in hadoop-hduser-datanode-hadoop-01.log

2012-07-22 09:26:52,795 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving block blk_-5384386931827098009_1010 src: /10.0.0.81:51402 dest: /10.0.0.82:50010
2012-07-22 09:26:52,800 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: IOException in BlockReceiver constructor. Cause is 
2012-07-22 09:26:52,800 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: writeBlock blk_-5384386931827098009_1010 received exception java.io.IOException: Unexpected problem in creating temporary file for blk_-5384386931827098009_1010.  File /work/app/hadoop/tmp/dfs/data/tmp/blk_-5384386931827098009 should not be present, but is.
2012-07-22 09:26:52,800 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(10.0.0.82:50010, storageID=DS-735951984-127.0.1.1-50010-1342943517618, infoPort=50075, ipcPort=50020):DataXceiver
java.io.IOException: Unexpected problem in creating temporary file for blk_-5384386931827098009_1010.  File /work/app/hadoop/tmp/dfs/data/tmp/blk_-5384386931827098009 should not be present, but is.
        at org.apache.hadoop.hdfs.server.datanode.FSDataset$FSVolume.createTmpFile(FSDataset.java:426)
        at org.apache.hadoop.hdfs.server.datanode.FSDataset$FSVolume.createTmpFile(FSDataset.java:404)
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.createTmpFile(FSDataset.java:1249)
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.writeToBlock(FSDataset.java:1138)
        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.<init>(BlockReceiver.java:99)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:299)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:107)
        at java.lang.Thread.run(Thread.java:662)

Please help me understand what is that I need to do inorder to resolve this issue?

like image 380
daydreamer Avatar asked Jul 22 '12 16:07

daydreamer


People also ask

What is job in Hadoop?

A Hadoop Map Reduce job defines, schedules, monitors, and manages the execution of Hadoop Map Reduce . jar files. You can bundle your Map Reduce code in a . jar file and run it using this job.

What is job in MapReduce?

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system.

Why Hadoop is obsolete?

Hadoop is designed with excellent support for batch processing. However, with its limitations in processing smaller data sets and not providing native support for real-time analytics, Hadoop is ill-suited for quick real-time analytics.


1 Answers

Since you have more than one reducer, your mappers will write outputs to the local disk on your slaves (as opposed to in HDFS). To be more precise, mappers don't actually write to the local disk immediately. Instead, they buffer the output in memory until it reaches a threshold (see "io.sort.mb" config setting). This process is called spilling. I think the problem is that when Hadoop is trying to spill to disk, your slaves don't have enough disk space to hold all the data generated by your mappers.

You mentioned each mapper produces a json string. Assuming it's ~100KB per doc (perhaps even bigger than this), it would amount to 278,262 x 100KB = ~28GB and both of your slaves have about 15GB of free space each.

The easiest way, I think, is to compress your immediate output from mappers using the following two config settings:

<property>
  <name> mapreduce.map.output.compress</name> 
  <value>true</value>
</property>
<property>
  <name>mapreduce.map.output.compress.codec</name>
  <value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>

Since your data is all JSON/text data, I think you will benefit from any compression algorithm supported by Hadoop.

As an FYI, if your document size grows way beyond 2 mil, you should consider adding more memory to your master. As a rule of thumb, each file/directory/block takes up about 150 bytes (or 300MB per 1 million files). In reality, however, I'd reserve 1GB per 1 million files.

like image 149
Edenbauer Avatar answered Nov 06 '22 02:11

Edenbauer