Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the right way to "pause" a route using camel with activemq?

My basic problem is that I can only process 7000 messages from one of my queues (across all machines) in any 1 hour time period. I don't see a way to do this with camel or activemq, so I resorted to implementing my own route stopping/starting logic. I see a number of ways to do this, and I've tried a few of them (only to run into problems).

  1. camelContext.stopRoute(route): This works in that messages stop being processed, but when I call camelContext.startRoute(route), it leaks a tcp connection which eventually causes the activemq server to hit its limit and die.
  2. camelContext.suspendRoute(route): This also stops messages from being processed and does not leak connections, but it appears to kill active consumers (visible in the admin panel) that don't reactivate when I call camelContext.resumeRoute(route). I think that could eventually lead to no messages being processed off that queue at all, even if I resume.
  3. implementing a custom RoutePolicy. To be fair, I haven't tried this yet, but it seems that it would fall prey to the same problems I had according to the pause method I chose above.

Is there a method for solving this problem that I haven't encountered yet?

like image 757
Denise Avatar asked Oct 09 '14 16:10

Denise


People also ask

What is stop in camel?

Using stop is for shutting down Camel and it's not guaranteed that when it's being started again using the start method that Camel will operate consistently. Adds a component to the context. Adds the endpoint to the context using the given URI.

How does a camel route work?

Contents. A Camel route is where the integration flow is defined. For example to integrate two systems then a Camel route can be coded to specify how these systems are integrated. An example could be to take files from a FTP server and send to a ActiveMQ messaging system.

What is direct in camel route?

The Direct component provides direct, synchronous invocation of any consumers when a producer sends a message exchange. This endpoint can be used to connect existing routes in the same camel context.

What is ActiveMQ in Apache Camel?

The ActiveMQ component allows messages to be sent to a JMS Queue or Topic or messages to be consumed from a JMS Queue or Topic using Apache ActiveMQ.


1 Answers

Instead of stopping the route, I would recommend to use the Throttler EIP.

from("jms:queue:inbox")
    .throttle(7000)
    .timePeriodMillis(1000*60*60)
    .to("log:result", "mock:result");

The above example will throttle messages received on jms:queue:inbox before being sent to mock:result ensuring that a maximum of 7000 messages are sent in any 1 hour window.

Alternatively, for more fine grained control you may define a throttling route policy as shown in Camel's throttling example:

<route routePolicyRef="myPolicy">
    <from uri="jms:queue:inbox"/>
    <transacted/>
    <to uri="log:+++JMS +++?groupSize=100"/>
    <to ref="foo"/>
</route>

The throttling police is defined as follows:

<bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
    <property name="scope" value="Context"/>
    <!-- when we hit > 20 inflight exchanges then kick in and suspend the routes -->
    <property name="maxInflightExchanges" value="20"/>
    <!-- when we hit lower than 10% of the max = 2 then kick in and resume the routes the default percentage is 70% but in this demo we want a low value -->
    <property name="resumePercentOfMax" value="10"/>
    <!-- output throttling activity at WARN level -->
    <property name="loggingLevel" value="WARN"/>
</bean>

EDIT 1:

If you need a global throttling, then you may first let one consumer read the messages, throttling all messages as described above, then re-send them to another queue and let re-reading and processing them by >= 1 distributed consumers.

EDIT 2:

Alternatively, you may implement your own ThrottlingInflightRoutePolicy accessing a central database holding processing information. That way, you don't need a "single node master throttler". However, also the DB may be a single point of failure.

like image 55
Peter Keller Avatar answered Sep 30 '22 01:09

Peter Keller