Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert JSON objects to RDD

I don't know if this question is a repetition but somehow all the answers I came across don't seem to work for me (maybe I'm doing something wrong).

I have a class defined thus:

case class myRec(
                 time: String,
                 client_title: String,
                 made_on_behalf: Double,
                 country: String,
                 email_address: String,
                 phone: String)

and a sample Json file that contains records or objects in the form

[{...}{...}{...}...] 

i.e

[{"time": "2015-05-01 02:25:47",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Brussel",
"email_address": "[email protected]"},
{"time": "2015-05-01 04:15:03",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Bundesliga",
"email_address": "[email protected]"},
{"time": "2015-05-01 06:29:18",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Japan",
"email_address": "[email protected]"}...]

my build.sbt has libraryDependencies += "com.owlike" % "genson-scala_2.11" % "1.3" for scalaVersion := "2.11.7",

I have a scala function defined thus

//PS: Other imports already made
import com.owlike.genson.defaultGenson_

//PS: Spark context already defined
def prepData(infile:String):RDD[myRec] = {

val input = sc.textFile(infile)
//Read Json Data into my Record Case class
input.mapPartitions( records =>
  records.map( record => fromJson[myRec](record))
)}

And I'm calling the function

prepData("file://path/to/abc.json")

Is there any way of doing this or is there any other Json library I can use to convert to RDD

I also tried this too and both don't seem to work

Using ScalaObjectMapper

PS: I don't want to go through spark SQL to process the json file

Thanks!

like image 786
Jyd Avatar asked Sep 26 '22 20:09

Jyd


2 Answers

Jyd, not using Spark SQL for JSON is an interesting choice, but its very much doable. There is an example of how to do this is in the Learning Spark book's examples (disclaimer I am one of the co-authors so a little biased). The examples are on github https://github.com/databricks/learning-spark, but here is the relevant code snippet:

case class Person(name: String, lovesPandas: Boolean) // Note: must be a top level class

object BasicParseJsonWithJackson {

  def main(args: Array[String]) {
    if (args.length < 3) {
      println("Usage: [sparkmaster] [inputfile] [outputfile]")
      exit(1)
      }
    val master = args(0)
    val inputFile = args(1)
    val outputFile = args(2)
    val sc = new SparkContext(master, "BasicParseJsonWithJackson", System.getenv("SPARK_HOME"))
    val input = sc.textFile(inputFile)

    // Parse it into a specific case class. We use mapPartitions beacuse:
    // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
    //     on the driver and have to send data back to the driver to go through the singleton object.
    //     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
    // (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
    //     partition with mapPartitions. Solves serialization and object creation performance hit.
    val result = input.mapPartitions(records => {
        // mapper object created on each executor node
        val mapper = new ObjectMapper with ScalaObjectMapper
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.registerModule(DefaultScalaModule)
        // We use flatMap to handle errors
        // by returning an empty list (None) if we encounter an issue and a
        // list with one element if everything is ok (Some(_)).
        records.flatMap(record => {
          try {
            Some(mapper.readValue(record, classOf[Person]))
          } catch {
            case e: Exception => None
          }
        })
    }, true)
    result.filter(_.lovesPandas).mapPartitions(records => {
      val mapper = new ObjectMapper with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)
      records.map(mapper.writeValueAsString(_))
    })
      .saveAsTextFile(outputFile)
    }
}

Note this uses Jackson (specifically "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.3" & "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.3.3" dependencies).

I just noticed that your question had some sample input and as @zero323 pointed out line by line parsing isn't going to work. Instead you would do:

    val input = sc.wholeTextFiles(inputFile).map(_._2)

    // Parse it into a specific case class. We use mapPartitions beacuse:
    // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
    //     on the driver and have to send data back to the driver to go through the singleton object.
    //     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
    // (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
    //     partition with mapPartitions. Solves serialization and object creation performance hit.
    val result = input.mapPartitions(records => {
        // mapper object created on each executor node
        val mapper = new ObjectMapper with ScalaObjectMapper
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.registerModule(DefaultScalaModule)
        // We use flatMap to handle errors
        // by returning an empty list (None) if we encounter an issue and a
        // list with one element if everything is ok (List(_)).
        records.flatMap(record => {
          try {
            mapper.readValue(record, classOf[List[Person]])
          } catch {
            case e: Exception => None
          }
        })
    })
like image 96
Holden Avatar answered Sep 30 '22 08:09

Holden


Just for fun you can try to split individual documents using specific delimiter. While it won't work on complex nested documents it should handle example input without using wholeTextFiles:

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.conf.Configuration
import net.liftweb.json.{parse, JObject, JField, JString, JInt}

case class MyRec(
  time: String,
  client_title: String,
  made_on_behalf: Double,
  country: String,
  email_address: String)

@transient val conf = new Configuration
conf.set("textinputformat.record.delimiter", "},\n{")

def clean(s: String) = {
   val p = "(?s)\\[?\\{?(.*?)\\}?\\]?".r
   s match {
     case p(x) => Some(s"{$x}")
     case _ => None
   }
}

def toRec(os: Option[String]) = {
  os match {
    case Some(s) => 
      for {
        JObject(o) <- parse(s);
        JField("time", JString(time)) <- o;
        JField("client_title", JString(client_title)) <- o;
        JField("made_on_behalf", JInt(made_on_behalf)) <- o
        JField("country", JString(country)) <- o;
        JField("email_address", JString(email)) <- o
      } yield MyRec(time, client_title, made_on_behalf.toDouble, country, email)
    case _ => Nil
  }
}

val records = sc.newAPIHadoopFile("some.json",
    classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
      .map{case (_, txt) => clean(txt.toString)}
      .flatMap(toRec)
like image 22
zero323 Avatar answered Sep 30 '22 06:09

zero323