I don't know if this question is a repetition but somehow all the answers I came across don't seem to work for me (maybe I'm doing something wrong).
I have a class defined thus:
case class myRec(
time: String,
client_title: String,
made_on_behalf: Double,
country: String,
email_address: String,
phone: String)
and a sample Json file that contains records or objects in the form
[{...}{...}{...}...]
i.e
[{"time": "2015-05-01 02:25:47",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Brussel",
"email_address": "[email protected]"},
{"time": "2015-05-01 04:15:03",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Bundesliga",
"email_address": "[email protected]"},
{"time": "2015-05-01 06:29:18",
"client_title": "Mr.",
"made_on_behalf": 0,
"country": "Japan",
"email_address": "[email protected]"}...]
my build.sbt
has libraryDependencies += "com.owlike" % "genson-scala_2.11" % "1.3"
for scalaVersion := "2.11.7"
,
I have a scala function defined thus
//PS: Other imports already made
import com.owlike.genson.defaultGenson_
//PS: Spark context already defined
def prepData(infile:String):RDD[myRec] = {
val input = sc.textFile(infile)
//Read Json Data into my Record Case class
input.mapPartitions( records =>
records.map( record => fromJson[myRec](record))
)}
And I'm calling the function
prepData("file://path/to/abc.json")
Is there any way of doing this or is there any other Json library I can use to convert to RDD
I also tried this too and both don't seem to work
Using ScalaObjectMapper
PS: I don't want to go through spark SQL
to process the json file
Thanks!
Jyd, not using Spark SQL for JSON is an interesting choice, but its very much doable. There is an example of how to do this is in the Learning Spark book's examples (disclaimer I am one of the co-authors so a little biased). The examples are on github https://github.com/databricks/learning-spark, but here is the relevant code snippet:
case class Person(name: String, lovesPandas: Boolean) // Note: must be a top level class
object BasicParseJsonWithJackson {
def main(args: Array[String]) {
if (args.length < 3) {
println("Usage: [sparkmaster] [inputfile] [outputfile]")
exit(1)
}
val master = args(0)
val inputFile = args(1)
val outputFile = args(2)
val sc = new SparkContext(master, "BasicParseJsonWithJackson", System.getenv("SPARK_HOME"))
val input = sc.textFile(inputFile)
// Parse it into a specific case class. We use mapPartitions beacuse:
// (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
// on the driver and have to send data back to the driver to go through the singleton object.
// Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
// (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
// partition with mapPartitions. Solves serialization and object creation performance hit.
val result = input.mapPartitions(records => {
// mapper object created on each executor node
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
// We use flatMap to handle errors
// by returning an empty list (None) if we encounter an issue and a
// list with one element if everything is ok (Some(_)).
records.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Person]))
} catch {
case e: Exception => None
}
})
}, true)
result.filter(_.lovesPandas).mapPartitions(records => {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
records.map(mapper.writeValueAsString(_))
})
.saveAsTextFile(outputFile)
}
}
Note this uses Jackson (specifically "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.3"
& "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.3.3"
dependencies).
I just noticed that your question had some sample input and as @zero323 pointed out line by line parsing isn't going to work. Instead you would do:
val input = sc.wholeTextFiles(inputFile).map(_._2)
// Parse it into a specific case class. We use mapPartitions beacuse:
// (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
// on the driver and have to send data back to the driver to go through the singleton object.
// Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
// (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
// partition with mapPartitions. Solves serialization and object creation performance hit.
val result = input.mapPartitions(records => {
// mapper object created on each executor node
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
// We use flatMap to handle errors
// by returning an empty list (None) if we encounter an issue and a
// list with one element if everything is ok (List(_)).
records.flatMap(record => {
try {
mapper.readValue(record, classOf[List[Person]])
} catch {
case e: Exception => None
}
})
})
Just for fun you can try to split individual documents using specific delimiter. While it won't work on complex nested documents it should handle example input without using wholeTextFiles
:
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.conf.Configuration
import net.liftweb.json.{parse, JObject, JField, JString, JInt}
case class MyRec(
time: String,
client_title: String,
made_on_behalf: Double,
country: String,
email_address: String)
@transient val conf = new Configuration
conf.set("textinputformat.record.delimiter", "},\n{")
def clean(s: String) = {
val p = "(?s)\\[?\\{?(.*?)\\}?\\]?".r
s match {
case p(x) => Some(s"{$x}")
case _ => None
}
}
def toRec(os: Option[String]) = {
os match {
case Some(s) =>
for {
JObject(o) <- parse(s);
JField("time", JString(time)) <- o;
JField("client_title", JString(client_title)) <- o;
JField("made_on_behalf", JInt(made_on_behalf)) <- o
JField("country", JString(country)) <- o;
JField("email_address", JString(email)) <- o
} yield MyRec(time, client_title, made_on_behalf.toDouble, country, email)
case _ => Nil
}
}
val records = sc.newAPIHadoopFile("some.json",
classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
.map{case (_, txt) => clean(txt.toString)}
.flatMap(toRec)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With