001package com.hfg.util;
002
003import java.util.concurrent.*;
004
005//------------------------------------------------------------------------------
006/**
007 * A version of ThreadPoolExecutor that works with a PriorityBlockingQueue.
008 *
009 * @author J. Alex Taylor, hairyfatguy.com
010 */
011//------------------------------------------------------------------------------
012// com.hfg XML/HTML Coding Library
013//
014// This library is free software; you can redistribute it and/or
015// modify it under the terms of the GNU Lesser General Public
016// License as published by the Free Software Foundation; either
017// version 2.1 of the License, or (at your option) any later version.
018//
019// This library is distributed in the hope that it will be useful,
020// but WITHOUT ANY WARRANTY; without even the implied warranty of
021// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
022// Lesser General Public License for more details.
023//
024// You should have received a copy of the GNU Lesser General Public
025// License along with this library; if not, write to the Free Software
026// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
027//
028// J. Alex Taylor, President, Founder, CEO, COO, CFO, OOPS hairyfatguy.com
029// jataylor@hairyfatguy.com
030//------------------------------------------------------------------------------
031
032public class PrioritizedThreadPoolExecutor extends ThreadPoolExecutor
033{
034   //--------------------------------------------------------------------------
035   /**
036    * Creates a new {@code PrioritizedThreadPoolExecutor} with the given initial
037    * parameters and
038    * {@linkplain Executors#defaultThreadFactory default thread factory}.
039    *
040    * @param corePoolSize the number of threads to keep in the pool, even
041    *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
042    * @param maximumPoolSize the maximum number of threads to allow in the
043    *        pool
044    * @param keepAliveTime when the number of threads is greater than
045    *        the core, this is the maximum time that excess idle threads
046    *        will wait for new tasks before terminating.
047    * @param unit the time unit for the {@code keepAliveTime} argument
048    * @param workQueue the queue to use for holding tasks before they are
049    *        executed.  This queue will hold only the {@code Runnable}
050    *        tasks submitted by the {@code execute} method.
051    * @throws IllegalArgumentException if one of the following holds:<br>
052    *         {@code corePoolSize < 0}<br>
053    *         {@code keepAliveTime < 0}<br>
054    *         {@code maximumPoolSize <= 0}<br>
055    *         {@code maximumPoolSize < corePoolSize}
056    * @throws NullPointerException if {@code workQueue}
057    *         or {@code handler} is null
058    */
059   public PrioritizedThreadPoolExecutor(int corePoolSize,
060                                        int maximumPoolSize,
061                                        long keepAliveTime,
062                                        TimeUnit unit,
063                                        PriorityBlockingQueue<Runnable> workQueue)
064   {
065      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
066   }
067
068   //--------------------------------------------------------------------------
069   /**
070    * Creates a new {@code PrioritizedThreadPoolExecutor} with the given initial
071    * parameters and
072    * {@linkplain Executors#defaultThreadFactory default thread factory}.
073    *
074    * @param corePoolSize the number of threads to keep in the pool, even
075    *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
076    * @param maximumPoolSize the maximum number of threads to allow in the
077    *        pool
078    * @param keepAliveTime when the number of threads is greater than
079    *        the core, this is the maximum time that excess idle threads
080    *        will wait for new tasks before terminating.
081    * @param unit the time unit for the {@code keepAliveTime} argument
082    * @param workQueue the queue to use for holding tasks before they are
083    *        executed.  This queue will hold only the {@code Runnable}
084    *        tasks submitted by the {@code execute} method.
085    * @param handler the handler to use when execution is blocked
086    *        because the thread bounds and queue capacities are reached
087    * @throws IllegalArgumentException if one of the following holds:<br>
088    *         {@code corePoolSize < 0}<br>
089    *         {@code keepAliveTime < 0}<br>
090    *         {@code maximumPoolSize <= 0}<br>
091    *         {@code maximumPoolSize < corePoolSize}
092    * @throws NullPointerException if {@code workQueue}
093    *         or {@code handler} is null
094    */
095   public PrioritizedThreadPoolExecutor(int corePoolSize,
096                                        int maximumPoolSize,
097                                        long keepAliveTime,
098                                        TimeUnit unit,
099                                        PriorityBlockingQueue<Runnable> workQueue,
100                                        RejectedExecutionHandler handler)
101   {
102      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
103   }
104
105
106   //--------------------------------------------------------------------------
107   @Override
108   public Future<?> submit(Runnable inTask)
109   {
110      return submit(inTask, Priority.MEDIUM);
111   }
112
113   //--------------------------------------------------------------------------
114   public Future<?> submit(Runnable inTask, Priority inPriority)
115   {
116      if (inTask == null) throw new NullPointerException();
117      RunnableFuture<Void> ftask = newTaskFor(inTask, null, inPriority);
118      execute(ftask);
119      return ftask;
120   }
121
122   //--------------------------------------------------------------------------
123   @Override
124   public <T> Future<T> submit(Runnable inTask, T inResult)
125   {
126      return submit(inTask, inResult, Priority.MEDIUM);
127   }
128
129   //--------------------------------------------------------------------------
130   public <T> Future<T> submit(Runnable task, T result, Priority inPriority)
131   {
132      if (task == null) throw new NullPointerException();
133      RunnableFuture<T> ftask = newTaskFor(task, result, inPriority);
134      execute(ftask);
135      return ftask;
136   }
137
138   //--------------------------------------------------------------------------
139   @Override
140   public <T> Future<T> submit(Callable<T> inTask)
141   {
142      if (inTask == null) throw new NullPointerException();
143      RunnableFuture<T> ftask = newTaskFor(inTask);
144      execute(ftask);
145      return ftask;
146   }
147
148   //--------------------------------------------------------------------------
149   public <T> Future<T> submit(Callable<T> inTask, Priority inPriority)
150   {
151      if (inTask == null) throw new NullPointerException();
152      RunnableFuture<T> ftask = newTaskFor(inTask, inPriority);
153      execute(ftask);
154      return ftask;
155   }
156
157   //--------------------------------------------------------------------------
158   /**
159    * Returns a {@code RunnableFuture} for the given runnable and default
160    * value.
161    *
162    * @param inRunnable the runnable task being wrapped
163    * @param inValue the default value for the returned future
164    * @param <T> the type of the given value
165    * @return a {@code RunnableFuture} which, when run, will run the
166    * underlying runnable and which, as a {@code Future}, will yield
167    * the given value as its result and provide for cancellation of
168    * the underlying task
169    * @since 1.6
170    */
171   @Override
172   protected <T> RunnableFuture<T> newTaskFor(Runnable inRunnable, T inValue)
173   {
174      return newTaskFor(inRunnable, inValue, Priority.MEDIUM);
175   }
176
177   //--------------------------------------------------------------------------
178   /**
179    * Returns a {@code RunnableFuture} for the given runnable and default
180    * value.
181    *
182    * @param inRunnable the runnable task being wrapped
183    * @param inValue the default value for the returned future
184    * @param <T> the type of the given value
185    * @param inPriority the job's priority
186    * @return a {@code RunnableFuture} which, when run, will run the
187    * underlying runnable and which, as a {@code Future}, will yield
188    * the given value as its result and provide for cancellation of
189    * the underlying task
190    * @since 1.6
191    */
192   protected <T> RunnableFuture<T> newTaskFor(Runnable inRunnable, T inValue, Priority inPriority)
193   {
194      return new PrioritizedFutureTask<T>(inRunnable, inValue).setPriority(inPriority);
195   }
196
197   //--------------------------------------------------------------------------
198   /**
199    * Returns a {@code RunnableFuture} for the given callable task.
200    *
201    * @param inCallable the callable task being wrapped
202    * @param <T> the type of the callable's result
203    * @return a {@code RunnableFuture} which, when run, will call the
204    * underlying callable and which, as a {@code Future}, will yield
205    * the callable's result as its result and provide for
206    * cancellation of the underlying task
207    * @since 1.6
208    */
209   @Override
210   protected <T> RunnableFuture<T> newTaskFor(Callable<T> inCallable)
211   {
212      return newTaskFor(inCallable, Priority.MEDIUM);
213   }
214
215   //--------------------------------------------------------------------------
216   /**
217    * Returns a {@code RunnableFuture} for the given callable task.
218    *
219    * @param inCallable the callable task being wrapped
220    * @param <T> the type of the callable's result
221    * @param inPriority the job's priority
222    * @return a {@code RunnableFuture} which, when run, will call the
223    * underlying callable and which, as a {@code Future}, will yield
224    * the callable's result as its result and provide for
225    * cancellation of the underlying task
226    * @since 1.6
227    */
228   protected <T> RunnableFuture<T> newTaskFor(Callable<T> inCallable, Priority inPriority)
229   {
230      return new PrioritizedFutureTask<T>(inCallable).setPriority(inPriority);
231   }
232}