Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to process data in chunks/batches with kafka streams?

For many situations in Big Data it is preferable to work with a small buffer of records at a go, rather than one record at a time.

The natural example is calling some external API that supports batching for efficiency.

How can we do this in Kafka Streams? I cannot find anything in the API that looks like what I want.

So far I have:

builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")

What I want is:

builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")

In Scala and Akka Streams the function is called grouped or batch. In Spark Structured Streaming we can do mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall)).

like image 227
samthebest Avatar asked Sep 17 '18 11:09

samthebest


People also ask

Can Kafka do batch processing?

Accordingly, batch processing can be easily implemented with Apache Kafka, the advantages of Apache Kafka can be used, and the operation can be made efficient.

How does batching work in Kafka?

Batching messages enables a Kafka producer to increase its throughput. Reducing the number of network requests the producer makes in order to send data will improve the performance of the system. The cost of increased throughput is increased latency.

Can Kafka be used for stream processing?

Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.


2 Answers

Doesn't seem to exist yet. Watch this space https://issues.apache.org/jira/browse/KAFKA-7432

like image 104
samthebest Avatar answered Sep 25 '22 08:09

samthebest


you could use a queue. something like below,

@Component
@Slf4j
public class NormalTopic1StreamProcessor extends AbstractStreamProcessor<String> {

    public NormalTopic1StreamProcessor(KafkaStreamsConfiguration configuration) {
        super(configuration);
    }

    @Override
    Topology buildTopology() {
        KStream<String, String> kStream = streamsBuilder.stream("normalTopic", Consumed.with(Serdes.String(), Serdes.String()));
        // .peek((key, value) -> log.info("message received by stream 0"));
        kStream.process(() -> new AbstractProcessor<String, String>() {
            final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
            final List<String> collection = new ArrayList<>();

            @Override
            public void init(ProcessorContext context) {
                super.init(context);
                context.schedule(Duration.of(1, ChronoUnit.MINUTES), WALL_CLOCK_TIME, timestamp -> {
                    processQueue();
                    context().commit();
                });
            }

            @Override
            public void process(String key, String value) {
                queue.add(value);
                if (queue.remainingCapacity() == 0) {
                    processQueue();
                }
            }

            public void processQueue() {
                queue.drainTo(collection);
                long count = collection.stream().peek(System.out::println).count();
                if (count > 0) {
                    System.out.println("count is " + count);
                    collection.clear();
                }
            }
        });
        kStream.to("normalTopic1");
        return streamsBuilder.build();
    }

}
like image 41
Rajesh Rai Avatar answered Sep 25 '22 08:09

Rajesh Rai