I have a bunch of snappy-compressed server logs in S3, and I need to process them using streaming on Elastic MapReduce. How do I tell Amazon and Hadoop that the logs are already compressed (before they are pulled into HFS!) so that they can be decompressed before being sent to the streaming mapper script?
The only documentation I can find is here: http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/HadoopDataCompression.html#emr-using-snappy , and it seems to refer to intermediate compression, not files that are compressed when they arrive at the HFS.
BTW, I'm mainly working in python, so bonus points if you have a solution in boto!
The answer is, "it can't be done." At least, not for the specific case of applying hadoop streaming to snappy-compressed files originating outside of hadoop.
I (thoroughly!) explored two main options to come to this conclusion: (1) attempt to use hadoop's built-in snappy compression as suggested by highlycaffeinated, or (2) write my own streaming module to consume and decompress snappy files.
For option (1), it appears that hadoop adds some markup to files when compressing them using snappy. Since my files are compressed using snappy outside hadoop, hadoop's built-in codec can't decompress out files.
One symptom of this problem was a heap space error:
2013-04-03 20:14:49,739 FATAL org.apache.hadoop.mapred.Child (main): Error running child : java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:102)
at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:82)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:76)
at java.io.InputStream.read(InputStream.java:85)
...
When I switched to a much larger instance and cranked up the mapred.child.java.opts setting, I got a new error:
java.io.IOException: IO error in map input file s3n://my-bucket/my-file.snappy
Hadoop's snappy codec just doesn't work with externally generated files.
For option (2), the problem is that hadoop streaming doesn't distinguish between \n, \r, and \r\n line breaks. Since snappy compression ends up sprinkling those byte codes throughout compressed files, this is fatal. Here's my error trace:
2013-04-03 22:29:50,194 WARN org.apache.hadoop.mapred.Child (main): Error running child
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:372)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:586)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
...
With a little work on hadoop's Java classes (see here, for example) we could probably fix the \r vs \n problem. But as I said initially, my goal was to build within the hadoop streaming module, without touching Java. With that constraint, there doesn't seem to be any way to resolve this problem.
In the end, I went back to the guys generating the files this cluster is consuming and persuaded them to switch to gzip or lzo.
PS - On option (2), I played around with splitting records on different characters (e.g. textinputformat.record.delimiter=X), but it felt very hacky and didn't work anyway.
PPS - Another workaround would be to write scripts to download the files from S3, decompress them, and then run -copyFromLocal to pull them into HDFS. Computationally, there's nothing wrong with this, but from a workflow perspective it would introduce all kinds of hassles.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With