I use Spark 2.1.
I am trying to read records from Kafka using Spark Structured Streaming, deserialize them and apply aggregations afterwards.
I have the following code:
SparkSession spark = SparkSession
        .builder()
        .appName("Statistics")
        .getOrCreate();
Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaUri)
        .option("subscribe", "Statistics")
        .option("startingOffsets", "earliest")
        .load();
df.selectExpr("CAST(value AS STRING)")
What I want is to deserialize the value field into my object instead of casting as String.
I have a custom deserializer for this.
public StatisticsRecord deserialize(String s, byte[] bytes)
How can I do this in Java?
The only relevant link I have found is this https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html, but this is for Scala.
Define schema for your JSON messages.
StructType schema = DataTypes.createStructType(new StructField[] { 
                DataTypes.createStructField("Id", DataTypes.IntegerType, false),
                DataTypes.createStructField("Name", DataTypes.StringType, false),
                DataTypes.createStructField("DOB", DataTypes.DateType, false) });
Now read Messages like below. MessageData is JavaBean for your JSON message.
Dataset<MessageData> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaUri)
            .option("subscribe", "Statistics")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING) as message")
            .select(functions.from_json(functions.col("message"),schema).as("json"))
            .select("json.*")
            .as(Encoders.bean(MessageData.class));  
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With