Is there a way to use a schema to convert avro messages from kafka with spark to dataframe? The schema file for user records:
{
"fields": [
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
],
"name": "user",
"type": "record"
}
And code snippets from SqlNetworkWordCount example and Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages to read in messages.
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
...
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
case class User(firstName: String, lastName: String)
Somehow I can't find another way than using a case class to convert AVRO messages to DataFrame. Is there a possibility to use the schema instead? I'm using Spark 1.6.2
and Kafka 0.10
.
The complete code, in case you're interested.
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
object ReadMessagesFromKafka {
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
def main(args: Array[String]) {
val brokers = "127.0.0.1:9092"
val topics = "test"
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicsSet)
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
Going from Avro to Pandas DataFrame is also a three-step process: Create a list to store the records — This list will store dictionary objects you can later convert to Pandas DataFrame. Read and parse the Avro file — Use fastavro. reader() to read the file and then iterate over the records.
OP probably resolved the issue but for future reference I solved this issue quite generally so thought it might be helpful to post here.
So generally speaking you should convert the Avro schema to a spark StructType and also convert the object you have in your RDD to Row[Any] and then use:
spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>
In order to convert the Avro schema I used spark-avro like so:
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
The convertion of the RDD was more tricky.. if your schema is simple you can probably just do a simple map.. something like this:
rdd.map(obj=>{
val seq = (obj.getName(),obj.getAge()
Row.fromSeq(seq))
})
In this example the object has 2 fields name and age.
The important thing is to make sure the elements in the Row will match the order and types of the fields in the StructType from before.
In my perticular case I had a much more complex object which I wanted to handle generically to support future schema changes so my code was much more complex.
the method suggested by OP should also work on some casese but will be hard to imply on complex objects (not primitive or case-class)
another tip is that if you have a class within a class you should convert that class to a Row so that the wrapping class will be converted to something like:
Row(Any,Any,Any,Row,...)
you can also look at the spark-avro project I mentioned earlier on how to convert objects to rows.. I used some of the logic there myself
If someone reading this needs further help ask me in the comments and I'll try to help
Similar problem is solved also here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With