Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to support multiple KeyBy in Flink

In code sample below, I am trying to get a stream of employee records { Country, Employer, Name, Salary, Age } and dumping highest paid employee in every country. Unfortunately Multiple KEY By doesn't work.

Only KeyBy(Employer) is reflecting, thus I don't get correct result. What am I missing?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Employee> streamEmployee = env.addSource(
        new FlinkKafkaConsumer010<ObjectNode>("flink-demo", new JSONDeserializationSchema(), properties))
        .map(new MapFunction<ObjectNode, Employee>() {

            private static final long serialVersionUID = 6111226274068863916L;

            @Override
            public Employee map(ObjectNode value) throws Exception {
                final Gson gson = new GsonBuilder().create();
                Employee uMsg = gson.fromJson(value.toString(), Employee.class);
                return uMsg;
            }
        });

KeyedStream<Employee, String> employeesKeyedByCountryndEmployer = streamEmployee
        .keyBy(new KeySelector<Employee, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String getKey(Employee value) throws Exception {
                // TODO Auto-generated method stub
                return value.getCountry();
            }
        }).keyBy(new KeySelector<Employee, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String getKey(Employee value) throws Exception {
                // TODO Auto-generated method stub
                return value.getEmployer();
            }
        });
// This should display employees highly paid in a given country , for a
// given employer
DataStream<Employee> uHighlyPaidEmployee = employeesKeyedByCountryndEmployer.timeWindow(Time.seconds(5))
        .maxBy("salary");

// Assume toString() is overridden , so print works well.
uHighlyPaidEmployee.print();

env.execute("Employee-employer log processor");
like image 320
Abhijit Pathak Avatar asked Sep 19 '17 08:09

Abhijit Pathak


People also ask

How does KeyBy work in Flink?

KeyBy is one of the mostly used transformation operator for data streams. It is used to partition the data stream based on certain properties or keys of incoming data objects in the stream. Once we apply the keyBy, all the data objects with same type of keys are grouped together.

What is keyed stream in Flink?

Using keyed streams - Flink TutorialFlink distributes the events in a data stream to different task slots based on the key. Flink users are hashing algorithms to divide the stream by partitions based on the number of slots allocated to the job. It then distributes the same keys to the same slots.

What are operators in Flink?

An Apache Flink operator transforms one or more data streams into a new data stream. The new data stream contains modified data from the original data stream. Apache Flink provides more than 25 pre-built stream processing operators. For more information, see Operators in the Apache Flink Documentation .


2 Answers

You can define a KeySelector that returns a composite key:

KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
  streamEmployee.keyBy(
    new KeySelector<Employee, Tuple2<String, String>>() {

      @Override
      public Tuple2<String, String> getKey(Employee value) throws Exception {
        return Tuple2.of(value.getCountry(), value.getEmployer());
      }
    }
  );
like image 193
Fabian Hueske Avatar answered Oct 17 '22 15:10

Fabian Hueske


If you try to replace the code with lambda expression you will run in to problems described here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html

like image 38
Horatiu Avatar answered Oct 17 '22 15:10

Horatiu