I have an event queue with n message listeners. When a message arrives, one message listener takes it and executes a new instance of a state machine. The problem I'm facing is that although multiple messages are handled in parallel, the state machine starts executes actions sequentially although they are invoked by different state machine instances as you can see here:
2017-10-18 16:11:03.740 INFO 30282 --- [lTaskExecutor-1] o.s.s.support.LifecycleObjectSupport : started org.springframework.statemachine.support.DefaultStateMachineExecutor@6ddb1ea6
2017-10-18 16:11:03.741 INFO 30282 --- [lTaskExecutor-1] o.s.s.support.LifecycleObjectSupport : started EVALUATE_IS_WALKTHROUGH SAVE END START / START / uuid=b922b6b1-a441-4924-8531-d45e0e0c9c40 / id=null
2017-10-18 16:11:03.740 INFO 30282 --- [TaskExecutor-10] o.s.s.support.LifecycleObjectSupport : started org.springframework.statemachine.support.DefaultStateMachineExecutor@13b6ace4
2017-10-18 16:11:03.741 INFO 30282 --- [TaskExecutor-10] o.s.s.support.LifecycleObjectSupport : started EVALUATE_IS_WALKTHROUGH SAVE END START / START / uuid=e06a8c1d-beed-41c6-bc63-d8c1a3a56169 / id=null
2017-10-18 16:11:03.759 INFO 30282 --- [pool-5-thread-1] i.b.b.e.processors.actions.SaveAction : [io.botbit.backend.events.processors.actions.SaveAction@607e4071] Saving event Event[id=null,
2017-10-18 16:11:24.046 INFO 30282 --- [pool-5-thread-1] i.b.b.e.processors.actions.SaveAction : [io.botbit.backend.events.processors.actions.SaveAction@607e4071] Saving event Event[id=null,
2017-10-18 16:11:44.058 INFO 30282 --- [pool-5-thread-1] i.b.b.e.p.a.EvaluateIsWalkthroughAction : Evaluation is WT,,,
2017-10-18 16:11:44.059 INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport : stopped org.springframework.statemachine.support.DefaultStateMachineExecutor@6ddb1ea6
2017-10-18 16:11:44.060 INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport : stopped EVALUATE_IS_WALKTHROUGH SAVE END START / / uuid=b922b6b1-a441-4924-8531-d45e0e0c9c40 / id=null
2017-10-18 16:11:44.060 INFO 30282 --- [pool-5-thread-1] i.b.b.e.p.a.EvaluateIsWalkthroughAction : Evaluation is WT,,,
2017-10-18 16:11:44.061 INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport : stopped org.springframework.statemachine.support.DefaultStateMachineExecutor@13b6ace4
2017-10-18 16:11:44.061 INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport : stopped EVALUATE_IS_WALKTHROUGH SAVE END START / / uuid=e06a8c1d-beed-41c6-bc63-d8c1a3a56169 / id=nul
I think this happens because every action is run within the same thread ([pool-5-thread-1]). I need every instance to run fully in parallel, and by this I mean the states machine are executed in parallel but also its actions.
Any help will be appreciated, thanks!
@Component
public class EventConsumer {
private final static Logger logger = Logger.getLogger(EventConsumer.class);
@Autowired
private StateMachineFactory<String, String> eventProcessorFactory;
public void consume(Event event) {
logger.info("Received <" + event + ">");
StateMachine<String, String> eventProcessor = eventProcessorFactory.getStateMachine();
eventProcessor.getExtendedState().getVariables().put("event", event);
eventProcessor.start();
eventProcessor.sendEvent(CommonEvents.SUCCESS);
}
public void consume(Collection<Event> events) {
for (Event event : events) {
this.consume(event);
}
}
}
And this is the state machine configuration
@Configuration
@EnableStateMachineFactory
public class WiFiConnectEventProcessorConfig extends StateMachineConfigurerAdapter<String, String> {
@Autowired
SaveAction saveAction;
@Autowired
DeprecateAddVisitationAction addVisitation;
@Autowired
EvaluateIsWalkthroughAction isWTAction;
@Override
public void configure(StateMachineStateConfigurer<String, String> states) throws Exception {
states.withStates().initial(WiFiConnectStates.START).state(WiFiConnectStates.SAVE, saveAction)
.state(WiFiConnectStates.DEPRECATE_ADD_VISITATION, addVisitation)
.state(WiFiConnectStates.EVALUATE_IS_WALKTHROUGH, isWTAction).end(WiFiConnectStates.END);
}
@Override
public void configure(StateMachineTransitionConfigurer<String, String> transitions) throws Exception {
transitions.withExternal().source(WiFiConnectStates.START).target(WiFiConnectStates.SAVE)
.event(CommonEvents.SUCCESS)
.and().withExternal().source(WiFiConnectStates.SAVE)
.target(WiFiConnectStates.DEPRECATE_ADD_VISITATION).event(CommonEvents.SUCCESS)
.and().withExternal()
.source(WiFiConnectStates.DEPRECATE_ADD_VISITATION).target(WiFiConnectStates.EVALUATE_IS_WALKTHROUGH)
.event(CommonEvents.SUCCESS)
.and().withExternal().source(WiFiConnectStates.EVALUATE_IS_WALKTHROUGH)
.target(WiFiConnectStates.END).event(CommonEvents.SUCCESS);
}
}
You need to create your own TaskScheduler, and config it as follows within your StateMachineConfig @Configuration file. Choose a proper PoolSize for your needs.
@Override
public void configure(StateMachineConfigurationConfigurer<TaskState, TaskEvent> config) throws Exception {
config.withConfiguration()
.taskScheduler(myTaskScheduler());
}
@Bean
public TaskScheduler myTaskScheduler() {
final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
return scheduler;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With