I have a list of Map in java, essentially representing rows.
List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);
Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);
dataList.add(row1);
dataList.add(row2);
I'm trying to create a Spark DataFrame from it.
I've tried to convert it into JavaRDD<Map<String, Object>> using
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
But I'm not sure how to go from here to Dataset<Row>. I've seen Scala examples but none in Java.
I also tried to convert the list to JSON string, and read the JSON string.
String jsonStr = mapper.writeValueAsString(dataList);
But seems like I will have to write it to a file to then read using
Dataset<Row> df = spark.read().json(pathToFile);
I would prefer to do it in-memory if possible rather than write to file and read from there.
SparkConf sparkConf = new SparkConf().setAppName("SparkTest").setMaster("local[*]")
.set("spark.sql.shuffle.partitions", "1");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate();
List<Map<String, Object>> dataList = new ArrayList<>();
Map<String, Object> row1 = new HashMap<>();
row1.put("fund", "f1");
row1.put("broker", "b1");
row1.put("qty", 100);
Map<String, Object> row2 = new HashMap<>();
row2.put("fund", "f2");
row2.put("broker", "b2");
row2.put("qty", 200);
dataList.add(row1);
dataList.add(row2);
ObjectMapper mapper = new ObjectMapper();
String jsonStr = mapper.writeValueAsString(dataList);
JavaRDD<Map<String,Object>> rows = sc.parallelize(dataList);
Dataset<Row> data = sparkSession.createDataFrame(rows, Map.class);
data.show();
You do not need to use RDDs at all. What you need to do is extract the desired schema from your list of maps, transform you list of maps into a list of rows and then use spark.createDataFrame.
In java, that's a bit painful, particularly when creating the Row objects, but here is how it could go:
List<String> cols = new ArrayList(dataList.get(0).keySet());
List<Row> rows = dataList
.stream()
.map(row -> cols.stream().map(c -> (Object) row.get(c).toString()))
.map(row -> row.collect(Collectors.toList()))
.map(row -> JavaConverters.asScalaBufferConverter(row).asScala().toSeq())
.map(Row$.MODULE$::fromSeq)
.collect(Collectors.toList());
StructType schema = new StructType(
cols.stream()
.map(c -> new StructField(c, DataTypes.StringType, true, new Metadata()))
.collect(Collectors.toList())
.toArray(new StructField[0])
);
Dataset<Row> result = spark.createDataFrame(rows, schema);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With