Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark streaming reads file twice from NFS

I am using spark streaming (spark 2.4.6) to read data files from NFS mount point. However, sometimes spark streaming job checkpoints files differently for different batches, hence it produces duplicates. Does anyone have similar issue?

Here is example of checkpoint:

$ hdfs dfs -cat checkpoint/App/sources/0/15419.compact | grep 'export_dat_20210923.gz'

{"path":"file:///data_uploads/app/export_dat_20210923.gz","timestamp":1632398460000,"batchId":14994} {"path":"file:/data_uploads/app/export_dat_20210923.gz","timestamp":1632398460000,"batchId":14997}

like image 590
kevi Avatar asked Sep 24 '21 08:09

kevi


People also ask

How does Spark read Streaming data?

Use readStream. format("socket") from Spark session object to read data from the socket and provide options host and port where you want to stream data from.

What is a batch interval in Spark Streaming?

A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.

What is back pressure in Spark Streaming?

Backpressure refers to the situation where a system is receiving data at a higher rate than it can process during a temporary load spike. If there is a sudden spike in traffic, this could cause bottlenecks in downstream dependencies, that slows down the stream processing.

Do Spark Streaming programs run continuously?

Users specify a streaming computation by writing a batch computation (using Spark's DataFrame/Dataset API), and the engine automatically incrementalizes this computation (runs it continuously).

Can spark read and write to HDFS?

Though Spark supports to read from/write to files on multiple file systems like Amazon S3, Hadoop HDFS, Azure, GCP e.t.c, the HDFS file system is mostly used at the time of writing this article. Also, like any other file system, we can read and write TEXT, CSV, Avro, Parquet and JSON files into HDFS.

How to write streaming Dataframe to console in spark?

val df = spark. readStream . schema ("provide schema of json file")//Below codes provides example . json ("c:/tmp/stream_folder") Use writeStream.format ("console") to write the streaming DataFrame to console.

What is readstream and writestream in Spark Streaming?

Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.

What is the use of Apache Spark Streaming?

Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads. It is an extension of the core Spark API to process real-time data from sources like Kafka, Flume, and Amazon Kinesis to name few. This processed data can be pushed to databases, Kafka, live dashboards e.t.c


Video Answer


1 Answers

Exactly once guarantee comes with multiple assumptions about the source (replayable), checkpoint (HDFS-compatible fault-tolerant) and sink (idempotent).

When writing files using structured streaming, out of the box, you won't always get idempotency. If different batches write to different files, or partition, this is something that can cause duplicates by design. For example, as described in this article, using globbed paths results in duplicates.

The problem is described in this article.

There are several idempotent targets (e.g. ElasticSearch), and also suggestions how to write in an idempotent manner, for example:

You can create idempotent sinks by implementing logic that first checks for the existence of the incoming result in the datastore. If the result already exists, the write should appear to succeed from the perspective of your Spark job, but in reality your data store ignored the duplicate data. If the result doesn't exist, then the sink should insert this new result into its storage.

like image 93
Yosi Dahari Avatar answered Sep 30 '22 06:09

Yosi Dahari