My understanding of Spark's fileStream()
method is that it takes three types as parameters: Key
, Value
, and Format
. In case of text files, the appropriate types are: LongWritable
, Text
, and TextInputFormat
.
First, I want to understand the nature of these types. Intuitively, I would guess that the Key
in this case is the line number of the file, and the Value
is the text on that line. So, in the following example of a text file:
Hello
Test
Another Test
The first row of the DStream
would have a Key
of 1
(0
?) and a Value
of Hello
.
Is this correct?
Second part of my question: I looked at the decompiled implementation of ParquetInputFormat
and I noticed something curious:
public class ParquetInputFormat<T>
extends FileInputFormat<Void, T> {
//...
public class TextInputFormat
extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
//...
TextInputFormat
extends FileInputFormat
of types LongWritable
and Text
, whereas ParquetInputFormat
extends the same class of types Void
and T
.
Does this mean that I must create a Value
class to hold an entire row of my parquet data, and then pass the types <Void, MyClass, ParquetInputFormat<MyClass>>
to ssc.fileStream()
?
If so, how should I implement MyClass
?
EDIT 1: I have noticed a readSupportClass
which is to be passed to ParquetInputFormat
objects. What kind of class is this, and how is it used to parse the parquet file? Is there some documentation that covers this?
EDIT 2: As far as I can tell, this is impossible. If anybody knows how to stream in parquet files to Spark then please feel free to share...
My sample to read parquet files in Spark Streaming is below.
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)
val lines = stream.map(row => {
println("row:" + row.toString())
row
})
Some points are ...
I referred to source codes below for creating sample.
And I also could not find good examples.
I would like to wait better one.
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
You can access the parquet by adding some parquet
specific hadoop
settings :
val ssc = new StreamingContext(conf, Seconds(5))
var schema =StructType(Seq(
StructField("a", StringType, nullable = false),
........
))
val schemaJson=schema.json
val fileDir="/tmp/fileDir"
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport") ssc.sparkContext.hadoopConfiguration.set("org.apache.spark.sql.parquet.row.requested_schema", schemaJson)
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
val streamRdd = ssc.fileStream[Void, UnsafeRow, ParquetInputFormat[UnsafeRow]](fileDir,(t: org.apache.hadoop.fs.Path) => true, false)
streamRdd.count().print()
ssc.start()
ssc.awaitTermination()
This code was prepared with Spark 2.1.0
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With