Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to increase Flink taskmanager.numberOfTaskSlots to run it without Flink server(in IDE or fat jar)

I have one questions about running Flink streaming job in IDE or as fat jar without deploying it to Flink server.

The problem is I cannot run it in IDE when I have more than 1 taskslot in my job.

public class StreamingJob {

public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
    kafkaProperties.setProperty("group.id", "test");
    env.setParallelism(1);

    DataStream<String> kafkaSource = env
        .addSource(new FlinkKafkaConsumer010<>("flink-source", new SimpleStringSchema(), kafkaProperties))
        .name("Kafka-Source")
        .slotSharingGroup("Kafka-Source");

    kafkaSource.print().slotSharingGroup("Print");

    env.execute("Flink Streaming Java API Skeleton");
}

}

I know that job need 2 slot for this job and I can have two taskmanagers in Flink cluster, but how can I run it locally in IDE.

Currently I have to specify the same slotSharingGroup name for all operator locally to have one slot. But it's not flexible.

How do you handle it?

like image 383
Aleksandr Filichkin Avatar asked Jul 19 '18 07:07

Aleksandr Filichkin


People also ask

What is Flink Task Manager?

Task Managers are the actual worker nodes doing computations on the data and updating the job manager about their progress. Apache Flink does use something similar to master-slave architecture. It has a job manager acting as a master while task managers are worker or slave nodes.

What is Flink task slot?

To control how many tasks a worker accepts, a worker has so called task slots (at least one). Each task slot represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot.

How do I start a Flink program?

Go to Flink's home directory and run the below command in the terminal. Go to Flink dashboard, you will be able to see a completed job with its details. If you click on Completed Jobs, you will get detailed overview of the jobs. To check the output of wordcount program, run the below command in the terminal.

How do I open Task Manager in Flink?

1 Answer. Show activity on this post. Go to that particular server and start by using the command ./taskmanager.sh start.


1 Answers

This is a known bug which you are describing. You can find the corresponding JIRA issue here.

The way to circumvent this problem is to manually set the number of task slots with which the TaskExecutor is started. You can do this via the TaskManagerOptions.NUM_TASK_SLOTS configuration option:

final int parallelism = ...;
final Configuration configuration = new Configuration();
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, configuration);
like image 154
Till Rohrmann Avatar answered Sep 28 '22 17:09

Till Rohrmann