Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

apache camel polling a rest endpoint

Tags:

apache-camel

I have a rest endpoint sample.org which returns a json response of the form

{
  "response" : "pending" 
}

My route looks like this

from("http://sample.org")
.marshal(xmlFormatConverterUtil.getxmlJsonDataFormat())  //To convert into json as I receive data in xml format which needs to be converted to json

I read about polling consumer but couldn't find an example on how to keep polling the endpoint, till it returns response as "success".

Should a polling consumer be used ? If so can an example relevant to my case be illustrated. Any other resource to poll rest endpoints will be highly useful.

like image 279
anirudh_raja Avatar asked Dec 24 '22 23:12

anirudh_raja


2 Answers

You need to start from a timer instead and then call the rest endpoint. Then you can check the result and if its then stop the route using controlbus. The filter can be used to check if its pending and then just stop continue routing, and then the next timer will try again.

Someting along this pseudo route

from timer
  to http
  marshal
  filter (if pending)
     stop 
  end
  to something with positive response
  to controlbus stop route

You can find more details at

  • http://camel.apache.org/timer
  • http://camel.apache.org/controlbus
  • http://camel.apache.org/how-can-i-stop-a-route-from-a-route.html
  • http://camel.apache.org/message-filter.html
like image 102
Claus Ibsen Avatar answered Feb 19 '23 19:02

Claus Ibsen


I had a similar problem and ended up writing a custom endpoint for polling.

It works as a producer and polls the specified uri until a specified predicate has been met or polling reaches the maximum number of tries.

from("direct:start")
  .to("poll:http://example.com/status?maxRetries=3&successPredicate=#statusSuccess")

The polling endpoint uses a simple processor that uses a polling consumer for polling.

public class PollProcessor implements Processor {

    private final String uri;
    private final long requestTimeoutMs;
    private final long period;
    private final int maxTries;
    private final Predicate<Exchange> successPredicate;

    public PollProcessor(String uri, long requestTimeoutMs, long period, int maxTries, Predicate<Exchange> successPredicate) {
        Preconditions.checkArgument(maxTries > 0);
        Preconditions.checkArgument(period >= 0);
        Preconditions.checkNotNull(successPredicate);

        this.uri = uri;
        this.requestTimeoutMs = requestTimeoutMs;
        this.period = period;
        this.maxTries = maxTries;
        this.successPredicate = successPredicate;
    }

    @Override
    public void process(Exchange exchange) throws Exception {
        PollingConsumer consumer = exchange.getContext().getEndpoint(uri).createPollingConsumer();

        for (int tryNumber = 1; tryNumber <= maxTries; ++tryNumber) {
            Exchange pollExchange = consumer.receive(requestTimeoutMs);
            if (successPredicate.test(pollExchange)) {
                exchange.setOut(pollExchange.getOut());
                exchange.setException(pollExchange.getException());
                return;
            }

            log.warn("Polling {} failed try number {}, waiting {} ms for next try...", uri, tryNumber);
            Thread.sleep(period);
        }

        throw new RuntimeException("Polling failed maximum allowed number of tries [" + maxTries + "], see log for details.");
    }
}
like image 43
DeeperUnderstanding Avatar answered Feb 19 '23 19:02

DeeperUnderstanding