I am new to the BigData eco system and kind of getting started.
I have read several articles about reading a kafka topic using spark streaming but would like to know if it is possible to read from kafka using a spark job instead of streaming ? If yes, could you guys help me in pointing out to some articles or code snippets that can get me started.
My second part of the question is writing to hdfs in parquet format. Once i read from Kafka , i assume i will have an rdd. Convert this rdd into a dataframe and then write the dataframe as a parquet file. Is this the right approach.
Any help appreciated.
Thanks
For reading data from Kafka and writing it to HDFS, in Parquet format, using Spark Batch job instead of streaming, you can use Spark Structured Streaming.
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
It comes with Kafka as a built in Source, i.e., we can poll data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher.
For pulling the data from Kafka in batch mode, you can create a Dataset/DataFrame for a defined range of offsets.
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Each row in the source has the following schema:
| Column | Type |
|:-----------------|--------------:|
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
Now, to write Data to HDFS in parquet format, following code can be written:
df.write.parquet("hdfs://data.parquet")
For more information on Spark Structured Streaming + Kafka, please refer to following guide - Kafka Integration Guide
I hope it helps!
You already have a couple of good answers on the topic.
Just wanted to stress out - be careful to stream directly into a parquet table. Parquet's performance shines when parquet row group sizes are large enough (for simplicity, you can say file size should be in order of 64-256Mb for example), to take advantage of dictionary compression, bloom filters etc. (one parquet file can have multiple row chunks in it, and normally does have multiple row chunks in each file; although row chunks can't span multiple parquet files)
If you're streaming directly to a parquet table, then you'll end up very likely with a bunch of tiny parquet files (depending on mini-batch size of Spark Streaming, and volume of data). Querying such files can be very slow. Parquet may require reading all files' headers to reconcile schema for example and it's a big overhead. If this is the case, you will need to have a separate process that will, for example, as a workaround, read older files, and writes them "merged" (this wouldn't be a simple file-level merge, a process would actually need to read in all parquet data and spill out larger parquet files).
This workaround may kill the original purpose of data "streaming". You could look at other technologies here too - like Apache Kudu, Apache Kafka, Apache Druid, Kinesis etc that can work here better.
Update: since I posted this answer, there is now a new strong player here - Delta Lake. https://delta.io/ If you're used to parquet, you'll find Delta very attractive (actually, Delta is built on top of parquet layer + metadata). Delta Lake offers:
ACID transactions on Spark:
Use Kafka Streams. SparkStreaming is an misnomer (it's mini-batch under the hood, at least up to 2.2).
https://eng.verizondigitalmedia.com/2017/04/28/Kafka-to-Hdfs-ParquetSerializer/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With