I read documentation about prefetch buffer
.As per my understanding If I assign Prefetch
value =1 to consumer A. Activemq push 1 message at a time to A .once A sends acknowledgement to activemq,then only activemq push another message to A.
My doubt was,where I need to assign prefetch value to consumer.
Am I need to assign prefetch value in consumer program.If it is right,can you explain with simple code.
Thanks.
As per the ActiveMQ manual:
ActiveMQ uses a prefetch limit on how many messages can be streamed to a consumer at any point in time. Once the prefetch limit is reached, no more messages are dispatched to the consumer until the consumer starts sending back acknowledgements of messages (to indicate that the message has been processed). The actual prefetch limit value can be specified on a per consumer basis.
To change the prefetch size for all consumer types you would use a connection URI similar to:
tcp://localhost:61616?jms.prefetchPolicy.all=50
To change the prefetch size for just queue consumer types you would use a connection URI similar to:
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
It can also be configured on a per consumer basis using Destination Options.
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
Though it is old thread.
If you are using spring with ActiveMQ integration
<!-- A connection to ActiveMQ -->
<bean id="orderStatusAmqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
<property name="prefetchPolicy" ref="prefetchPolicy" />
<property name="optimizeAcknowledge" value="true" />
<property name="useAsyncSend" value="true" />
<property name="trustedPackages">
<list>
<value>com.myapp.tradingplatform</value>
<value>java</value>
</list>
</property>
</bean>
<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch" value="1000" />
</bean>
<!-- A cached connection to wrap the ActiveMQ connection -->
<bean id="orderStatusCachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<ref bean="orderStatusAmqConnectionFactory" />
</property>
<property name="sessionCacheSize">
<value>100</value>
</property>
</bean>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With