当前位置: 首页>后端>正文

实现guava线程池的优先级队列

定义一个线程类:

实现guava线程池的优先级队列,第1张

线程类需要实现Comparable接口,同时要定义需要比较的字段,

另外需要自己实现一个优先级的线程池如下:

package priority;

import com.google.common.collect.Lists;

import com.google.common.collect.Queues;

import com.google.common.util.concurrent.*;

import java.util.ArrayList;

import java.util.Collection;

import java.util.Iterator;

import java.util.List;

import java.util.concurrent.*;

import static com.google.common.base.Preconditions.checkArgument;

public class PriorityThreadPoolExecutorextends ThreadPoolExecutorimplements ListeningExecutorService {

public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

? ? }

public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? RejectedExecutionHandler handler) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);

? ? }

public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

? ? }

public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadFactory threadFactory, RejectedExecutionHandler handler) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

? ? }

@Override

? ? protected RunnableFuturenewTaskFor(Runnable runnable, T value) {

return new ComparableFutureTask(runnable, value);

? ? }

@Override

? ? protected RunnableFuturenewTaskFor(Callable callable) {

return new ComparableFutureTask(callable);

? ? }

protected class ComparableFutureTask

extends FutureTaskimplements Comparable>,ListenableFuture? {

private Objectobject;

? ? ? ? // The execution list to hold our listeners.

? ? ? ? private final ExecutionListexecutionList =new ExecutionList();

? ? ? ? public ComparableFutureTask(Callable callable) {

super(callable);

? ? ? ? ? ? object = callable;

? ? ? ? }

public ComparableFutureTask(Runnable runnable, V result) {

super(runnable, result);

? ? ? ? ? ? object = runnable;

? ? ? ? }

@Override

? ? ? ? @SuppressWarnings("unchecked")

public int compareTo(ComparableFutureTask o) {

if (this == o) {

return 0;

? ? ? ? ? ? }

if (o ==null) {

return -1; // high priority

? ? ? ? ? ? }

if (object !=null && o.object !=null) {

if (object.getClass().equals(o.object.getClass())) {

if (object instanceof Comparable) {

return ((Comparable)object).compareTo(o.object);

? ? ? ? ? ? ? ? ? ? }

}

}

return 0;

? ? ? ? }

@Override

? ? ? ? public void addListener(Runnable listener, Executor exec) {

executionList.add(listener, exec);

? ? ? ? }

/**

* Internal implementation detail used to invoke the listeners.

*/

? ? ? ? @Override

? ? ? ? protected void done() {

executionList.execute();

? ? ? ? }

}

@Override public ListenableFuturesubmit(Runnable task) {

ComparableFutureTask ftask =new ComparableFutureTask(task, null);

? ? ? ? execute(ftask);

? ? ? ? return ftask;

? ? }

@Override public ListenableFuturesubmit(Runnable task, T result) {

ComparableFutureTask ftask =new? ComparableFutureTask(task, result);

? ? ? ? execute(ftask);

? ? ? ? return ftask;

? ? }

@Override public ListenableFuturesubmit(Callable task) {

ComparableFutureTask ftask =new ComparableFutureTask(task);

? ? ? ? execute(ftask);

? ? ? ? return ftask;

? ? }

@Override public T invokeAny(Collection> tasks)

throws InterruptedException, ExecutionException {

try {

return invokeAnyImpl(this, tasks, false, 0);

? ? ? ? }catch (TimeoutException cannotHappen) {

throw new AssertionError();

? ? ? ? }

}

@Override public T invokeAny(

Collection> tasks, long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

return invokeAnyImpl(this, tasks, true, unit.toNanos(timeout));

? ? }

@Override public List>invokeAll(Collection> tasks)

