I am trying to use SQL on a spark data frame. But the data frame has 1 value has string (which is JSON like structure) :
I saved my data frame to temp table : TestTable
When I did desc :
col_name data_type
requestId string
name string
features string
But features values is a json :
{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}
I just want to query on TestTable where totalSpent > 10. Can some tell me how do I do this ?
My JSON file looks like :
{
"requestId": 232323,
"name": "ravi",
"features": "{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"
}
features is a string. I only need totalSpent in that. I tried with :
val features = StructType(
Array(StructField("totalSpent",LongType,true),
StructField("movies",LongType,true)
))
val schema = StructType(Array(
StructField("requestId",StringType,true),
StructField("name",StringType,true),
StructField("features",features,true),
)
)
val records = sqlContext.read.schema(schema).json(filePath)
Since each request has one JSON string of features. But this gives me error.
When I tried with
val records = sqlContext.jsonFile(filePath)
records.printSchema
shows me :
root
|-- requestId: string (nullable = true)
|-- features: string (nullable = true)
|-- name: string (nullable = true)
Can I use parallelize inside StructField while creating schema ? I tried with :
I first tried with :
val customer = StructField("features",StringType,true)
val events = sc.parallelize(customer :: Nil)
val schema = StructType(Array(
StructField("requestId",StringType,true),
StructField("name", StructType(events, true),true),
StructField("features",features,true),
)
)
This gives me error as well. Also tried :
import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})
val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show
This gives me :
<console>:78: error: object liftweb is not a member of package net
import net.liftweb.json.parse
Tried :
I tried with :
val parseJson = udf((s: String) => {
sqlContext.read.json(s)
})
val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show
But again error.
Tried :
import org.json4s._
import org.json4s.jackson.JsonMethods._
val parseJson = udf((s: String) => {
parse(s)
})
val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show
But it gives me :
java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
This gives me proper schema (based on the answer given by zero323 :
val extractFeatures = udf((features: String) => Try {
implicit val formats = DefaultFormats
parse(features).extract[Features]
}.toOption)
val parsed = records.withColumn("features", extractFeatures($"features"))
parsed.printSchema
But When I query :
val value = parsed.filter($"requestId" === "232323" ).select($"features.totalSpent")
value.show gives null
.
When you return data from UDF it has to be representable as SQL types and JSON AST is not. One approach is to create a case class similar to this one:
case class Features(
places: Integer,
movies: Integer,
totalPlacesVisited: Integer,
totalSpent: Integer,
SpentMap: Map[String, Integer],
benefits: Map[String, Integer]
)
and use it to extract
objects:
val df = Seq((
232323, "ravi",
"""{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"""
)).toDF("requestId", "name", "features")
val extractFeatures = udf((features: String) =>
parse(features).extract[Features])
val parsed = df.withColumn("features", extractFeatures($"features"))
parsed.show(false)
// +---------+----+-----------------------------------------------------------------+
// |requestId|name|features |
// +---------+----+-----------------------------------------------------------------+
// |232323 |ravi|[11,2,0,13,Map(Movie -> 2, Park Visit -> 11),Map(freeTime -> 13)]|
// +---------+----+-----------------------------------------------------------------+
parsed.printSchema
// root
// |-- requestId: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- features: struct (nullable = true)
// | |-- places: integer (nullable = true)
// | |-- movies: integer (nullable = true)
// | |-- totalPlacesVisited: integer (nullable = true)
// | |-- totalSpent: integer (nullable = true)
// | |-- SpentMap: map (nullable = true)
// | | |-- key: string
// | | |-- value: integer (valueContainsNull = true)
// | |-- benefits: map (nullable = true)
// | | |-- key: string
// | | |-- value: integer (valueContainsNull = true)
Depending on the other records and expected usage you should adjust representation and add relevant error handling logic.
You can also use DSL to access individual fields as strings:
val getMovieSpent = udf((s: String) =>
compact(render(parse(s) \\ "SpentMap" \\ "Movie")))
df.withColumn("movie_spent", getMovieSpent($"features").cast("bigint")).show
// +---------+----+--------------------+-----------+
// |requestId|name| features|movie_spent|
// +---------+----+--------------------+-----------+
// | 232323|ravi|{"places":11,"mov...| 2|
// +---------+----+--------------------+-----------+
For alternative approaches see How to query JSON data column using Spark DataFrames?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With