My basic problem is that I can only process 7000 messages from one of my queues (across all machines) in any 1 hour time period. I don't see a way to do this with camel or activemq, so I resorted to implementing my own route stopping/starting logic. I see a number of ways to do this, and I've tried a few of them (only to run into problems).
camelContext.stopRoute(route)
: This works in that messages stop being processed, but when I call camelContext.startRoute(route)
, it leaks a tcp connection which eventually causes the activemq server to hit its limit and die.camelContext.suspendRoute(route)
: This also stops messages from being processed and does not leak connections, but it appears to kill active consumers (visible in the admin panel) that don't reactivate when I call camelContext.resumeRoute(route)
. I think that could eventually lead to no messages being processed off that queue at all, even if I resume.RoutePolicy
. To be fair, I haven't tried this yet, but it seems that it would fall prey to the same problems I had according to the pause method I chose above.Is there a method for solving this problem that I haven't encountered yet?
Using stop is for shutting down Camel and it's not guaranteed that when it's being started again using the start method that Camel will operate consistently. Adds a component to the context. Adds the endpoint to the context using the given URI.
Contents. A Camel route is where the integration flow is defined. For example to integrate two systems then a Camel route can be coded to specify how these systems are integrated. An example could be to take files from a FTP server and send to a ActiveMQ messaging system.
The Direct component provides direct, synchronous invocation of any consumers when a producer sends a message exchange. This endpoint can be used to connect existing routes in the same camel context.
The ActiveMQ component allows messages to be sent to a JMS Queue or Topic or messages to be consumed from a JMS Queue or Topic using Apache ActiveMQ.
Instead of stopping the route, I would recommend to use the Throttler EIP.
from("jms:queue:inbox")
.throttle(7000)
.timePeriodMillis(1000*60*60)
.to("log:result", "mock:result");
The above example will throttle messages received on jms:queue:inbox
before being sent to mock:result
ensuring that a maximum of 7000 messages are sent in any 1 hour window.
Alternatively, for more fine grained control you may define a throttling route policy as shown in Camel's throttling example:
<route routePolicyRef="myPolicy">
<from uri="jms:queue:inbox"/>
<transacted/>
<to uri="log:+++JMS +++?groupSize=100"/>
<to ref="foo"/>
</route>
The throttling police is defined as follows:
<bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
<property name="scope" value="Context"/>
<!-- when we hit > 20 inflight exchanges then kick in and suspend the routes -->
<property name="maxInflightExchanges" value="20"/>
<!-- when we hit lower than 10% of the max = 2 then kick in and resume the routes the default percentage is 70% but in this demo we want a low value -->
<property name="resumePercentOfMax" value="10"/>
<!-- output throttling activity at WARN level -->
<property name="loggingLevel" value="WARN"/>
</bean>
EDIT 1:
If you need a global throttling, then you may first let one consumer read the messages, throttling all messages as described above, then re-send them to another queue and let re-reading and processing them by >= 1 distributed consumers.
EDIT 2:
Alternatively, you may implement your own ThrottlingInflightRoutePolicy
accessing a central database holding processing information. That way, you don't need a "single node master throttler". However, also the DB may be a single point of failure.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With