一、背景
- 优先级相同时,要按照插入顺序返回任务
- 优先级不同时,要先执行优先级高的任务
- 默认的优先级是0,少数情况下才会出现优先级大于0的情况
二、实现
PriorityQueue 和 PriorityBlockingQueue 是比较好的解决方案。
业务中常常需要考虑多线程问题,所以一般都会使用PriorityBlockingQueue去缓存任务队列。
简化代码后如下:
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit
class Task(val num: Int, val priority: Int = 0) {
override fun toString() = "Task:$num,$priority"
}
class TaskExecutor {
private val queue = PriorityBlockingQueue<Task>(11, kotlin.Comparator { o1, o2 ->
o2.priority - o1.priority
})
fun put(task: Task) = queue.put(task)
@Throws(InterruptedException::class)
fun poll(timeout: Long, unit: TimeUnit): Task= queue.poll(timeout, unit)
fun isNotEmpty() = queue.isNotEmpty()
}
put
往队列添加一个任务
poll
从队头取出一个任务用于执行
isNotEmpty
判断队列是否有元素
Comparator
排序规则是按优先级从高到低排序
简化后代码是比较简单和容易理解的。但是在一次解决BUG排除了上层问题后排查到执行器内部时,发现用了PriorityBlockingQueue实现优先级队列是无法满足需求。
BUG简述:概率性出现任务乱序,逻辑错误执行。
如原本处于开状态下,任务发送顺序是 开 -> 关 -> 关,预期执行是执行一次关操作,相同状态不执行。
但是概率性会变成 关 -> 开 -> 关执行。 导致多执行一次开和关操作。
问题:优先级相同时,无法保证稳定性
fun main() {
val test = TaskExecutor()
test.put(Task(1))
test.put(Task(2))
test.put(Task(3))
test.put(Task(4))
while (test.isNotEmpty()) {
println(test.poll(0, TimeUnit.SECONDS))
}
}
按照类使用者的预期,加入的4个任务后取得结果应该是1,2,3,4才是正确的。
运行结果后如下:
Task:1,0
Task:4,0
Task:3,0
Task:2,0
但是意外的是输出结果是1,4,3,2。如果任务2,3,4存在依赖性,则最后执行结果就会出现逻辑错误。
PriorityBlockingQueue
内部使用的是PriorityQueue
,而PriorityQueue
是基于最小堆实现的。所以问题就变成最小堆的维护能否保证稳定性。
关于PriorityQueue的具体实现网上很多而且很详细,该文章就不赘述。
三、分析
结果输出模拟
在TaskExecutor类添加打印方法
fun foreachPrint() {
print("foreach ")
queue.forEach {
print("$it ")
}
println()
}
foreach Task:1,0 Task:2,0 Task:3,0 Task:4,0
Task:1,0
foreach Task:4,0 Task:2,0 Task:3,0
Task:4,0
foreach Task:3,0 Task:2,0
Task:3,0
foreach Task:2,0
Task:2,0
最小堆的维护两个操作是无法保证稳定性的
- 上移:
put
是将元素放到最后的位置,然后进行上移操作。如果新加的元素优先级比父节点高,则会进行上移操作,这样这个父元素变成最后一个执行的,破坏了稳定性。 - 下移:
poll
是取出队头元素后会将最后的元素放到队头,然后进行下移操作。如果在优先级相同的情况下,该元素就不会再下移。而该元素应该是最后一个执行的但是变成第一个执行,也破坏了稳定性。
就如堆排序是不稳定的排序。所以任务优先级乱序问题就是一个排序比较问题。
四、方法
常用排序算法如下
思路一:类比稳定的排序
- 类比插入排序,冒泡排序:使用
LinkedList
。put
时从后往前遍历找到插入的位置,或者一直往前冒泡,poll
时取队头。
put
时间复杂度是O(n)
,poll
时间复杂度是O(1)
- 类比基数排序:使用
HashMap
,key
为优先级,value
为LinkedList
; 再用LinkedList
维护任务的优先级从高到低的列表。put
时,通过HashMap
获取到队列直接尾插法,并更新列表。poll
时获取优先级最高的值,并通过HashMap
获取到队列取队头。
put
时间复杂度是O(k)
,poll
时间复杂度是O(1)
(k为优先级的队列长度)。
思路二:改良比较规则
让Comparator
不出现比较相同的情况,任意一对都能比出高低。所以除了priority
外需要添加一个Stamp机制。
- 时间戳
时间戳的值是系统控制的,并且相同时间点获取到到Stamp可能是相同的,不能百分百保证Stamp是唯一的。 - 自增id
使用i++ 或者AtomicInteger
实现一个环形Stamp
该方式适用于所有支持Comparator
接口的数据结构,如实现SortedSet
接口的 TreeSet
、Collections.synchronizedSortedSet()
、 ConcurrentSkipListSet
,以及PriorityQueue
和PriorityBlockingQueue
等
基于PriorityBlockingQueue添加Stamp,
put
时间复杂度是O(logn)
,poll
时间复杂度是O(logn)
以下代码是基于AtomicInteger
实现的TaskIdCreator类
class TaskIdCreator {
private var creator = AtomicInteger(Int.MAX_VALUE - 2)
@Volatile var top = 0
fun createId(): Int {
top = creator.getAndIncrement()
return top
}
fun compare(taskId1: Int, taskId2: Int): Int {
return when {
taskId1 == taskId2 -> 0
top in taskId1 until taskId2 -> 1
top in taskId2 until taskId1 -> -1
else -> taskId1 - taskId2
}
}
}
TaskIdCreator负责生成环形Stamp,以及提供比较规则能够处理int越界的情况。
@Volatile 修饰top是避免:当put线程使top从
Int.MAX_VALUE
->Int.MIN_VALUE
后,poll线程比较时top的值未同步还是为Int.MAX_VALUE
,可能导致后加入的任务先执行,存在乱序执行的问题。关于volatile
在多线程中的应用,这不再赘述。
起到关键作用的是修改了排序规则
kotlin.Comparator { o1, o2 ->
if (o2.priority != o1.priority) {
return@Comparator o2.priority - o1.priority
}
return@Comparator Task.ID_CREATOR.compare(o1.taskId, o2.taskId)
}
排序规则:
- 按优先级从高到低排序
- 相同优先级的,按
taskId
从旧到新排序
完整的文件如下:
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
class TaskIdCreator {
private var creator = AtomicInteger(Int.MAX_VALUE - 2)
@Volatile
var top = 0
fun createId(): Int {
top = creator.getAndIncrement()
return top
}
fun compare(taskId1: Int, taskId2: Int): Int {
if (taskId1 == taskId2)
return 0
if (taskId1 > top || taskId2 > top) {
if (taskId1 <= top) {
return 1
} else if (taskId2 <= top) {
return -1
}
}
return taskId1 - taskId2
}
}
class Task(val num: Int, val priority: Int = 0) {
var taskId = ID_CREATOR.createId()
private set
override fun toString() = "Task:$num,$priority,$taskId"
companion object {
val ID_CREATOR = TaskIdCreator()
}
}
class TaskExecutor {
private val queue = PriorityBlockingQueue<Task>(11, kotlin.Comparator { o1, o2 ->
if (o2.priority != o1.priority) {
return@Comparator o2.priority - o1.priority
}
return@Comparator Task.ID_CREATOR.compare(o1.taskId, o2.taskId)
})
fun put(task: Task) = queue.put(task)
@Throws(InterruptedException::class)
fun poll(timeout: Long, unit: TimeUnit): Task= queue.poll(timeout, unit)
fun isNotEmpty() = queue.isNotEmpty()
fun foreachPrint() {
print("foreach ")
queue.forEach {
print("$it ")
}
println()
}
}
fun main() {
val test = TaskExecutor()
test.put(Task(1))
test.put(Task(2))
test.put(Task(3))
test.put(Task(4))
test.put(Task(5, priority = 2))
test.put(Task(6, priority = 2))
while (test.isNotEmpty()) {
test.foreachPrint()
println(test.poll(0, TimeUnit.SECONDS))
}
}
结果输出模拟
foreach Task:5,2,-2147483647 Task:1,0,2147483645 Task:6,2,-2147483646 Task:4,0,-2147483648 Task:2,0,2147483646 Task:3,0,2147483647
Task:5,2,-2147483647
foreach Task:6,2,-2147483646 Task:1,0,2147483645 Task:3,0,2147483647 Task:4,0,-2147483648 Task:2,0,2147483646
Task:6,2,-2147483646
foreach Task:1,0,2147483645 Task:2,0,2147483646 Task:3,0,2147483647 Task:4,0,-2147483648
Task:1,0,2147483645
foreach Task:2,0,2147483646 Task:4,0,-2147483648 Task:3,0,2147483647
Task:2,0,2147483646
foreach Task:3,0,2147483647 Task:4,0,-2147483648
Task:3,0,2147483647
foreach Task:4,0,-2147483648
Task:4,0,-2147483648
输出结果是符合预期的,先输出优先级高的5,6;再输出1,2,3,4。
五、总结
使用了不稳定的(比较方式)数据结构,通过一个变量priority
是会出现乱序问题的。不稳定意味着compare为0时,无法保证相对顺序不变。解决的方法也比较多,本篇文章只实现了基于PriorityBlockingQueue
的解決方案, 未作多方案的性能对比和实现,后续补上并发条件下的比较。
非并发条件下的任务队列,直接比较算法复杂度即可,主要是预估场景中的n, k中可能的大小,然后对比
O(n)
,O(k)
,O(logn)
。