We have two kinds of messages coming to Flink
We have separate source streams for both the messages.and we have attached same sink to both the streams. What we want to do is to broadcast the control message so that all the sinks running in parallel should receive it.
Below is the code for the same:
package com.ranjit.com.flinkdemo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.DateTimeBucketer;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.StringWriter;;
public class FlinkBroadcast {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);
ctrl_message_stream.broadcast();
DataStream<String> message_stream = env.socketTextStream("localhost", 8087);
RollingSink sink = new RollingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setWriter(new StringWriter<String>() );
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
ctrl_message_stream.broadcast().addSink(sink);
message_stream.addSink(sink);
env.execute("stream");
}
}
But what i have observed is, it is creating 4 instances of sinks and control messages are getting broadcasted to only 2 sinks(created by control message stream). So what i understood is both streams should to through same chain of operators to do this which we don't want as there will be multiple transformation on data messages. We have written our own sink which will read the messages if it is control message then it will only roll the file.
Example Code:
package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkBroadcast {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
List<String> controlMessageList = new ArrayList<String>();
controlMessageList.add("controlMessage1");
controlMessageList.add("controlMessage2");
List<String> dataMessageList = new ArrayList<String>();
dataMessageList.add("Person1");
dataMessageList.add("Person2");
dataMessageList.add("Person3");
dataMessageList.add("Person4");
DataStream<String> controlMessageStream = env.fromCollection(controlMessageList);
DataStream<String> dataMessageStream = env.fromCollection(dataMessageList);
DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() {
@Override
public GenericRecord map(String value) throws Exception {
Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc")));
gr.put("TYPE", value);
return gr;
}
});
DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() {
@Override
public GenericRecord map(String value) throws Exception {
Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc")));
gr.put("FIRSTNAME", value);
gr.put("LASTNAME", value+": lastname");
return gr;
}
});
//Displaying Generic records
dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() {
@Override
public GenericRecord map(GenericRecord value) throws Exception {
System.out.println("data before union: "+ value);
return value;
}
});
controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() {
@Override
public GenericRecord map(GenericRecord value) throws Exception {
System.out.println("data after union: " + value);
return value;
}
});
env.execute("stream");
}
}
Output:
05/09/2016 13:02:12 Source: Collection Source(1/1) switched to FINISHED
05/09/2016 13:02:12 Source: Collection Source(1/1) switched to FINISHED
05/09/2016 13:02:13 Map(1/2) switched to FINISHED
05/09/2016 13:02:13 Map(2/2) switched to FINISHED
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"}
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"}
data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"}
data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"}
data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"}
data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"}
05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED
data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"}
05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED
05/09/2016 13:02:13 Map(1/2) switched to FINISHED
05/09/2016 13:02:13 Map(2/2) switched to FINISHED
05/09/2016 13:02:13 Job execution switched to status FINISHED.
As we can see that LASTNAME value is not correct, its getting replaced by FIRSTNAME value for each record
Your code essentially terminates both streams with their own copy of the sink you define. What you would want instead is something like this:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<String> ctrl_message_stream = env.socketTextStream("localhost", 8088);
DataStream<String> message_stream = env.socketTextStream("localhost", 8087);
RollingSink sink = new RollingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
sink.setWriter(new StringWriter<String>() );
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
ctrl_message_stream.broadcast().union(message_stream).addSink(sink);
env.execute("stream");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With