I am using Flink DataStream API where there where racks are available & I want to calculate "average"of temperature group by rack IDs. My window duration is of 40 seconds & my window is sliding every 10 seconds...Following is my code where I am calculating sum of temperatures every 10 seconds for every rackID,but now I want to calculate average temperatures::
static Properties properties=new Properties();
public static Properties getProperties()
properties.setProperty("bootstrap.servers", "");
properties.setProperty("zookeeper.connect", "");
//properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
//properties.setProperty("group.id", "akshay");
properties.setProperty("auto.offset.reset", "earliest");
return properties;
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
Properties props=Program.getProperties();
DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature");
env.execute("Temperature Consumer");
How can I calcluate average temperature for the above example ??
As far as I can tell, you need to write the average function yourself. You can find an example here:
In your case, you would probably replace
with something like
.apply(new Avg());
and implement the Avg class:
public class Avg implements WindowFunction<TemperatureEvent, TemperatureEvent, Long, org.apache.flink.streaming.api.windowing.windows.Window> {
public void apply(Long key, Window window, Iterable<TemperatureEvent> values, Collector<TemperatureEvent> out) {
long sum = 0L;
int count = 0;
for (TemperatureEvent value : values) {
sum += value.getTemperature();
count ++;
TemperatureEvent result = values.iterator().next();
result.setTemperature(sum / count);
Note: If there's any chance that your function will be called on an empty window (e.g. by using custom triggers), you need a check before accessing elements.head
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With