Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I flatMap a row of arrays into multiple rows in Apache spark using Java?

I have a json data file which contain one property which is string arrays of "tags". The Apache Spark DataFrame schema look like below:

root
 |-- acceptedAnswerId: long (nullable = true)
 |-- answerCount: long (nullable = true)
 |-- body: string (nullable = true)
 |-- score: long (nullable = true)
 |-- <b>tags</b>: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- viewCount: long (nullable = true)

I'd like to explode each row out into several rows in Java. I can find similar answer using Scala but not able to convert the solution in Java. Any suggestion?

The "tags" property in JSON look like:

"tags":["c#",".net","compression","decompression"]
like image 396
ErhWen Kuo Avatar asked Dec 28 '25 16:12

ErhWen Kuo


2 Answers

Just to make the solution more visualize, the sample json data look like below:

{"id":4,"score":358,"viewCount":24247,"answerCount":13,"commentCount":1,"favoriteCount":28,"tags":["c#","winforms","type-conversion","opacity"]}

Below is the Java code snippet to read the json data as "DataFrame" object:

JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

String jsonData = "{\"id\":4,\"score\":358,\"viewCount\":24247,\"tags\":[\"c#\",\"winforms\",\"type-conversion\",\"opacity\"]}";    
    
List dataSet = Arrays.asList(jsonData);

JavaRDD distData = sc.parallelize(dataSet);

DataFrame stackoverflow_Posts = sqlContext.read().json(distData);

stackoverflow_Posts.printSchema(); //let's print out the DataFrame schema (Output#1)

stackoverflow_Posts.show(); //let's show the DataFrame content (Ouput#2)

The schema: Output#1 looks like below:

root
 |-- id: long (nullable = true)
 |-- score: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- viewCount: long (nullable = true)

The data: Output#2 looks like below:

+---+-----+--------------------+---------+
| id|score|                tags|viewCount|
+---+-----+--------------------+---------+
|  4|  358|[c#, winforms, ty...|    24247|
+---+-----+--------------------+---------+

Base on zero323's information, I continue to process:

DataFrame expanded = stackoverflow_Posts.withColumn("tag", org.apache.spark.sql.functions.explode(stackoverflow_Posts.col("tags")));
        
expanded.printSchema(); //let's print out the DataFrame schema again (Output#3)
        
expanded.show(); //let's show the DataFrame content (Output#4)

The schema: Output#3 looks like below:

root
 |-- id: long (nullable = true)
 |-- score: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- viewCount: long (nullable = true)
 |-- tag: string (nullable = true)

And the data after explode : Output#4

+---+-----+--------------------+---------+---------------+
| id|score|                tags|viewCount|            tag|
+---+-----+--------------------+---------+---------------+
|  4|  358|[c#, winforms, ty...|    24247|             c#|
|  4|  358|[c#, winforms, ty...|    24247|       winforms|
|  4|  358|[c#, winforms, ty...|    24247|type-conversion|
|  4|  358|[c#, winforms, ty...|    24247|        opacity|
+---+-----+--------------------+---------+---------------+

The result looks very similar like using SQL to join two table.

like image 102
ErhWen Kuo Avatar answered Dec 30 '25 06:12

ErhWen Kuo


You can simply use explode function.

DataFrame df = ...
DataFrame expanded = df.withColumn(
  "tag", org.apache.spark.sql.functions.explode(df.col("tags"))).drop("tags");
like image 35
zero323 Avatar answered Dec 30 '25 05:12

zero323