Spark v2.4
spark = SparkSession \
.builder \
.master('local[15]') \
.appName('Notebook') \
.config('spark.sql.debug.maxToStringFields', 2000) \
.config('spark.sql.maxPlanStringLength', 2000) \
.config('spark.debug.maxToStringFields', 2000) \
.getOrCreate()
df = spark.createDataFrame(spark.range(1000).rdd.map(lambda x: range(100)))
df.repartition(1).write.mode('overwrite').parquet('test.parquet')
df = spark.read.parquet('test.parquet')
df.select('*').explain()
== Physical Plan ==
ReadSchema: struct<_1:bigint,_2:bigint,_3:bigint,_4:bigint,_5:bigint,_6:bigint,_7:bigint,_8:bigint,_9:bigint,...
Note: spark.debug.maxToStringFields
helped a bit by expanding FileScan parquet [_1#302L,_2#303L,... 76 more fields]
, but not the schema part.
Note2: I am not only interested in the ReadSchema
, but also PartitionFilters
, PushedFilters
... which are all truncated.
Spark 3.0 introduced explain('formatted')
which layouts the information differently and no truncation is applied.
We should run the 'explain' command with a true argument to see both physical and logical plans. The physical plan is always an RDD. It gives only the physical plan if we run it without the true argument. Spark performs optimization by itself in the optimized logical plan.
explain(mode=”simple”) which will display the physical plan. explain(mode=”extended”) which will display physical and logical plans (like “extended” option) explain(mode=”codegen”) which will display the java code planned to be executed.
To get the schema of the Spark DataFrame, use printSchema() on Spark DataFrame object. From the above example, printSchema() prints the schema to console( stdout ) and show() displays the content of the Spark DataFrame.
Logical Plan just depicts what I expect as output after applying a series of transformations like join, filter, where, groupBy, etc clause on a particular table. Physical Plan is responsible for deciding the type of join, the sequence of the execution of filter, where, groupBy clause, etc.
I am afraid there is no easy way
https://github.com/apache/spark/blob/v2.4.2/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L57
It is hard coded to be no more than 100 chars
override def simpleString: String = {
val metadataEntries = metadata.toSeq.sorted.map {
case (key, value) =>
key + ": " + StringUtils.abbreviate(redact(value), 100)
}
In the end I have been using
def full_file_meta(f: FileSourceScanExec) = {
val metadataEntries = f.metadata.toSeq.sorted.flatMap {
case (key, value) if Set(
"Location", "PartitionCount",
"PartitionFilters", "PushedFilters"
).contains(key) =>
Some(key + ": " + value.toString)
case other => None
}
val metadataStr = metadataEntries.mkString("[\n ", ",\n ", "\n]")
s"${f.nodeNamePrefix}${f.nodeName}$metadataStr"
}
val ep = data.queryExecution.executedPlan
print(ep.flatMap {
case f: FileSourceScanExec => full_file_meta(f)::Nil
case other => Nil
}.mkString(",\n"))
It is a hack and better than nothing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With