Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark sampling options in JSON reader ignored?

In the following two examples, the number of tasks run and the corresponding run time imply that the sampling options have no effect, as they are similar to jobs run without any sampling options:

val df = spark.read.options("samplingRatio",0.001).json("s3a://test/*.json.bz2")

val df = spark.read.option("sampleSize",100).json("s3a://test/*.json.bz2")

I know that explicit schemas are best for performance, but in convenience cases sampling is useful.

New to Spark, am I using these options incorrectly? Attempted the same approach in PySpark, with same results:

df = spark.read.options(samplingRatio=0.1).json("s3a://test/*.json.bz2")

df = spark.read.options(samplingRatio=None).json("s3a://test/*.json.bz2")
like image 998
kermatt Avatar asked Jun 12 '19 16:06

kermatt


1 Answers

TL;DR None of the you use options will have significant impact on the execution time:

  • sampleSize is not among valid JSONOptions or JSONOptionsInRead so it will be ignored.

  • samplingRatio is a valid option, but internally it uses PartitionwiseSampledRDD, so the process is linear in terms of the number of records. Therefore sampling can only reduce inference cost, not the IO, which is likely the bottleneck here.

  • Setting samplingRatio to None is equivalent to no sampling. PySpark OptionUtils simply discard None options and sampleRatio defaults to 1.0.

You can try to sample data explicitly. In Python

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField 

def infer_json_schema(path: str, sample_size: int, **kwargs: str) -> StructType:
    spark = SparkSession.builder.getOrCreate()
    sample = spark.read.text(path).limit(sample_size).rdd.flatMap(lambda x: x)
    return spark.read.options(**kwargs).json(sample).schema

In Scala:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

def inferJsonSchema(
    path: String, sampleSize: Int, options: Map[String, String]): StructType = {
  val spark = SparkSession.builder.getOrCreate()
  val sample = spark.read.text(path).limit(sampleSize).as[String]
  spark.read.options(options).json(sample).schema
}

Please keep in mind, that to work well, sample size should at most equal to the expected size of partition. Limits in Spark escalate quickly (see for example my answer to Spark count vs take and length) and you can easily end up scanning the whole input.

like image 51
user10938362 Avatar answered Nov 10 '22 11:11

user10938362