I have a KStream x KStream join which is breaking down with the following exception.
Exception in thread “my-clicks-and-recs-join-streams-4c903fb1-5938-4919-9c56-2c8043b86986-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_15, processor=KSTREAM-SOURCE-0000000001, topic=my_outgoing_recs_prod, partition=15, offset=9248896
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_15] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)
at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:59)
at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:105)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:107)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:100)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
I am joining a Click
topic with a Recommendation
topic. The Click
objects are really small (less than a KB). Recommendation
, on the other hand, might be big, occasionally bigger than 1 MB.
I Googled for the exception and found ( here) that I need to set max.request.size
in the producer configs.
What I don't understand is, where does the producer come into picture in the streams join? The topic in the exception above topic=my_outgoing_recs_prod
is the recommendations topic, and not the final joined topic. Isn't the streaming application supposed to just "consume" from it?
Nevertheless, I tried setting the property as config.put("max.request.size", "31457280");
, which is 30MB. I don't expect the recommendations record to exceed that limit. Still, the code is crashing.
I cannot change the configs in the Kafka cluster but, if needed, I can change the properties of the relevant topics in Kafka.
Could someone suggest what else I can try?
If nothing works, I am willing to ignore such oversizes messages. However, I don't know a way of handling this RecordTooLargeException
.
My code to perform the join is as follows.
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, JOINER_ID + "-" + System.getenv("HOSTNAME"));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
config.put("max.request.size", "314572800");
config.put("message.max.bytes", "314572800");
config.put("max.message.bytes", "314572800");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> clicksStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), clicksTopic);
KStream<String, byte[]> recsStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), recsTopic);
KStream<String, ClickRec> join = clicksStream.join(
recsStream,
(click, recs) -> new ClickRec(click, recs),
JoinWindows.of(windowMillis).until(3*windowMillis));
join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);
KafkaStreams streams = new KafkaStreams(builder, config);
streams.cleanUp();
streams.start();
ClickRec
is the joined object (which is far smaller than a Recommendation
object and I don't expect it to be bigger than a few KBs).
Where do I put a try...catch
in the code above to recover from such occasionally oversized objects?
Stream-stream joins combine two event streams into a new stream. The streams are joined based on a common key, so keys are necessary. You define a time window, and records on either side of the join need to arrive within the defined window.
So the expected output would be 8000 records.
Update Streams This means that when using a KTable, keys are required, although they aren't required when using a KStream. By overwriting records, a KTable creates a completely different data structure from a KStream, even given the same source records.
Kafka Streams is an API for writing client applications that transform data in Apache Kafka. You usually do this by publishing the transformed data onto a new topic. The data processing itself happens within your client application, not on a Kafka broker. Kafka Connect is an API for moving data into and out of Kafka.
There are multiple configs at different levels:
message.max.bytes
(default is 1000012) (cf http://kafka.apache.org/documentation/#brokerconfigs)max.message.bytes
(default is 1000012) (cf http://kafka.apache.org/documentation/#topicconfigs)max.request.size
(default is 1048576) (cf. http://kafka.apache.org/documentation/#producerconfigs)You stack trace indicates, that you need to change the setting at the broker or topic level:
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
Maybe you also need to increase the producer setting.
Why do you need this in the first place:
As you perform a KStream-KStream join, the join operator builds up state (it must buffer records from both streams in order to compute the join). State is by default backed by a Kafka topic -- the local state is basically a cache while the Kafka topic is the source of truth. Thus, all your records will be written to this "changelog topic" that Kafka Streams creates automatically.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With