throws InterruptedException {

if (tasks ==null) {

throw new NullPointerException();

? ? ? ? }

List> futures =new ArrayList>(tasks.size());

? ? ? ? boolean done =false;

? ? ? ? try {

for (Callable t : tasks) {

ComparableFutureTask f =new ComparableFutureTask(t);

? ? ? ? ? ? ? ? futures.add(f);

? ? ? ? ? ? ? ? execute(f);

? ? ? ? ? ? }

for (Future f : futures) {

if (!f.isDone()) {

try {

f.get();

? ? ? ? ? ? ? ? ? ? }catch (CancellationException ignore) {

}catch (ExecutionException ignore) {

}

}

}

done =true;

? ? ? ? ? ? return futures;

? ? ? ? }finally {

if (!done) {

for (Future f : futures) {

f.cancel(true);

? ? ? ? ? ? ? ? }

}

}

}

@Override public List>invokeAll(

Collection> tasks, long timeout, TimeUnit unit)

throws InterruptedException {

if (tasks ==null || unit ==null) {

throw new NullPointerException();

? ? ? ? }

long nanos = unit.toNanos(timeout);

? ? ? ? List> futures =new ArrayList>(tasks.size());

? ? ? ? boolean done =false;

? ? ? ? try {

for (Callable t : tasks) {

futures.add(new ComparableFutureTask(t));

? ? ? ? ? ? }

long lastTime = System.nanoTime();

? ? ? ? ? ? // Interleave time checks and calls to execute in case

// executor doesn't have any/much parallelism.

? ? ? ? ? ? Iterator> it = futures.iterator();

? ? ? ? ? ? while (it.hasNext()) {

execute((Runnable) (it.next()));

? ? ? ? ? ? ? ? long now = System.nanoTime();

? ? ? ? ? ? ? ? nanos -= now - lastTime;

? ? ? ? ? ? ? ? lastTime = now;

? ? ? ? ? ? ? ? if (nanos <=0) {

return futures;

? ? ? ? ? ? ? ? }

}

for (Future f : futures) {

if (!f.isDone()) {

if (nanos <=0) {

return futures;

? ? ? ? ? ? ? ? ? ? }

try {

f.get(nanos, TimeUnit.NANOSECONDS);

? ? ? ? ? ? ? ? ? ? }catch (CancellationException ignore) {

}catch (ExecutionException ignore) {

}catch (TimeoutException toe) {

return futures;

? ? ? ? ? ? ? ? ? ? }

long now = System.nanoTime();

? ? ? ? ? ? ? ? ? ? nanos -= now - lastTime;

? ? ? ? ? ? ? ? ? ? lastTime = now;

? ? ? ? ? ? ? ? }

}

done =true;

? ? ? ? ? ? return futures;

? ? ? ? }finally {

if (!done) {

for (Future f : futures) {

f.cancel(true);

? ? ? ? ? ? ? ? }

}

}

}

/*

* This following method is a modified version of one found in

* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30

* which contained the following notice:

*

* Written by Doug Lea with assistance from members of JCP JSR-166

* Expert Group and released to the public domain, as explained at

* http://creativecommons.org/publicdomain/zero/1.0/

* Other contributors include Andrew Wright, Jeffrey Hayes,

* Pat Fisher, Mike Judd.

*/

? ? /**

? ? * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}

* implementations.

? ? */ static T invokeAnyImpl(ListeningExecutorService executorService,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Collection> tasks, boolean timed, long nanos)

throws InterruptedException, ExecutionException, TimeoutException {

int ntasks = tasks.size();

? ? ? ? checkArgument(ntasks >0);

? ? ? ? List> futures = Lists.newArrayListWithCapacity(ntasks);

? ? ? ? BlockingQueue> futureQueue = Queues.newLinkedBlockingQueue();

? ? ? ? // For efficiency, especially in executors with limited

// parallelism, check to see if previously submitted tasks are

// done before submitting more of them. This interleaving

// plus the exception mechanics account for messiness of main

// loop.

? ? ? ? try {

// Record exceptions so that if we fail to obtain any

// result, we can throw the last exception we got.

? ? ? ? ? ? ExecutionException ee =null;

? ? ? ? ? ? long lastTime = timed System.nanoTime() :0;

? ? ? ? ? ? Iterator> it = tasks.iterator();

? ? ? ? ? ? futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));

? ? ? ? ? ? --ntasks;

? ? ? ? ? ? int active =1;

