Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read/write protocol buffer messages with Apache Spark?

I want to Read/write protocol buffer messages from/to HDFS with Apache Spark. I found these suggested ways:

1) Convert protobuf messsages to Json with Google's Gson Library and then read/write them by SparkSql. This solution is explained in this link But I think doing that (convert to json) is an extra task.

2) Convert to Parquet file. There are parquet-mr and sparksql-protobuf github projects for this way but I don't want parquet file because I always work with all columns (not some columns) and in this way Parquet Format does not give me any gain (at least I think).

3) ScalaPB. May be it's what I am looking for. but in scala language that I don't know anything about it. I am looking for a java-based solution. This youtube video introduce scalaPB and explain how to use it (for scala developers).

4) Through the use of the sequence file and this is what I looking for, but found nothing about that. So, my question is: How can I write protobuf messages to sequence file on HDFS and from that? Any other suggestion will be useful.

5) Through twitter's Elephant-bird Library.

like image 863
DAVID_ROA Avatar asked Aug 30 '18 11:08

DAVID_ROA


People also ask

How do I read Protobuf files in Pyspark?

pb file into a pyspark dataframe for distributed reading. You can not do this natively with Spark as it does not provide a reader for the pb format. You can read it as a text file into RDD and use the existing libraries as the one pointed by @MarcGravell to convert to JSON RDD so that you can create your Dataframe.

What is Protobuf message?

Protocol Buffers (Protobuf) is a free and open-source cross-platform data format used to serialize structured data. It is useful in developing programs to communicate with each other over a network or for storing data.

What is ScalaPB?

ScalaPB is a protocol buffer compiler ( protoc ) plugin for Scala. It will generate Scala case classes, parsers and serializers for your protocol buffers. ScalaPB generates case classes that can co-exist in the same project alongside the Java-generated code for ProtocolBuffer.


1 Answers

Though a bit hidden between the points, you seem to be asking how to write to a sequencefile in spark. I found an example here.

// Importing org.apache.hadoop.io package
import org.apache.hadoop.io._

// As we need data in sequence file format to read. Let us see how to write first
// Reading data from text file format
val dataRDD = sc.textFile("/public/retail_db/orders")

// Using null as key and value will be of type Text while saving in sequence file format
// By Int and String, we do not need to convert types into IntWritable and Text
// But for others we need to convert to writable object
// For example, if the key/value is of type Long, we might have to 
// type cast by saying new LongWritable(object)
dataRDD.
  map(x => (NullWritable.get(), x)).
  saveAsSequenceFile("/user/`whoami`/orders_seq")
// Make sure to replace `whoami` with the appropriate OS user id

// Saving in sequence file with key of type Int and value of type String
dataRDD.
  map(x => (x.split(",")(0).toInt, x.split(",")(1))).
  saveAsSequenceFile("/user/`whoami`/orders_seq")
// Make sure to replace `whoami` with the appropriate OS user id
like image 61
Dennis Jaheruddin Avatar answered Oct 07 '22 14:10

Dennis Jaheruddin