Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java ExecutorService and ThreadPoolExecutor

I am experiencing a strange problem. I am trying to use Producer/Consumer Model, Please suggest if I have done something wrong here. When I use the ExecutorService with fixed thread of 4, I never get any exception and program runs BUT when I use ThreadPoolExecutor , it gives me the exception. Can't find out what is the error! Please advise!

code of ExecutorService:

ArrayBlockingQueue<BillableList> list =new ArrayBlockingQueue<BillableList>(2);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
 ExecutorService threadPool = Executors.newFixedThreadPool(4, threadFactory);

  threadPool.execute(new BillingConsu(network,"consumer->"+Thread.currentThread(), list)); 
  threadPool.execute(new BillingConsu(network,"consumer->"+Thread.currentThread(), list));
  threadPool.execute(new BillingConsu(network,"consumer->"+Thread.currentThread(), list));

Future producerStatus = threadPool.submit(new BillProdu(this.network,"Producer", list)); 
producerStatus.get();
threadPool.shutdown(); 

 while (!threadPool.isTerminated()) {
 threadPool.shutdown();
 threadPool.awaitTermination(10, TimeUnit.SECONDS);
 }

Code of ThreadPoolExecutor:

ArrayBlockingQueue<BillableList> list =new ArrayBlockingQueue<BillableList>(4);
BlockingQueue<Runnable> worksQueue = new ArrayBlockingQueue<Runnable>(100);
RejectedExecutionHandler executionHandler = new MyRejectedExecutionHandelerImpl();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,5, 10, 
 TimeUnit.SECONDS, worksQueue,threadFactory, executionHandler);
 Future producerStatus = threadPool.submit(new BillProdu(this.network,"Producer", list)); 
 producerStatus.get(); 

  threadPool.execute(new BillingConsu(network,"consumer 1", list)); 
  threadPool.execute(new BillingConsu(network,"consumer 2", list));
  threadPool.execute(new BillingConsu(network,"consumer 3", list));
  threadPool.execute(new BillingConsu(network,"consumer 4", list));
  threadPool.shutdown(); 

             while (!threadPool.isTerminated()) {
              threadPool.shutdown();
                threadPool.awaitTermination(10, TimeUnit.SECONDS);
           }

Exception when I run ThreadPoolExecutor :

Exception in thread "pool-1-thread-2" java.lang.ExceptionInInitializerError
    at org.apache.axis.utils.Messages.<clinit>(Messages.java:36)
    at org.apache.axis.configuration.EngineConfigurationFactoryFinder$1.run    (EngineConfigurationFactoryFinder.java:141)
    at java.security.AccessController.doPrivileged(Native Method)
    at org.apache.axis.configuration.EngineConfigurationFactoryFinder.newFactory    (EngineConfigurationFactoryFinder.java:113)
    at org.apache.axis.configuration.EngineConfigurationFactoryFinder.newFactory    (EngineConfigurationFactoryFinder.java:160)
    at org.apache.axis.client.Service.getEngineConfiguration(Service.java:813)
    at org.apache.axis.client.Service.getAxisClient(Service.java:104)
    at org.apache.axis.client.Service.<init>(Service.java:113)
    at org.tempuri.OnlineBillingLocator.<init>(OnlineBillingLocator.java:28)
    at com.mixem.sdc.sms.StsSmsConnection.<init>(StsSmsConnection.java:40)
    at BillingConsu.doStsBilling(BillingConsu.java:202)
    at BillingConsu.run(BillingConsu.java:60)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:619)
Caused by: java.lang.NullPointerException
    at java.io.FileOutputStream.<init>(FileOutputStream.java:172)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:102)
    at org.apache.log4j.FileAppender.setFile(FileAppender.java:290)
    at LogFileWriter.append(LogFileWriter.java:45)
    at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
    at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders    (AppenderAttachableImpl.java:66)
    at org.apache.log4j.Category.callAppenders(Category.java:206)
    at org.apache.log4j.Category.forcedLog(Category.java:391)
    at org.apache.log4j.Category.log(Category.java:856)
    at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)
    at org.apache.axis.i18n.ProjectResourceBundle.getBundle(ProjectResourceBundle.java:264)
    at org.apache.axis.i18n.MessagesConstants.<clinit>(MessagesConstants.java:32) 

