Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java Config Inbound File Adapter Transaction Management

I have a simple IntegrationFlow that I'm trying to secure with transaction management. From a high level, it polls a directory for a file, gzips that file to a new location on disk, and then uploads that second file to S3. If it commits, I want to publish both paths to one channel (where they'll be deleted); if it rolls back, I want to publish them to a different channel (where they'll be archived).

My (abbreviated) attempted config looks like this:

@Bean
public IntegrationFlow fromFileFlow(@Qualifier("inboundMessageDirectory") Path inboundMessageDirectory,
                                    @Qualifier("consumableFileFilter") FileListFilter<File> fileListFilter,
                                    @Qualifier(ChannelNames.TO_GZIP_SERVICE) MessageChannel outbound,
                                    @Qualifier("transactionSynchronizationFactory") TransactionSynchronizationFactory transactionSynchronizationFactory) {
    return IntegrationFlows
            .from(s -> s.file(inboundMessageDirectory.toFile()),
                    e -> e.poller(Pollers
                            .fixedDelay(100)
                            .transactionSynchronizationFactory(transactionSynchronizationFactory)
                            .transactional(new PseudoTransactionManager())
                            .get()))
            .channel(outbound)
            .get();
}

The services then look like:

@ServiceActivator(inputChannel = ChannelNames.TO_GZIP_SERVICE, outputChannel = ChannelNames.TO_S3_SERVICE)
public Path gzip(@Payload Path path, @Header(ApplicationHeaders.REFERENCE_ID) String referenceId, @Header(ApplicationHeaders.DATA_TYPE) String dataType,
                 @Header(ApplicationHeaders.BUCKET_END) long bucketEnd) throws IOException {
     // ...
}

@ServiceActivator(inputChannel = ChannelNames.TO_S3_SERVICE)
@Retryable(interceptor = RETRY_INTERCEPTOR_BEAN_NAME)
public void sendToS3(@Payload Path path, @Header(ApplicationHeaders.BUCKET_END) long bucketStart,
                     @Header(ApplicationHeaders.DATA_TYPE) String dataType) throws IOException {
     // ...
}

My custom TransactionSynchronizationProcessor (I use the DefaultTransactionSynchronizationFactory) is implemented more or less as follows (code not shown extracts paths from the message payloads and stores it in the IntegrationResourceHolder's attributes):

@Override
public void processBeforeCommit(IntegrationResourceHolder holder) {
    updatePaths(holder);
}

@Override
public void processAfterCommit(IntegrationResourceHolder holder) {
    updateAndSend(successChannel, holder);
}

@Override
public void processAfterRollback(IntegrationResourceHolder holder) {
    updateAndSend(failureChannel, holder);
}

My understanding is that, because all of the interstitial channels are direct channels, the transaction should encompass the services too. Because it updates at before, after commit, and after rollback, I would expect it to visit the message at the start, grab the ungzipped path, and then visit it at the end and get the gzipped path -- and then try to act on them. However, only the ungzipped path is ever extracted.

Clearly, my application (and understanding) of the transactions is missing something. What is the right way for me to implement the desired behavior?

like image 411
jwilner Avatar asked Mar 23 '26 03:03

jwilner


1 Answers

No, you misunderstood it a bit.

With that transactional and transactionSynchronizationFactory we worry there only about the source. The final result as a data isn't important for transactional resource. We need to know there only the state to commit or rallback transaction.

That's why IntegrationResourceHolder has a message property to carry the source data to worry about in the end of TX.

Like try to imaging just a DB transaction and data in the end of TX when we rallback it. Right, there is no that data! Only the input of the TX is important.

However you can achieve your requirements with an additional ResourceHolder and ResourceHolderSynchronization to use somewhere before S3 store. The simple ThreadLocal may help, too. But be sure to clear it properly in the end of TX.

From other side consider to carry everything you needed withing the headers of the message from the s.file. For this purpose the AbstractMessageSourceAdvice would be useful since Spring Integration 4.2.

like image 68
Artem Bilan Avatar answered Mar 25 '26 16:03

Artem Bilan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!