Spark 2.1 and Scala 2.11 here. I have a large Map[String,Date]
that has 10K key/value pairs in it. I also have 10K JSON files living on a file system that is accessible to Spark:
mnt/
some/
path/
data00001.json
data00002.json
data00003.json
...
data10000.json
Each KV pair in the map corresponds to its respective JSON file (hence the 1st map KV pair corresponds to data00001.json
, etc.)
I want to read all these JSON files into 1 large Spark Dataset
and, while I'm at it, add two new columns to this dataset (that don't exist in the JSON files). Each map key will be the value for the first new column, and each key's value will be the value for the second new column:
val objectSummaries = getScalaList()
val dataFiles = objectSummaries.filter { _.getKey.endsWith("data.json") }
val dataDirectories = dataFiles.map(dataFile => {
val keyComponents = dataFile.getKey.split("/")
val parent = if (keyComponents.length > 1) keyComponents(keyComponents.length - 2) else "/"
(parent, dataFile.getLastModified)
})
// TODO: How to take each KV pair from dataDirectories above and store them as the values for the
// two new columns?
val allDataDataset = spark.read.json("mnt/some/path/*.json")
.withColumn("new_col_1", dataDirectories._1)
.withColumn("new_col_2", dataDirectories._2)
I've confirmed that Spark will honor the wildcard (mnt/some/path/*.json
) and read all the JSON files into a single Dataset when I remove the withColumn
methods and do a allData.show()
. So I'm all good there.
What I'm struggling with is: how do I add the two new columns and then pluck out all the key/value map elements correctly?
I think you should create your own datasource for this. This new datasource would know about your particular folder structure and content structure.
If I understood correctly you want to correlate a KV from map with dataframes from json files.
I'll try to simplify the problem to only 3 files and 3 key values all ordered.
val kvs = Map("a" -> 1, "b" -> 2, "c" -> 3)
val files = List("data0001.json", "data0002.json", "data0003.json")
Define a case class for handling more easy files, key, values
case class FileWithKV(fileName: String, key: String, value: Int)
Will zip the files and kvs
val filesWithKVs = files.zip(kvs)
.map(p => FileWithKV(p._1, p._2._1, p._2._2))
It will look like this
filesWithKVs: List[FileWithKV] = List(FileWithKV(data0001.json,a,1), FileWithKV(data0002.json,b,2), FileWithKV(data0003.json,c,3))
We start then with an initial dataframe, from the head of our collection and then will start folding left to construct the entire dataframe that will hold all the files, with all the columns dynamically generated from KV
val head = filesWithKVs.head
val initialDf = spark
.read.json(head.filename)
.withColumn(s"new_col_1", lit(head.key))
.withColumn(s"new_col_2", lit(head.value))
Now the folding part
val dfAll = filesWithKVs.tail.foldLeft(initialDf)((df, fileWithKV) => {
val newDf = spark
.read.json(fileWithKV.filename)
.withColumn(s"new_col_1", lit(fileWithKV.key))
.withColumn(s"new_col_2", lit(fileWithKV.value))
// union the dataframes to capture file by file, key value with key value
df.union(newDf)
})
The dataframe will look like this, assuming that in the json files will be a column named bar and a value foo, for each of the 3 json files
+---+----------+----------+
|bar|new_col_1 |new_col_2 |
+---+----------+----------+
|foo| a| 1|
|foo| b| 2|
|foo| c| 3|
+---+----------+----------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With