In code sample below, I am trying to get a stream of employee records { Country, Employer, Name, Salary, Age }
and dumping highest paid employee in every country. Unfortunately Multiple KEY By doesn't work.
Only KeyBy(Employer) is reflecting, thus I don't get correct result. What am I missing?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Employee> streamEmployee = env.addSource(
new FlinkKafkaConsumer010<ObjectNode>("flink-demo", new JSONDeserializationSchema(), properties))
.map(new MapFunction<ObjectNode, Employee>() {
private static final long serialVersionUID = 6111226274068863916L;
@Override
public Employee map(ObjectNode value) throws Exception {
final Gson gson = new GsonBuilder().create();
Employee uMsg = gson.fromJson(value.toString(), Employee.class);
return uMsg;
}
});
KeyedStream<Employee, String> employeesKeyedByCountryndEmployer = streamEmployee
.keyBy(new KeySelector<Employee, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String getKey(Employee value) throws Exception {
// TODO Auto-generated method stub
return value.getCountry();
}
}).keyBy(new KeySelector<Employee, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String getKey(Employee value) throws Exception {
// TODO Auto-generated method stub
return value.getEmployer();
}
});
// This should display employees highly paid in a given country , for a
// given employer
DataStream<Employee> uHighlyPaidEmployee = employeesKeyedByCountryndEmployer.timeWindow(Time.seconds(5))
.maxBy("salary");
// Assume toString() is overridden , so print works well.
uHighlyPaidEmployee.print();
env.execute("Employee-employer log processor");
KeyBy is one of the mostly used transformation operator for data streams. It is used to partition the data stream based on certain properties or keys of incoming data objects in the stream. Once we apply the keyBy, all the data objects with same type of keys are grouped together.
Using keyed streams - Flink TutorialFlink distributes the events in a data stream to different task slots based on the key. Flink users are hashing algorithms to divide the stream by partitions based on the number of slots allocated to the job. It then distributes the same keys to the same slots.
An Apache Flink operator transforms one or more data streams into a new data stream. The new data stream contains modified data from the original data stream. Apache Flink provides more than 25 pre-built stream processing operators. For more information, see Operators in the Apache Flink Documentation .
You can define a KeySelector
that returns a composite key:
KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer =
streamEmployee.keyBy(
new KeySelector<Employee, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(Employee value) throws Exception {
return Tuple2.of(value.getCountry(), value.getEmployer());
}
}
);
If you try to replace the code with lambda expression you will run in to problems described here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With