With spring-cloud-stream's kafka binder, how do you configure concurrent message consumers (in a single consumer jvm)? If I understand correctly, having concurrent message consumption when using kafka requires partitions, but the s-c-s docs indicate that to use partitioning you need to specify partition selection in the producer via partitionKeyExpression or partitionKeyExtractorClass. Kafka docs mention round-robin partitioning.
The s-c-s docs don't mention spring.cloud.stream.bindings.*.concurrency at all, though that does seem to matter in the use case I've described above. With producer config
spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/json
partitionCount: 3
and consumer config
spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/x-java-object;type=foo.Customer
partitioned: true
concurrency: 3
I seem to be getting the behavior I want (at least somewhat). I can see that there are 3 consumer threads active sometimes, though there does seem to be some partitioning other than round robin at play, since some messages seem to wait for a busy consumer thread and get consumed once that thread is finished. I'd assume that's because the message is getting sent to the same partition.
Is there some default key extraction and partitioning strategy that's getting used on the producer when I don't specify partitionKeyExpression or partitionKeyExtractorClass? Is this an appropriate way to set up an s-c-s consumer with kafka where you want multiple threads consuming messages in order to increase consumer throughput?
Since your producer is not partitioned (there is no partitionKeyExpression
set), the producer side will round robin over the 3 partitions (if that is not the observed behaviour, please open a ticket in Git Hub). If you configured a partitionKeyExpression
then the producer will effectively partition the data according to the configured logic.
On the consumer side, we ensure thread/partition affinity because this is a widely-respected Kafka convention - we ensure that messages on a given partition are processed in order - which might account for the behaviour you are observing. If sending messages A,B,C,D to partitions 0,1,2,0 - D will have to wait until A is processed, even if there are two other threads available.
One option for increasing throughput is to overpartition (which is a fairly typical strategy in Kafka). This would spread out messages even further and would increase the chance that messages are sent to different threads.
The other option for you to increase throughput if you don't care about ordering would be to process messages asynchronously downstream: e.g. by bridging the input channel to an ExecutorChannel.
Generally speaking, partitioned
refers to the ability of a client to receive partitioned data (Kafka clients are always partitioned, but this setting also applies to Rabbit and/or Redis). It is used in conjunction with the properties instanceIndex
and instanceCount
to ensure that the partitions of a topic are divided correctly between multiple application instances (also see http://docs.spring.io/spring-cloud-stream/docs/1.0.0.M4/reference/htmlsingle/index.html#_instance_index_and_instance_count)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With