Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Flink batch: data local planning on HDFS?



we've been playing a bit with Flink. So far we've been using Spark and standard M/R on Hadoop 2.x / YARN.

Apart from the Flink execution model on YARN, that AFAIK is not dynamic like spark where executors dynamically take and release virtual-cores in YARN, the main point of the question is as follows.

Flink seems just amazing: for streaming API's, I'd only say that it's brilliant and over the top.

Batch API's: processing graphs are very powerful and are optimised and run in parallel in a unique way, leveraging cluster scalability much more than Spark and others, optiziming perfectly very complex DAG's that share common processing steps.

The only drawback I found, that I hope is just my misunderstanding and lack of knowledge is that it doesn't seem to prefer data-local processing when planning the batch jobs that use input on HDFS.

Unfortunately it's not a minor one because in 90% use cases you have a big-data partitioned storage on HDFS and usually you do something like:

  • read and filter (e.g. take only failures or successes)
  • aggregate, reduce, work with it

The first part, when done in simple M/R or spark, is always planned with the idiom of 'prefer local processing', so that data is processed by the same node that keeps the data-blocks, to be faster, to avoid data-transfer over the network.

In our tests with a cluster of 3 nodes, setup to specifically test this feature and behaviour, Flink seemed to perfectly cope with HDFS blocks, so e.g. if file was made up of 3 blocks, Flink was perfectly handling 3 input-splits and scheduling them in parallel. But w/o the data-locality pattern.

Please share your opinion, I hope I just missed something or maybe it's already coming in a new version. Thanks in advance to anyone taking the time to answer this.

like image 386
Carlo Medas Avatar asked Jan 06 '23 15:01

Carlo Medas

1 Answers

Flink uses a different approach for local input split processing than Hadoop and Spark. Hadoop creates for each input split a Map task which is preferably scheduled to a node that hosts the data referred by the split.

In contrast, Flink uses a fixed number of data source tasks, i.e., the number of data source tasks depends on the configured parallelism of the operator and not on the number of input splits. These data source tasks are started on some node in the cluster and start requesting input splits from the master (JobManager). In case of input splits for files in an HDFS, the JobManager assigns the input splits with locality preference. So there is locality-aware reading from HDFS. However, if the number of parallel tasks is much lower than the number of HDFS nodes, many splits will be remotely read, because, source tasks remain on the node on which they were started and fetch one split after the other (local ones first, remote ones later). Also race-conditions may happen if your splits are very small as the first data source task might rapidly request and process all splits before the other source tasks do their first request.

IIRC, the number of local and remote input split assignments is written to the JobManager logfile and might also be displayed in the web dashboard. That might help to debug the issue further. In case you identify a problem that does not seem to match with what I explained above, it would be great if you could get in touch with the Flink community via the user mailing list to figure out what the problem is.

like image 107
Fabian Hueske Avatar answered Mar 08 '23 02:03

Fabian Hueske