import java.util.concurrent.CountDownLatch; import quickfix.Initiator; public class UserSession { private final CountDownLatch latch = new CountDownLatch(1); public String await() { try { System.out.println("waiting..."); if (latch.await(5, TimeUnit.SECONDS)) System.out.println("released!"); else System.out.println("timed out"); return secret; } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println(e.getMessage()); e.printStackTrace(); } return null; } public void countdown(String s) { System.out.println("In countdown: "+s+ ". Latch count: "+latch.getCount()); secret = s; latch.countDown(); System.out.println("Latch count: "+latch.getCount()); } } public class LogonHandler extends AbstractHandler { public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { Map<String,String[]> query = request.getParameterMap(); if (query.containsKey("method")) { if (query.get("method")[0].compareTo(method) == 0) { baseRequest.setHandled(true); response.getWriter().println(logon(query)); } } else baseRequest.setHandled(false); } private String logon(Map<String,String[]> query) { if (query.containsKey("username") && query.containsKey("password") && query.containsKey("sendercompid")) { app.mapUser(query.get("sendercompid")[0], new UserSession(query.get("username")[0], query.get("password")[0])); SessionID session = new SessionID(new BeginString("FIX.4.4"), new SenderCompID(query.get("sendercompid")[0]), new TargetCompID("PARFX")); try { ThreadedSocketInitiator tsi = new ThreadedSocketInitiator(app, app.getFileStoreFactory(), settings, app.getLogFactory(), app.getMessageFactory()); UserSession userSession = new UserSession(query.get("username")[0], query.get("password")[0]); userSession.setInitiator(tsi); tsi.start(); return userSession.await(); } catch (ConfigError e) { // TODO Auto-generated catch block e.printStackTrace(); return e.toString(); } } return "fail"; } } public class QuickfixjApplication implements Application { private Map<String,UserSession> users = new HashMap<String,UserSession>(); public void mapUser(String s, UserSession u) { users.put(s, u); } public void toAdmin(Message message, SessionID sessionId) { try { if (message.getHeader().getField(new StringField(MsgType.FIELD)).valueEquals(Logon.MSGTYPE)) { UserSession user = users.get(sessionId.getSenderCompID()); message.setField(new Username(user.getUsername())); message.setField(new Password(user.getPassword())); } } catch (FieldNotFound e) { e.printStackTrace(); } } public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon { if (message.getHeader().getField(new StringField(MsgType.FIELD)).valueEquals(Logon.MSGTYPE)) { System.out.println(message.toString()); UserSession user = users.get(sessionId.getSenderCompID()); user.countdown(message.toString()); } } }
Ok, I've tried to only include the minimum amount of code here. There are three interesting classes, UserSession is the internal glue between the Jetty handler and the QuickFix/j application.
The LogonHandler receives an HTTP logon request and tries to log a user onto a QuickFix/j application session.
QuickFix/j is sending a logon message to a FIX server, this logon request / response is asynchronous. The HTTP logon request is of course synchronous. So we have to wait for the reply from the FIX server before we return from the HTTP request. I do this using CountDownLatch and this UserSession object.
When I create the QuickFix/j session object I also create a UserSession object and add it to a map (that happens in the LogonHandler logon method).
There are two callbacks in the QuickFix/j application object, toAdmin() and fromAdmin(). In fromAdmin() I check if the message is a logon response and if it is I call a method of UserSession to countdown the latch. In debugging the code I see that the fromAdmin() method is hit, the UserSession object is found in the map and the countdown() method is called and the latch.getCount() goes from 1 to 0, but the latch.await() method in UserSession await() never returns. It always times out.
You could use CountDownLatch
like this:
public class LogonHandler implements Handler { private final CountDownLatch loginLatch = new CountDownLatch (1); private boolean callbackResults; public void serverResponseCallback(boolean result) { callbackResults = result; loginLatch.countDown (); } public boolean tryLogon(Credentials creds) throws InterruptedException { SomeServer server = new SomeServer(address); server.tryLogon (creds.getName (), creds.getPass ()); loginLatch.await (); return callbackResults; } }
If you want to limit waiting time by, for example, 5 seconds, then instead of loginLatch.await ()
use the following:
if (loginLatch.await (5L, TimeUnit.SECONDS)) return callbackResults; else return false; // Timeout exceeded
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With