Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Structured Streaming checkpoint

I am trying to do structured streaming from Kafka. I am planning to store checkpoints in HDFS. I read a Cloudera blog recommending not to store checkpoints in HDFS for Spark streaming. Is it same issue for structure streaming checkpoints. https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/.

In structured streaming, If my spark program is down for certain time, how do I get latest offset from checkpoint directory and load data after that offset. I am storing checkpoints in a directory as shown below.

 df.writeStream\
        .format("text")\
        .option("path", '\files') \
        .option("checkpointLocation", 'checkpoints\chkpt') \
        .start()

Update:

This is my Structured streaming program reads a Kafka message, decompresses and writes to HDFS.

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KafkaServer) \
        .option("subscribe", KafkaTopics) \
        .option("failOnDataLoss", "false")\
         .load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()

decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream

query = decomp.writeStream\
    .format("text")\
    .option("path", \Data_directory_inHDFS) \
    .option("checkpointLocation", \pathinDHFS\) \
    .start()

query.awaitTermination()
like image 376
ranjith reddy Avatar asked Jan 04 '23 08:01

ranjith reddy


1 Answers

Storing Checkpoint on longterm storage(HDFS, AWS S3,etc.) are most preferred. I would Like to add one point here that the property "failOnDataLoss" should not be set to false as it is not best practice. Data loss is something no one would like to afford. Rest you are on the right Path.

like image 102
Naman Agarwal Avatar answered Jan 12 '23 01:01

Naman Agarwal