Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark concurrent writes on same HDFS location

I have a spark code which saves a dataframe to a HDFS location (date partitioned location) in Json format using append mode.

df.write.mode("append").format('json').save(hdfsPath)
sample hdfs location : /tmp/table1/datepart=20190903

I am consuming data from upstream in NiFi cluster. Each node in NiFi cluster will create a flow file for consumed data. My spark code is processing that flow file.As NiFi is distributed, my spark code is getting executed from different NiFi nodes in parallel trying to save data into same HDFS location.

I cannot store output of spark job in different directories as my data is partitioned on date.

This process is running daily once from last 14 days and my spark job failed 4 times with different errors. First Error:

java.io.IOException: Failed to rename FileStatus{path=hdfs://tmp/table1/datepart=20190824/_temporary/0/task_20190824020604_0000_m_000000/part-00000-101aa2e2-85da-4067-9769-b4f6f6b8f276-c000.json; isDirectory=false; length=0; replication=3; blocksize=268435456; modification_time=1566630365451; access_time=1566630365034; owner=hive; group=hive; permission=rwxrwx--x; isSymlink=false} to hdfs://tmp/table1/datepart=20190824/part-00000-101aa2e2-85da-4067-9769-b4f6f6b8f276-c000.json

Second Error:

java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190825/_temporary/0 does not exist.

Third Error:

java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190901/_temporary/0/task_20190901020450_0000_m_000000 does not exist.

Fourth Error:

java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190903/_temporary/0 does not exist.

Following are the problems/issue:

  1. I am not able to recreate this scenario again. How to do that?
  2. On all 4 occasions, errors are related to _temporary directory. Is is because 2 or more jobs are parallelly trying to save the data in same HDFS location and whiling doing that Job A might have deleted _temporary directory of Job B? (Because of the same location and all folders have common name /_directory/0/)

If it is concurrency problem then I can run all NiFi processor from primary node but then I will loose the performance.

Need your expert advice.

Thanks in advance.

like image 833
Gaurav_Bhide Avatar asked Sep 03 '19 18:09

Gaurav_Bhide


1 Answers

It seems the problem is that two spark nodes are independently trying to write to the same place, causing conflicts as the fastest one will clear up the working directory before the second one expects it.

The most straightforward solution may be to avoid this.

As I understand how you use Nifi and spark, the node where Nifi runs also determines the node where spark runs (there is a 1-1 relationship?)

If that is the case you should be able to solve this by routing the work in Nifi to nodes that do not interfere with each other. Check out the load balancing strategy (property of the queue) that depends on attributes. Of course you would need to define the right attribute, but something like directory or table name should go a long way.

like image 176
Dennis Jaheruddin Avatar answered Oct 03 '22 15:10

Dennis Jaheruddin