I am aware that it is not possible to order multiple partitions in Kafka and that partition ordering is only guaranteed for a single consumer within a group (for a single partition). However with Kafka Streams 0.10 is it now possible to achieve this? If we use the timestamp feature so that each message in each partition maintains the order, at the consumer side, lets say with Kafka Streams 0.10 is this now possible? Assuming we receive all messages could we not sort all the partitions based on the consumed timestamp and perhaps forward them on to a separate topic for consumption?
At the moment I need to maintain ordering, but this means having a single partition with a single consumer thread. I wanted to change this to multiple partitions to increase parallelism but somehow 'get them in order'.
Any thoughts? thank you.
Ordering Guarantee with Apache Kafka “Apache Kafka preserves the order of messages within a partition. This means that if messages were sent from the producer in a specific order, the broker will write them to a partition in that order and all consumers will read them in that order.”
The consumers in a group divide the topic partitions as fairly amongst themselves as possible by establishing that each partition is only consumed by a single consumer from the group. When the number of consumers is lower than partitions, same consumers are going to read messages from more than one partition.
A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.
Using the right partitioning strategies allows your application to handle terabytes of data at scale with minimal latency. A Kafka producer can write to different partitions in parallel, which generally means that it can achieve higher levels of throughput.
There are two problems you are facing in such a situation:
I am aware that it is not possible to order multiple partitions in Kafka and that partition ordering is only guaranteed for a single consumer within a group (for a single partition). However with Kafka Streams 0.10 is it now possible to achieve this?
The short answer is: No, it is still not possible to achieve global order when you are reading from Kafka topics that have multiple partitions.
Also, "partition ordering" means "partition ordering based on the offsets of the messages in a partition". The ordering guarantee is not related to the timestamps of the messages.
Lastly, ordering is only guaranteed if max.in.flight.requests.per.connection == 1
:
Producer configuration settings from the Apache Kafka documentation:
max.in.flight.requests.per.connection
(default:5
): The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
Note that at this point we are talking about a combination of consumer behavior (which is what your original question started out with) and producer behavior in Kafka.
If we use the timestamp feature so that each message in each partition maintains the order, at the consumer side, lets say with Kafka Streams 0.10 is this now possible?
Even with the timestamp feature we still don't achieve "each message in each partition maintains the order". Why? Because of the possibility of late-arriving / out-of-order messages.
A partition is ordered by offsets, but it is not guaranteed to be ordered by timestamps. The following contents of a partition is perfectly possible in practice (timestamps are normally milliseconds-since-the-epoch):
Partition offsets 0 1 2 3 4 5 6 7 8
Timestamps 15 16 16 17 15 18 18 19 17
^^
oops, late-arriving data!
What are late-arriving / out-of-order messages? Imagine you have sensors scattered all over the world, all of which measure their local temperature and send the latest measurement to a Kafka topic. Some sensors may have unreliable Internet connectivity, thus their measurements may arrive with a delay of minutes, hours, or even days. Eventually their delayed measurements will make it to Kafka, but they will arrive "late". Same for mobile phones in a city: Some may run out of battery/energy and need to be recharged before they can send their data, some may lose Internet connectivity because you're driving underground, etc.
Assuming we receive all messages could we not sort all the partitions based on the consumed timestamp and perhaps forward them on to a separate topic for consumption?
In theory yes, but in practice that's quite difficult. The assumption "we receive all messages" is actually challenging for a streaming system (even for a batch processing system, though presumably the problem of late-arriving data is often simply ignored here). You never know whether you actually have received "all messages" -- because of the possibility of late-arriving data. If you receive a late-arriving message, what do you want to happen? Re-process/re-sort "all" the messages again (now including the late-arriving message), or ignore the late-arriving message (thus computing incorrect results)? In a sense, any such global ordering achieved by "let's sort all of them" is either very costly or best effort.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With