Log4J properties File

log4j.rootLogger = DEBUG, fileout
log4j.appender.fileout = LogFileWriter
log4j.appender.fileout.layout.ConversionPattern = %d{ABSOLUTE} %5p %c - %m%n
log4j.appender.fileout.layout = org.apache.log4j.PatternLayout
log4j.appender.fileout.File = /logs/billinglogs.log

LogFileWriter Append Code

@Override
public void append(LoggingEvent event) {
try {
setFile(appendLevelToFileName((String) MDC.get(ORIG_LOG_FILE_NAME),
event.getLevel().toString()), fileAppend, bufferedIO,bufferSize);
} catch (IOException ie) {
errorHandler.error("Error occured while setting file for the log level "+ event.getLevel(), ie,
ErrorCode.FILE_OPEN_FAILURE);
    }
super.append(event);
}

MDC put code inside LogFileWriter

@Override
public void activateOptions() {
MDC.put(ORIG_LOG_FILE_NAME, fileName);
super.activateOptions();
 }
like image 758
Madan Madan Avatar asked May 28 '13 10:05

Madan Madan


People also ask

What is the difference between ThreadPoolExecutor and ExecutorService?

On the other hand, ExecutorService is an extension of the Executor interface and provides a facility for returning a Future object and terminate, or shut down the thread pool. Once the shutdown is called, the thread pool will not accept new tasks but complete any pending task.

What is the difference between ExecutorService and ScheduledExecutorService?

ScheduledExecutorService is an ExecutorService which can schedule tasks to run after a delay, or to execute repeatedly with a fixed interval of time in between each execution. Tasks are executed asynchronously by a worker thread, and not by the thread handing the task to the ScheduledExecutorService .

What is difference between ExecutorService and ForkJoinPool?

The Fork/Join framework in Java 7 is an implementation of the Divide and Conquer algorithm, in which a central ForkJoinPool executes branching ForkJoinTasks. ExecutorService is an Executor that provides methods to manage the progress-tracking and termination of asynchronous tasks.

What is ThreadPoolExecutor in java?

ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.


1 Answers

As I expected you are failing because of thread-locality. The line here is almost certainly returning null

MDC.get(ORIG_LOG_FILE_NAME)

When/where do you MDC.put? The issue here is that MDC uses a thread-local map. So when you are running the Callable it will attempt to log in a separate thread. That thread has not been registered with MDC and get will return null.

Imagine your application looks similar to

Main-Thread
  MDC.put -> sets thread-local-map(Main-Thread, ORIG_LOG_FILE_NAME)

Executor-Thread-1
Executor-Thread-2
Executor-Thread-N

Now when you are in Executor-Thread-1..N it will do

Executor-Thread-N
    MDC.get(Executor-Thread-N, ORIG_LOG_FILE_NAME)

It will return null

If you run outside of the Executor Service threads it works

Main-Thread
   MDC.get(Main-Thread, ORIG_LOG_FILE_NAME) // will be non-null

So your next question is, "Why is it not failing with an ExecutorService?" It probably is or would be and may not be reported. I do notice your order of submission to the ExecutorService is different then TPE. May want to try and match them up and see if you get the same output.

Edit: May want to try this as a fix

ThreadFactory threadFactory = new ThreadFactory() {
    public Thread newThread(final Runnable r) {
        return Executors.defaultThreadFactory().newThread(new Runnable(){
            public void run() {
                MDC.put(ORIG_LOG_FILE_NAME, fileName);
                r.run();
            }
        });
    }
};
like image 75
John Vint Avatar answered Sep 30 '22 19:09

John Vint