001package com.hfg.util; 002 003import java.util.concurrent.*; 004 005//------------------------------------------------------------------------------ 006/** 007 * A version of ThreadPoolExecutor that works with a PriorityBlockingQueue. 008 * 009 * @author J. Alex Taylor, hairyfatguy.com 010 */ 011//------------------------------------------------------------------------------ 012// com.hfg XML/HTML Coding Library 013// 014// This library is free software; you can redistribute it and/or 015// modify it under the terms of the GNU Lesser General Public 016// License as published by the Free Software Foundation; either 017// version 2.1 of the License, or (at your option) any later version. 018// 019// This library is distributed in the hope that it will be useful, 020// but WITHOUT ANY WARRANTY; without even the implied warranty of 021// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 022// Lesser General Public License for more details. 023// 024// You should have received a copy of the GNU Lesser General Public 025// License along with this library; if not, write to the Free Software 026// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 027// 028// J. Alex Taylor, President, Founder, CEO, COO, CFO, OOPS hairyfatguy.com 029// jataylor@hairyfatguy.com 030//------------------------------------------------------------------------------ 031 032public class PrioritizedThreadPoolExecutor extends ThreadPoolExecutor 033{ 034 //-------------------------------------------------------------------------- 035 /** 036 * Creates a new {@code PrioritizedThreadPoolExecutor} with the given initial 037 * parameters and 038 * {@linkplain Executors#defaultThreadFactory default thread factory}. 039 * 040 * @param corePoolSize the number of threads to keep in the pool, even 041 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 042 * @param maximumPoolSize the maximum number of threads to allow in the 043 * pool 044 * @param keepAliveTime when the number of threads is greater than 045 * the core, this is the maximum time that excess idle threads 046 * will wait for new tasks before terminating. 047 * @param unit the time unit for the {@code keepAliveTime} argument 048 * @param workQueue the queue to use for holding tasks before they are 049 * executed. This queue will hold only the {@code Runnable} 050 * tasks submitted by the {@code execute} method. 051 * @throws IllegalArgumentException if one of the following holds:<br> 052 * {@code corePoolSize < 0}<br> 053 * {@code keepAliveTime < 0}<br> 054 * {@code maximumPoolSize <= 0}<br> 055 * {@code maximumPoolSize < corePoolSize} 056 * @throws NullPointerException if {@code workQueue} 057 * or {@code handler} is null 058 */ 059 public PrioritizedThreadPoolExecutor(int corePoolSize, 060 int maximumPoolSize, 061 long keepAliveTime, 062 TimeUnit unit, 063 PriorityBlockingQueue<Runnable> workQueue) 064 { 065 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 066 } 067 068 //-------------------------------------------------------------------------- 069 /** 070 * Creates a new {@code PrioritizedThreadPoolExecutor} with the given initial 071 * parameters and 072 * {@linkplain Executors#defaultThreadFactory default thread factory}. 073 * 074 * @param corePoolSize the number of threads to keep in the pool, even 075 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 076 * @param maximumPoolSize the maximum number of threads to allow in the 077 * pool 078 * @param keepAliveTime when the number of threads is greater than 079 * the core, this is the maximum time that excess idle threads 080 * will wait for new tasks before terminating. 081 * @param unit the time unit for the {@code keepAliveTime} argument 082 * @param workQueue the queue to use for holding tasks before they are 083 * executed. This queue will hold only the {@code Runnable} 084 * tasks submitted by the {@code execute} method. 085 * @param handler the handler to use when execution is blocked 086 * because the thread bounds and queue capacities are reached 087 * @throws IllegalArgumentException if one of the following holds:<br> 088 * {@code corePoolSize < 0}<br> 089 * {@code keepAliveTime < 0}<br> 090 * {@code maximumPoolSize <= 0}<br> 091 * {@code maximumPoolSize < corePoolSize} 092 * @throws NullPointerException if {@code workQueue} 093 * or {@code handler} is null 094 */ 095 public PrioritizedThreadPoolExecutor(int corePoolSize, 096 int maximumPoolSize, 097 long keepAliveTime, 098 TimeUnit unit, 099 PriorityBlockingQueue<Runnable> workQueue, 100 RejectedExecutionHandler handler) 101 { 102 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); 103 } 104 105 106 //-------------------------------------------------------------------------- 107 @Override 108 public Future<?> submit(Runnable inTask) 109 { 110 return submit(inTask, Priority.MEDIUM); 111 } 112 113 //-------------------------------------------------------------------------- 114 public Future<?> submit(Runnable inTask, Priority inPriority) 115 { 116 if (inTask == null) throw new NullPointerException(); 117 RunnableFuture<Void> ftask = newTaskFor(inTask, null, inPriority); 118 execute(ftask); 119 return ftask; 120 } 121 122 //-------------------------------------------------------------------------- 123 @Override 124 public <T> Future<T> submit(Runnable inTask, T inResult) 125 { 126 return submit(inTask, inResult, Priority.MEDIUM); 127 } 128 129 //-------------------------------------------------------------------------- 130 public <T> Future<T> submit(Runnable task, T result, Priority inPriority) 131 { 132 if (task == null) throw new NullPointerException(); 133 RunnableFuture<T> ftask = newTaskFor(task, result, inPriority); 134 execute(ftask); 135 return ftask; 136 } 137 138 //-------------------------------------------------------------------------- 139 @Override 140 public <T> Future<T> submit(Callable<T> inTask) 141 { 142 if (inTask == null) throw new NullPointerException(); 143 RunnableFuture<T> ftask = newTaskFor(inTask); 144 execute(ftask); 145 return ftask; 146 } 147 148 //-------------------------------------------------------------------------- 149 public <T> Future<T> submit(Callable<T> inTask, Priority inPriority) 150 { 151 if (inTask == null) throw new NullPointerException(); 152 RunnableFuture<T> ftask = newTaskFor(inTask, inPriority); 153 execute(ftask); 154 return ftask; 155 } 156 157 //-------------------------------------------------------------------------- 158 /** 159 * Returns a {@code RunnableFuture} for the given runnable and default 160 * value. 161 * 162 * @param inRunnable the runnable task being wrapped 163 * @param inValue the default value for the returned future 164 * @param <T> the type of the given value 165 * @return a {@code RunnableFuture} which, when run, will run the 166 * underlying runnable and which, as a {@code Future}, will yield 167 * the given value as its result and provide for cancellation of 168 * the underlying task 169 * @since 1.6 170 */ 171 @Override 172 protected <T> RunnableFuture<T> newTaskFor(Runnable inRunnable, T inValue) 173 { 174 return newTaskFor(inRunnable, inValue, Priority.MEDIUM); 175 } 176 177 //-------------------------------------------------------------------------- 178 /** 179 * Returns a {@code RunnableFuture} for the given runnable and default 180 * value. 181 * 182 * @param inRunnable the runnable task being wrapped 183 * @param inValue the default value for the returned future 184 * @param <T> the type of the given value 185 * @param inPriority the job's priority 186 * @return a {@code RunnableFuture} which, when run, will run the 187 * underlying runnable and which, as a {@code Future}, will yield 188 * the given value as its result and provide for cancellation of 189 * the underlying task 190 * @since 1.6 191 */ 192 protected <T> RunnableFuture<T> newTaskFor(Runnable inRunnable, T inValue, Priority inPriority) 193 { 194 return new PrioritizedFutureTask<T>(inRunnable, inValue).setPriority(inPriority); 195 } 196 197 //-------------------------------------------------------------------------- 198 /** 199 * Returns a {@code RunnableFuture} for the given callable task. 200 * 201 * @param inCallable the callable task being wrapped 202 * @param <T> the type of the callable's result 203 * @return a {@code RunnableFuture} which, when run, will call the 204 * underlying callable and which, as a {@code Future}, will yield 205 * the callable's result as its result and provide for 206 * cancellation of the underlying task 207 * @since 1.6 208 */ 209 @Override 210 protected <T> RunnableFuture<T> newTaskFor(Callable<T> inCallable) 211 { 212 return newTaskFor(inCallable, Priority.MEDIUM); 213 } 214 215 //-------------------------------------------------------------------------- 216 /** 217 * Returns a {@code RunnableFuture} for the given callable task. 218 * 219 * @param inCallable the callable task being wrapped 220 * @param <T> the type of the callable's result 221 * @param inPriority the job's priority 222 * @return a {@code RunnableFuture} which, when run, will call the 223 * underlying callable and which, as a {@code Future}, will yield 224 * the callable's result as its result and provide for 225 * cancellation of the underlying task 226 * @since 1.6 227 */ 228 protected <T> RunnableFuture<T> newTaskFor(Callable<T> inCallable, Priority inPriority) 229 { 230 return new PrioritizedFutureTask<T>(inCallable).setPriority(inPriority); 231 } 232}