I am currently generating async client code using wsdl-to-java which is being used to query a SOAP web service. Here is a snippet of the generated async method:
@WebMethod(operationName = "GetSession")
public Future<?> getSessionAsync(
@WebParam(partName = "parameters", name = "GetSessionRequest")
mynamespace.GetSessionRequest parameters,
@WebParam(partName = "ResponseHeader", mode = WebParam.Mode.OUT, name = "ResponseHeader", header = true)
javax.xml.ws.Holder<mydatacontract.ResponseHeader> responseHeader,
@WebParam(name = "asyncHandler", targetNamespace = "")
AsyncHandler<myservice.GetSessionResponse> asyncHandler
);
I am calling the above generated code in a wrapper class:
getSession(GetSessionRequest request) {
Future<?> response = generatedClient.getSessionAsync(request, responseHeader, handler)
}
handler(Response<GetSessionResponse> response) {
// no access to SOAP XML at this point?
}
As per my understanding, the generated code takes care of serialization/de-serialization and I do not have access to the raw SOAP response. There are ways to log the SOAP XML response as described here but I need to access this in code as the response needs to be dumped into the database.
Is there any way to access this in the handler without touching the generated client code?
UPDATE:
I am able to read the ResponseContext which is of type java.util.Map<String, Object>. But this does not return the raw SOAP XML that I am looking for.
Also, using an inbound Interceptor would mean that I lose the context of the calling function. This is required to store the XML response associated with each call in the database.
UPDATE 2:
The Future returns an object of type Response which can be found at Response.java. The jdoc states the following:
The interface provides methods used to obtain the
payload and context of a message sent in response to an operation
invocation.
However, I am only able to retrieve the Context and no property to access the payload.
I found an SO answer that has a solution for Axis here. Is it possible to have something similar in cxf?
The problem: SOAP messages can be intercepted, but at a very separate location to the original call site. It's difficult to pass the SOAP message back to the original call site, particularly in multithreaded or asynchronous environments.
The only solution I can see is to explicitly only have one JAX-WS Proxy, which has one Handler, per request. Having only one Proxy for an application would be a bottleneck, so it requires using multithreading tools to allow for parallel and async execution.
Here's my idea, in code. First I go through it step-by-step, and at the end there's a dump of all code.
UPDATE: I've replaced the LinkedBlockingQueue with a static ThreadLocal<SoapApiWrapper>
instance, and the executor with a newWorkStealingPool(). See the edit history for the changes!
I've set it up to use http://www.dneonline.com/calculator.asmx. It compiles and runs, but I haven't spent a lot of time ensuring it works correctly or is optimal. I'm sure there are issues (my CPU fan is working hard, even though I'm not running code). Be warned!
(Does anyone know of a better public SOAP API that I can either run locally or flood with
requests?)
If you'd like to test, here are some public SOAP APIs:
https://documenter.getpostman.com/view/8854915/Szf26WHn
Implement SOAPHandler class that will capture messages, called SoapMessageHandler.
public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {
// capture messages in a list
private final List<SOAPMessageContext> messages = new ArrayList<>();
// get & clear messages
public List<SOAPMessageContext> collectMessages() {
var m = new ArrayList<>(messages);
messages.clear();
return m;
}
@Override
public boolean handleMessage(SOAPMessageContext context) {
messages.add(context); // collect message
return true;
}
@Override
public boolean handleFault(SOAPMessageContext context) {
messages.add(context); // collect error
return true;
}
}
Define a SoapApiWrapper class that
SoapMessageHandler,class SoapApiWrapper {
// 1. create a handler
private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();
private final CalculatorSoap connection;
public SoapApiWrapper() {
// 2. create one connection
var factoryBean = new JaxWsProxyFactoryBean();
factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
factoryBean.setServiceClass(CalculatorSoap.class);
// 3. add the Handler
factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));
connection = factoryBean.create(CalculatorSoap.class);
}
}
Define a SoapApiManager that has
ExecutorService, which will manage the SOAP requests and responsesThreadLocal<SoapApiWrapper>, so each thread has an JAX-WS Proxy (idea
from https://stackoverflow.com/a/16680215/4161471)public class SoapApiManager {
// 1. request executor
private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);
private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);
}
The SoapApiManager has a method, submitRequest(...). It will return the SOAP API response **
and** the SOAP messages.
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) {
//...
}
The parameter is a SoapRequestRunner, a lambda that accepts a JAX-WS Proxy and returns a SOAP
Response.
@FunctionalInterface
interface SoapRequestRunner<ResponseT> {
ResponseT sendRequest(CalculatorSoap calculatorSoap);
}
When invoked, submitRequest(...) performs the following:
SoapRequestRunner with CompleteableFuture.supplyAsync(...), and using
our ExectutorServiceSoapApiWrapper from the ThreadLocal,SoapRequestRunner to the SoapApiWrapper's JAX-WS
Proxy)SoapApiWrapper's SOAPHandler,SoapResponseHolder public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) { // 1. use CompletableFuture & executorService
return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
}
private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
SoapRequestRunner<ResponseT> requestRunner
) {
return () -> {
SoapApiWrapper api = null;
try {
api = soapApiWrapperQueue.get(); // 2. fetch an API Wrapper
var response = requestRunner.sendRequest(api.connection); // 3&4. request & response
var messages = api.soapMessageHandler.collectMessages(); // 5. extract raw SOAP messages
return new SoapResponseHolder<>(response, messages); // 6. bundle into DTO
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (api != null) {
soapApiWrapperQueue.offer(api);
}
}
};
}
public class Main {
public static void main(String[] args) {
SoapApiManager apiManager = new SoapApiManager();
apiManager
.submitRequest((soapApi) -> soapApi.add(5, 4))
.thenAccept(response -> {
// we can get the SOAP API response
var sum = response.getResponse();
// and also the intercepted messages!
var messages = response.getMessages();
var allXml = messages.stream().map(Main::getRawXml).collect(Collectors.joining("\n---\n"));
System.out.println("sum: " + sum + ",\n" + allXml);
});
}
public static String getRawXml(SOAPMessageContext context) {
try {
ByteArrayOutputStream byteOS = new ByteArrayOutputStream();
context.getMessage().writeTo(byteOS);
return byteOS.toString(StandardCharsets.UTF_8);
} catch (SOAPException | IOException e) {
throw new RuntimeException(e);
}
}
}
sum: 105,
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
<soap:Body>
<Add xmlns="http://tempuri.org/">
<intA>73</intA>
<intB>32</intB>
</Add>
</soap:Body>
</soap:Envelope>
---
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body>
<AddResponse xmlns="http://tempuri.org/">
<AddResult>105</AddResult>
</AddResponse>
</soap:Body>
</soap:Envelope>
Here's a working example, complete with validation of the responses.
It creates a lot (REQUESTS_COUNT) of requests and submits them all to SoapApiManager.
Each request prints out the thread's name, and the hashcode of the JAX-WS Proxy (I wanted to check
they were being reused), and the basic input/output (e.g. -9 - 99 = -108).
There's validation to make sure each SoapResponseHolder has the correct result and raw SOAP
messages, and that the correct number of requests were sent.
Main.java
import com.github.underscore.lodash.Xml;
import com.github.underscore.lodash.Xml.XmlStringBuilder.Step;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.soap.SOAPException;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class Main implements AutoCloseable {
private final SoapApiManager apiManager = new SoapApiManager();
private static final int THREAD_COUNT = 4;
private static final int REQUESTS_COUNT = 500;
private final AtomicInteger i = new AtomicInteger();
public static void main(String[] args)
throws InterruptedException {
try (var m = new Main()) {
m.run();
}
}
private void run() throws InterruptedException {
var executor = Executors.newFixedThreadPool(THREAD_COUNT);
var tasks = Stream.generate(() -> Map.entry(randomInt(), randomInt()))
.limit(REQUESTS_COUNT)
.map(intA -> (Callable<Boolean>) () -> {
sendAndValidateRequest(intA.getKey(), intA.getValue());
i.incrementAndGet();
return true;
})
.collect(Collectors.toList());
executor.invokeAll(tasks);
var waiter = Executors.newSingleThreadScheduledExecutor();
waiter.scheduleWithFixedDelay(
() -> {
var size = i.get();
System.out.println(">waiting... (size " + size + ")");
if (size >= REQUESTS_COUNT) {
System.out.println(">finished waiting! " + size);
waiter.shutdownNow();
}
},
3, 3, TimeUnit.SECONDS
);
System.out.println("Finished sending tasks " + waiter.awaitTermination(10, TimeUnit.SECONDS));
waiter.shutdownNow();
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
executor.shutdown();
System.out.println(
"executor.awaitTermination " + executor.awaitTermination(10, TimeUnit.SECONDS));
if (!executor.isTerminated()) {
System.out.println("executor.shutdownNow " + executor.shutdownNow());
}
if (i.get() != REQUESTS_COUNT) {
throw new RuntimeException(
"Test did not execute " + REQUESTS_COUNT + " times, actual: " + i.get()
);
}
}
private int randomInt() {
return ThreadLocalRandom.current().nextInt(-100, 100);
}
private void sendAndValidateRequest(int a, int b) {
apiManager
.submitRequest((soapApi) -> {
var response = soapApi.add(a, b);
System.out.printf(
"[%-12s / %-18s] %4d %s %3d = %4d\n",
soapApi.hashCode(),
Thread.currentThread().getName(),
a,
(b >= 0 ? "+" : "-"),
Math.abs(b),
response
);
return response;
})
.thenAcceptAsync(response -> {
var sum = response.getResponse();
var messages = response.getMessages();
var allXml = messages.stream().map(Main::getRawXml)
.collect(Collectors.joining("\n---\n"));
if (sum != a + b) {
throw new RuntimeException(
"Bad sum, sent " + a + " + " + b + ", result: " + sum + ", xml: " + allXml
);
}
if (messages.size() != 2) {
throw new RuntimeException(
"Bad messages, expected 1 request and 1 response, but got " + messages.size()
+ ", xml: " + allXml
);
}
if (!allXml.contains("<AddResult>" + (a + b) + "</AddResult>")) {
throw new RuntimeException(
"Bad result, did not contain AddResult=" + (a + b) + ", actual: " + allXml
);
}
});
}
public static String getRawXml(SOAPMessageContext context) {
try (var byteOS = new ByteArrayOutputStream()) {
context.getMessage().writeTo(byteOS);
var rawSoap = byteOS.toString(StandardCharsets.UTF_8);
return Xml.formatXml(rawSoap, Step.TWO_SPACES);
} catch (SOAPException | IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
apiManager.close();
}
}
SoapApiManager.java
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.function.Supplier;
import javax.xml.ws.handler.soap.SOAPMessageContext;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.tempuri.CalculatorSoap;
public class SoapApiManager implements AutoCloseable {
private static final int THREAD_LIMIT = Math.min(Runtime.getRuntime().availableProcessors(), 5);
private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);
private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);
@Override
public void close() {
executorService.shutdown();
}
private static class SoapApiWrapper {
private final CalculatorSoap connection;
private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();
public SoapApiWrapper() {
var factoryBean = new JaxWsProxyFactoryBean();
factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
factoryBean.setServiceClass(CalculatorSoap.class);
factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));
connection = factoryBean.create(CalculatorSoap.class);
}
}
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) {
return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
}
private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
SoapRequestRunner<ResponseT> requestRunner
) {
return () -> {
SoapApiWrapper api = soapApiWrapper.get();
var response = requestRunner.sendRequest(api.connection);
var messages = api.soapMessageHandler.collectMessages();
return new SoapResponseHolder<>(response, messages);
};
}
@FunctionalInterface
interface SoapRequestRunner<ResponseT> {
ResponseT sendRequest(CalculatorSoap calculatorSoap);
}
public static class SoapResponseHolder<ResponseT> {
private final List<SOAPMessageContext> messages;
private final ResponseT response;
SoapResponseHolder(
ResponseT response,
List<SOAPMessageContext> messages
) {
this.response = response;
this.messages = messages;
}
public ResponseT getResponse() {
return response;
}
public List<SOAPMessageContext> getMessages() {
return messages;
}
}
}
SoapMessageHandler.javapackage org.example;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.xml.namespace.QName;
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.handler.soap.SOAPHandler;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {
private final List<SOAPMessageContext> messages = new ArrayList<>();
public List<SOAPMessageContext> collectMessages() {
var m = new ArrayList<>(messages);
messages.clear();
return m;
}
@Override
public Set<QName> getHeaders() {
return Collections.emptySet();
}
@Override
public boolean handleMessage(SOAPMessageContext context) {
messages.add(context);
return true;
}
@Override
public boolean handleFault(SOAPMessageContext context) {
messages.add(context);
return true;
}
@Override
public void close(MessageContext context) {
}
}
build.gradle.ktsplugins {
java
id("com.github.bjornvester.wsdl2java") version "1.2"
}
group = "org.example"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation(enforcedPlatform("org.apache.cxf:cxf-bom:3.4.4"))
implementation("org.apache.cxf:cxf-core")
implementation("org.apache.cxf:cxf-rt-frontend-jaxws")
implementation("org.apache.cxf:cxf-rt-transports-http")
implementation("org.apache.cxf:cxf-rt-databinding-jaxb")
// implementation("org.apache.cxf:cxf-rt-transports-http-jetty")
implementation("org.apache.cxf:cxf-rt-transports-http-hc")
implementation("com.sun.activation:javax.activation:1.2.0")
implementation("javax.annotation:javax.annotation-api:1.3.2")
implementation("com.sun.xml.messaging.saaj:saaj-impl:1.5.1")
implementation("com.github.javadev:underscore:1.68")
//<editor-fold desc="JAXB">
implementation("org.jvnet.jaxb2_commons:jaxb2-basics-runtime:1.11.1")
xjcPlugins("org.jvnet.jaxb2_commons:jaxb2-basics:1.11.1")
//</editor-fold>
//<editor-fold desc="Test">
testImplementation(enforcedPlatform("org.junit:junit-bom:5.7.2")) // JUnit 5 BOM
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
//</editor-fold>
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(11))
}
}
wsdl2java {
cxfVersion.set("3.4.4")
options.addAll("-xjc-Xequals", "-xjc-XhashCode")
}
tasks.test {
useJUnitPlatform()
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With