? ? ? ? ? ? for (;;) {

Future f = futureQueue.poll();

? ? ? ? ? ? ? ? if (f ==null) {

if (ntasks >0) {

--ntasks;

? ? ? ? ? ? ? ? ? ? ? ? futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));

? ? ? ? ? ? ? ? ? ? ? ? ++active;

? ? ? ? ? ? ? ? ? ? }else if (active ==0) {

break;

? ? ? ? ? ? ? ? ? ? }else if (timed) {

f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);

? ? ? ? ? ? ? ? ? ? ? ? if (f ==null) {

throw new TimeoutException();

? ? ? ? ? ? ? ? ? ? ? ? }

long now = System.nanoTime();

? ? ? ? ? ? ? ? ? ? ? ? nanos -= now - lastTime;

? ? ? ? ? ? ? ? ? ? ? ? lastTime = now;

? ? ? ? ? ? ? ? ? ? }else {

f = futureQueue.take();

? ? ? ? ? ? ? ? ? ? }

}

if (f !=null) {

--active;

? ? ? ? ? ? ? ? ? ? try {

return f.get();

? ? ? ? ? ? ? ? ? ? }catch (ExecutionException eex) {

ee = eex;

? ? ? ? ? ? ? ? ? ? }catch (RuntimeException rex) {

ee =new ExecutionException(rex);

? ? ? ? ? ? ? ? ? ? }

}

}

if (ee ==null) {

ee =new ExecutionException(null);

? ? ? ? ? ? }

throw ee;

? ? ? ? }finally {

for (Future f : futures) {

f.cancel(true);

? ? ? ? ? ? }

}

}

/**

? ? * Submits the task and adds a listener that adds the future to {@code queue} when it completes.

*/

? ? private static ListenableFuturesubmitAndAddQueueListener(

ListeningExecutorService executorService, Callable task,

? ? ? ? ? ? final BlockingQueue> queue) {

final ListenableFuture future = executorService.submit(task);

? ? ? ? future.addListener(new Runnable() {

@Override public void run() {

queue.add(future);

? ? ? ? ? ? }

}, MoreExecutors.sameThreadExecutor());

? ? ? ? return future;

? ? }

}

测试方法如下:

public static void main(String[] args) {

PriorityBlockingQueue priorityQueue =new PriorityBlockingQueue(1000);

? ? ThreadPoolExecutor threadPoolExecutor =new PriorityThreadPoolExecutor(2, 2, 1000, TimeUnit.MILLISECONDS,priorityQueue);

? ? ListeningExecutorService processExecutorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

? ? for (int i =0; i <15; i++) {

Random random =new Random();

? ? ? ? ProcessEntry threadA =new ProcessEntry(random.nextInt(30));

? ? ? ? ListenableFuture submit = processExecutorService.submit(threadA);

? ? }

System.out.println("完成");

? ? try {

Thread.sleep(5000);

? ? }catch (InterruptedException e) {

e.printStackTrace();

? ? }

System.out.println("插入小的数字");

? ? ProcessEntry threadb =new ProcessEntry(-1);

? ? processExecutorService.submit(threadb);

? ? System.out.println("插入小的数字2");

? ? ProcessEntry threadb1 =new ProcessEntry(-30);

? ? processExecutorService.submit(threadb1);

? ? System.out.println("插入 大的数字");

? ? ProcessEntry threadA =new ProcessEntry(200);

? ? ListenableFuture processFuture = processExecutorService.submit(threadA);

? ? Futures.addCallback(processFuture, new FutureCallback() {

@Override

? ? ? ? public void onSuccess(Object o) {

System.out.println("200 成功");

? ? ? ? }

@Override

? ? ? ? public void onFailure(Throwable throwable) {

throwable.printStackTrace();

? ? ? ? ? ? System.out.println("200 失败");

? ? ? ? }

});

}

结果:

完成

priority:29

priority:6

插入小的数字

插入小的数字2

插入 大的数字

priority:23

priority:28

priority:23

priority:200

200 成功

priority:22

priority:22

priority:17

priority:16

priority:16

priority:15

priority:7

priority:3

priority:3

priority:1

priority:-1

priority:-30

可以看到是按照从大到小的顺序执行的线程


https://www.xamrdz.com/backend/3q91937933.html

相关文章: