My scenario with spring integration is:
So I tried lots configuration, the best for this moment is the follow:
<bean id="serializer" class="com.MySerializerDeserializer" />
<task:executor id="myTaskExecutor" pool-size="5-300" queue-capacity="500000"/>
<int-ip:tcp-connection-factory id="serverTcpConFact"
type="server"
port="5566"
using-nio="true"
single-use="false"
so-timeout="5000"
task-executor="myTaskExecutor"
deserializer="serializer"
serializer="serializer"/>
<int-ip:tcp-inbound-channel-adapter id="tcpInboundAdapter"
channel="tcpInbound"
connection-factory="serverTcpConFact" />
<int:channel id="tcpInbound" />
<int:service-activator input-channel="tcpInbound"
ref="importService"
method="handler" />
<bean id="importService" class="com.MyImportService" />
The serialization class is:
public class MySerializerDeserializer implements Serializer< MyMessage >, Deserializer< MyMessage > {
@Override
public MyMessage deserialize(InputStream inputStream) throws IOException {
DataInputStream dis = new DataInputStream(inputStream);
int size = dis.readInt();
byte[] b = new byte[size];
dis.read(b);
MyMessage s = new MyMessage ();
String value = new String(b);
s.setTest(value);
return s;
I use this code to thest the server:
class SimpleThread extends Thread {
public void run() {
try {
String id = java.util.UUID.randomUUID().toString();
try (Socket echoSocket = new Socket("localhost", 5566)) {
DataOutputStream dos = new DataOutputStream(echoSocket.getOutputStream());
for (int i = 0; i < 500; i++) {
String s = id + " " + i;
dos.writeInt(s.length());
dos.write(s.getBytes());
dos.flush();
System.out.println(id + " - " + i + " - " + s.length());
}
}
When I execute this with a single thread works fine when I try to execute more than one threads Like:
for (int i = 0; i < 5; i++) {
new SimpleThread().start();
}
The spring integration server gets stuck and I have the follow WARN:
WARN _[m [THREAD ID=myTaskExecutor-1] 2014-06-25 12:42:18 WARN org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory:566 - Timing out TcpNioConnection 127.0.0.1:56273:5566:4e3caf61-1101-4881-a1cf-1c31610b33f3
And it doesn’t work, The server can't recive the messages.
With this error:

Where am I wrong? Thank you.
EDIT
I modified the thread poll in this way:
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="50000" />
<property name="rejectedExecutionHandler">
<bean class="org.springframework.integration.util.CallerBlocksPolicy">
<constructor-arg value="10000" />
</bean>
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="50000" />
<property name="rejectedExecutionHandler">
<bean class="org.springframework.integration.util.CallerBlocksPolicy">
<constructor-arg value="10000" />
</bean>
</property>
</bean>
</constructor-arg>
</bean>
And the server is more responvive, but the I still have the problem
**EDIT*****
The import service is:
public class ImportService {
public void handler(MyMessage inp) {
System.out.println(Thread.currentThread().getName() + "******" + inp.getTest());
}
I just ran your test exactly as described (copied your code) and all worked as expected. 500 messages delivered to the other side.
I suggest you turn on TRACE level logging, and maybe add some debug logging to your deserializer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With