Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a custom SamplingService for a custom message processor?log after that an element is retrieved and before the sequence is performed

I am pretty new in WSO2 ESB and I have the implement a custom message processor with this specific behavior: perform an operation after that an element is retrieved from the message store and before the sequence related to this message processor is performed.

I try to explain it in details.

This is my ESB message processor definition:

<?xml version="1.0" encoding="UTF-8"?>
<!---<messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">-->
<messageProcessor class="com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="sequence">transferProcessorSequence</parameter>
    <parameter name="interval">1000</parameter>
    <parameter name="is.active">true</parameter>
    <parameter name="concurrency">1</parameter>
</messageProcessor>

It retrieve some elements (XML documents) form the transferFromMessageStore (a queue) and pass this object to the transferProcessorSequence.xml sequence that use it. As you can see at this time I have implemented a custom message processor SamplingProcessorHeaderRateLimit that simply extends the org.apache.synapse.message.processor.impl.sampler.SamplingProcessor WSO2 class. At this time it only show a log when the init() method is performed. I deployed it on my Carbon server and it works.

Here you can find the entire project code.

Ok but from what I have understood to obtain the desired behavior I have not to simply extend the SamplingProcessor class because in order to do custom implementation between every message consumption and dispatch to the sequence, need to extend the class SamplingService class, this one.

I think that I need to override execute() or fetch(MessageConsumer msgConsumer).

At this time should be ok also insert a log, something that write into the log file each time that an element is retrieved from the message store and before that the sequence related to the message processor is performed.

Is it possible?

So my main main doubs are:

1) Have I to create a class extending the SamplingService class into the same project in which I am implementing my custom message processor (this behavior have to be used only for this specific message processor in my WSO2 ESB project, all the other message processor used in this project have to use the standard SamplingService implementation).

2) Another doubt is related about how this custom SamplingService implementation is passed to my custom message processor. Into the the SamplingProcessor WSO2 class (how to associate a specific custom message processor implementation with a custom SamplingService implementation handling its lifecycle).

like image 901
AndreaNobili Avatar asked Jul 26 '17 08:07

AndreaNobili


1 Answers

1) Have I to create a class extending the SamplingService class into the same project in which I am implementing my custom message processor (this behavior have to be used only for this specific message processor in my WSO2 ESB project, all the other message processor used in this project have to use the standard SamplingService implementation).

Your custom SamplingProcessorHeaderRateLimitation will only consume messages coming in to transferFromMessageStore and will inject messages it consumed and processed only to sequence transferProcessorSequence. All other paths will not get processed by this message processor.

2) Another doubt is related about how this custom SamplingService implementation is passed to my custom message processor. Into the the SamplingProcessor WSO2 class (how to associate a specific custom message processor implementation with a custom SamplingService implementation handling its lifecycle).

If you look at the Source Code you implemented SamplingProcessorHeaderRateLimitation.getTask() You have tied your custom SamplingService2 with your custom SamplingProcessorHeaderRateLimitation

@Override
protected Task getTask() {
    logger.info("getTask() START");
    System.out.println("getTask() START");
    logger.info("getTask() END");
    System.out.println("getTask() END");
    return (Task) new SamplingService2(this, synapseEnvironment, CONCURRENCY, SEQUENCE, isProcessorStartAsDeactivated());
}
like image 115
shazin Avatar answered Nov 08 '22 12:11

shazin