I have a Cassandra table that for simplicity looks something like:
key: text jsonData: text blobData: blob
I can create a basic data frame for this using spark and the spark-cassandra-connector using:
val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load()
I'm struggling though to expand the JSON data into its underlying structure. I ultimately want to be able to filter based on the attributes within the json string and return the blob data. Something like jsonData.foo = "bar" and return blobData. Is this currently possible?
You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. using the read. json() function, which loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file that is offered as a json file is not a typical JSON file.
Spark >= 2.4
If needed, schema can be determined using schema_of_json
function (please note that this assumes that an arbitrary row is a valid representative of the schema).
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json} import collection.JavaConverters._ val schema = schema_of_json(lit(df.select($"jsonData").as[String].first)) df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))
Spark >= 2.1
You can use from_json
function:
import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("k", StringType, true), StructField("v", DoubleType, true) )) df.withColumn("jsonData", from_json($"jsonData", schema))
Spark >= 1.6
You can use get_json_object
which takes a column and a path:
import org.apache.spark.sql.functions.get_json_object val exprs = Seq("k", "v").map( c => get_json_object($"jsonData", s"$$.$c").alias(c)) df.select($"*" +: exprs: _*)
and extracts fields to individual strings which can be further casted to expected types.
The path
argument is expressed using dot syntax, with leading $.
denoting document root (since the code above uses string interpolation $
has to be escaped, hence $$.
).
Spark <= 1.5:
Is this currently possible?
As far as I know it is not directly possible. You can try something similar to this:
val df = sc.parallelize(Seq( ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"), ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") )).toDF("key", "jsonData", "blobData")
I assume that blob
field cannot be represented in JSON. Otherwise you cab omit splitting and joining:
import org.apache.spark.sql.Row val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") val jsons = sqlContext.read.json(df.drop("blobData").map{ case Row(key: String, json: String) => s"""{"key": "$key", "jsonData": $json}""" }) val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") parsed.printSchema // root // |-- jsonData: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: double (nullable = true) // |-- key: long (nullable = true) // |-- blobData: string (nullable = true)
An alternative (cheaper, although more complex) approach is to use an UDF to parse JSON and output a struct
or map
column. For example something like this:
import net.liftweb.json.parse case class KV(k: String, v: Int) val parseJson = udf((s: String) => { implicit val formats = net.liftweb.json.DefaultFormats parse(s).extract[KV] }) val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) parsed.show // +---+--------------------+------------------+----------+ // |key| jsonData| blobData|parsedJSON| // +---+--------------------+------------------+----------+ // | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]| // | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]| // +---+--------------------+------------------+----------+ parsed.printSchema // root // |-- key: string (nullable = true) // |-- jsonData: string (nullable = true) // |-- blobData: string (nullable = true) // |-- parsedJSON: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: integer (nullable = false)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With