定义一个线程类:
线程类需要实现Comparable接口,同时要定义需要比较的字段,
另外需要自己实现一个优先级的线程池如下:
package priority;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkArgument;
public class PriorityThreadPoolExecutorextends ThreadPoolExecutorimplements ListeningExecutorService {
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
? ? }
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
? ? }
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
? ? }
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
? ? }
@Override
? ? protected RunnableFuturenewTaskFor(Runnable runnable, T value) {
return new ComparableFutureTask(runnable, value);
? ? }
@Override
? ? protected RunnableFuturenewTaskFor(Callable callable) {
return new ComparableFutureTask(callable);
? ? }
protected class ComparableFutureTask
extends FutureTaskimplements Comparable>,ListenableFuture? {
private Objectobject;
? ? ? ? // The execution list to hold our listeners.
? ? ? ? private final ExecutionListexecutionList =new ExecutionList();
? ? ? ? public ComparableFutureTask(Callable callable) {
super(callable);
? ? ? ? ? ? object = callable;
? ? ? ? }
public ComparableFutureTask(Runnable runnable, V result) {
super(runnable, result);
? ? ? ? ? ? object = runnable;
? ? ? ? }
@Override
? ? ? ? @SuppressWarnings("unchecked")
public int compareTo(ComparableFutureTask o) {
if (this == o) {
return 0;
? ? ? ? ? ? }
if (o ==null) {
return -1; // high priority
? ? ? ? ? ? }
if (object !=null && o.object !=null) {
if (object.getClass().equals(o.object.getClass())) {
if (object instanceof Comparable) {
return ((Comparable)object).compareTo(o.object);
? ? ? ? ? ? ? ? ? ? }
}
}
return 0;
? ? ? ? }
@Override
? ? ? ? public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
? ? ? ? }
/**
* Internal implementation detail used to invoke the listeners.
*/
? ? ? ? @Override
? ? ? ? protected void done() {
executionList.execute();
? ? ? ? }
}
@Override public ListenableFuturesubmit(Runnable task) {
ComparableFutureTask ftask =new ComparableFutureTask(task, null);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }
@Override public ListenableFuturesubmit(Runnable task, T result) {
ComparableFutureTask ftask =new? ComparableFutureTask(task, result);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }
@Override public ListenableFuturesubmit(Callable task) {
ComparableFutureTask ftask =new ComparableFutureTask(task);
? ? ? ? execute(ftask);
? ? ? ? return ftask;
? ? }
@Override public T invokeAny(Collection> tasks)
throws InterruptedException, ExecutionException {
try {
return invokeAnyImpl(this, tasks, false, 0);
? ? ? ? }catch (TimeoutException cannotHappen) {
throw new AssertionError();
? ? ? ? }
}
@Override public T invokeAny(
Collection> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return invokeAnyImpl(this, tasks, true, unit.toNanos(timeout));
? ? }
@Override public List>invokeAll(Collection> tasks)
throws InterruptedException {
if (tasks ==null) {
throw new NullPointerException();
? ? ? ? }
List> futures =new ArrayList>(tasks.size());
? ? ? ? boolean done =false;
? ? ? ? try {
for (Callable t : tasks) {
ComparableFutureTask f =new ComparableFutureTask(t);
? ? ? ? ? ? ? ? futures.add(f);
? ? ? ? ? ? ? ? execute(f);
? ? ? ? ? ? }
for (Future f : futures) {
if (!f.isDone()) {
try {
f.get();
? ? ? ? ? ? ? ? ? ? }catch (CancellationException ignore) {
}catch (ExecutionException ignore) {
}
}
}
done =true;
? ? ? ? ? ? return futures;
? ? ? ? }finally {
if (!done) {
for (Future f : futures) {
f.cancel(true);
? ? ? ? ? ? ? ? }
}
}
}
@Override public List>invokeAll(
Collection> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks ==null || unit ==null) {
throw new NullPointerException();
? ? ? ? }
long nanos = unit.toNanos(timeout);
? ? ? ? List> futures =new ArrayList>(tasks.size());
? ? ? ? boolean done =false;
? ? ? ? try {
for (Callable t : tasks) {
futures.add(new ComparableFutureTask(t));
? ? ? ? ? ? }
long lastTime = System.nanoTime();
? ? ? ? ? ? // Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
? ? ? ? ? ? Iterator> it = futures.iterator();
? ? ? ? ? ? while (it.hasNext()) {
execute((Runnable) (it.next()));
? ? ? ? ? ? ? ? long now = System.nanoTime();
? ? ? ? ? ? ? ? nanos -= now - lastTime;
? ? ? ? ? ? ? ? lastTime = now;
? ? ? ? ? ? ? ? if (nanos <=0) {
return futures;
? ? ? ? ? ? ? ? }
}
for (Future f : futures) {
if (!f.isDone()) {
if (nanos <=0) {
return futures;
? ? ? ? ? ? ? ? ? ? }
try {
f.get(nanos, TimeUnit.NANOSECONDS);
? ? ? ? ? ? ? ? ? ? }catch (CancellationException ignore) {
}catch (ExecutionException ignore) {
}catch (TimeoutException toe) {
return futures;
? ? ? ? ? ? ? ? ? ? }
long now = System.nanoTime();
? ? ? ? ? ? ? ? ? ? nanos -= now - lastTime;
? ? ? ? ? ? ? ? ? ? lastTime = now;
? ? ? ? ? ? ? ? }
}
done =true;
? ? ? ? ? ? return futures;
? ? ? ? }finally {
if (!done) {
for (Future f : futures) {
f.cancel(true);
? ? ? ? ? ? ? ? }
}
}
}
/*
* This following method is a modified version of one found in
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
* which contained the following notice:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
* Other contributors include Andrew Wright, Jeffrey Hayes,
* Pat Fisher, Mike Judd.
*/
? ? /**
? ? * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
* implementations.
? ? */ static T invokeAnyImpl(ListeningExecutorService executorService,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Collection> tasks, boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
int ntasks = tasks.size();
? ? ? ? checkArgument(ntasks >0);
? ? ? ? List> futures = Lists.newArrayListWithCapacity(ntasks);
? ? ? ? BlockingQueue> futureQueue = Queues.newLinkedBlockingQueue();
? ? ? ? // For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
? ? ? ? try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
? ? ? ? ? ? ExecutionException ee =null;
? ? ? ? ? ? long lastTime = timed System.nanoTime() :0;
? ? ? ? ? ? Iterator> it = tasks.iterator();
? ? ? ? ? ? futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
? ? ? ? ? ? --ntasks;
? ? ? ? ? ? int active =1;
? ? ? ? ? ? for (;;) {
Future f = futureQueue.poll();
? ? ? ? ? ? ? ? if (f ==null) {
if (ntasks >0) {
--ntasks;
? ? ? ? ? ? ? ? ? ? ? ? futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
? ? ? ? ? ? ? ? ? ? ? ? ++active;
? ? ? ? ? ? ? ? ? ? }else if (active ==0) {
break;
? ? ? ? ? ? ? ? ? ? }else if (timed) {
f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
? ? ? ? ? ? ? ? ? ? ? ? if (f ==null) {
throw new TimeoutException();
? ? ? ? ? ? ? ? ? ? ? ? }
long now = System.nanoTime();
? ? ? ? ? ? ? ? ? ? ? ? nanos -= now - lastTime;
? ? ? ? ? ? ? ? ? ? ? ? lastTime = now;
? ? ? ? ? ? ? ? ? ? }else {
f = futureQueue.take();
? ? ? ? ? ? ? ? ? ? }
}
if (f !=null) {
--active;
? ? ? ? ? ? ? ? ? ? try {
return f.get();
? ? ? ? ? ? ? ? ? ? }catch (ExecutionException eex) {
ee = eex;
? ? ? ? ? ? ? ? ? ? }catch (RuntimeException rex) {
ee =new ExecutionException(rex);
? ? ? ? ? ? ? ? ? ? }
}
}
if (ee ==null) {
ee =new ExecutionException(null);
? ? ? ? ? ? }
throw ee;
? ? ? ? }finally {
for (Future f : futures) {
f.cancel(true);
? ? ? ? ? ? }
}
}
/**
? ? * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
*/
? ? private static ListenableFuturesubmitAndAddQueueListener(
ListeningExecutorService executorService, Callable task,
? ? ? ? ? ? final BlockingQueue> queue) {
final ListenableFuture future = executorService.submit(task);
? ? ? ? future.addListener(new Runnable() {
@Override public void run() {
queue.add(future);
? ? ? ? ? ? }
}, MoreExecutors.sameThreadExecutor());
? ? ? ? return future;
? ? }
}
测试方法如下:
public static void main(String[] args) {
PriorityBlockingQueue priorityQueue =new PriorityBlockingQueue(1000);
? ? ThreadPoolExecutor threadPoolExecutor =new PriorityThreadPoolExecutor(2, 2, 1000, TimeUnit.MILLISECONDS,priorityQueue);
? ? ListeningExecutorService processExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
? ? for (int i =0; i <15; i++) {
Random random =new Random();
? ? ? ? ProcessEntry threadA =new ProcessEntry(random.nextInt(30));
? ? ? ? ListenableFuture submit = processExecutorService.submit(threadA);
? ? }
System.out.println("完成");
? ? try {
Thread.sleep(5000);
? ? }catch (InterruptedException e) {
e.printStackTrace();
? ? }
System.out.println("插入小的数字");
? ? ProcessEntry threadb =new ProcessEntry(-1);
? ? processExecutorService.submit(threadb);
? ? System.out.println("插入小的数字2");
? ? ProcessEntry threadb1 =new ProcessEntry(-30);
? ? processExecutorService.submit(threadb1);
? ? System.out.println("插入 大的数字");
? ? ProcessEntry threadA =new ProcessEntry(200);
? ? ListenableFuture processFuture = processExecutorService.submit(threadA);
? ? Futures.addCallback(processFuture, new FutureCallback() {
@Override
? ? ? ? public void onSuccess(Object o) {
System.out.println("200 成功");
? ? ? ? }
@Override
? ? ? ? public void onFailure(Throwable throwable) {
throwable.printStackTrace();
? ? ? ? ? ? System.out.println("200 失败");
? ? ? ? }
});
}
结果:
完成
priority:29
priority:6
插入小的数字
插入小的数字2
插入 大的数字
priority:23
priority:28
priority:23
priority:200
200 成功
priority:22
priority:22
priority:17
priority:16
priority:16
priority:15
priority:7
priority:3
priority:3
priority:1
priority:-1
priority:-30
可以看到是按照从大到小的顺序执行的线程