Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading JSON files into Spark Dataset and adding columns from a separate Map

Spark 2.1 and Scala 2.11 here. I have a large Map[String,Date] that has 10K key/value pairs in it. I also have 10K JSON files living on a file system that is accessible to Spark:

mnt/
    some/
        path/
            data00001.json
            data00002.json
            data00003.json
            ...
            data10000.json

Each KV pair in the map corresponds to its respective JSON file (hence the 1st map KV pair corresponds to data00001.json, etc.)

I want to read all these JSON files into 1 large Spark Dataset and, while I'm at it, add two new columns to this dataset (that don't exist in the JSON files). Each map key will be the value for the first new column, and each key's value will be the value for the second new column:

val objectSummaries = getScalaList()
val dataFiles = objectSummaries.filter { _.getKey.endsWith("data.json") }
val dataDirectories = dataFiles.map(dataFile => {
  val keyComponents = dataFile.getKey.split("/")
  val parent = if (keyComponents.length > 1) keyComponents(keyComponents.length - 2) else "/"
  (parent, dataFile.getLastModified)
})

// TODO: How to take each KV pair from dataDirectories above and store them as the values for the
// two new columns?
val allDataDataset = spark.read.json("mnt/some/path/*.json")
  .withColumn("new_col_1", dataDirectories._1)
  .withColumn("new_col_2", dataDirectories._2)

I've confirmed that Spark will honor the wildcard (mnt/some/path/*.json) and read all the JSON files into a single Dataset when I remove the withColumn methods and do a allData.show(). So I'm all good there.

What I'm struggling with is: how do I add the two new columns and then pluck out all the key/value map elements correctly?

like image 204
smeeb Avatar asked Aug 01 '17 18:08

smeeb


2 Answers

I think you should create your own datasource for this. This new datasource would know about your particular folder structure and content structure.

like image 32
Michel Lemay Avatar answered Oct 19 '22 20:10

Michel Lemay


If I understood correctly you want to correlate a KV from map with dataframes from json files.

I'll try to simplify the problem to only 3 files and 3 key values all ordered.

val kvs = Map("a" -> 1, "b" -> 2, "c" -> 3)
val files = List("data0001.json", "data0002.json", "data0003.json")

Define a case class for handling more easy files, key, values

case class FileWithKV(fileName: String, key: String, value: Int)

Will zip the files and kvs

val filesWithKVs = files.zip(kvs)
  .map(p => FileWithKV(p._1, p._2._1, p._2._2))

It will look like this

filesWithKVs: List[FileWithKV] = List(FileWithKV(data0001.json,a,1), FileWithKV(data0002.json,b,2), FileWithKV(data0003.json,c,3))

We start then with an initial dataframe, from the head of our collection and then will start folding left to construct the entire dataframe that will hold all the files, with all the columns dynamically generated from KV

val head = filesWithKVs.head
val initialDf = spark
.read.json(head.filename)
.withColumn(s"new_col_1", lit(head.key)) 
.withColumn(s"new_col_2", lit(head.value))

Now the folding part

val dfAll = filesWithKVs.tail.foldLeft(initialDf)((df, fileWithKV) => {
    val newDf = spark
    .read.json(fileWithKV.filename)
    .withColumn(s"new_col_1", lit(fileWithKV.key)) 
    .withColumn(s"new_col_2", lit(fileWithKV.value))
    // union the dataframes to capture file by file, key value with key value
    df.union(newDf)
})

The dataframe will look like this, assuming that in the json files will be a column named bar and a value foo, for each of the 3 json files

+---+----------+----------+
|bar|new_col_1 |new_col_2 |
+---+----------+----------+
|foo|         a|         1|
|foo|         b|         2|
|foo|         c|         3|
+---+----------+----------+
like image 107
dumitru Avatar answered Oct 19 '22 22:10

dumitru