Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spring-cloud-stream kafka consumer concurrency

With spring-cloud-stream's kafka binder, how do you configure concurrent message consumers (in a single consumer jvm)? If I understand correctly, having concurrent message consumption when using kafka requires partitions, but the s-c-s docs indicate that to use partitioning you need to specify partition selection in the producer via partitionKeyExpression or partitionKeyExtractorClass. Kafka docs mention round-robin partitioning.

The s-c-s docs don't mention spring.cloud.stream.bindings.*.concurrency at all, though that does seem to matter in the use case I've described above. With producer config

spring:
  cloud:
    stream:
      bindings:
        customer-save:
          destination: customer-save
          group: customer-save
          content-type: application/json
          partitionCount: 3

and consumer config

spring:
  cloud:
    stream:
      bindings:
        customer-save: 
          destination: customer-save
          group: customer-save
          content-type: application/x-java-object;type=foo.Customer
          partitioned: true
          concurrency: 3

I seem to be getting the behavior I want (at least somewhat). I can see that there are 3 consumer threads active sometimes, though there does seem to be some partitioning other than round robin at play, since some messages seem to wait for a busy consumer thread and get consumed once that thread is finished. I'd assume that's because the message is getting sent to the same partition.

Is there some default key extraction and partitioning strategy that's getting used on the producer when I don't specify partitionKeyExpression or partitionKeyExtractorClass? Is this an appropriate way to set up an s-c-s consumer with kafka where you want multiple threads consuming messages in order to increase consumer throughput?

like image 462
gadams00 Avatar asked Mar 07 '16 21:03

gadams00


1 Answers

Since your producer is not partitioned (there is no partitionKeyExpression set), the producer side will round robin over the 3 partitions (if that is not the observed behaviour, please open a ticket in Git Hub). If you configured a partitionKeyExpression then the producer will effectively partition the data according to the configured logic.

On the consumer side, we ensure thread/partition affinity because this is a widely-respected Kafka convention - we ensure that messages on a given partition are processed in order - which might account for the behaviour you are observing. If sending messages A,B,C,D to partitions 0,1,2,0 - D will have to wait until A is processed, even if there are two other threads available.

One option for increasing throughput is to overpartition (which is a fairly typical strategy in Kafka). This would spread out messages even further and would increase the chance that messages are sent to different threads.

The other option for you to increase throughput if you don't care about ordering would be to process messages asynchronously downstream: e.g. by bridging the input channel to an ExecutorChannel.

Generally speaking, partitioned refers to the ability of a client to receive partitioned data (Kafka clients are always partitioned, but this setting also applies to Rabbit and/or Redis). It is used in conjunction with the properties instanceIndex and instanceCount to ensure that the partitions of a topic are divided correctly between multiple application instances (also see http://docs.spring.io/spring-cloud-stream/docs/1.0.0.M4/reference/htmlsingle/index.html#_instance_index_and_instance_count)

like image 61
Marius Bogoevici Avatar answered Oct 06 '22 18:10

Marius Bogoevici