I have a producer-consumer scenario where the producers produce much faster than the consumers can consume. Generally, the solution is to make the producers block since a producer/consumer scenario operates as fast as the slowest component. Throttling or blocking the producers is not a good solution because our application provides enough time for consumers to catch up later.
Here's a diagram depicting a full "phase" in our application vs. a more common scenario:
Our Application Common Scenario
2N +--------+--------+
|PPPPPPPP|oooooooo| P = Producer
|PPPPPPPP|oooooooo| C = Consumer
N +--------+--------+ N +--------+--------+--------+ o = Other Work
|CPCPCPCP|CCCCCCCC| |CPCPCPCP|CPCPCPCP|oooooooo| N = number of tasks
|CPCPCPCP|CCCCCCCC| |CPCPCPCP|CPCPCPCP|oooooooo|
------------------- ----------------------------
0 T/2 T 0 T/2 T 3T/2
The idea is to maximize throughput by not inhibiting the producers.
The data on which our tasks operate is easily serialized, so I plan to implement a filesystem solution for spilling all the tasks that can't be immediately satisfied.
I'm using Java's ThreadPoolExecutor
with a BlockingQueue
with a maximum capacity to ensure we don't run out of memory. The problem is in implementing such a "tiered" queue, where tasks that can be queued in memory are done so immediately, otherwise the data is queued on disk.
I've come up with two possible solutions:
BlockingQueue
from scratch, using the LinkedBlockingQueue
or ArrayBlockingQueue
implementation as a reference. This may be as simple as copying the implementation in the standard library and adding filesystem read/writes.BlockingQueue
implementation, implement a separate FilesystemQueue
for storing my data, and using one or more threads to dequeue files, create Runnable
s and enqueue them using the ThreadPoolExecutor
.Are either of these reasonable and is there potentially a better approach?
Before going for a more complex solution, are you really confident that using a bounded BlockingQueue
is a deal-breaker for you? It may turn out that increasing your heap size and preallocating generous enough capacity is still OK for you. It will allow you to avoid complexity and performance uncertainty, on the price of GC pauses that are well within your comfort zone.
Still, if your workload is that unbalanced that it can take advantage of persisting an amount of messages that cannot fit in memory (compared to a proven MPMC blocking queue), it sounds like you need a simpler, smaller version of ActiveMQ or its Apollo off-shoot. Depending on your application, you may find ActiveMQ's other features useful, in which case you can use it directly. If not, you're probably better off searching the JMS space, as bowmore suggests.
The first option is to increase the available heap space size, as suggested by Dimitar Dimitrov, using the memory flag -Xmx
, e.g.java -Xmx2048m
From Oracle's Documentation: Note that the JVM uses more memory than just the heap. For example Java methods, thread stacks and native handles are allocated in memory separate from the heap, as well as JVM internal data structures.
Here is also a diagram of how java heap memory is categorized.
The second option is to use a library that implements the functionality requested. For that purpose you can use ashes-queue
From project's overview: This is a simple FIFO implementation in Java which has persistent support. That is, if the queue is full, the overflowing messages will be persisted and when there are available slots, they will be put back into memory.
The third option is to create your own implementation. For that matter, you may preview this thread which guides you to that purpose.
Your suggestions are included in this last third option. Both are reasonable. From an implementation point of view, you should go with the first option as it will guarantee an easier implementation and clean design.
This sounds like the ideal situation to use a JMS queue, rather than a file system.
Instead of using a blocking queue, post the messages on a persistent JMS queue. You could still try the tiered approach, combining a JMS queue in parallel with a BlockingQueue
, posting to the JMS queue when the BlockingQueue is full, but I'm sure the pure JMS approach will work fine by itself.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With