I am trying to do structured streaming from Kafka. I am planning to store checkpoints in HDFS. I read a Cloudera blog recommending not to store checkpoints in HDFS for Spark streaming. Is it same issue for structure streaming checkpoints. https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/.
In structured streaming, If my spark program is down for certain time, how do I get latest offset from checkpoint directory and load data after that offset. I am storing checkpoints in a directory as shown below.
df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()
Update:
This is my Structured streaming program reads a Kafka message, decompresses and writes to HDFS.
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()
decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream
query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()
query.awaitTermination()
Storing Checkpoint on longterm storage(HDFS, AWS S3,etc.) are most preferred. I would Like to add one point here that the property "failOnDataLoss" should not be set to false as it is not best practice. Data loss is something no one would like to afford. Rest you are on the right Path.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With