I am reading a Kafka topic using Spark 2.1.1 (kafka 0.10+) and the payload is a JSON string. I'd like to parse the string with a schema and move forward with business logic.
Everyone seems to suggest that I should use from_json
to parse the JSON strings, however, it doesn't seem to compile for my situation. The error being
not found : value from_json
.select(from_json($"json", txnSchema) as "data")
When i tried the following lines into spark shell, it works just fine -
val df = stream
.select($"value" cast "string" as "json")
.select(from_json($"json", txnSchema) as "data")
.select("data.*")
Any idea, what could I be doing wrong in the code to see this piece working in shell but not in IDE/compile time?
Here's the code:
import org.apache.spark.sql._
object Kafka10Cons3 extends App {
val spark = SparkSession
.builder
.appName(Util.getProperty("AppName"))
.master(Util.getProperty("spark.master"))
.getOrCreate
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
.option("subscribe", src_topic)
.load
val txnSchema = Util.getTxnStructure
val df = stream
.select($"value" cast "string" as "json")
.select(from_json($"json", txnSchema) as "data")
.select("data.*")
}
you're probably just missing the relevant import - import org.apache.spark.sql.functions._
.
You have imported spark.implicits._
and org.apache.spark.sql._
, but none of these would import the individual function in functions
.
I was also importing
com.wizzardo.tools.json
which looks like it also has afrom_json
function, which must have been the one the compiler chose (since it was imported first?) and which was apparently incompatible with my version of spark
Make sure you are not importing the from_json
function from some other json library, as this library may be incompatible with the version of spark you are using.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With