Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BZip2 compressed input for Apache Flink

I have a wikipedia dump compressed with bzip2 (downloaded from http://dumps.wikimedia.org/enwiki/), but I don't want to unpack it: I want to process it while decompressing on the fly.

I know that it's possible to do it in plain Java (see e.g. Java - Read BZ2 file and uncompress/parse on the fly), but I was wondering how do it in Apache Flink? What I probably need is something like https://github.com/whym/wikihadoop but for Flink, not Hadoop.

like image 939
Alexey Grigorev Avatar asked Apr 03 '15 09:04

Alexey Grigorev


1 Answers

It is possible to read compressed files in the following formats in Apache Flink:

org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec

As you can see from the package names, Flink does this using Hadoop's InputFormats. This is an example for reading gz files using Flink's Scala API: (You need at least Flink 0.8.1)

def main(args: Array[String]) {

  val env = ExecutionEnvironment.getExecutionEnvironment
  val job = new JobConf()
  val hadoopInput = new TextInputFormat()
  FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
  val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)

  lines.print

  env.execute("Read gz files")
}

Apache Flink has only build-in support for .deflate files. Adding support for more compression codecs is easy to do, but hasn't been done yet.

Using HadoopInputFormats with Flink doesn't cause any performance loss. Flink has build-in serialization support for Hadoop's Writable types.

like image 147
Robert Metzger Avatar answered Sep 23 '22 02:09

Robert Metzger