I have a spark code which saves a dataframe to a HDFS location (date partitioned location) in Json format using append mode.
df.write.mode("append").format('json').save(hdfsPath)
sample hdfs location : /tmp/table1/datepart=20190903
I am consuming data from upstream in NiFi cluster. Each node in NiFi cluster will create a flow file for consumed data. My spark code is processing that flow file.As NiFi is distributed, my spark code is getting executed from different NiFi nodes in parallel trying to save data into same HDFS location.
I cannot store output of spark job in different directories as my data is partitioned on date.
This process is running daily once from last 14 days and my spark job failed 4 times with different errors. First Error:
java.io.IOException: Failed to rename FileStatus{path=hdfs://tmp/table1/datepart=20190824/_temporary/0/task_20190824020604_0000_m_000000/part-00000-101aa2e2-85da-4067-9769-b4f6f6b8f276-c000.json; isDirectory=false; length=0; replication=3; blocksize=268435456; modification_time=1566630365451; access_time=1566630365034; owner=hive; group=hive; permission=rwxrwx--x; isSymlink=false} to hdfs://tmp/table1/datepart=20190824/part-00000-101aa2e2-85da-4067-9769-b4f6f6b8f276-c000.json
Second Error:
java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190825/_temporary/0 does not exist.
Third Error:
java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190901/_temporary/0/task_20190901020450_0000_m_000000 does not exist.
Fourth Error:
java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190903/_temporary/0 does not exist.
Following are the problems/issue:
If it is concurrency problem then I can run all NiFi processor from primary node but then I will loose the performance.
Need your expert advice.
Thanks in advance.
It seems the problem is that two spark nodes are independently trying to write to the same place, causing conflicts as the fastest one will clear up the working directory before the second one expects it.
The most straightforward solution may be to avoid this.
As I understand how you use Nifi and spark, the node where Nifi runs also determines the node where spark runs (there is a 1-1 relationship?)
If that is the case you should be able to solve this by routing the work in Nifi to nodes that do not interfere with each other. Check out the load balancing strategy (property of the queue) that depends on attributes. Of course you would need to define the right attribute, but something like directory or table name should go a long way.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With