In the following two examples, the number of tasks run and the corresponding run time imply that the sampling options have no effect, as they are similar to jobs run without any sampling options:
val df = spark.read.options("samplingRatio",0.001).json("s3a://test/*.json.bz2")
val df = spark.read.option("sampleSize",100).json("s3a://test/*.json.bz2")
I know that explicit schemas are best for performance, but in convenience cases sampling is useful.
New to Spark, am I using these options incorrectly? Attempted the same approach in PySpark, with same results:
df = spark.read.options(samplingRatio=0.1).json("s3a://test/*.json.bz2")
df = spark.read.options(samplingRatio=None).json("s3a://test/*.json.bz2")
TL;DR None of the you use options will have significant impact on the execution time:
sampleSize
is not among valid JSONOptions
or JSONOptionsInRead
so it will be ignored.
samplingRatio
is a valid option, but internally it uses PartitionwiseSampledRDD
, so the process is linear in terms of the number of records. Therefore sampling can only reduce inference cost, not the IO, which is likely the bottleneck here.
samplingRatio
to None
is equivalent to no sampling. PySpark OptionUtils
simply discard None
options and sampleRatio
defaults to 1.0.You can try to sample data explicitly. In Python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField
def infer_json_schema(path: str, sample_size: int, **kwargs: str) -> StructType:
spark = SparkSession.builder.getOrCreate()
sample = spark.read.text(path).limit(sample_size).rdd.flatMap(lambda x: x)
return spark.read.options(**kwargs).json(sample).schema
In Scala:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
def inferJsonSchema(
path: String, sampleSize: Int, options: Map[String, String]): StructType = {
val spark = SparkSession.builder.getOrCreate()
val sample = spark.read.text(path).limit(sampleSize).as[String]
spark.read.options(options).json(sample).schema
}
Please keep in mind, that to work well, sample size should at most equal to the expected size of partition. Limits in Spark escalate quickly (see for example my answer to Spark count vs take and length) and you can easily end up scanning the whole input.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With