Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read a compressed (gzip) file without extension in Spark

I am new to Spark and have a fun task in hand where I have to read a bunch of files from S3, which have some xml content in them.

These files are compressed (Gzip) but do not have that extension.

I read some questions on this here where people suggest to extend the default codec in Spark and force a different extension.

But in my case, there is no extension and the files are named in some 16 digit UUID format such as 2c7358ca472ad91057da84adfba.

like image 340
g.cyberian Avatar asked Aug 24 '18 21:08

g.cyberian


1 Answers

You can use newAPIHadoopFile (instead of textFile) with a custom/modified TextInputFormat which forces the use of the GzipCodec.

Instead of calling sparkContext.textFile,

// gzip compressed but no .gz extension:
sparkContext.textFile("s3://mybucket/uuid")

we can use the underlying sparkContext.newAPIHadoopFile which allows us to specify how to read the input:

import org.apache.hadoop.mapreduce.lib.input.GzipInputFormatWithoutExtention
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}

sparkContext
  .newAPIHadoopFile(
    "s3://mybucket/uuid",
    classOf[GzipInputFormatWithoutExtention], // This is our custom reader
    classOf[LongWritable],
    classOf[Text],
    new Configuration(sparkContext.hadoopConfiguration)
  )
  .map { case (_, text) => text.toString }

The usual way of calling newAPIHadoopFile would be with TextInputFormat. This is the part which wraps how the file is read and where the compression codec is chosen based on the file extension.

Let's call it GzipInputFormatWithoutExtention and implement it as follow as an extension of TextInputFormat (this is a Java file and let's put it in package src/main/java/org/apache/hadoop/mapreduce/lib/input):

package org.apache.hadoop.mapreduce.lib.input;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import com.google.common.base.Charsets;

public class GzipInputFormatWithoutExtention extends TextInputFormat {

    public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split,
        TaskAttemptContext context
    ) {

        String delimiter =
            context.getConfiguration().get("textinputformat.record.delimiter");

        byte[] recordDelimiterBytes = null;
        if (null != delimiter)
            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);

        // Here we use our custom `GzipWithoutExtentionLineRecordReader`
        // instead of `LineRecordReader`:
        return new GzipWithoutExtentionLineRecordReader(recordDelimiterBytes);
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false; // gzip isn't a splittable codec (as opposed to bzip2)
    }
}

In fact we have to go one level deeper and also replace the default LineRecordReader (Java) with our own (let's call it GzipWithoutExtentionLineRecordReader).

As it's quite difficult to inherit from LineRecordReader, we can copy LineRecordReader (in src/main/java/org/apache/hadoop/mapreduce/lib/input) and slightly modify (and simplify) the initialize(InputSplit genericSplit, TaskAttemptContext context) method by forcing the usage of the Gzip codec:

(the only changes compared to the original LineRecordReader have been given a comment explaining what's happening)

package org.apache.hadoop.mapreduce.lib.input;

import java.io.IOException;

import org.apache.hadoop.io.compress.*;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate({"MapReduce", "Pig"})
@InterfaceStability.Evolving
public class GzipWithoutExtentionLineRecordReader extends RecordReader<LongWritable, Text> {
    private static final Logger LOG =
            LoggerFactory.getLogger(GzipWithoutExtentionLineRecordReader.class);
    public static final String MAX_LINE_LENGTH =
            "mapreduce.input.linerecordreader.line.maxlength";

    private long start;
    private long pos;
    private long end;
    private SplitLineReader in;
    private FSDataInputStream fileIn;
    private Seekable filePosition;
    private int maxLineLength;
    private LongWritable key;
    private Text value;
    private boolean isCompressedInput;
    private Decompressor decompressor;
    private byte[] recordDelimiterBytes;

    public GzipWithoutExtentionLineRecordReader(byte[] recordDelimiter) {
        this.recordDelimiterBytes = recordDelimiter;
    }

    public void initialize(
        InputSplit genericSplit,
        TaskAttemptContext context
    ) throws IOException {

        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();

        // open the file and seek to the start of the split
        final FileSystem fs = file.getFileSystem(job);
        fileIn = fs.open(file);

        // This line is modified to force the use of the GzipCodec:
        // CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
        CompressionCodecFactory ccf = new CompressionCodecFactory(job);
        CompressionCodec codec = ccf.getCodecByClassName(GzipCodec.class.getName());

        // This part has been extremely simplified as we don't have to handle
        // all the different codecs:
        isCompressedInput = true;
        decompressor = CodecPool.getDecompressor(codec);
        if (start != 0) {
            throw new IOException(
                "Cannot seek in " + codec.getClass().getSimpleName() + " compressed stream"
            );
        }

        in = new SplitLineReader(
            codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes
        );
        filePosition = fileIn;
        if (start != 0) {
            start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
    }

    private int maxBytesToConsume(long pos) {
        return isCompressedInput
                ? Integer.MAX_VALUE
                : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
    }

    private long getFilePosition() throws IOException {
        long retVal;
        if (isCompressedInput && null != filePosition) {
            retVal = filePosition.getPos();
        } else {
            retVal = pos;
        }
        return retVal;
    }

    private int skipUtfByteOrderMark() throws IOException {
        int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
                Integer.MAX_VALUE);
        int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
        pos += newSize;
        int textLength = value.getLength();
        byte[] textBytes = value.getBytes();
        if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
                (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
            LOG.info("Found UTF-8 BOM and skipped it");
            textLength -= 3;
            newSize -= 3;
            if (textLength > 0) {
                textBytes = value.copyBytes();
                value.set(textBytes, 3, textLength);
            } else {
                value.clear();
            }
        }
        return newSize;
    }

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
            key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
            value = new Text();
        }
        int newSize = 0;
        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
            if (pos == 0) {
                newSize = skipUtfByteOrderMark();
            } else {
                newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
                pos += newSize;
            }

            if ((newSize == 0) || (newSize < maxLineLength)) {
                break;
            }

            LOG.info("Skipped line of size " + newSize + " at pos " +
                    (pos - newSize));
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }

    @Override
    public LongWritable getCurrentKey() {
        return key;
    }

    @Override
    public Text getCurrentValue() {
        return value;
    }

    public float getProgress() throws IOException {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
        }
    }

    public synchronized void close() throws IOException {
        try {
            if (in != null) {
                in.close();
            }
        } finally {
            if (decompressor != null) {
                CodecPool.returnDecompressor(decompressor);
                decompressor = null;
            }
        }
    }
}
like image 181
Xavier Guihot Avatar answered Nov 15 '22 05:11

Xavier Guihot