Apparently what I was hoping to do is outside of the scope of thrift... If I make sure there is never more than one client on the port, everything is a-ok. Of course this kind of defeats the purpose as I'd like to have several reusable connections open to the server to improve response times and lower overhead.
If anyone has a suggestion of an alternate way to achieve this, it would be appreciated(or if my conclusion is wrong)
I have a multi-component application that is mostly connected up by thrift(mostly java->php connections).
So far it all has appeared to be fine, but having introduced a Java->Java connection where the client end is a servlet that can launch hundreds of requests a second.
The method being accessed has the following interface:
bool pvCheck(1:i32 toolId) throws(1:DPNoToolException nte),
To make sure it wasn't something weird on the service end I replaced the implementation with a trivial one:
@Override
public boolean pvCheck(int toolId) throws TException {
//boolean ret = api.getViewsAndDec(toolId);
return true;
}
So long as there's not many connections it goes by fine but as soon as connections come close together, connections start getting stuck in the reader.
If I pull one of them up in the debugger the stack looks like this:
Daemon Thread [http-8080-197] (Suspended)
BufferedInputStream.read(byte[], int, int) line: 308
TSocket(TIOStreamTransport).read(byte[], int, int) line: 126
TSocket(TTransport).readAll(byte[], int, int) line: 84
TBinaryProtocol.readAll(byte[], int, int) line: 314
TBinaryProtocol.readI32() line: 262
TBinaryProtocol.readMessageBegin() line: 192
DumboPayment$Client.recv_pvCheck() line: 120
DumboPayment$Client.pvCheck(int) line: 105
Receiver.performTask(HttpServletRequest, HttpServletResponse) line: 157
Receiver.doGet(HttpServletRequest, HttpServletResponse) line: 109
Receiver(HttpServlet).service(HttpServletRequest, HttpServletResponse) line: 617
Receiver(HttpServlet).service(ServletRequest, ServletResponse) line: 717
ApplicationFilterChain.internalDoFilter(ServletRequest, ServletResponse) line: 290
ApplicationFilterChain.doFilter(ServletRequest, ServletResponse) line: 206
StandardWrapperValve.invoke(Request, Response) line: 233
StandardContextValve.invoke(Request, Response) line: 191
StandardHostValve.invoke(Request, Response) line: 127
ErrorReportValve.invoke(Request, Response) line: 102
StandardEngineValve.invoke(Request, Response) line: 109
CoyoteAdapter.service(Request, Response) line: 298
Http11AprProcessor.process(long) line: 859
Http11AprProtocol$Http11ConnectionHandler.process(long) line: 579
AprEndpoint$Worker.run() line: 1555
Thread.run() line: 619
This seems to be triggered by data getting corrupted as I get the following exceptions:
10/11/22 18:38:55 WARN logger.Receiver: pvCheck had an exception
org.apache.thrift.TApplicationException: pvCheck failed: unknown result
at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:135)
at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
at *.Receiver.performTask(Receiver.java:157)
at *.Receiver.doGet(Receiver.java:109)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:617)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:717)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:298)
at org.apache.coyote.http11.Http11AprProcessor.process(Http11AprProcessor.java:859)
at org.apache.coyote.http11.Http11AprProtocol$Http11ConnectionHandler.process(Http11AprProtocol.java:579)
at org.apache.tomcat.util.net.AprEndpoint$Worker.run(AprEndpoint.java:1555)
at java.lang.Thread.run(Thread.java:619)
and
10/11/22 17:59:46 ERROR [/ninja_ar].[Receiver]: サーブレット Receiver のServlet.service()が例外を投げました
java.lang.OutOfMemoryError: Java heap space
at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:296)
at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:290)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:198)
at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:120)
at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
at *.Receiver.performTask(Receiver.java:157)
at *.Receiver.doGet(Receiver.java:109)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:269)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:188)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:210)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:172)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:117)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:108)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:151)
at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:870)
at org.apache.coyote.http11.Http11BaseProtocol$Http11ConnectionHandler.processConnection(Http11BaseProtocol.java:665)
at org.apache.tomcat.util.net.PoolTcpEndpoint.processSocket(PoolTcpEndpoint.java:528)
at org.apache.tomcat.util.net.LeaderFollowerWorkerThread.runIt(LeaderFollowerWorkerThread.java:81)
at org.apache.tomcat.util.threads.ThreadPool$ControlRunnable.run(ThreadPool.java:685)
at java.lang.Thread.run(Thread.java:636)
Perhaps I'm way off the mark but I'm pretty sure these are related to the client continuing to attempt to read when there's nothing being sent.
Both the server and client are using the java binary protocol.
I wrote a simple client pool class which lets me reuse clients, these are the main functions:
public synchronized Client getClient() {
if(clientQueue.isEmpty()) {
return newClient();
} else {
return clientQueue.getLast();
}
}
private synchronized Client newClient() {
int leftToTry = serverArr.length;
Client cli = null;
while(leftToTry > 0 && cli == null) {
log.info("Creating new connection to " +
serverArr[roundRobinPos] + port);
TTransport transport = new TSocket(serverArr[roundRobinPos], port);
TProtocol protocol = new TBinaryProtocol(transport);
cli = new Client(protocol);
try {
transport.open();
} catch (TTransportException e) {
cli = null;
log.warn("Failed connection to " +
serverArr[roundRobinPos] + port);
}
roundRobinPos++;
if(roundRobinPos >= serverArr.length) {
roundRobinPos = 0;
}
leftToTry--;
}
return cli;
}
public void returnClient(Client cli) {
clientQueue.addFirst(cli);
}
The client applications(namely tomcat servlets) access it in the following way:
Client dpayClient = null;
if(dpay != null
&& (dpayClient = dpay.getClient()) != null) {
try {
dpayClient.pvCheck(requestParameters.getId());
} catch (DPNoToolException e) {
return;
} catch (TException e) {
log.warn("pvCheck had an exception", e);
} finally {
if(dpayClient != null) {
dpay.returnClient(dpayClient);
}
}
}
The actual thrift connection is upped in the following manner
private boolean initThrift(int port, Configuration conf) {
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
DPaymentHandler handler = new DPaymentHandler(conf);
DumboPayment.Processor processor =
new DumboPayment.Processor(handler);
InetAddress listenAddress;
try {
listenAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
LOG.error("Failed in thrift init", e);
return false;
}
TServerTransport serverTransport;
try {
serverTransport = new TServerSocket(
new InetSocketAddress(listenAddress, port));
} catch (TTransportException e) {
LOG.error("Failed in thrift init", e);
return false;
}
TTransportFactory transportFactory = new TTransportFactory();
TServer server = new TThreadPoolServer(processor, serverTransport,
transportFactory, protocolFactory);
LOG.info("Starting Dumbo Payment thrift server on " +
listenAddress + ":" + Integer.toString(port));
server.serve();
return true;
}
Been stuck on this for a while now... It may well be I'm missing something obvious. I'd really appreciate any help with this.
If any additional information is needed, please let me know. There's a whole mouthful there, so I wanted to try to keep stuff to the most (hopefully) relevant.
My guess is that you have multiple threads trying to use the clients at the same time, and I'm not entirely sure that bulletproof. You might try to use the async interface as well as construct a threadsafe resource pool for accessing your clients.
Using Thrift-0.5.0.0, here is an example of creating an AsyncClient for your thrift generated code:
Factory fac = new AsyncClient.Factory(new TAsyncClientManager(), new TProtocolFactory() {
@Override
public TProtocol getProtocol( TTransport trans ) {
return new TBinaryProtocol(trans);
}
});
AsyncClient cl = fac.getAsyncClient( new TNonblockingSocket( "127.0.0.1", 12345 ));
However, if you look through the source, you'll notice that it's got a single thread message handler, even though it uses a NIO socket, you might find this to be a bottleneck. To get more, you'll have to make more async clients, check them out, and return them correctly.
To simplify this, I've made a quick little class to manage them. The only thing you need to do to modify it to fit your needs is to include your packages and it should work for you, even though I have not really tested it much (at all, really):
public class Thrift {
// This is the request
private static abstract class ThriftRequest {
private void go( final Thrift thrift, final AsyncClient cli ) {
on( cli );
thrift.ret( cli );
}
public abstract void on( AsyncClient cli );
}
// Holds all of our Async Clients
private final ConcurrentLinkedQueue<AsyncClient> instances = new ConcurrentLinkedQueue<AsyncClient>();
// Holds all of our postponed requests
private final ConcurrentLinkedQueue<ThriftRequest> requests = new ConcurrentLinkedQueue<ThriftRequest>();
// Holds our executor, if any
private Executor exe = null;
/**
* This factory runs in thread bounce mode, meaning that if you call it from
* many threads, execution bounces between calling threads depending on when
* execution is needed.
*/
public Thrift(
final int clients,
final int clients_per_message_processing_thread,
final String host,
final int port ) throws IOException {
// We only need one protocol factory
TProtocolFactory proto_fac = new TProtocolFactory() {
@Override
public TProtocol getProtocol( final TTransport trans ) {
return new TBinaryProtocol( trans );
}
};
// Create our clients
Factory fac = null;
for ( int i = 0; i < clients; i++ ) {
if ( fac == null || i % clients_per_message_processing_thread == 0 ) {
fac = new AsyncClient.Factory(
new TAsyncClientManager(),
proto_fac );
}
instances.add( fac.getAsyncClient( new TNonblockingSocket(
host,
port ) ) );
}
}
/**
* This factory runs callbacks in whatever mode the executor is setup for,
* not on calling threads.
*/
public Thrift( Executor exe,
final int clients,
final int clients_per_message_processing_thread,
final String host,
final int port ) throws IOException {
this( clients, clients_per_message_processing_thread, host, port );
this.exe = exe;
}
// Call this to grab an instance
public void
req( final ThriftRequest req ) {
final AsyncClient cli;
synchronized ( instances ) {
cli = instances.poll();
}
if ( cli != null ) {
if ( exe != null ) {
// Executor mode
exe.execute( new Runnable() {
@Override
public void run() {
req.go( Thrift.this, cli );
}
} );
} else {
// Thread bounce mode
req.go( this, cli );
}
return;
}
// No clients immediately available
requests.add( req );
}
private void ret( final AsyncClient cli ) {
final ThriftRequest req;
synchronized ( requests ) {
req = requests.poll();
}
if ( req != null ) {
if ( exe != null ) {
// Executor mode
exe.execute( new Runnable() {
@Override
public void run() {
req.go( Thrift.this, cli );
}
} );
} else {
// Thread bounce mode
req.go( this, cli );
}
return;
}
// We did not need this immediately, hold onto it
instances.add( cli );
}
}
An example of how to use it:
// Make the pool
Thrift t = new Thrift( 10, "localhost", 8000 );
// Use the pool
t.req( new ThriftRequest() {
@Override
public void on( AsyncClient cli ) {
cli.MyThriftMethod( "stringarg", 111, new AsyncMethodCallback<AsyncClient.MyThriftMethod_call>() {
@Override
public void onError( Throwable throwable ) {
}
@Override
public void onComplete( MyThriftMethod_call response ) {
}
});
}
} );
You might want to experiment with different server modes like the THsHaServer to see what works best for your environment.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With