Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

flink job is not distributed across machines

I have small use case in Apache flink, which is, a batch processing system. I need to process a colletion of files. Processing of each file must be handled by one machine. I have this below code. All the time only one task slot is occupied, and the files are processed one after the other. I have 6 nodes (so 6 task managers) and configured 4 task slot in each node. So, i expect 24 files are processed at a time.

class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
  override def mapPartition(
      myfiles: java.lang.Iterable[java.io.File],
      out:org.apache.flink.util.Collector[Int])
    : Unit  =  {
    var temp = myfiles.iterator()
    while(temp.hasNext()){
      val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
      val file = new File(temp.next().toURI)
      Process(
        "/bin/bash ./run.sh  " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
        new File(fp1.getAbsoluteFile.getParent))
        .lines
        .foreach{println}
      out.collect(1)
    }
  }
}

I launched flink as ./bin/start-cluster.sh command and the web user interface shows it has 6 task managers, 24 task slots.

The folders contain about 49 files. When I create mapPartition on this collection, i expect 49 parallel processes are spanned. But then, in my infrastructure, they are all processed one after the other. This means that only one machine (one task manager) handles all the 49 filenames. What i want is, as configured 2 tasks per slots, I expect 24 files to be processed simultaneously.

Any pointers will surely help here. I have these parameters in flink-conf.yaml file

jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24

Thanks in advance. Can someone throw me light on where I am going wrong?

like image 731
Bala Avatar asked May 04 '17 09:05

Bala


People also ask

How do I run a Flink job locally?

To start Flink in local mode from the Windows Batch, open the command window, navigate to the bin/ directory of Flink and run start-local.

What is parallelism Flink?

A Flink program consists of multiple tasks (transformations/operators, data sources, and sinks). A task is split into several parallel instances for execution and each parallel instance processes a subset of the task's input data. The number of parallel instances of a task is called its parallelism.

How does Apache Flink work?

Flink is designed to run stateful streaming applications at any scale. Applications are parallelized into possibly thousands of tasks that are distributed and concurrently executed in a cluster. Therefore, an application can leverage virtually unlimited amounts of CPUs, main memory, disk and network IO.


1 Answers

As David described the problem is that env.fromCollection(Iterable[T]) creates a DataSource with a non parallel InputFormat. Therefore, the DataSource is executed with a parallelism of 1. The subsequent operators (mapPartition) inherit this parallelism from the source so that they can be chained (this saves us one network shuffle).

The way to solve this problem is to either explicitly rebalance the source DataSet via

env.fromCollection(folders).rebalance()

or to explicitly set the wished parallelism at the subsequent operator (mapPartition):

env.fromCollection(folders).mapPartition(...).setParallelism(49)
like image 79
Till Rohrmann Avatar answered Oct 21 '22 10:10

Till Rohrmann