Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unable to configure "Keep Alive" in Camel HTTP component

Tags:

I'm having some troubles with the right setup of the HTTP component. Currently a microservice pulls JSON Content from a provider, process it and send it to the next service for further processes. The main problem is that this microservice create a ton of CLOSE_WAIT socket connections. I understand that the whole concept of "KEEP-ALIVE" shall keep the connection open until I close it, but it's possible that the server will drop the connection for some reasons and creates this CLOSE_WAIT socket.

I've created a small service for debugging / testing purposes which make a GET Call to Google, but even this connection stays open until i close the program. I've tried many different solutions:

  • .setHeader("Connection", constant("Close"))
  • -Dhttp.keepAlive=false as VM argument
  • Switching from Camel-Http to Camel-Http4
  • httpClient.soTimeout=500 (Camel-HTTP), httpClient.socketTimeout=500 and connectionTimeToLive=500 (Camel-HTTP4)
  • .setHeader("Connection", simple("Keep-Alive")) and .setHeader("Keep-Alive", simple("timeout=10")) (Camel-HTTP4)
  • Setting via debugging the response of DefaultConnectionKeepAliveStrategy from -1 (never ending) to a specific value in Camel-HTTP4 - that works but I was not able to inject my own strategy.

but i had no success. So maybe one of you can help me:

  • How can i tell the Camel-HTTP that it should close a connection when a specific time is passed? For example, the service pulls every hour from the content provider. After 3-4 hours the HttpComponent should close the connection after the pull and reopen it when the next pull is there. Currently every connection would be put back into the MultiThreadedHttpConnectionManager and the socket is still open.
  • If it's not possible to do that with Camel-HTTP: How can i inject a HttpClientBuilder into the Creation of my route? I know that it should be possible via httpClient option but I don't understand that specific part of the documentation.

Thank you all for your help

like image 273
Markus Zimmer Avatar asked Aug 01 '16 14:08

Markus Zimmer


People also ask

What is http4?

The http4: component provides HTTP based endpoints for calling external HTTP resources (as a client to call external servers using HTTP).

What is endpoint in Apache Camel?

Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are created by a Component and these endpoints are referred to in the DSL via their endpoint URIs.

What is camel exchange?

Camel Exchange represents an abstraction for an exchange of messages which involves a request message and its corresponding reply or an exception message. It consists of the below components: Exchange ID – A unique ID that identifies the exchange.


2 Answers

It can be done by closing idle connections if they are idle for configured time. You can achieve same by configuring idle connection timeout for Camel Http Component. Camel Http provide interface to do so.

Cast org.apache.camel.component.http4.HttpComponent to PoolingHttpClientConnectionManager

        PoolingHttpClientConnectionManager poolingClientConnectionManager = (PoolingHttpClientConnectionManager) httpComponent
                .getClientConnectionManager();

        poolingClientConnectionManager.closeIdleConnections(5000, TimeUnit.MILLISECONDS);

