I am pretty new in WSO2 ESB and I have the implement a custom message processor with this specific behavior: perform an operation after that an element is retrieved from the message store and before the sequence related to this message processor is performed.
I try to explain it in details.
This is my ESB message processor definition:
<?xml version="1.0" encoding="UTF-8"?>
<!---<messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">-->
<messageProcessor class="com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">
<parameter name="sequence">transferProcessorSequence</parameter>
<parameter name="interval">1000</parameter>
<parameter name="is.active">true</parameter>
<parameter name="concurrency">1</parameter>
</messageProcessor>
It retrieve some elements (XML documents) form the transferFromMessageStore (a queue) and pass this object to the transferProcessorSequence.xml sequence that use it. As you can see at this time I have implemented a custom message processor SamplingProcessorHeaderRateLimit that simply extends the org.apache.synapse.message.processor.impl.sampler.SamplingProcessor WSO2 class. At this time it only show a log when the init() method is performed. I deployed it on my Carbon server and it works.
Here you can find the entire project code.
Ok but from what I have understood to obtain the desired behavior I have not to simply extend the SamplingProcessor class because in order to do custom implementation between every message consumption and dispatch to the sequence, need to extend the class SamplingService class, this one.
I think that I need to override execute() or fetch(MessageConsumer msgConsumer).
At this time should be ok also insert a log, something that write into the log file each time that an element is retrieved from the message store and before that the sequence related to the message processor is performed.
Is it possible?
So my main main doubs are:
1) Have I to create a class extending the SamplingService class into the same project in which I am implementing my custom message processor (this behavior have to be used only for this specific message processor in my WSO2 ESB project, all the other message processor used in this project have to use the standard SamplingService implementation).
2) Another doubt is related about how this custom SamplingService implementation is passed to my custom message processor. Into the the SamplingProcessor WSO2 class (how to associate a specific custom message processor implementation with a custom SamplingService implementation handling its lifecycle).
1) Have I to create a class extending the SamplingService class into the same project in which I am implementing my custom message processor (this behavior have to be used only for this specific message processor in my WSO2 ESB project, all the other message processor used in this project have to use the standard SamplingService implementation).
Your custom SamplingProcessorHeaderRateLimitation
will only consume messages coming in to transferFromMessageStore
and will inject messages it consumed and processed only to sequence transferProcessorSequence
. All other paths will not get processed by this message processor.
2) Another doubt is related about how this custom SamplingService implementation is passed to my custom message processor. Into the the SamplingProcessor WSO2 class (how to associate a specific custom message processor implementation with a custom SamplingService implementation handling its lifecycle).
If you look at the Source Code you implemented SamplingProcessorHeaderRateLimitation.getTask()
You have tied your custom SamplingService2
with your custom SamplingProcessorHeaderRateLimitation
@Override
protected Task getTask() {
logger.info("getTask() START");
System.out.println("getTask() START");
logger.info("getTask() END");
System.out.println("getTask() END");
return (Task) new SamplingService2(this, synapseEnvironment, CONCURRENCY, SEQUENCE, isProcessorStartAsDeactivated());
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With