Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark job with large text file in gzip format

I'm running a Spark job which is taking way too long time to process the input file. The input file is 6.8 GB in Gzip format and it contains 110 M lines of text. I know it's in Gzip format, so it's not splittable and only one executor will be used to read that file.

As part of my debug process, I decided just to see how long does it take to convert that gzip file to parquet. My idea was that once I convert to parquet files and then if I run my original Spark job on that file, in that case it will use multiple executors and input file will be processed in parallel.

But even the small job is taking a long time than I expected. Here is my code:

val input = sqlContext.read.text("input.gz")
input.write.parquet("s3n://temp-output/")

When I extracted that file in my laptop (16 GB RAM), it took less than 2 minutes. When I run it on Spark cluster, my expectation was that it will take same or even less time since executor memory I was using is 58 GB. It took ~20 minutes.

What am I missing here? I'm sorry if it sounds pretty amateur but I'm fairly new in Spark.

What's the optimal way to run Spark job on gzip file? Assume I do not have option to create that file in other file format (bzip2, snappy, lzo).

like image 209
dreamer Avatar asked Mar 12 '23 00:03

dreamer


1 Answers

When doing input-process-output type of Spark jobs, there are three separate issues to consider:

  1. Input parallelism
  2. Processing parallelism
  3. Output parallelism

In your case, the input parallelism is 1 because in your question you claim that you cannot change the input format or granularity.

You are also doing essentially no processing so you can't get any gains there.

However, you can control the output parallelism, which will give you two benefits:

  • Multiple CPUs will write, thus decreasing the total time of the write operation.

  • Your output will be split in multiple files allowing you to take advantage of input parallelism in later processing.

To increase parallelism, you have to increase the number of partitions, which you can do with repartition(), e.g.,

val numPartitions = ...
input.repartition(numPartitions).write.parquet("s3n://temp-output/")

When choosing the optimal number of partitions, there are a number of different factors to consider.

  • Data size
  • Parition skew
  • Cluster RAM size
  • Number of cores in the cluster
  • The type of follow-on processing you'll do
  • The size of cluster (RAM & cores) you'll use for follow-on processing
  • The system you are writing to

Without knowing your goals and constraints it is difficult to make a solid recommendation but here are a couple general guidelines to work with:

  • Since your partitions won't be skewed (the above use of repartition will use a hash partitioner that corrects for skew), you will get the fastest throughput if you set the number of partitions equal to the number of executor cores, assuming that you are using nodes with sufficient I/O.

  • When you process data, you really want an entire partition to be able to "fit" in the RAM allocated to a single executor core. What "fit" means here depends on your processing. If you are doing a simple map transformation, the data may be streamed. If you are doing something involving ordering then the RAM needs grow substantially. If you are using Spark 1.6+, you'll get the benefit of more flexible memory management. If you are using an earlier version, you'll have to be more careful. Job execution grinds to a halt when Spark has to start "buffering" to disk. On-disk size and in-RAM size can be very, very different. The latter varies based on how you process the data and how much benefit Spark can get from predicate pushdown (Parquet supports that). Use the Spark UI to see how much RAM various job stages take.

BTW, unless your data has a very specific structure, do not hard-code partition numbers because then your code will run sub-optimally on clusters of varying sizes. Instead, use the following trick to determine the number of executors in a cluster. You can then multiply by the number of cores per executor based on the machines you are using.

// -1 is for the driver node
val numExecutors = sparkContext.getExecutorStorageStatus.length - 1

Just as a point of reference, on our team, where we use rather complex data structures, which means that RAM size >> disk size, we aim to keep S3 objects in the 50-250Mb range for processing on nodes where each executor core has 10-20Gb RAM.

Hope this helps.

like image 166
Sim Avatar answered Mar 15 '23 08:03

Sim