Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read and process a batch of messages from Kafka

Tags:

I would like to read a batch of messages from a kafka topic and at regular time intervals, or when the number of messages read reaches a certain number, send them as a batch to a downstream system. At the moment my kafka topology is terminated by a processor which holds the messages and then processes the batch incrementally using the punctuate method.

I'm not sure this is perfect, however, because if the application crashes before the punctuate method is called, I think some messages get lost (i.e. the consumer thinks it has finished with them but they won't appear in the downstream system).

batchQueue = new LinkedBlockingQueue<String>(batchSize);

KStream<String, String> inputStream = builder
    .stream(Serdes.String(), Serdes.String(), "source-topic")
    .process(new ProcessorSupplier<String, String>() {

            @Override
            public Processor<String, String> get() {
                return new AbstractProcessor<String, Wrapper>() {

                    @Override
                    public void init(ProcessorContext context) {
                        super.init(context);
                        context.schedule(flushPeriod);
                    }

                    @Override
                    public void process(String key, String value) {
                        batchQueue.add(value);
                        if (batchQueue.remainingCapacity() == 0) {
                            processQueue();
                        }
                    }

                    @Override
                    public void punctuate(long timestamp) {
                        processQueue();
                        context().commit();
                    }
                }

                @Override
                public void close() {}
            };
        }
    });
  • Is there a way to make this approach more robust? Perhaps windowing but I don't really understand this.
  • Do I have to use Kafka connect for this. I'm leaning away from this due to it's error handling capability: https://groups.google.com/forum/#!topic/confluent-platform/OBuLbVHbuyI
like image 287
bm1729 Avatar asked Mar 20 '17 14:03

bm1729


People also ask

Can Kafka do batch processing?

Accordingly, batch processing can be easily implemented with Apache Kafka, the advantages of Apache Kafka can be used, and the operation can be made efficient.

How do I consume multiple messages from Kafka topic?

You should configure a Batch Listener, then you can set the max. poll. records property to specify your batch size. Note that setting this value too low might decrease overall performance, since you'll need to make more polls to the broker to fetch the same amount of records.

How do I process a Kafka message?

IBM Integration Bus provides two built-in nodes for processing Kafka messages, which use the Apache Kafka Java™ client: KafkaConsumer node, which subscribes to a Kafka topic and propagates the feed of published messages to nodes connected downstream in the flow.


1 Answers

Is there a way to make this approach more robust? Perhaps windowing but I don't really understand this.

I'd recommend to decouple the data transformation part (for which I'd use Kafka's Streams API) and the data ingestion part where you write to your downstream system (for which I'd use Kafka's Connect API).

In short, why should your transformation logic be coupled with and need to worry about the specifics (here: expensive inserts!) of one of the downstream systems that this data will eventually be forwarded to? The responsibility of the transformation should ideally be transformation only, and it should not be concerned with operational aspects of an external, downstream system. If, for example, you eventually wanted to forward the transformed data into a second downstream system (or a third, ...), then a coupled approach would mean you'd have to update/redeploy/... your application, even though none of its transformation logic changed.

Another benefit of decoupling transformation and ingestion is that your transformation logic will be much simpler because it does not have to account for failures due to the downstream system(s) being slow, unavailable, etc. For example, it does not need to implement/test a complicated retry logic.

Do I have to use Kafka connect for this.

No, you don't need to use Kafka Connect for this, but it's arguably the best tool for this task.

I'm leaning away from [Kafka Connect] due to it's error handling capability: https://groups.google.com/forum/#!topic/confluent-platform/OBuLbVHbuyI

In the latest versions of Kafka Connect, the error handling is actually pretty good. Also, the problem in the linked discussion can IIRC easily be solved by providing a more robust converter (think: serializer/deserializer) for Connect to use.

Also, as mentioned in that link, the specific problem that was discussed there becomes much less of an issue when you validate compatibility of data before it is written to Kafka. You can achieve this by leveraging Confluent's schema registry (https://github.com/confluentinc/schema-registry, docs or similar tools. Since you raised the question "how can I make this more robust", thinking about data serialization and evolution is another important aspect I'd look at before deploying to production.

Hope this helps!

like image 164
Michael G. Noll Avatar answered Sep 25 '22 12:09

Michael G. Noll