Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Need to wait for asynchronous api callback before I return from method in Java

  import java.util.concurrent.CountDownLatch;    import quickfix.Initiator;     public class UserSession {     private final CountDownLatch latch = new CountDownLatch(1);  public String await() {         try {             System.out.println("waiting...");             if (latch.await(5, TimeUnit.SECONDS))                 System.out.println("released!");             else                 System.out.println("timed out");             return secret;         } catch (InterruptedException e) {             // TODO Auto-generated catch block             System.out.println(e.getMessage());             e.printStackTrace();         }         return null;     }      public void countdown(String s) {         System.out.println("In countdown: "+s+ ". Latch count: "+latch.getCount());         secret = s;         latch.countDown();         System.out.println("Latch count: "+latch.getCount());     }   }     public class LogonHandler extends AbstractHandler {      public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)          throws IOException, ServletException         {             Map<String,String[]> query = request.getParameterMap();              if (query.containsKey("method")) {                 if (query.get("method")[0].compareTo(method) == 0) {                     baseRequest.setHandled(true);                     response.getWriter().println(logon(query));                 }             }             else                 baseRequest.setHandled(false);         }      private String logon(Map<String,String[]> query) {         if (query.containsKey("username") && query.containsKey("password") &&           query.containsKey("sendercompid")) {              app.mapUser(query.get("sendercompid")[0], new   UserSession(query.get("username")[0], query.get("password")[0]));              SessionID session = new SessionID(new BeginString("FIX.4.4"), new SenderCompID(query.get("sendercompid")[0]), new TargetCompID("PARFX"));              try {                 ThreadedSocketInitiator tsi = new ThreadedSocketInitiator(app, app.getFileStoreFactory(), settings, app.getLogFactory(), app.getMessageFactory());                 UserSession userSession = new UserSession(query.get("username")[0], query.get("password")[0]);                 userSession.setInitiator(tsi);                  tsi.start();                 return userSession.await();             } catch (ConfigError e) {                 // TODO Auto-generated catch block                 e.printStackTrace();                 return e.toString();             }         }         return "fail";     }   }   public class QuickfixjApplication implements Application {     private Map<String,UserSession> users = new HashMap<String,UserSession>();      public void mapUser(String s, UserSession u) {         users.put(s, u);     }      public void toAdmin(Message message, SessionID sessionId) {          try {             if (message.getHeader().getField(new StringField(MsgType.FIELD)).valueEquals(Logon.MSGTYPE)) {                 UserSession user = users.get(sessionId.getSenderCompID());                 message.setField(new Username(user.getUsername()));                 message.setField(new Password(user.getPassword()));             }         } catch (FieldNotFound e) {             e.printStackTrace();         }     }      public void fromAdmin(Message message, SessionID sessionId)         throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {          if (message.getHeader().getField(new StringField(MsgType.FIELD)).valueEquals(Logon.MSGTYPE)) {             System.out.println(message.toString());             UserSession user = users.get(sessionId.getSenderCompID());             user.countdown(message.toString());         }     } } 

Ok, I've tried to only include the minimum amount of code here. There are three interesting classes, UserSession is the internal glue between the Jetty handler and the QuickFix/j application.

The LogonHandler receives an HTTP logon request and tries to log a user onto a QuickFix/j application session.

QuickFix/j is sending a logon message to a FIX server, this logon request / response is asynchronous. The HTTP logon request is of course synchronous. So we have to wait for the reply from the FIX server before we return from the HTTP request. I do this using CountDownLatch and this UserSession object.

When I create the QuickFix/j session object I also create a UserSession object and add it to a map (that happens in the LogonHandler logon method).

There are two callbacks in the QuickFix/j application object, toAdmin() and fromAdmin(). In fromAdmin() I check if the message is a logon response and if it is I call a method of UserSession to countdown the latch. In debugging the code I see that the fromAdmin() method is hit, the UserSession object is found in the map and the countdown() method is called and the latch.getCount() goes from 1 to 0, but the latch.await() method in UserSession await() never returns. It always times out.

like image 568
shaz Avatar asked Feb 18 '13 04:02

shaz


1 Answers

You could use CountDownLatch like this:

public class LogonHandler implements Handler {     private final CountDownLatch loginLatch = new CountDownLatch (1);      private boolean callbackResults;      public void serverResponseCallback(boolean result) {         callbackResults = result;         loginLatch.countDown ();     }      public boolean tryLogon(Credentials creds) throws InterruptedException {         SomeServer server = new SomeServer(address);         server.tryLogon (creds.getName (), creds.getPass ());         loginLatch.await ();         return callbackResults;     } } 

If you want to limit waiting time by, for example, 5 seconds, then instead of loginLatch.await () use the following:

if (loginLatch.await (5L, TimeUnit.SECONDS))     return callbackResults; else     return false; // Timeout exceeded 
like image 187
Mikhail Vladimirov Avatar answered Sep 21 '22 17:09

Mikhail Vladimirov