Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Servlet-3 Async Context, how to do asynchronous writes?

Problem Description

Servlet-3.0 API allows to detach a request/response context and answer to it later.

However if I try to write a big amount of data, something like:

AsyncContext ac = getWaitingContext() ; ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(some_big_data); out.flush() 

It may actually block - and it does block in trivial test cases - for both Tomcat 7 and Jetty 8. The tutorials recommend to create a thread pool that would handle such a setup - witch is generally the counter-positive to a traditional 10K architecture.

However if I have 10,000 open connections and a thread pool of let's say 10 threads, it is enough for even 1% of clients that have low speed connections or just blocked connection to block the thread pool and completely block the comet response or slow it down significantly.

The expected practice is to get "write-ready" notification or I/O completion notification and than continue to push the data.

How can this be done using Servlet-3.0 API, i.e. how do I get either:

  • Asynchronous Completion notification on I/O operation.
  • Get non-blocking I/O with write ready notification.

If this is not supported by the Servlet-3.0 API, are there any Web Server specific APIs (like Jetty Continuation or Tomcat CometEvent) that allow to handle such events truly asynchronously without faking asynchronous I/O using thread pool.

Does anybody know?

And if this is not possible can you confirm it with a reference to documentation?

Problem demonstration in a sample code

I had attached the code below that emulates event-stream.

Notes:

  • it uses ServletOutputStream that throws IOException to detect disconnected clients
  • it sends keep-alive messages to make sure clients are still there
  • I created a thread pool to "emulate" asynchronous operations.

In such an example I explicitly defined thread pool of size 1 to show the problem:

  • Start an application
  • Run from two terminals curl http://localhost:8080/path/to/app (twice)
  • Now send the data with curd -d m=message http://localhost:8080/path/to/app
  • Both clients received the data
  • Now suspend one of the clients (Ctrl+Z) and send the message once again curd -d m=message http://localhost:8080/path/to/app
  • Observe that another non-suspended client either received nothing or after the message was transfered stopped receiving keep-alive requests because other thread is blocked.

I want to solve such a problem without using thread pool, because with 1000-5000 open connections I can exhaust the thread pool very fast.

The sample code below.


import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue;  import javax.servlet.AsyncContext; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.ServletOutputStream;   @WebServlet(urlPatterns = "", asyncSupported = true) public class HugeStreamWithThreads extends HttpServlet {      private long id = 0;     private String message = "";     private final ThreadPoolExecutor pool =          new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());         // it is explicitly small for demonstration purpose      private final Thread timer = new Thread(new Runnable() {         public void run()         {             try {                 while(true) {                     Thread.sleep(1000);                     sendKeepAlive();                 }             }             catch(InterruptedException e) {                 // exit             }         }     });       class RunJob implements Runnable {         volatile long lastUpdate = System.nanoTime();         long id = 0;         AsyncContext ac;         RunJob(AsyncContext ac)          {             this.ac = ac;         }         public void keepAlive()         {             if(System.nanoTime() - lastUpdate > 1000000000L)                 pool.submit(this);         }         String formatMessage(String msg)         {             StringBuilder sb = new StringBuilder();             sb.append("id");             sb.append(id);             for(int i=0;i<100000;i++) {                 sb.append("data:");                 sb.append(msg);                 sb.append("\n");             }             sb.append("\n");             return sb.toString();         }         public void run()         {             String message = null;             synchronized(HugeStreamWithThreads.this) {                 if(this.id != HugeStreamWithThreads.this.id) {                     this.id = HugeStreamWithThreads.this.id;                     message = HugeStreamWithThreads.this.message;                 }             }             if(message == null)                 message = ":keep-alive\n\n";             else                 message = formatMessage(message);              if(!sendMessage(message))                 return;              boolean once_again = false;             synchronized(HugeStreamWithThreads.this) {                 if(this.id != HugeStreamWithThreads.this.id)                     once_again = true;             }             if(once_again)                 pool.submit(this);          }         boolean sendMessage(String message)          {             try {                 ServletOutputStream out = ac.getResponse().getOutputStream();                 out.print(message);                 out.flush();                 lastUpdate = System.nanoTime();                 return true;             }             catch(IOException e) {                 ac.complete();                 removeContext(this);                 return false;             }         }     };      private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();      @Override     public void init(ServletConfig config) throws ServletException     {         super.init(config);         timer.start();     }     @Override     public void destroy()     {         for(;;){             try {                 timer.interrupt();                 timer.join();                 break;             }             catch(InterruptedException e) {                 continue;             }         }         pool.shutdown();         super.destroy();     }       protected synchronized void removeContext(RunJob ac)     {         asyncContexts.remove(ac);     }      // GET method is used to establish a stream connection     @Override     protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)             throws ServletException, IOException {          // Content-Type header         response.setContentType("text/event-stream");         response.setCharacterEncoding("utf-8");          // Access-Control-Allow-Origin header         response.setHeader("Access-Control-Allow-Origin", "*");          final AsyncContext ac = request.startAsync();          ac.setTimeout(0);         RunJob job = new RunJob(ac);         asyncContexts.add(job);         if(id!=0) {             pool.submit(job);         }     }      private synchronized void sendKeepAlive()     {         for(RunJob job : asyncContexts) {             job.keepAlive();         }     }      // POST method is used to communicate with the server     @Override     protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)             throws ServletException, IOException      {         request.setCharacterEncoding("utf-8");         id++;         message = request.getParameter("m");                 for(RunJob job : asyncContexts) {             pool.submit(job);         }     }   } 

