Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does SparkSession execute twice for one action?

Recently upgraded to Spark 2.0 and I'm seeing some strange behavior when trying to create a simple Dataset from JSON strings. Here's a simple test case:

 SparkSession spark = SparkSession.builder().appName("test").master("local[1]").getOrCreate();
 JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

 JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
            "{\"name\":\"tom\",\"title\":\"engineer\",\"roles\":[\"designer\",\"developer\"]}",
            "{\"name\":\"jack\",\"title\":\"cto\",\"roles\":[\"designer\",\"manager\"]}"
         ));

 JavaRDD<String> mappedRdd = rdd.map(json -> {
     System.out.println("mapping json: " + json);
     return json;
 });

 Dataset<Row> data = spark.read().json(mappedRdd);
 data.show();

And the output:

mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]}
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]}
mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]}
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]}
+----+--------------------+--------+
|name|               roles|   title|
+----+--------------------+--------+
| tom|[designer, develo...|engineer|
|jack| [designer, manager]|     cto|
+----+--------------------+--------+

It seems that the "map" function is being executed twice even though I'm only performing one action. I thought that Spark would lazily build an execution plan, then execute it when needed, but this makes it seem that in order to read data as JSON and do anything with it, the plan will have to be executed at least twice.

In this simple case it doesn't matter, but when the map function is long running, this becomes a big problem. Is this right, or am I missing something?

like image 567
Matt Holtzman Avatar asked Feb 07 '23 09:02

Matt Holtzman


1 Answers

It happens because you don't provide schema for DataFrameReader. As a result Spark has to eagerly scan data set to infer output schema.

Since mappedRdd is not cached it will be evaluated twice:

  • once for schema inference
  • once when you call data.show

If you want to prevent you should provide schema for reader (Scala syntax):

val schema: org.apache.spark.sql.types.StructType = ???
spark.read.schema(schema).json(mappedRdd)
like image 76
zero323 Avatar answered Feb 22 '23 04:02

zero323