For many situations in Big Data it is preferable to work with a small buffer of records at a go, rather than one record at a time.
The natural example is calling some external API that supports batching for efficiency.
How can we do this in Kafka Streams? I cannot find anything in the API that looks like what I want.
So far I have:
builder.stream[String, String]("my-input-topic")
.mapValues(externalApiCall).to("my-output-topic")
What I want is:
builder.stream[String, String]("my-input-topic")
.batched(chunkSize = 2000).map(externalBatchedApiCall).to("my-output-topic")
In Scala and Akka Streams the function is called grouped
or batch
. In Spark Structured Streaming we can do mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))
.
Accordingly, batch processing can be easily implemented with Apache Kafka, the advantages of Apache Kafka can be used, and the operation can be made efficient.
Batching messages enables a Kafka producer to increase its throughput. Reducing the number of network requests the producer makes in order to send data will improve the performance of the system. The cost of increased throughput is increased latency.
Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.
Doesn't seem to exist yet. Watch this space https://issues.apache.org/jira/browse/KAFKA-7432
you could use a queue. something like below,
@Component
@Slf4j
public class NormalTopic1StreamProcessor extends AbstractStreamProcessor<String> {
public NormalTopic1StreamProcessor(KafkaStreamsConfiguration configuration) {
super(configuration);
}
@Override
Topology buildTopology() {
KStream<String, String> kStream = streamsBuilder.stream("normalTopic", Consumed.with(Serdes.String(), Serdes.String()));
// .peek((key, value) -> log.info("message received by stream 0"));
kStream.process(() -> new AbstractProcessor<String, String>() {
final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
final List<String> collection = new ArrayList<>();
@Override
public void init(ProcessorContext context) {
super.init(context);
context.schedule(Duration.of(1, ChronoUnit.MINUTES), WALL_CLOCK_TIME, timestamp -> {
processQueue();
context().commit();
});
}
@Override
public void process(String key, String value) {
queue.add(value);
if (queue.remainingCapacity() == 0) {
processQueue();
}
}
public void processQueue() {
queue.drainTo(collection);
long count = collection.stream().peek(System.out::println).count();
if (count > 0) {
System.out.println("count is " + count);
collection.clear();
}
}
});
kStream.to("normalTopic1");
return streamsBuilder.build();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With