Recently upgraded to Spark 2.0 and I'm seeing some strange behavior when trying to create a simple Dataset from JSON strings. Here's a simple test case:
SparkSession spark = SparkSession.builder().appName("test").master("local[1]").getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
"{\"name\":\"tom\",\"title\":\"engineer\",\"roles\":[\"designer\",\"developer\"]}",
"{\"name\":\"jack\",\"title\":\"cto\",\"roles\":[\"designer\",\"manager\"]}"
));
JavaRDD<String> mappedRdd = rdd.map(json -> {
System.out.println("mapping json: " + json);
return json;
});
Dataset<Row> data = spark.read().json(mappedRdd);
data.show();
And the output:
mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]}
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]}
mapping json: {"name":"tom","title":"engineer","roles":["designer","developer"]}
mapping json: {"name":"jack","title":"cto","roles":["designer","manager"]}
+----+--------------------+--------+
|name| roles| title|
+----+--------------------+--------+
| tom|[designer, develo...|engineer|
|jack| [designer, manager]| cto|
+----+--------------------+--------+
It seems that the "map" function is being executed twice even though I'm only performing one action. I thought that Spark would lazily build an execution plan, then execute it when needed, but this makes it seem that in order to read data as JSON and do anything with it, the plan will have to be executed at least twice.
In this simple case it doesn't matter, but when the map function is long running, this becomes a big problem. Is this right, or am I missing something?
It happens because you don't provide schema for DataFrameReader
. As a result Spark has to eagerly scan data set to infer output schema.
Since mappedRdd
is not cached it will be evaluated twice:
data.show
If you want to prevent you should provide schema for reader (Scala syntax):
val schema: org.apache.spark.sql.types.StructType = ???
spark.read.schema(schema).json(mappedRdd)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With