Re: ThreadPoolExecutor with blocking execute?
wesley.hall@gmail.com wrote:
castillo.bryan@gmail.com wrote:
I thought I could use a ThreadPoolExecutor for a producer/consumer
relationship. I wanted to have a fixed queue size for the pool, which
blocked on the producer side if the queue was full, until a slot in the
queue was open. I can see that a RejectedExecutionHandler is called
when the queue is full and there are some pre-existing handlers, to
drop the Runnable or to run the Runnable in the current thread, but no
support for waiting until a slot is empty. I thought that running the
Runnable in the current thread is pretty close, but if multiple slots
open up, while the current thread is busy with a Runnable, it can't
give more tasks to waiting threads.
So I wrote this simple class to block until a slot is empty. Does this
seem reasonable? Does something like this already exist in the JDK that
I missed?
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
private static class BlockingQueuePut implements
RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException ie) {
throw new RejectedExecutionException(ie);
}
}
}
public BlockingThreadPoolExecutor(int coreThreadSize, int
maxThreadSize, int queueSize) {
super(
coreThreadSize,
maxThreadSize,
5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize),
new BlockingQueuePut());
}
}
Whats wrong with this?:
BlockingQueue<Runnable> fixedSizeQueue = new
ArrayBlockingQueue<Runnable>(size);
Executor executor = new ThreadPoolExecutor(........., fixedSizeQueue);
Just add tasks to the fixedSizeQueue, which will block if the queue
overflows?
Seems much simpler to me. Would doesn't this solve your problem?
No, by default ThreadPoolExecutor does not block when the queue is
full. It throws a RejectedExecutionException.
If you run the code below you will see that happen.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestExecutorService {
public static void runTest(ExecutorService executor, final long
sleepTime, int itemsToRun)
throws InterruptedException
{
System.err.println("Starting test.");
for (int i=0; i<itemsToRun; i++) {
final int id = i+1;
System.err.println("enqueing item " + id + ".");
executor.execute(new Runnable() {
public void run() {
System.err.println("Running " + id);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {}
System.err.println("Finished " + id);
}
});
}
System.err.println("Waiting for shutdown.");
executor.shutdown();
while (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
; // do nothing
}
}
public static void main(String[] args) {
try {
ExecutorService executor = new ThreadPoolExecutor(1, 10, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
//ExecutorService executor = new BlockingThreadPoolExecutor(1, 10,
5);
runTest(executor, 1000, 50);
}
catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}