Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use schema to convert AVRO messages with Spark to DataFrame

Is there a way to use a schema to convert avro messages from kafka with spark to dataframe? The schema file for user records:

{
  "fields": [
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" }
  ],
  "name": "user",
  "type": "record"
}

And code snippets from SqlNetworkWordCount example and Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages to read in messages.

object Injection {
  val parser = new Schema.Parser()
  val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
  val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}

...

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

  df.show()
})

case class User(firstName: String, lastName: String)

Somehow I can't find another way than using a case class to convert AVRO messages to DataFrame. Is there a possibility to use the schema instead? I'm using Spark 1.6.2 and Kafka 0.10.

The complete code, in case you're interested.

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

object ReadMessagesFromKafka {
  object Injection {
    val parser = new Schema.Parser()
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
  }

  def main(args: Array[String]) {
    val brokers = "127.0.0.1:9092"
    val topics = "test"

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
  ssc, kafkaParams, topicsSet)

    messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

      df.show()
    })

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}
like image 607
Sascha Vetter Avatar asked Aug 20 '16 01:08

Sascha Vetter


People also ask

How do I read an Avro file in pandas?

Going from Avro to Pandas DataFrame is also a three-step process: Create a list to store the records — This list will store dictionary objects you can later convert to Pandas DataFrame. Read and parse the Avro file — Use fastavro. reader() to read the file and then iterate over the records.


1 Answers

OP probably resolved the issue but for future reference I solved this issue quite generally so thought it might be helpful to post here.

So generally speaking you should convert the Avro schema to a spark StructType and also convert the object you have in your RDD to Row[Any] and then use:

spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>

In order to convert the Avro schema I used spark-avro like so:

SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]

The convertion of the RDD was more tricky.. if your schema is simple you can probably just do a simple map.. something like this:

rdd.map(obj=>{
    val seq = (obj.getName(),obj.getAge()
    Row.fromSeq(seq))
    })

In this example the object has 2 fields name and age.

The important thing is to make sure the elements in the Row will match the order and types of the fields in the StructType from before.

In my perticular case I had a much more complex object which I wanted to handle generically to support future schema changes so my code was much more complex.

the method suggested by OP should also work on some casese but will be hard to imply on complex objects (not primitive or case-class)

another tip is that if you have a class within a class you should convert that class to a Row so that the wrapping class will be converted to something like:

Row(Any,Any,Any,Row,...)

you can also look at the spark-avro project I mentioned earlier on how to convert objects to rows.. I used some of the logic there myself

If someone reading this needs further help ask me in the comments and I'll try to help

Similar problem is solved also here.

like image 135
Tal Joffe Avatar answered Sep 29 '22 16:09

Tal Joffe