当前位置: 首页>后端>正文

PriorityBlockingQueue实现任务优先级是乱序的

一、背景

  1. 优先级相同时,要按照插入顺序返回任务
  2. 优先级不同时,要先执行优先级高的任务
  3. 默认的优先级是0,少数情况下才会出现优先级大于0的情况

二、实现

PriorityQueue 和 PriorityBlockingQueue 是比较好的解决方案。
业务中常常需要考虑多线程问题,所以一般都会使用PriorityBlockingQueue去缓存任务队列。

简化代码后如下:

import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit

class Task(val num: Int, val priority: Int = 0) {
    override fun toString() = "Task:$num,$priority"
}

class TaskExecutor {
    private val queue = PriorityBlockingQueue<Task>(11, kotlin.Comparator { o1, o2 ->
        o2.priority - o1.priority
    })

    fun put(task: Task) = queue.put(task)

    @Throws(InterruptedException::class)
    fun poll(timeout: Long, unit: TimeUnit): Task= queue.poll(timeout, unit)

    fun isNotEmpty() = queue.isNotEmpty()
}

put 往队列添加一个任务
poll 从队头取出一个任务用于执行
isNotEmpty 判断队列是否有元素
Comparator 排序规则是按优先级从高到低排序

简化后代码是比较简单和容易理解的。但是在一次解决BUG排除了上层问题后排查到执行器内部时,发现用了PriorityBlockingQueue实现优先级队列是无法满足需求

BUG简述:概率性出现任务乱序,逻辑错误执行。
如原本处于开状态下,任务发送顺序是 开 -> 关 -> 关,预期执行是执行一次关操作,相同状态不执行。
但是概率性会变成 关 -> 开 -> 关执行。 导致多执行一次开和关操作。

问题:优先级相同时,无法保证稳定性
fun main() {
    val test = TaskExecutor()
    test.put(Task(1))
    test.put(Task(2))
    test.put(Task(3))
    test.put(Task(4))

    while (test.isNotEmpty()) {
        println(test.poll(0, TimeUnit.SECONDS))
    }
}

按照使用者的预期,加入的4个任务后取得结果应该是1,2,3,4才是正确的。
运行结果后如下:

Task:1,0
Task:4,0
Task:3,0
Task:2,0

但是意外的是输出结果是1,4,3,2。如果任务2,3,4存在依赖性,则最后执行结果就会出现逻辑错误。
PriorityBlockingQueue内部使用的是PriorityQueue,而PriorityQueue是基于最小堆实现的。所以问题就变成最小堆的维护能否保证稳定性。

关于PriorityQueue的具体实现网上很多而且很详细,该文章就不赘述。

三、分析

结果输出模拟

TaskExecutor类添加打印方法

fun foreachPrint() {
    print("foreach ")
    queue.forEach {
        print("$it ")
    }
    println()
}
foreach Task:1,0 Task:2,0 Task:3,0 Task:4,0 
Task:1,0
foreach Task:4,0 Task:2,0 Task:3,0 
Task:4,0
foreach Task:3,0 Task:2,0 
Task:3,0
foreach Task:2,0 
Task:2,0
最小堆的维护两个操作是无法保证稳定性的
  • 上移:put是将元素放到最后的位置,然后进行上移操作。如果新加的元素优先级比父节点高,则会进行上移操作,这样这个父元素变成最后一个执行的,破坏了稳定性。
  • 下移:poll 是取出队头元素后会将最后的元素放到队头,然后进行下移操作。如果在优先级相同的情况下,该元素就不会再下移。而该元素应该是最后一个执行的但是变成第一个执行,也破坏了稳定性。

就如堆排序是不稳定的排序。所以任务优先级乱序问题就是一个排序比较问题

四、方法

常用排序算法如下
PriorityBlockingQueue实现任务优先级是乱序的,第1张

思路一:类比稳定的排序

  1. 类比插入排序,冒泡排序:使用LinkedListput时从后往前遍历找到插入的位置,或者一直往前冒泡,poll时取队头。

put时间复杂度是O(n), poll时间复杂度是O(1)

  1. 类比基数排序:使用HashMapkey为优先级,valueLinkedList; 再用LinkedList维护任务的优先级从高到低的列表。 put时,通过HashMap获取到队列直接尾插法,并更新列表。poll时获取优先级最高的值,并通过HashMap获取到队列取队头。

put时间复杂度是O(k), poll时间复杂度是O(1) (k为优先级的队列长度)。

思路二:改良比较规则

Comparator不出现比较相同的情况,任意一对都能比出高低。所以除了priority外需要添加一个Stamp机制

  • 时间戳
    时间戳的值是系统控制的,并且相同时间点获取到到Stamp可能是相同的,不能百分百保证Stamp是唯一的。
  • 自增id
    使用i++ 或者AtomicInteger实现一个环形Stamp

该方式适用于所有支持Comparator接口的数据结构,如实现SortedSet接口的 TreeSetCollections.synchronizedSortedSet()ConcurrentSkipListSet,以及PriorityQueuePriorityBlockingQueue