Visit Here [http://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.html#closeIdleConnections(long, java.util.concurrent.TimeUnit)]

like image 25
Ankit Avatar answered Sep 28 '22 02:09

Ankit


Unfortunately none of the proposed answers solved the CLOSE_WAIT connection status on my side until the application finally was closed.

I reproduced this problem with the following test case:

public class HttpInvokationTest extends CamelSpringTestSupport {

  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  @EndpointInject(uri = "mock:success")
  private MockEndpoint successEndpoint;
  @EndpointInject(uri = "mock:failure")
  private MockEndpoint failureEndpoint;

  @Override
  protected AbstractApplicationContext createApplicationContext() {
    return new AnnotationConfigApplicationContext(ContextConfig.class);
  }

  @Configuration
  @Import(HttpClientSpringTestConfig.class)
  public static class ContextConfig extends CamelConfiguration {

    @Override
    public List<RouteBuilder> routes() {
      List<RouteBuilder> routes = new ArrayList<>(1);
      routes.add(new RouteBuilder() {
        @Override
        public void configure() {
          from("direct:start")
            .log(LoggingLevel.INFO, LOG, CONFIDENTIAL, "Invoking external URL: ${header[ERPEL_URL]}")
            .setHeader("Connection", constant("close"))
            .recipientList(header("TEST_URL"))
            .log(LoggingLevel.DEBUG, "HTTP response code: ${header["+Exchange.HTTP_RESPONSE_CODE+"]}")
            .bean(CopyBodyToHeaders.class)
            .choice()
              .when(header(Exchange.HTTP_RESPONSE_CODE).isGreaterThanOrEqualTo(300))
                .to("mock:failure")
              .otherwise()
                .to("mock:success");
        }
      });
      return routes;
    }
  }

  @Test
  public void testHttpInvocation() throws Exception {
    successEndpoint.expectedMessageCount(1);
    failureEndpoint.expectedMessageCount(0);

    ProducerTemplate template = context.createProducerTemplate();

    template.sendBodyAndHeader("direct:start", null, "TEST_URL", "http4://meta.stackoverflow.com");

    successEndpoint.assertIsSatisfied();
    failureEndpoint.assertIsSatisfied();

    Exchange exchange = successEndpoint.getExchanges().get(0);
    Map<String, Object> headers = exchange.getIn().getHeaders();
    String body = exchange.getIn().getBody(String.class);
    for (String key : headers.keySet()) {
      LOG.info("Header: {} -> {}", key, headers.get(key));
    }
    LOG.info("Body: {}", body);

    Thread.sleep(120000);
  }
}

and issuing netstat -ab -p tcp | grep 151.101.129.69 requests, where the IP is the one of meta.stackoverflow.com.

This gave responses like:

tcp4       0      0  192.168.0.10.52183     151.101.129.69.https   ESTABLISHED      37562       2118
tcp4       0      0  192.168.0.10.52182     151.101.129.69.http    ESTABLISHED        885        523

right after the invocation followeb by

tcp4       0      0  192.168.0.10.52183     151.101.129.69.https   CLOSE_WAIT       37562       2118
tcp4       0      0  192.168.0.10.52182     151.101.129.69.http    CLOSE_WAIT         885        523

responses until the application was closed due to the Connection: keep-alive header even with a configuration like the one below:

@Configuration
@EnableConfigurationProperties(HttpClientSettings.class)
public class HttpClientSpringTestConfig {

  private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  @Resource
  private HttpClientSettings httpClientSettings;

  @Resource
  private CamelContext camelContext;

  private SocketConfig httpClientSocketConfig() {
    /*
      socket timeout:
      Monitors the time passed between two consecutive incoming messages over the connection and
      raises a SocketTimeoutException if no message was received within the given timeout interval
     */
    LOG.info("Creating a SocketConfig with a socket timeout of {} seconds", httpClientSettings.getSoTimeout());
    return SocketConfig.custom()
        .setSoTimeout(httpClientSettings.getSoTimeout() * 1000)
        .setSoKeepAlive(false)
        .setSoReuseAddress(false)
        .build();
  }

  private RequestConfig httpClientRequestConfig() {
    /*
      connection timeout:
      The time span the application will wait for a connection to get established. If the connection
      is not established within the given amount of time a ConnectionTimeoutException will be raised.
     */
    LOG.info("Creating a RequestConfig with a socket timeout of {} seconds and a connection timeout of {} seconds",
             httpClientSettings.getSoTimeout(), httpClientSettings.getConTimeout());
    return RequestConfig.custom()
        .setConnectTimeout(httpClientSettings.getConTimeout() * 1000)
        .setSocketTimeout(httpClientSettings.getSoTimeout() * 1000)
        .build();
  }

  @Bean(name = "httpClientConfigurer")
  public HttpClientConfigurer httpConfiguration() {
    ConnectionKeepAliveStrategy myStrategy = new ConnectionKeepAliveStrategy() {
      @Override
      public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
        return 5 * 1000;
      }
    };

    PoolingHttpClientConnectionManager conMgr =
        new PoolingHttpClientConnectionManager();
    conMgr.closeIdleConnections(5, TimeUnit.SECONDS);

    return builder -> builder.setDefaultSocketConfig(httpClientSocketConfig())
                             .setDefaultRequestConfig(httpClientRequestConfig())
                             .setConnectionTimeToLive(5, TimeUnit.SECONDS)
                             .setKeepAliveStrategy(myStrategy)
                             .setConnectionManager(conMgr);
  }

  @PostConstruct
  public void init() {
    LOG.debug("Initializing HTTP clients");
    HttpComponent httpComponent = camelContext.getComponent("http4", HttpComponent.class);
    httpComponent.setHttpClientConfigurer(httpConfiguration());
    HttpComponent httpsComponent = camelContext.getComponent("https4", HttpComponent.class);
    httpsComponent.setHttpClientConfigurer(httpConfiguration());
  }
}

