I am experiencing a strange problem. I am trying to use Producer/Consumer Model, Please suggest if I have done something wrong here. When I use the ExecutorService with fixed thread of 4, I never get any exception and program runs BUT when I use ThreadPoolExecutor , it gives me the exception. Can't find out what is the error! Please advise!
code of ExecutorService:
ArrayBlockingQueue<BillableList> list =new ArrayBlockingQueue<BillableList>(2);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ExecutorService threadPool = Executors.newFixedThreadPool(4, threadFactory);
threadPool.execute(new BillingConsu(network,"consumer->"+Thread.currentThread(), list));
threadPool.execute(new BillingConsu(network,"consumer->"+Thread.currentThread(), list));
threadPool.execute(new BillingConsu(network,"consumer->"+Thread.currentThread(), list));
Future producerStatus = threadPool.submit(new BillProdu(this.network,"Producer", list));
producerStatus.get();
threadPool.shutdown();
while (!threadPool.isTerminated()) {
threadPool.shutdown();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
Code of ThreadPoolExecutor:
ArrayBlockingQueue<BillableList> list =new ArrayBlockingQueue<BillableList>(4);
BlockingQueue<Runnable> worksQueue = new ArrayBlockingQueue<Runnable>(100);
RejectedExecutionHandler executionHandler = new MyRejectedExecutionHandelerImpl();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,5, 10,
TimeUnit.SECONDS, worksQueue,threadFactory, executionHandler);
Future producerStatus = threadPool.submit(new BillProdu(this.network,"Producer", list));
producerStatus.get();
threadPool.execute(new BillingConsu(network,"consumer 1", list));
threadPool.execute(new BillingConsu(network,"consumer 2", list));
threadPool.execute(new BillingConsu(network,"consumer 3", list));
threadPool.execute(new BillingConsu(network,"consumer 4", list));
threadPool.shutdown();
while (!threadPool.isTerminated()) {
threadPool.shutdown();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
Exception when I run ThreadPoolExecutor :
Exception in thread "pool-1-thread-2" java.lang.ExceptionInInitializerError
at org.apache.axis.utils.Messages.<clinit>(Messages.java:36)
at org.apache.axis.configuration.EngineConfigurationFactoryFinder$1.run (EngineConfigurationFactoryFinder.java:141)
at java.security.AccessController.doPrivileged(Native Method)
at org.apache.axis.configuration.EngineConfigurationFactoryFinder.newFactory (EngineConfigurationFactoryFinder.java:113)
at org.apache.axis.configuration.EngineConfigurationFactoryFinder.newFactory (EngineConfigurationFactoryFinder.java:160)
at org.apache.axis.client.Service.getEngineConfiguration(Service.java:813)
at org.apache.axis.client.Service.getAxisClient(Service.java:104)
at org.apache.axis.client.Service.<init>(Service.java:113)
at org.tempuri.OnlineBillingLocator.<init>(OnlineBillingLocator.java:28)
at com.mixem.sdc.sms.StsSmsConnection.<init>(StsSmsConnection.java:40)
at BillingConsu.doStsBilling(BillingConsu.java:202)
at BillingConsu.run(BillingConsu.java:60)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
Caused by: java.lang.NullPointerException
at java.io.FileOutputStream.<init>(FileOutputStream.java:172)
at java.io.FileOutputStream.<init>(FileOutputStream.java:102)
at org.apache.log4j.FileAppender.setFile(FileAppender.java:290)
at LogFileWriter.append(LogFileWriter.java:45)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders (AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)
at org.apache.axis.i18n.ProjectResourceBundle.getBundle(ProjectResourceBundle.java:264)
at org.apache.axis.i18n.MessagesConstants.<clinit>(MessagesConstants.java:32)
Log4J properties File
log4j.rootLogger = DEBUG, fileout
log4j.appender.fileout = LogFileWriter
log4j.appender.fileout.layout.ConversionPattern = %d{ABSOLUTE} %5p %c - %m%n
log4j.appender.fileout.layout = org.apache.log4j.PatternLayout
log4j.appender.fileout.File = /logs/billinglogs.log
LogFileWriter Append Code
@Override
public void append(LoggingEvent event) {
try {
setFile(appendLevelToFileName((String) MDC.get(ORIG_LOG_FILE_NAME),
event.getLevel().toString()), fileAppend, bufferedIO,bufferSize);
} catch (IOException ie) {
errorHandler.error("Error occured while setting file for the log level "+ event.getLevel(), ie,
ErrorCode.FILE_OPEN_FAILURE);
}
super.append(event);
}
MDC put code inside LogFileWriter
@Override
public void activateOptions() {
MDC.put(ORIG_LOG_FILE_NAME, fileName);
super.activateOptions();
}
On the other hand, ExecutorService is an extension of the Executor interface and provides a facility for returning a Future object and terminate, or shut down the thread pool. Once the shutdown is called, the thread pool will not accept new tasks but complete any pending task.
ScheduledExecutorService is an ExecutorService which can schedule tasks to run after a delay, or to execute repeatedly with a fixed interval of time in between each execution. Tasks are executed asynchronously by a worker thread, and not by the thread handing the task to the ScheduledExecutorService .
The Fork/Join framework in Java 7 is an implementation of the Divide and Conquer algorithm, in which a central ForkJoinPool executes branching ForkJoinTasks. ExecutorService is an Executor that provides methods to manage the progress-tracking and termination of asynchronous tasks.
ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.
As I expected you are failing because of thread-locality. The line here is almost certainly returning null
MDC.get(ORIG_LOG_FILE_NAME)
When/where do you MDC.put
? The issue here is that MDC uses a thread-local map. So when you are running the Callable it will attempt to log in a separate thread. That thread has not been registered with MDC and get
will return null.
Imagine your application looks similar to
Main-Thread
MDC.put -> sets thread-local-map(Main-Thread, ORIG_LOG_FILE_NAME)
Executor-Thread-1
Executor-Thread-2
Executor-Thread-N
Now when you are in Executor-Thread-1..N it will do
Executor-Thread-N
MDC.get(Executor-Thread-N, ORIG_LOG_FILE_NAME)
It will return null
If you run outside of the Executor Service threads it works
Main-Thread
MDC.get(Main-Thread, ORIG_LOG_FILE_NAME) // will be non-null
So your next question is, "Why is it not failing with an ExecutorService?" It probably is or would be and may not be reported. I do notice your order of submission to the ExecutorService is different then TPE. May want to try and match them up and see if you get the same output.
Edit: May want to try this as a fix
ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(final Runnable r) {
return Executors.defaultThreadFactory().newThread(new Runnable(){
public void run() {
MDC.put(ORIG_LOG_FILE_NAME, fileName);
r.run();
}
});
}
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With