Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I force spark/hadoop to ignore the .gz extension on a file and read it as uncompressed plain text?

I have code along the lines of:

val lines: RDD[String] = sparkSession.sparkContext.textFile("s3://mybucket/file.gz")

The URL ends in .gz but this is a result of legacy code. The file is plain text with no compression involved. However spark insists on reading it as a GZIP file which obviously fails. How can I make it ignore the extension and simply read the file as text?

Based on this article I've tried setting configuration in various places that doesn't include the GZIP codec, e.g.:

sparkContext.getConf.set("spark.hadoop.io.compression.codecs", classOf[DefaultCodec].getCanonicalName)

This doesn't seem to have any effect.

Since the files are on S3, I can't simply rename them without copying the entire file.

like image 588
Alex Hall Avatar asked Mar 05 '18 12:03

Alex Hall


1 Answers

First solution: Shading GzipCodec

The idea is to shadow/shade the GzipCodec as defined in the package org.apache.hadoop.io.compress by including in your own sources this java file and replacing this line:

public String getDefaultExtension() {
  return ".gz";
}

with:

public String getDefaultExtension() {
  return ".whatever";
}

When building your project, this will have for effect to use your definition of GzipCodec instead of the one provided by the dependencies (this is the shadowing of GzipCodec).

This way, when parsing your file, textFile() will be forced to apply the default codec as the one for gzip doesn't fit the naming of your file anymore.

The inconvenient of this solution is that you won't be able to also process real gzip files within the same app.


Second solution: Using newAPIHadoopFile with a custom/modified TextInputFormat

You can use newAPIHadoopFile (instead of textFile) with a custom/modified TextInputFormat which forces the use of the DefaultCodec (plain text).

We'll write our own line reader based on the default one (TextInputFormat). The idea is to remove the part of TextInputFormat which finds out it's named .gz and thus uncompress the file before reading it.

Instead of calling sparkContext.textFile,

// plain text file with a .gz extension:
sparkContext.textFile("s3://mybucket/file.gz")

we can use the underlying sparkContext.newAPIHadoopFile which allows us to specify how to read the input:

import org.apache.hadoop.mapreduce.lib.input.FakeGzInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}

sparkContext
  .newAPIHadoopFile(
    "s3://mybucket/file.gz",
    classOf[FakeGzInputFormat], // This is our custom reader
    classOf[LongWritable],
    classOf[Text],
    new Configuration(sparkContext.hadoopConfiguration)
  )
  .map { case (_, text) => text.toString }

The usual way of calling newAPIHadoopFile would be with TextInputFormat. This is the part which wraps how the file is read and where the compression codec is chosen based on the file extension.

Let's call it FakeGzInputFormat and implement it as follow as an extension of TextInputFormat (this is a Java file and let's put it in package src/main/java/org/apache/hadoop/mapreduce/lib/input):

package org.apache.hadoop.mapreduce.lib.input;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import com.google.common.base.Charsets;

public class FakeGzInputFormat extends TextInputFormat {

    public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split,
        TaskAttemptContext context
    ) {

        String delimiter =
            context.getConfiguration().get("textinputformat.record.delimiter");

        byte[] recordDelimiterBytes = null;
        if (null != delimiter)
            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);

        // Here we use our custom `FakeGzLineRecordReader` instead of
        // `LineRecordReader`:
        return new FakeGzLineRecordReader(recordDelimiterBytes);
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return true; // plain text is splittable (as opposed to gzip)
    }
}

In fact we have to go one level deeper and also replace the default LineRecordReader (Java) with our own (let's call it FakeGzLineRecordReader).

As it's quite difficult to inherit from LineRecordReader, we can copy LineRecordReader (in src/main/java/org/apache/hadoop/mapreduce/lib/input) and slightly modify (and simplify) the initialize(InputSplit genericSplit, TaskAttemptContext context) method by forcing the usage of the default codec (plain text):

(the only changes compared to the original LineRecordReader have been given a comment explaining what's happening)

package org.apache.hadoop.mapreduce.lib.input;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.LimitedPrivate({"MapReduce", "Pig"})
@InterfaceStability.Evolving
public class FakeGzLineRecordReader extends RecordReader<LongWritable, Text> {
    private static final Logger LOG =
            LoggerFactory.getLogger(FakeGzLineRecordReader.class);
    public static final String MAX_LINE_LENGTH =
            "mapreduce.input.linerecordreader.line.maxlength";

    private long start;
    private long pos;
    private long end;
    private SplitLineReader in;
    private FSDataInputStream fileIn;
    private Seekable filePosition;
    private int maxLineLength;
    private LongWritable key;
    private Text value;
    private byte[] recordDelimiterBytes;

    public FakeGzLineRecordReader(byte[] recordDelimiter) {
        this.recordDelimiterBytes = recordDelimiter;
    }

    // This has been simplified a lot since we don't need to handle compression
    // codecs.
    public void initialize(
        InputSplit genericSplit,
        TaskAttemptContext context
    ) throws IOException {

        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();

        final FileSystem fs = file.getFileSystem(job);
        fileIn = fs.open(file);

        fileIn.seek(start);
        in = new UncompressedSplitLineReader(
            fileIn, job, this.recordDelimiterBytes, split.getLength()
        );
        filePosition = fileIn;

        if (start != 0) {
            start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
    }

    // Simplified as input is not compressed:
    private int maxBytesToConsume(long pos) {
        return (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
    }

    // Simplified as input is not compressed:
    private long getFilePosition() {
        return pos;
    }

    private int skipUtfByteOrderMark() throws IOException {
        int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
                Integer.MAX_VALUE);
        int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
        pos += newSize;
        int textLength = value.getLength();
        byte[] textBytes = value.getBytes();
        if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
                (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
            LOG.info("Found UTF-8 BOM and skipped it");
            textLength -= 3;
            newSize -= 3;
            if (textLength > 0) {
                textBytes = value.copyBytes();
                value.set(textBytes, 3, textLength);
            } else {
                value.clear();
            }
        }
        return newSize;
    }

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
            key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
            value = new Text();
        }
        int newSize = 0;
        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
            if (pos == 0) {
                newSize = skipUtfByteOrderMark();
            } else {
                newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
                pos += newSize;
            }

            if ((newSize == 0) || (newSize < maxLineLength)) {
                break;
            }

            LOG.info("Skipped line of size " + newSize + " at pos " +
                    (pos - newSize));
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }

    @Override
    public LongWritable getCurrentKey() {
        return key;
    }

    @Override
    public Text getCurrentValue() {
        return value;
    }

    public float getProgress() {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
        }
    }

    public synchronized void close() throws IOException {
        try {
            if (in != null) {
                in.close();
            }
        } finally {}
    }
}
like image 56
Xavier Guihot Avatar answered Sep 22 '22 08:09

Xavier Guihot