or defining the settings directly on the respective HttpComponent.

On examining the respective proposed methods in the HttpClient code it gets obvious that these methods are single-shot operations and not configurations that HttpClient internally will check every few milliseconds itself.

PoolingHttpClientConnectionManager states further that:

The handling of stale connections was changed in version 4.4. Previously, the code would check every connection by default before re-using it. The code now only checks the connection if the elapsed time since the last use of the connection exceeds the timeout that has been set. The default timeout is set to 2000ms

which only occurs if an attempt is done on re-using a connection, which makes sense for a connection pool, especially if multiple messages are exchanged via the same connection. For single-shot invocations, that should more behave like a Connection: close there might not be a reuse of that connection for some time, leaving the connection open or half-closed as no further attempt is done to read from that connection and therefore recognizing itself that the connection could be closed.

I noticed that I already solved such an issue a while back with traditional HttpClients and started to port this solution to Camel, which worked out quite easily.

The solution basically consists of registering HttpClients with a service and then periodically (5 seconds in my case) call closeExpiredConnections() and closeIdleConnections(...).

This logic is kept in a singleton enum, as this is actually in a library that a couple of applications use, each running in their own JVM.

/**
 * This singleton monitor will check every few seconds for idle and stale connections and perform
 * a cleanup on the connections using the registered connection managers.
 */
public enum IdleConnectionMonitor {

  INSTANCE;

  private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  /** The execution service which runs the cleanup every 5 seconds **/
  private ScheduledExecutorService executorService =
      Executors.newScheduledThreadPool(1, new NamingThreadFactory());
  /** The actual thread which performs the monitoring **/
  private IdleConnectionMonitorThread monitorThread = new IdleConnectionMonitorThread();

  IdleConnectionMonitor() {
    // execute the thread every 5 seconds till the application is shutdown (or the shutdown method
    // is invoked)
    executorService.scheduleAtFixedRate(monitorThread, 5, 5, TimeUnit.SECONDS);
  }

  /**
   * Registers a {@link HttpClientConnectionManager} to monitor for stale connections
   */
  public void registerConnectionManager(HttpClientConnectionManager connMgr) {
    monitorThread.registerConnectionManager(connMgr);
  }

  /**
   * Request to stop the monitoring for stale HTTP connections.
   */
  public void shutdown() {
    executorService.shutdown();
    try {
      if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) {
        LOG.warn("Connection monitor shutdown not finished after 3 seconds!");
      }
    } catch (InterruptedException iEx) {
      LOG.warn("Execution service was interrupted while waiting for graceful shutdown");
    }
  }

  /**
   * Upon invocation, the list of registered connection managers will be iterated through and if a
   * referenced object is still reachable {@link HttpClientConnectionManager#closeExpiredConnections()}
   * and {@link HttpClientConnectionManager#closeIdleConnections(long, TimeUnit)} will be invoked
   * in order to cleanup stale connections.
   * <p/>
   * This runnable implementation holds a weakly referable list of {@link
   * HttpClientConnectionManager} objects. If a connection manager is only reachable by {@link
   * WeakReference}s or {@link PhantomReference}s it gets eligible for garbage collection and thus
   * may return null values. If this is the case, the connection manager will be removed from the
   * internal list of registered connection managers to monitor.
   */
  private static class IdleConnectionMonitorThread implements Runnable {

    // we store only weak-references to connection managers in the list, as the lifetime of the
    // thread may extend the lifespan of a connection manager and thus allowing the garbage
    // collector to collect unused objects as soon as possible
    private List<WeakReference<HttpClientConnectionManager>> registeredConnectionManagers =
        Collections.synchronizedList(new ArrayList<>());

    @Override
    public void run() {

      LOG.trace("Executing connection cleanup");
      Iterator<WeakReference<HttpClientConnectionManager>> conMgrs =
          registeredConnectionManagers.iterator();
      while (conMgrs.hasNext()) {
        WeakReference<HttpClientConnectionManager> weakConMgr = conMgrs.next();
        HttpClientConnectionManager conMgr = weakConMgr.get();
        if (conMgr != null) {
          LOG.trace("Found connection manager: {}", conMgr);
          conMgr.closeExpiredConnections();
          conMgr.closeIdleConnections(30, TimeUnit.SECONDS);
        } else {
          conMgrs.remove();
        }
      }
    }

    void registerConnectionManager(HttpClientConnectionManager connMgr) {
      registeredConnectionManagers.add(new WeakReference<>(connMgr));
    }
  }

  private static class NamingThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      t.setName("Connection Manager Monitor");
      return t;
    }
  }
}

