I have small use case in Apache flink, which is, a batch processing system. I need to process a colletion of files. Processing of each file must be handled by one machine. I have this below code. All the time only one task slot is occupied, and the files are processed one after the other. I have 6 nodes (so 6 task managers) and configured 4 task slot in each node. So, i expect 24 files are processed at a time.
class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
override def mapPartition(
myfiles: java.lang.Iterable[java.io.File],
out:org.apache.flink.util.Collector[Int])
: Unit = {
var temp = myfiles.iterator()
while(temp.hasNext()){
val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
val file = new File(temp.next().toURI)
Process(
"/bin/bash ./run.sh " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
new File(fp1.getAbsoluteFile.getParent))
.lines
.foreach{println}
out.collect(1)
}
}
}
I launched flink as ./bin/start-cluster.sh command and the web user interface shows it has 6 task managers, 24 task slots.
The folders contain about 49 files. When I create mapPartition on this collection, i expect 49 parallel processes are spanned. But then, in my infrastructure, they are all processed one after the other. This means that only one machine (one task manager) handles all the 49 filenames. What i want is, as configured 2 tasks per slots, I expect 24 files to be processed simultaneously.
Any pointers will surely help here. I have these parameters in flink-conf.yaml file
jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
Thanks in advance. Can someone throw me light on where I am going wrong?
To start Flink in local mode from the Windows Batch, open the command window, navigate to the bin/ directory of Flink and run start-local.
A Flink program consists of multiple tasks (transformations/operators, data sources, and sinks). A task is split into several parallel instances for execution and each parallel instance processes a subset of the task's input data. The number of parallel instances of a task is called its parallelism.
Flink is designed to run stateful streaming applications at any scale. Applications are parallelized into possibly thousands of tasks that are distributed and concurrently executed in a cluster. Therefore, an application can leverage virtually unlimited amounts of CPUs, main memory, disk and network IO.
As David described the problem is that env.fromCollection(Iterable[T])
creates a DataSource
with a non parallel InputFormat
. Therefore, the DataSource
is executed with a parallelism of 1
. The subsequent operators (mapPartition
) inherit this parallelism from the source so that they can be chained (this saves us one network shuffle).
The way to solve this problem is to either explicitly rebalance the source DataSet
via
env.fromCollection(folders).rebalance()
or to explicitly set the wished parallelism at the subsequent operator (mapPartition
):
env.fromCollection(folders).mapPartition(...).setParallelism(49)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With