Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mule ESB - two files as input (wait for both)

I want to make a flow where:

  1. waiting for two files: file_name.xdf and file_name.dff : if both files (I want to process two files in the same time (waiting for both), the file name of those files shoud be the same)
  2. process those files convert to byte array
  3. run groovy script

How can I make the first point?

like image 904
lukisp Avatar asked Apr 23 '13 22:04

lukisp


1 Answers

You can aggregate the files based on their "base name" (i.e. take the file name without the extension), and then process each file in the aggregated set.

Aggregation could be performed using a Collection-Aggregator or a Custom-Aggregator, here an example for each one:

Using a Collection-Aggregator:

<flow name="two-files-per-process-with-collection-aggregator">
    <file:inbound-endpoint path="/file-in" moveToDirectory="/file-in-process" responseTimeout="10000" doc:name="Read files" >
        <file:filename-regex-filter pattern=".*\.aaa|.*\.bbb" caseSensitive="true"/>
    </file:inbound-endpoint>
    <set-property propertyName="MULE_CORRELATION_ID" value="#[message.inboundProperties.originalFilename.substring(0,message.inboundProperties.originalFilename.lastIndexOf('.'))]" doc:name="Set Correlation-Id"/>
    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="2" doc:name="Set Correlation-Group-Size"/>
    <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
    <foreach doc:name="For Each">
        <logger message="#[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/>
    </foreach>
</flow>


Using a Custom-Aggregator (you will need a custom java class):

<flow name="two-files-per-process-with-custom-aggregator">
    <file:inbound-endpoint path="/file-in" moveToDirectory="/file-in-process" responseTimeout="10000" doc:name="Read files">
        <file:filename-regex-filter pattern=".*\.aaa|.*\.bbb" caseSensitive="true"/>
    </file:inbound-endpoint>
    <custom-aggregator failOnTimeout="true" class="org.mnc.MatchFileNames" doc:name="Custom Aggregator"/>
    <foreach doc:name="For Each">
        <logger message="#[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/>
    </foreach>
</flow>

And this is a possible implementation for the custom aggregator (it could be more elegant:

package org.mnc;

import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.routing.RoutingException;
import org.mule.routing.AbstractAggregator;
import org.mule.routing.EventGroup;
import org.mule.routing.correlation.EventCorrelatorCallback;

public class MatchFileNames extends AbstractAggregator {

    @Override
    protected EventCorrelatorCallback getCorrelatorCallback(final MuleContext muleContext) {
        return new EventCorrelatorCallback() {

            @Override
            public boolean shouldAggregateEvents(EventGroup events) {
                return events.size()==2;
            }

            @Override
            public EventGroup createEventGroup(MuleEvent event, Object id) {
                String filename = event.getMessage().getInboundProperty("originalFilename");
                String commonFilename = filename.substring(0, filename.lastIndexOf('.'));
                System.out.println(filename + " -> " + commonFilename);
                return new EventGroup(commonFilename, muleContext, 2, true, storePrefix);
            }

            @Override
            public MuleEvent aggregateEvents(EventGroup events) throws RoutingException {
                return events.getMessageCollectionEvent();
            }
        };
    }
}
like image 147
MarcosNC Avatar answered Jan 04 '23 12:01

MarcosNC