Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RecordTooLargeException in Kafka streams join

I have a KStream x KStream join which is breaking down with the following exception.

Exception in thread “my-clicks-and-recs-join-streams-4c903fb1-5938-4919-9c56-2c8043b86986-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_15, processor=KSTREAM-SOURCE-0000000001, topic=my_outgoing_recs_prod, partition=15, offset=9248896
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_15] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:59)
    at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:105)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:107)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:100)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
    ... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

I am joining a Click topic with a Recommendation topic. The Click objects are really small (less than a KB). Recommendation, on the other hand, might be big, occasionally bigger than 1 MB.

I Googled for the exception and found ( here) that I need to set max.request.size in the producer configs.

What I don't understand is, where does the producer come into picture in the streams join? The topic in the exception above topic=my_outgoing_recs_prod is the recommendations topic, and not the final joined topic. Isn't the streaming application supposed to just "consume" from it?

Nevertheless, I tried setting the property as config.put("max.request.size", "31457280");, which is 30MB. I don't expect the recommendations record to exceed that limit. Still, the code is crashing.

I cannot change the configs in the Kafka cluster but, if needed, I can change the properties of the relevant topics in Kafka.

Could someone suggest what else I can try?

If nothing works, I am willing to ignore such oversizes messages. However, I don't know a way of handling this RecordTooLargeException.

My code to perform the join is as follows.

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, JOINER_ID + "-" + System.getenv("HOSTNAME"));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
config.put("max.request.size", "314572800");
config.put("message.max.bytes", "314572800");
config.put("max.message.bytes", "314572800");


KStreamBuilder builder = new KStreamBuilder();

KStream<String, byte[]> clicksStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), clicksTopic);
KStream<String, byte[]> recsStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), recsTopic);

KStream<String, ClickRec> join = clicksStream.join(
        recsStream,
        (click, recs) -> new ClickRec(click, recs),
        JoinWindows.of(windowMillis).until(3*windowMillis));

join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);

KafkaStreams streams = new KafkaStreams(builder, config);
streams.cleanUp();
streams.start();

ClickRec is the joined object (which is far smaller than a Recommendation object and I don't expect it to be bigger than a few KBs).

Where do I put a try...catch in the code above to recover from such occasionally oversized objects?

like image 690
Nik Avatar asked Oct 05 '17 17:10

Nik


People also ask

How does Kafka stream join work?

Stream-stream joins combine two event streams into a new stream. The streams are joined based on a common key, so keys are necessary. You define a time window, and records on either side of the join need to arrive within the defined window.

What is the output of KStream KTable join?

So the expected output would be 8000 records.

What is the difference between KTable and KStream?

Update Streams This means that when using a KTable, keys are required, although they aren't required when using a KStream. By overwriting records, a KTable creates a completely different data structure from a KStream, even given the same source records.

What is difference between Kafka connect and Kafka Streams?

Kafka Streams is an API for writing client applications that transform data in Apache Kafka. You usually do this by publishing the transformed data onto a new topic. The data processing itself happens within your client application, not on a Kafka broker. Kafka Connect is an API for moving data into and out of Kafka.


1 Answers

There are multiple configs at different levels:

  1. You have a broker setting message.max.bytes (default is 1000012) (cf http://kafka.apache.org/documentation/#brokerconfigs)
  2. There is a topic level config max.message.bytes (default is 1000012) (cf http://kafka.apache.org/documentation/#topicconfigs)
  3. Producer has max.request.size (default is 1048576) (cf. http://kafka.apache.org/documentation/#producerconfigs)

You stack trace indicates, that you need to change the setting at the broker or topic level:

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

Maybe you also need to increase the producer setting.

Why do you need this in the first place:

As you perform a KStream-KStream join, the join operator builds up state (it must buffer records from both streams in order to compute the join). State is by default backed by a Kafka topic -- the local state is basically a cache while the Kafka topic is the source of truth. Thus, all your records will be written to this "changelog topic" that Kafka Streams creates automatically.

like image 143
Matthias J. Sax Avatar answered Oct 17 '22 04:10

Matthias J. Sax