As mentioned, this singleton service spawns an own thread that invokes the two, above mentioned methods every 5 seconds. These invocations take care of closing connections that are either unused for a certain amount of time or that are IDLE for the stated amount of time.

In order to camelize this service EventNotifierSupport can be utilized in order to let Camel take care of shutting down the monitor thread once it is closing down.

/**
 * This Camel service with take care of the lifecycle management of {@link IdleConnectionMonitor} 
 * and invoke {@link IdleConnectionMonitor#shutdown()} once Camel is closing down in order to stop
 * listening for stale connetions.
 */
public class IdleConnectionMonitorService extends EventNotifierSupport {

  private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  private IdleConnectionMonitor connectionMonitor;

  @Override
  public void notify(EventObject event) {
    if (event instanceof CamelContextStartedEvent) {
      LOG.info("Start listening for closable HTTP connections");
      connectionMonitor = IdleConnectionMonitor.INSTANCE;
    } else if (event instanceof CamelContextStoppingEvent){
      LOG.info("Shutting down listener for open HTTP connections");
      connectionMonitor.shutdown();
    }
  }

  @Override
  public boolean isEnabled(EventObject event) {
    return event instanceof CamelContextStartedEvent || event instanceof CamelContextStoppingEvent;
  }

  public IdleConnectionMonitor getConnectionMonitor() {
    return this.connectionMonitor;
  }
}

In order to take advantage of that service, the connection manager that is used by the HttpClient Camel uses internally needs to be registered with the service, which is done in the code block below:

private void registerHttpClientConnectionManager(HttpClientConnectionManager conMgr) {
  if (!getIdleConnectionMonitorService().isPresent()) {
    // register the service with Camel so that on a shutdown the monitoring thread will be stopped
    camelContext.getManagementStrategy().addEventNotifier(new IdleConnectionMonitorService());
  }
  IdleConnectionMonitor.INSTANCE.registerConnectionManager(conMgr);
}

private Optional<IdleConnectionMonitorService> getIdleConnectionMonitorService() {
  for (EventNotifier eventNotifier : camelContext.getManagementStrategy().getEventNotifiers()) {
    if (eventNotifier instanceof IdleConnectionMonitorService) {
      return Optional.of((IdleConnectionMonitorService) eventNotifier);
    }
  }
  return Optional.empty();
}

Last but not least the connection manager defined in httpConfiguration inside the HttpClientSpringTestConfig in my case needed to be past to the introduced register function

PoolingHttpClientConnectionManager conMgr = new PoolingHttpClientConnectionManager();
registerHttpClientConnectionManager(conMgr);

This might not be the prettiest solution, but it does close the half-closed connections on my machine.


@edit

I just learned that you can use a NoConnectionReuseStrategy which changes the connection state to TIME_WAIT rather than CLOSE_WAIT and therefore removes the connection after a short moment. Unfortunately, the request is still issued with a Connection: keep-alive header. This strategy will create a new connection per request, i.e. if you've got a 301 Moved Permanently redirect response the redirect would occur on a new connection.

The httpClientConfigurer bean would need to change to the following in order to make use of the above mentioned strategy:

@Bean(name = "httpClientConfigurer")
public HttpClientConfigurer httpConfiguration() {
    return builder -> builder.setDefaultSocketConfig(socketConfig)
        .setDefaultRequestConfig(requestConfig)
        .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
}
like image 94
Roman Vottner Avatar answered Sep 28 '22 01:09

Roman Vottner