Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark JSON text field to RDD

I've got a cassandra table with a field of type text named snapshot containing JSON objects:

[identifier, timestamp, snapshot]

I understood that to be able to do transformations on that field with Spark, I need to convert that field of that RDD to another RDD to make transformations on the JSON schema.

Is that correct? How should I proceed to to that?

Edit: For now I managed to create an RDD from a single text field:

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val first = snapshots.first()
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3)))
firstJson.printSchema()

Which shows me the JSON schema. Good!

How do I proceed to tell Spark that this schema should be applied on all rows of the table Snapshots, to get an RDD on that snapshot field from each row?

like image 346
galex Avatar asked May 04 '15 15:05

galex


People also ask

What does explode () do in a JSON field?

The explode function explodes the dataframe into multiple rows.

How do I read a JSON file in Spark?

Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. using the read. json() function, which loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file that is offered as a json file is not a typical JSON file.


1 Answers

Almost there, you just want to pass your an RDD[String] with your json into the jsonRDD method

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) 
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly
jsonSchemaRDD.registerTempTable("testjson")
sqlContext.sql("SELECT * FROM testjson where .... ").collect 

A quick example

val stringRDD = sc.parallelize(Seq(""" 
  { "isActive": false,
    "balance": "$1,431.73",
    "picture": "http://placehold.it/32x32",
    "age": 35,
    "eyeColor": "blue"
  }""",
   """{
    "isActive": true,
    "balance": "$2,515.60",
    "picture": "http://placehold.it/32x32",
    "age": 34,
    "eyeColor": "blue"
  }""", 
  """{
    "isActive": false,
    "balance": "$3,765.29",
    "picture": "http://placehold.it/32x32",
    "age": 26,
    "eyeColor": "blue"
  }""")
)
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson")
csc.sql("SELECT age from testjson").collect
//res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26])
like image 145
RussS Avatar answered Oct 06 '22 10:10

RussS