my applications should have 2 core endpoints: push, pull for sending and fetching data.
Pull operation should works asynchronously and result DeferredResult. When user call pull service over rest, new DefferedResult is created and stored into Map<Long, DefferedResult> results = new ConcurrentHashMap<>()
where is waiting for new data or until timeout is expired.
Push operation call user over rest as well, and this operation checks map of results for recipient of data pushed by this operation. When map contains result of recipient, these data are set to his result, DefferedResult is returned.
Here is base code:
@Service
public class FooServiceImpl {
Map<Long, DefferedResult> results = new ConcurrentHashMap<>();
@Transactional
@Override
public DeferredResult<String> pull(Long userId) {
// here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it
DeferredResult<String> newResult = new DeferredResult<>(5000L);
results.putIfAbsent(userId, newResult);
newResult.onCompletion(() -> results.remove(userId));
// if (data != null)
// newResult.setResult(data);
return newResult;
}
@Transactional
@Override
public void push(String data, Long recipientId) {
// fooRepository.save(data, recipientId);
if (results.containsKey(recipientId)) {
results.get(recipientId).setResult(data);
}
}
}
Code is working as I expected problem is that should also works for multiple users. I guess the max active users which will call pull operation will max 1000. So every call of pull take max 5 seconds as I set in DefferedResult but it isn't.
As you can see in image, if I immediately call rest of pull operation from my javascript client multiple times you can see that tasks will executed sequentially instead of simultaneously. Tasks which I fired as last take about 25 seconds, but I need that when 1000 users execute at same time pull operation, that operation should take max 5 seconds + latency.
How to configure my app to execute these tasks simultaneously and ensure each each task will about 5 seconds or less (when another user send something to waiting user)? I tried add this configuration into property file:
server.tomcat.max-threads=1000
and also this configuration:
@Configuration
public class AsyncConfig extends AsyncSupportConfigurer {
@Override
protected AsyncTaskExecutor getTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1000);
taskExecutor.initialize();
return taskExecutor;
}
}
But it didn't help, still same result. Can you help me configure it please?
EDIT:
This is how I calling this service from angular:
this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
}, (error) => {
console.log('e', error);
});
When I tried call it multiple times with pure JS code like this:
function httpGet()
{
var xmlHttp = new XMLHttpRequest();
xmlHttp.open( "GET", 'http://localhost:8080/api/pull?id=1', true );
xmlHttp.send( null );
return xmlHttp.responseText;
}
setInterval(httpGet, 500);
it will execute every request call much faster (about 7 seconds). I expected that increasing is caused database calling in service, but it still better than 25 sec. Do I have something wrong with calling this service in angular?
EDIT 2:
I tried another form of testing and instead of browser I used jMeter. I execute 100 requests in 100 threads and here is result:
As you can see requests will be proceed by 10, and after reach 50 requests application throw exception:
java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:667) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.8.jar:na]
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.8.jar:na]
at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at org.hibernate.internal.SessionImpl.connection(SessionImpl.java:523) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:223) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:207) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle.doGetConnection(HibernateJpaDialect.java:391) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:154) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at sk.moe.zoya.service.impl.FooServiceImpl$$EnhancerBySpringCGLIB$$ebab570a.pull(<generated>) ~[classes/:na]
at sk.moe.zoya.web.FooController.pull(FooController.java:25) ~[classes/:na]
at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
2018-06-02 13:21:47.163 WARN 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 WARN 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.164 ERROR 26978 --- [io-8080-exec-69] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection] with root cause
I also comment code where I use Repositories to ensure there is nothing with database, and same result. ALso I set uniqe userId for each request with AtomicLong class.
EDIT 3:
I find out when I comment also @Transactional
everything works fine! So can you tell me how to set spring's transactions for large amount of operations without increasing delay?
I added spring.datasource.maximumPoolSize=1000
to increase pool size which I guess shoulds, so the only problem is how to speed up methods with @Transactional.
Every call to pull method is annotated with @Transactional because I need at first load data from database and check if there are new data, because yes, I do not have to do creating waiting deferred result. push methods have to be annotation with @Transaction as well, because there I need at first store received data in database and next set that value to waiting results. For my data I am using Postgres.
It seems the problem here is that you're running out of connections in the database pool.
You have your method tagged with @Transaction
but your controller is also expecting the result of the method i.e. the DeferredResult
to be delivered as soon as possible such that the thread is set free.
Now, this is what happens when you run a request:
@Transaction
functionality is implemented in a Spring proxy which must open a connection, call your subject method and then commit or rollback the transaction.fooService.pull
method, it is in fact calling a proxy.DeferredResult
, which is then passed to the controller for it to return.Now, the problem is that DeferredResult
is designed in such a way that it should be used asynchronously. In other words, the promise is expected to be resolved later in some other thread, and we are supposed to free the request thread as soon as possible.
In fact, Spring documentation on DeferredResult says:
@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<String>();
// Save the deferredResult somewhere..
return deferredResult;
}
// From some other thread...
deferredResult.setResult(data);
The problem in your code is precisely that the DeferredResult
is being solved in the same request thread.
So, the thing is that when the Spring proxy requests a connection to the database pool, when you do your heavy load tests, many requests will find the pool is full and does not have connections available. So the request is put on hold, but at that point your DeferredResult
has not been created yet, so its timeout functionality does not exist.
Your request is basically waiting for some connection from the database pool to become available. So, let's say 5 seconds pass, then the request gets a connection, and now you get DeferredResult
which the controller uses to handle the response. Eventually, 5 seconds later it timeout. So you have to add your time waiting for a connection from the pool and your time waiting for the DeferredResult
to get resolved.
That's why you probably see that, when you test with JMeter, the request time gradually increases as connections get depleted from the database pool.
You can enable some logging for the thread pool by adding the following your application.properties file:
logging.level.com.zaxxer.hikari=DEBUG
You could also configure the size of your database pool and even add some JMX support such that you can monitor it from Java Mission Control:
spring.datasource.hikari.maximumPoolSize=10
spring.datasource.hikari.registerMbeans=true
Using JMX support you will be able to see how the database pool gets depleted.
The trick here consists in moving the logic that resolves the promise to another thread:
@Override
public DeferredResult pull(Long previousId, String username) {
DeferredResult result = createPollingResult(previousId, username);
CompletableFuture.runAsync(() -> {
//this is where you encapsulate your db transaction
List<MessageDTO> messages = messageService.findRecents(previousId, username); // should be final or effective final
if (messages.isEmpty()) {
pollingResults.putIfAbsent(username, result);
} else {
result.setResult(messages);
}
});
return result;
}
By doing this, your DeferredResult
is returned immediately and Spring can do its magic of asynchronous request handling while it sets free that precious Tomcat thread.
As many guys mentioned it's not correct way to test the performance. You asked to do automated requests at certain period of time as you were doing in XMLHttpRequest. You can use interval
of Observable
as:
import {Observable} from "rxjs/Observable";
import {Subscription} from "rxjs/Subscription";
private _intervalSubscription: Subscription;
ngOnInit() {
this._intervalSubscription = Observable.interval(500).subscribe(x => {
this.getDataFromServer();
});
}
ngOnDestroy(): void {
this._intervalSubscription.unsubscribe();
}
getDataFromServer() {
// do your network call
this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
}, (error) => {
console.log('e', error);
});
}
This is the best possible way of doing polling from client side.
EDIT 1
private prevRequestTime: number;
ngAfterViewInit(): void {
this.getDataFromServer();
}
getDataFromServer() {
this.prevRequestTime = Date.now();
// do your network call
this.http.get<any>(this.url, {params})
.subscribe((data) => {
console.log('s', data);
this.scheduleRequestAgain();
}, (error) => {
console.log('e', error);
this.scheduleRequestAgain();
});
}
scheduleRequestAgain() {
let diff = Date.now() - this.prevRequestTime;
setTimeout(this.getDataFromServer(), diff);
}
I think you need producer and consumer structure model. i write code for you. i hope it will help you.
This is sample code :
DeferredResultStrore
@Component
public class DeferredResultStrore {
private Queue<DeferredResult<String>> responseBodyQueue;
private HashMap<String, List<DeferredResult<InterfaceModel>>> groupMap;
private final long resultTimeOut;
public DeferredResultStrore() {
responseBodyQueue = new LinkedBlockingQueue<DeferredResult<String>>();
groupMap = new HashMap<String, List<DeferredResult<InterfaceModel>>>();
// write time.
resultTimeOut = 1000 * 60 * 60;
}
public Queue<DeferredResult<String>> getResponseBodyQueue() {
return responseBodyQueue;
}
public HashMap<String, List<DeferredResult<InterfaceModel>>> getGroupMap() {
return groupMap;
}
public long getResultTimeOut() {
return resultTimeOut;
}
}
DeferredResultService
public interface DeferredResultService {
public DeferredResult<?> biteResponse(HttpServletResponse resp, HttpServletRequest req);
public DeferredResult<?> biteGroupResponse(String key, HttpServletResponse resp);
}
DeferredResultServiceImpl
@Service
public class DeferredResultServiceImpl implements DeferredResultService {
@Autowired
private DeferredResultStrore deferredResultStore;
@Override
public DeferredResult<?> biteResponse(final HttpServletResponse resp, HttpServletRequest req) {
final DeferredResult<String> defResult = new DeferredResult<String>(deferredResultStore.getResultTimeOut());
removeObserver(resp, defResult, null);
deferredResultStore.getResponseBodyQueue().add(defResult);
return defResult;
}
@Override
public DeferredResult<?> biteGroupResponse(String key, final HttpServletResponse resp) {
final DeferredResult<InterfaceModel> defResult = new DeferredResult<InterfaceModel>(
deferredResultStore.getResultTimeOut());
List<DeferredResult<InterfaceModel>> defResultList = null;
removeObserver(resp, defResult, key);
if (deferredResultStore.getGroupMap().containsKey(key)) {
defResultList = deferredResultStore.getGroupMap().get(key);
defResultList.add(defResult);
} else {
defResultList = new ArrayList<DeferredResult<InterfaceModel>>();
defResultList.add(defResult);
deferredResultStore.getGroupMap().put(key, defResultList);
}
return defResult;
}
private void removeObserver(final HttpServletResponse resp, final DeferredResult<?> defResult, final String key) {
defResult.onCompletion(new Runnable() {
public void run() {
if (key != null) {
List<DeferredResult<InterfaceModel>> defResultList = deferredResultStore.getGroupMap().get(key);
if (defResultList != null) {
for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
if (deferredResult.hashCode() == defResult.hashCode()) {
defResultList.remove(deferredResult);
}
}
}
} else {
if (!deferredResultStore.getResponseBodyQueue().isEmpty()) {
deferredResultStore.getResponseBodyQueue().remove(defResult);
}
}
}
});
defResult.onTimeout(new Runnable() {
public void run() {
// 206
resp.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
if (key != null) {
List<DeferredResult<InterfaceModel>> defResultList = deferredResultStore.getGroupMap().get(key);
if (defResultList != null) {
for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
if (deferredResult.hashCode() == defResult.hashCode()) {
InterfaceModel model = new InterfaceModel();
model.setId(key);
model.setMessage("onTimeout");
deferredResult.setErrorResult(model);
defResultList.remove(deferredResult);
}
}
}
} else {
defResult.setErrorResult("onTimeout");
deferredResultStore.getResponseBodyQueue().remove(defResult);
}
}
});
}
}
PushService
public interface PushService {
public boolean pushMessage(String message);
public boolean pushGroupMessage(String key, String topic, HttpServletResponse resp);
}
PushServiceImpl
@Service
public class PushServiceImpl implements PushService {
@Autowired
private DeferredResultStrore deferredResultStore;
@Override
public boolean pushMessage(String message) {
if (!deferredResultStore.getResponseBodyQueue().isEmpty()) {
for (DeferredResult<String> deferredResult : deferredResultStore.getResponseBodyQueue()) {
deferredResult.setResult(message);
}
deferredResultStore.getResponseBodyQueue().remove();
}
return true;
}
@Override
public boolean pushGroupMessage(String key, String topic, HttpServletResponse resp) {
List<DeferredResult<InterfaceModel>> defResultList = null;
// select data in DB. that is sample group push service. need to connect db.
InterfaceModel model = new InterfaceModel();
model.setMessage("write group message.");
model.setId(key);
if (deferredResultStore.getGroupMap().containsKey(key)) {
defResultList = deferredResultStore.getGroupMap().get(key);
for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
deferredResult.setResult(model);
}
deferredResultStore.getGroupMap().remove(key);
}
return true;
}
}
InterfaceModel
public class InterfaceModel {
private String message;
private int idx;
private String id;
// DB Column
public InterfaceModel() {
// TODO Auto-generated constructor stub
}
public InterfaceModel(String message, int idx, String id) {
this.message = message;
this.idx = idx;
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public int getIdx() {
return idx;
}
public void setIdx(int idx) {
this.idx = idx;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
web.xml
async-supported very important in settings.
<servlet>
<servlet-name>appServlet</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>/WEB-INF/spring/appServlet/servlet-context.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
<async-supported>true</async-supported>
</servlet>
Java base
@Bean
public ServletRegistrationBean dispatcherServlet() {
ServletRegistrationBean registration = new ServletRegistrationBean(
new DispatcherServlet(), "/");
registration.setAsyncSupported(true);
return registration;
}
In fact :
A DeferredResult is associated with an open request. When the request completes, the DeferredResult is removed from the map, and then, the client issues a new long polling request, which adds a new DeferredResult instance
Spring Boot will automatically register any Servlet beans in your application context with the servlet container. By default async supported is set to true so there's nothing for you to do beyond creating a bean for your Servlet.
@Aligtor, for you => public @interface EnableAsync Enables Spring's asynchronous method execution capability, similar to functionality found in Spring's XML namespace.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With