The sample above uses threads to prevent blocking... However if the number of blocking clients is bigger than the size of the thread pool it would block.

How could it be implemented without blocking?

like image 748
Artyom Avatar asked Aug 23 '12 05:08

Artyom


1 Answers

I've found the Servlet 3.0 Asynchronous API tricky to implement correctly and helpful documentation to be sparse. After a lot of trial and error and trying many different approaches, I was able to find a robust solution that I've been very happy with. When I look at my code and compare it to yours, I notice one major difference that may help you with your particular problem. I use a ServletResponse to write the data and not a ServletOutputStream.

Here my go-to Asynchronous Servlet class adapted slightly for your some_big_data case:

import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;  import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebInitParam; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession;  import org.apache.log4j.Logger;  @javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") }) public class AsyncServlet extends HttpServlet {    private static final Logger logger = Logger.getLogger(AsyncServlet.class);    public static final int CALLBACK_TIMEOUT = 10000; // ms    /** executor service */   private ExecutorService exec;    @Override   public void init(ServletConfig config) throws ServletException {      super.init(config);     int size = Integer.parseInt(getInitParameter("threadpoolsize"));     exec = Executors.newFixedThreadPool(size);   }    @Override   public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {      final AsyncContext ctx = req.startAsync();     final HttpSession session = req.getSession();      // set the timeout     ctx.setTimeout(CALLBACK_TIMEOUT);      // attach listener to respond to lifecycle events of this AsyncContext     ctx.addListener(new AsyncListener() {        @Override       public void onComplete(AsyncEvent event) throws IOException {          logger.info("onComplete called");       }        @Override       public void onTimeout(AsyncEvent event) throws IOException {          logger.info("onTimeout called");       }        @Override       public void onError(AsyncEvent event) throws IOException {          logger.info("onError called: " + event.toString());       }        @Override       public void onStartAsync(AsyncEvent event) throws IOException {          logger.info("onStartAsync called");       }     });      enqueLongRunningTask(ctx, session);   }    /**    * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)    * <p/>    * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).    */   private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {      exec.execute(new Runnable() {        @Override       public void run() {          String some_big_data = getSomeBigData();          try {            ServletResponse response = ctx.getResponse();           if (response != null) {             response.getWriter().write(some_big_data);             ctx.complete();           } else {             throw new IllegalStateException(); // this is caught below           }         } catch (IllegalStateException ex) {           logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.         } catch (Exception e) {           logger.error("ERROR IN AsyncServlet", e);         }       }     });   }    /** destroy the executor */   @Override   public void destroy() {      exec.shutdown();   } } 
like image 156
herrtim Avatar answered Sep 19 '22 17:09

herrtim