Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I load Avros in Spark using the schema on-board the Avro file(s)?

I am running CDH 4.4 with Spark 0.9.0 from a Cloudera parcel.

I have a bunch of Avro files that were created via Pig's AvroStorage UDF. I want to load these files in Spark, using a generic record or the schema onboard the Avro files. So far I've tried this:

import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.mapred.FsInput

val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
val inURI = new URI(input)
val inPath = new Path(inURI)

val fsInput = new FsInput(inPath, sc.hadoopConfiguration)
val reader =  new GenericDatumReader[GenericRecord]
val dataFileReader = DataFileReader.openReader(fsInput, reader)
val schemaString = dataFileReader.getSchema

val buf = scala.collection.mutable.ListBuffer.empty[GenericRecord]
while(dataFileReader.hasNext)  {
  buf += dataFileReader.next
}
sc.parallelize(buf)

This works for one file, but it can't scale - I am loading all the data into local RAM and then distributing it across the spark nodes from there.

like image 920
rjurney Avatar asked May 29 '14 23:05

rjurney


People also ask

How do I load an Avro file into spark?

Since spark-avro module is external, there is no . avro API in DataFrameReader or DataFrameWriter . To load/save data in Avro format, you need to specify the data source option format as avro (or org. apache.

How do I get schema from Avro file in Pyspark?

Using Avro Schemaavsc file and provide this file using option() while reading an Avro file. This schema provides the structure of the Avro file with field names and it's data types. Alternatively, we can also specify the StructType using the schema method.


2 Answers

To answer my own question:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import java.io.BufferedInputStream
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.FsInput
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.hadoop.mapred.JobConf
import java.io.File
import java.net.URI

// spark-shell -usejavacp -classpath "*.jar"

val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"

val jobConf= new JobConf(sc.hadoopConfiguration)
val rdd = sc.hadoopFile(
  input,
  classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
  classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
  classOf[org.apache.hadoop.io.NullWritable],
  10)
val f1 = rdd.first
val a = f1._1.datum
a.get("rawLog") // Access avro fields
like image 176
rjurney Avatar answered Sep 18 '22 15:09

rjurney


This works for me:

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable

...
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
like image 33
dexter Avatar answered Sep 19 '22 15:09

dexter