基于PriorityBlockingQueue添加Stamp,put时间复杂度是O(logn),poll时间复杂度是O(logn)

以下代码是基于AtomicInteger实现的TaskIdCreator

class TaskIdCreator {
    private var creator = AtomicInteger(Int.MAX_VALUE - 2)

    @Volatile var top = 0
    
    fun createId(): Int {
        top = creator.getAndIncrement()
        return top
    }

    fun compare(taskId1: Int, taskId2: Int): Int {
        return when {
            taskId1 == taskId2 -> 0
            top in taskId1 until taskId2 -> 1
            top in taskId2 until taskId1 -> -1
            else -> taskId1 - taskId2
        }
    }
}

TaskIdCreator负责生成环形Stamp,以及提供比较规则能够处理int越界的情况。

@Volatile 修饰top是避免:当put线程使top从Int.MAX_VALUE ->Int.MIN_VALUE后,poll线程比较时top的值未同步还是为Int.MAX_VALUE,可能导致后加入的任务先执行,存在乱序执行的问题。关于volatile在多线程中的应用,这不再赘述。

起到关键作用的是修改了排序规则

kotlin.Comparator { o1, o2 ->
    if (o2.priority != o1.priority) {
        return@Comparator o2.priority - o1.priority
    }
    return@Comparator Task.ID_CREATOR.compare(o1.taskId, o2.taskId)
}

排序规则:

  1. 按优先级从高到低排序
  2. 相同优先级的,按taskId从旧到新排序

完整的文件如下:

import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class TaskIdCreator {

    private var creator = AtomicInteger(Int.MAX_VALUE - 2)

    @Volatile
    var top = 0

    fun createId(): Int {
        top = creator.getAndIncrement()
        return top
    }

    fun compare(taskId1: Int, taskId2: Int): Int {
        if (taskId1 == taskId2)
            return 0
        if (taskId1 > top || taskId2 > top) {
            if (taskId1 <= top) {
                return 1
            } else if (taskId2 <= top) {
                return -1
            }
        }
        return taskId1 - taskId2
    }

}

class Task(val num: Int, val priority: Int = 0) {

    var taskId = ID_CREATOR.createId()
        private set

    override fun toString() = "Task:$num,$priority,$taskId"

    companion object {
        val ID_CREATOR = TaskIdCreator()
    }
}

class TaskExecutor {
    private val queue = PriorityBlockingQueue<Task>(11, kotlin.Comparator { o1, o2 ->
        if (o2.priority != o1.priority) {
            return@Comparator o2.priority - o1.priority
        }
        return@Comparator Task.ID_CREATOR.compare(o1.taskId, o2.taskId)
    })

    fun put(task: Task) = queue.put(task)

    @Throws(InterruptedException::class)
    fun poll(timeout: Long, unit: TimeUnit): Task= queue.poll(timeout, unit)

    fun isNotEmpty() = queue.isNotEmpty()

    fun foreachPrint() {
        print("foreach ")
        queue.forEach {
            print("$it ")
        }
        println()
    }
}

fun main() {
    val test = TaskExecutor()
    test.put(Task(1))
    test.put(Task(2))
    test.put(Task(3))
    test.put(Task(4))

    test.put(Task(5, priority = 2))
    test.put(Task(6, priority = 2))

    while (test.isNotEmpty()) {
        test.foreachPrint()
        println(test.poll(0, TimeUnit.SECONDS))
    }
}
结果输出模拟
foreach Task:5,2,-2147483647 Task:1,0,2147483645 Task:6,2,-2147483646 Task:4,0,-2147483648 Task:2,0,2147483646 Task:3,0,2147483647 
Task:5,2,-2147483647
foreach Task:6,2,-2147483646 Task:1,0,2147483645 Task:3,0,2147483647 Task:4,0,-2147483648 Task:2,0,2147483646 
Task:6,2,-2147483646
foreach Task:1,0,2147483645 Task:2,0,2147483646 Task:3,0,2147483647 Task:4,0,-2147483648 
Task:1,0,2147483645
foreach Task:2,0,2147483646 Task:4,0,-2147483648 Task:3,0,2147483647 
Task:2,0,2147483646
foreach Task:3,0,2147483647 Task:4,0,-2147483648 
Task:3,0,2147483647
foreach Task:4,0,-2147483648 
Task:4,0,-2147483648

输出结果是符合预期的,先输出优先级高的5,6;再输出1,2,3,4。

五、总结

使用了不稳定的(比较方式)数据结构,通过一个变量priority是会出现乱序问题的。不稳定意味着compare为0时,无法保证相对顺序不变。解决的方法也比较多,本篇文章只实现了基于PriorityBlockingQueue的解決方案, 未作多方案的性能对比和实现,后续补上并发条件下的比较。

非并发条件下的任务队列,直接比较算法复杂度即可,主要是预估场景中的n, k中可能的大小,然后对比O(n),O(k),O(logn)


https://www.xamrdz.com/backend/3x41924297.html

相关文章: