Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring state machine in multithreaded environment

I have an event queue with n message listeners. When a message arrives, one message listener takes it and executes a new instance of a state machine. The problem I'm facing is that although multiple messages are handled in parallel, the state machine starts executes actions sequentially although they are invoked by different state machine instances as you can see here:

2017-10-18 16:11:03.740  INFO 30282 --- [lTaskExecutor-1] o.s.s.support.LifecycleObjectSupport     : started org.springframework.statemachine.support.DefaultStateMachineExecutor@6ddb1ea6
2017-10-18 16:11:03.741  INFO 30282 --- [lTaskExecutor-1] o.s.s.support.LifecycleObjectSupport     : started EVALUATE_IS_WALKTHROUGH SAVE END START  / START / uuid=b922b6b1-a441-4924-8531-d45e0e0c9c40 / id=null
2017-10-18 16:11:03.740  INFO 30282 --- [TaskExecutor-10] o.s.s.support.LifecycleObjectSupport     : started org.springframework.statemachine.support.DefaultStateMachineExecutor@13b6ace4
2017-10-18 16:11:03.741  INFO 30282 --- [TaskExecutor-10] o.s.s.support.LifecycleObjectSupport     : started EVALUATE_IS_WALKTHROUGH SAVE END START  / START / uuid=e06a8c1d-beed-41c6-bc63-d8c1a3a56169 / id=null
2017-10-18 16:11:03.759  INFO 30282 --- [pool-5-thread-1] i.b.b.e.processors.actions.SaveAction    : [io.botbit.backend.events.processors.actions.SaveAction@607e4071] Saving event Event[id=null,     
2017-10-18 16:11:24.046  INFO 30282 --- [pool-5-thread-1] i.b.b.e.processors.actions.SaveAction    : [io.botbit.backend.events.processors.actions.SaveAction@607e4071] Saving event Event[id=null, 
2017-10-18 16:11:44.058  INFO 30282 --- [pool-5-thread-1] i.b.b.e.p.a.EvaluateIsWalkthroughAction  : Evaluation is WT,,,
2017-10-18 16:11:44.059  INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport     : stopped org.springframework.statemachine.support.DefaultStateMachineExecutor@6ddb1ea6
2017-10-18 16:11:44.060  INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport     : stopped EVALUATE_IS_WALKTHROUGH SAVE END START  /  / uuid=b922b6b1-a441-4924-8531-d45e0e0c9c40 / id=null
2017-10-18 16:11:44.060  INFO 30282 --- [pool-5-thread-1] i.b.b.e.p.a.EvaluateIsWalkthroughAction  : Evaluation is WT,,,
2017-10-18 16:11:44.061  INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport     : stopped org.springframework.statemachine.support.DefaultStateMachineExecutor@13b6ace4
2017-10-18 16:11:44.061  INFO 30282 --- [pool-5-thread-1] o.s.s.support.LifecycleObjectSupport     : stopped EVALUATE_IS_WALKTHROUGH SAVE END START  /  / uuid=e06a8c1d-beed-41c6-bc63-d8c1a3a56169 / id=nul

I think this happens because every action is run within the same thread ([pool-5-thread-1]). I need every instance to run fully in parallel, and by this I mean the states machine are executed in parallel but also its actions.

Any help will be appreciated, thanks!

@Component
public class EventConsumer {
    private final static Logger logger = Logger.getLogger(EventConsumer.class);
    @Autowired
    private StateMachineFactory<String, String> eventProcessorFactory;

    public void consume(Event event) {
        logger.info("Received <" + event + ">");
        StateMachine<String, String> eventProcessor = eventProcessorFactory.getStateMachine();
        eventProcessor.getExtendedState().getVariables().put("event", event);
        eventProcessor.start();
        eventProcessor.sendEvent(CommonEvents.SUCCESS);
    }

    public void consume(Collection<Event> events) {
        for (Event event : events) {
            this.consume(event);
        }
    }
}

And this is the state machine configuration

@Configuration
@EnableStateMachineFactory
public class WiFiConnectEventProcessorConfig extends StateMachineConfigurerAdapter<String, String> {

    @Autowired
    SaveAction saveAction;
    @Autowired
    DeprecateAddVisitationAction addVisitation;
    @Autowired
    EvaluateIsWalkthroughAction isWTAction;

    @Override
    public void configure(StateMachineStateConfigurer<String, String> states) throws Exception {
        states.withStates().initial(WiFiConnectStates.START).state(WiFiConnectStates.SAVE, saveAction)
                .state(WiFiConnectStates.DEPRECATE_ADD_VISITATION, addVisitation)
                .state(WiFiConnectStates.EVALUATE_IS_WALKTHROUGH, isWTAction).end(WiFiConnectStates.END);
    }

    @Override
    public void configure(StateMachineTransitionConfigurer<String, String> transitions) throws Exception {
        transitions.withExternal().source(WiFiConnectStates.START).target(WiFiConnectStates.SAVE)
                .event(CommonEvents.SUCCESS)
                .and().withExternal().source(WiFiConnectStates.SAVE)
                .target(WiFiConnectStates.DEPRECATE_ADD_VISITATION).event(CommonEvents.SUCCESS)
                .and().withExternal()
                .source(WiFiConnectStates.DEPRECATE_ADD_VISITATION).target(WiFiConnectStates.EVALUATE_IS_WALKTHROUGH)
                .event(CommonEvents.SUCCESS)
                .and().withExternal().source(WiFiConnectStates.EVALUATE_IS_WALKTHROUGH)
                .target(WiFiConnectStates.END).event(CommonEvents.SUCCESS);
    }
}
like image 885
Alejandro Raiczyk Avatar asked Nov 08 '22 15:11

Alejandro Raiczyk


1 Answers

You need to create your own TaskScheduler, and config it as follows within your StateMachineConfig @Configuration file. Choose a proper PoolSize for your needs.

    @Override
    public void configure(StateMachineConfigurationConfigurer<TaskState, TaskEvent> config) throws Exception {
        config.withConfiguration()
                .taskScheduler(myTaskScheduler());
    }

    @Bean
    public TaskScheduler myTaskScheduler() {
        final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10);
        return scheduler;
    }
like image 192
Shawn Zhang Avatar answered Nov 14 '22 23:11

Shawn Zhang