Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Futures with Kafka Streams

Have a kafka cluster from which I consuming two topics and join it. With result of join I do some manipulation with database. All operations to DB is async, so they return me a Future (scala.concurrent.Future, but anyway its the same as java.util.concurrent.CompletableFuture). So as a result I got code like this:

val firstSource: KTable[String, Obj]
val secondSource: KTable[String, Obj2]

def enrich(data: ObjAndObj2): Future[EnrichedObj]
def saveResultToStorage(enrichedData: Future[EnrichedObj]): Future[Unit]

firstSource.leftJoin(secondSource, joinFunc)
           .mapValues(enrich)
           .foreach(saveResultToStorage)

Is it okay that I manupulate with future values in stream or there are better ways how to handle async tasks (like .mapAsync in Akka streams)?

like image 461
Arthur Kushka Avatar asked Feb 15 '17 10:02

Arthur Kushka


People also ask

When should you not use Kafka streams?

As point 1 if having just a producer producing message we don't need Kafka Stream. If consumer messages from one Kafka cluster but publish to different Kafka cluster topics. In that case, you can even use Kafka Stream but have to use a separate Producer to publish messages to different clusters.

What is difference between Kafka and Kafka streams?

Introduction. Apache Kafka is the most popular open-source distributed and fault-tolerant stream processing system. Kafka Consumer provides the basic functionalities to handle messages. Kafka Streams also provides real-time stream processing on top of the Kafka Consumer client.

Does Netflix use Kafka for streaming?

Apache Kafka is an open-source streaming platform that enables the development of applications that ingest a high volume of real-time data. It was originally built by the geniuses at LinkedIn and is now used at Netflix, Pinterest and Airbnb to name a few.

What is Kafka streams good for?

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in an Apache Kafka® cluster. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.


1 Answers

I have this same issue. From what I can tell, Kafka Streams is not designed to handle multi-rate streaming the same way Akka Streams is. Kafka Streams has no equivalent of the multi-rate primitives Akka has like mapAsync, throttle, conflate, buffer, batch, etc. Kafka Streams is good at handling joins between topics and stateful aggregations of data. Akka Streams is good at multi-rate and asynchronous processing.

You have a couple options how to handle this:

  • Make a blocking call in the Kafka Streams app. This is the easiest, and is fine if the throughput of your Future calls is not much greater than their latency. Kafka Streams uses separate threads per partition, so you can use the partitioning of the Kafka topic(s) being processed to drive parallelism.
  • Handle the enrichment in Akka Streams using the Reactive Kafka library, publish the enriched result to another Kafka Topic which you then bring into your Kafka Streams application. This is what we do for cases where the async call has a much faster parallel throughput than end-to-end latency such as a web service call or a query to a NoSQL database.
  • Publish all your enrichment data to its own KTable and join it in the Kafka Streams app. In fact, joining stream data with enrichment data via KTables is what Kafka Streams is good at. We use this if the enrichment data can be represented as a table. It does not work if the enrichment data must be computed on the fly.
like image 105
Charles Crain Avatar answered Sep 21 '22 19:09

Charles Crain