I've been working on updating a Flink processor (Flink version 1.9) that reads from Kafka and then writes to Kafka. We have written this processor to run towards a Kafka 0.10.2 cluster and now we have deployed a new Kafka cluster running version 2.2. Therefore I set out to update the processor to use the latest FlinkKafkaConsumer and FlinkKafkaProducer (as suggested by the Flink docs). However I've run into some problems with the Kafka producer. I'm unable to get it to Serialize data using deprecated constructors (not surprising) and I've been unable to find any implementations or examples online about how to implement a Serializer (all the examples are using older Kafka Connectors)
The current implementation (for Kafka 0.10.2) is as follows
FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
"playerSessions",
new SimpleStringSchema(),
producerProps,
(FlinkKafkaPartitioner) null
);
When trying to implement the following FlinkKafkaProducer
FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
"playerSessions",
new SimpleStringSchema(),
producerProps,
null
);
I get the following error:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)
and I haven't been able to figure out why. The constructor for FlinkKafkaProducer is also deprecated and when I try implementing the non-deprecated constructor I can't figure out how to serialize the data. The following is how it would look:
FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
"playerSessions",
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
return null;
}
},
producerProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
But I don't understand how to implement the KafkaSerializationSchema and I find no examples of this online or in the Flink docs.
Does anyone have any experience implementing this or any tips on why the FlinkProducer gets NullPointerException in the step?
In a nutshell, it transforms the content into readable and interpretable information. The figure above shows the process of sending messages to a Kafka topic through the network. In this process, the custom serializer converts the object into bytes before the producer sends the message to the topic.
I've been working on updating a Flink processor (Flink version 1.9) that reads from Kafka and then writes to Kafka. We have written this processor to run towards a Kafka 0.10.2 cluster and now we have deployed a new Kafka cluster running version 2.2.
Let's create a static method that will make the creation of FlinkKafkaConsumer easier: This method takes a topic, kafkaAddress, and kafkaGroup and creates the FlinkKafkaConsumer that will consume data from given topic as a String since we have used SimpleStringSchema to decode data. The number 011 in the name of class refers to the Kafka version.
Deserialization is the inverse process — converting a stream of bytes into an object. In a nutshell, it transforms the content into readable and interpretable information. As we mentioned, Apache Kafka provides default serializers for several basic types, and it allows us to implement custom serializers:
If you are just sending String to Kafka:
public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{
private String topic;
public ProducerStringSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
}
}
For sending a Java Object:
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{
private String topic;
private ObjectMapper mapper;
public ObjSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {
byte[] b = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
b= mapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
// TODO
}
return new ProducerRecord<byte[], byte[]>(topic, b);
}
}
In your code
.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic),
params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
To the deal with the timeout in the case of FlinkKafkaProducer.Semantic.EXACTLY_ONCE you should read https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-011-and-newer, particularly this part:
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time). Having this in mind, please configure your transaction timeout appropriately to your expected down times.
Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes. This property will not allow to set transaction timeouts for the producers larger than it’s value. FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to 1 hour, thus transaction.max.timeout.ms should be increased before using the Semantic.EXACTLY_ONCE mode.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With