JUC 包中除了 CountDownLatch, CyclicBarrier, Semaphore, 还有一个重要的工具,只不过相对而言使用的不多,什么呢Exchange —— 交换器。用于在两个线程之间交换数据,A 线程将 a 数据交给 B 线程,B 线程将 b 数据交给 a 线程。Exchanger——交换器,是JDK1.5时引入的一个同步器,从字面上就可以看出,这个类的主要作用是交换数据。
Exchanger有点类似于CyclicBarrier,我们知道CyclicBarrier是一个栅栏,到达栅栏的线程需要等待其它一定数量的线程到达后,才能通过栅栏。
Exchanger可以看成是一个双向栅栏,如下图:
而今天,我们将从源码处分析,Exchange 的实现原理。如果大家看过之前关于 SynchronousQueue ,就能够看的出来,Exchange 的原理和他很类似。
package com.conrrentcy.juc;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerSample {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
String data1 = "thread1 data";
System.out.println("线程" + Thread.currentThread().getName()
+ "正在把数据" + data1 + "换出去");
String data2 = (String) exchanger.exchange(data1);
System.out.println("线程" + Thread.currentThread().getName()
+ "换回的数据为" + data2);
} catch (Exception e) {
}
}
});
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
String data1 = "thread2 data";
System.out.println("线程" + Thread.currentThread().getName()
+ "正在把数据" + data1 + "换出去");
String data2 = (String) exchanger.exchange(data1);
System.out.println("线程" + Thread.currentThread().getName()
+ "换回的数据为" + data2);
} catch (Exception e) {
}
}
});
}
}
源码
类 UML:
内部有 2 个内部类: Node , Participant 重写了 ThreadLocal 的 initialValue 方法。
构造方法如下:
public Exchanger() {
participant = new Participant();
}
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
就是创建了一个 ThreadLocal 对象,并设置了初始值,一个 Node 对象。
看看这个 node 对象:
@sun.misc.Contended
static final class Node {
int index; // node 在 arena 数组下标
int bound; // 交换器的最后记录值
int collides; // 记录的 CAS 失败数
int hash; // 伪随机的自旋数
Object item; // 这个线程的数据项
volatile Object match; // 别的线程提供的元素,也就是释放他的线程提供的数据 item
volatile Thread parked; // 当阻塞时,设置此线程,不阻塞的话就不必了(因为会自旋)
}
这个 node 对象就是 A ,B 线程实际存储数据的容器。A 线程存在 item 属性上,B 线程存储在 match 线程上,称为匹配。同时,有个线程对象,你应该猜到做什么用处的吧,对,挂起线程的。
和 SynchronousQueue 的区别在于, SynchronousQueue 使用了一个变量来保存数据项,通过 isData 来区别 “存” 操作和 “取” 操作。而 Exchange 使用了 2 个变量,就不用使用 isData 来区分了。
我们再来看看 Exchange 的唯一重要方法 : exchange 方法
- exchange 方法源码分析
代码如下:
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) NULL_ITEM : x; // translate null args
// arena 不是 Null ,返回的却是 null, 说明线程中断了.
// 如果 arena 是 null, 就执行后面的方法.反之,如果不是 null, 执行没有意义.
// 注意,当 slotExchange 有机会被执行,且返回的不是 null, 这个表达式整个就是 false, 下面的表达式就不会执行了.
// 也就是说,当 slot 有效的时候, arena 是没有必要执行的.
if ((arena != null || (v = slotExchange(item, false, 0L)) == null) &&
// 线程中断了,或者返回的是 null. 说明线程中断了
// 如果线程没有中断 ,就执行后面的方法.
((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))){
throw new InterruptedException();
}
return (v == NULL_ITEM) null : (V)v;
说一下方法的逻辑:
如果执行 slotExchange 有结果,就不再执行 arenaExchange.
如果 slot 被占用了,就执行 arenaExchange.
返回值是什么呢?返回值就是对方线程的数据项,如果 A 线程先调用,那么 A 线程将数据项存在 item 中,B 线程后调用,则 B 线程将数据存在 match 属性中。
A 返回的是 match 属性,b 返回的是 item 属性。
从该方法中,可以看到,有 2 个重要的方法: slotExchange, arenaExchange。先简单说说这两个方法。
- 当没有多线程并发操作 Exchange 的时候,使用 slotExchange 就足够了。 slot 是一个 node 对象。
- 当出现并发了,一个 slot 就不够了,就需要使用一个 node 数组 arena 操作了。
so,我们先看看 slotExchange 方法吧,两个方法的逻辑类似
slotExchange 方法源码分析
代码加注释如下:
private final Object slotExchange(Object item, boolean timed, long ns) {
Node p = participant.get(); // 从 ThreadLocal 中取出 node 对象
Thread t = Thread.currentThread();// 当前线程
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
for (Node q;;) {// 死循环
// 另一个下线程进入这里, 假设 slot 有值
if ((q = slot) != null) {
// 将 slot 修改为 null
if (U.compareAndSwapObject(this, SLOT, q, null)) {
// 拿到 q 的 item
Object v = q.item;
// 自己的 item 赋值给 match,以让对方线程获取
q.match = item;
// q 线程
Thread w = q.parked;
// slot 的 parked 就是阻塞等待的线程对象.
if (w != null)
U.unpark(w);
// 返回了上一个线程放入的 item
return v;
}
// 如果使用 CAS 修改slot 失败了,说明 slot 被使用了,那就需要创建 arena 数组了
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ)) // SEQ == 256; 默认 BOUND == 0
arena = new Node[(FULL + 2) << ASHIFT];// length = (2 + 2) << 7 == 512
}
// 如果 slot 是 null, 但 arena 有值了,说明有线程竞争 slot 了,返回 null, 执行 arenaExchange 逻辑
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {// 第一次循环,给 p node 的 item 赋值
p.item = item;
// 将 slot 赋值赋值为 p
if (U.compareAndSwapObject(this, SLOT, null, p))
// 赋值成功跳出循环
break;
// 如果 CAS 失败,将 p 的值清空,重来
p.item = null;
}
}
// 当走到这里的时候,说明 slot 是 null, 且 arena 也是 null(没有多线程竞争使用 slot),并且成功将 item 放入了 slot 中.
// 这个时候要做的就是阻塞自己,等待对方取出 slot 的数据项,然后重置 slot 的数据和池化对象的数据
// 伪随机数
int h = p.hash;
// 超时时间
long end = timed System.nanoTime() + ns : 0L;
// 自旋,默认 1024
int spins = (NCPU > 1) SPINS : 1;
Object v;
// 如果这个值不是 null, 说明数据被其他线程拿走了, 并且其他线程将数据赋值给 match 属性,完成了一次交换
while ((v = p.match) == null) {
// 自旋
if (spins > 0) {
// 计算伪随机数
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
// 如果算出来的是0,就使用线程 ID
if (h == 0)
h = SPINS | (int)t.getId();
// 如果不是0,就将自旋数减一,并且让出 CPU 时间片
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
// 如果自旋数不够了,且 slot 还没有得到,就重置自旋数
else if (slot != p)
spins = SPINS;
// 如果 slot == p 了,说明对 slot 赋值成功
// 如果线程没有中断 && 数组不是 null && 没有超时限制
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
// 为线程中的 parkBlocker 属性赋值为 Exchange 自己
U.putObject(t, BLOCKER, this);
// node 节点的阻塞线程为当前线程
p.parked = t;
// 如果这个数据还没有被拿走,阻塞自己
if (slot == p)
U.park(false, ns);
// 线程苏醒后,将 p 的阻塞线程属性清空
p.parked = null;
// 将当前线程的 parkBlocker 属性设置成 null
U.putObject(t, BLOCKER, null);
}
// 如果有超时限制,使用 CAS 将 slot 从 p 变成 null,取消这次交换
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
// 如果CAS成功,如果时间到了 && 线程没有中断 : 返回 time_out 对象: 返回 null
v = timed && ns <= 0L && !t.isInterrupted() TIMED_OUT : null;
// 跳出内层循环
break;
}
}
// 将 p 的 match 属性设置成 null, 表示初始化状态,没有任何匹配 >>> putOrderedObject是putObjectVolatile的内存非立即可见版本.
U.putOrderedObject(p, MATCH, null);
// 重置 item
p.item = null;
// 保留伪随机数,供下次种子数字
p.hash = h;
// 返回
return v;
}
源码还是有点小长的。简单说说逻辑。
Exchange 使用了对象池的技术,将对象保存在 ThreadLocal 中,这个对象(Node)封装了数据项,线程对象等关键数据。
- 当第一个线程进入的时候,会将数据放到 池化对象中,并赋值给 slot 的 item.并阻塞自己(通常不会立即阻塞,而是使用 yield 自旋一会儿),等待对方取值.
- 当第二个线程进入的时候,会拿出存储在 slot item 中的值, 然后对 slot 的 match 赋值,并唤醒上次阻塞的线程.
- 当第一个线程阻塞被唤醒后,说明对方取到值了,就获取 slot 的 match 值, 并重置 slot 的数据和池化对象的数据,并返回自己的数据.
- 如果超时了,就返回 Time_out 对象.
- 如果线程中断了,就返回 null.
在该方法中,会返回 2 种结果,一是有效的 item, 二是 null--- 要么是线程竞争使用 slot 了,创建了 arena 数组,要么是线程中断了.
用一幅图来看看具体逻辑,其实还是挺简单的。
当 slot 被别是线程使用了,那么就需要创建一个 arena 的数组了。通过操纵数组里面的元素来实现数据交换。
Exchanger的多槽位交换
Exchanger最复杂的地方就是它的多槽位交换(arenaExchange),我们先看下,什么时候会触发多槽位交换?
我们之前说了,并发量大的时候会触发多槽交换,这个说法并不准确。
单槽交换(slotExchange)中有这样一段代码:
也就是说,如果在单槽交换中,同时出现了多个配对线程竞争修改slot槽位,导致某个线程CAS修改slot失败时,就会初始化arena多槽数组,后续所有的交换都会走arenaExchange:
多槽交换方法arenaExchange的整体流程和slotExchange类似,主要区别在于它会根据当前线程的数据携带结点Node中的index字段计算出命中的槽位。
arenaExchange与slotExchange方法很相似,只不过arenaExchange针对数组的node元素进行相应的处理而已。arenaExchange先要确定元素的下标位置,p.index是表示p在arena中的序数i(从0开始计数,表示第i个元素),arean中的元素不是连接分布的,每个元素间隔1<<ASHIFT个下标,所以i<<ASHIFT才是第i个元素的下
单槽位交换的逻辑非常简单,Exchanger在实现数据交换时总是默认采用单槽位交换,只有发生了竞争才会开启多槽位交换,毕竟多槽位交换复杂度高并且占用系统资源多,所谓的多槽位其实就是开辟了一个槽位数组,不同的线程定位到数组的不同下标进行数据数据交换,当然这只是一个基本的思想,它的实现过程远远没有这么简单,为了彻底理解多槽位交换的逻辑,我们一步步的来分析。在核心代码之前,我们需要先了解相关的一些基础的辅助变量:
上面的变量不是很好理解,先继续往下看,在多槽位交换机制中,初始化槽位数组的过程在单槽位交换方法slotExchange中,如下:
可以看到,在初始化槽位数组arena之前,先要进行CPU核心个数判断,单核心CPU是不能开启多槽位交换模式的,然后对bound进行了初始化成SEQ,SEQ= MMASK+1=256=100000000(二进制),这个bound其实由两部分组成,二进制低8位用于表示当前arean数组的最大有效索引值,例如现在,bound二进制100000000的低8位都是0,通过bound&MMASK运算,MMASK二进制是8个1,结果就是bound低8位表示的数值0,即在初始化之后数组的最大有效索引就是0,就是只能存放一个有效槽位,这里的有效槽位是什么意思呢?其实就是为了避免伪共享,由于数组元素在分配内存的时候大都是地址连续的,为了避免伪共享,所以在实际存放数据的时候并不会将我们的槽位按顺序存储,而是会按间隔一个缓存行的长度进行存储,两个有效数据之间就由填充数据占据,所以数组中真正可用于存储的下标会很少。
解决了bound的低8位,那么高8位呢?高8位其实是一个版本号的概念,虽然这里的arean数组初始化的长度为 (FULL + 2) << ASHIFT,但是并不是说我们从开始到结束都可以利用该数组所有可利用的下标进行存取,为什么呢?试想如果竞争不是很激烈,数组长度又很大,某个线程占据了某个槽位,剩下空的槽位很多,所以其他来交换的线程很难刚好也寻找到了同一个槽位从而交换成功,很大可能是重新找了一个空槽位也傻傻的等待着别人来交换,虽然Exchanger在实现中会把那些等待太久的线程不断往下标0压缩,从而迫使它们尽快发生交换,但这明显不是最高效的,所以arean数组表面上看起来大小是固定的,其实在内部存储的时候会动态的对数组的大小进行限制,竞争激烈的时候它就扩张,竞争稍缓或者等待交换的线程等待太久就会压缩数组大小,对于扩张的策略是,只有当线程在当前数组的特定有效槽位(bound低8位)情况下,如果线程已经在每一个有效槽位上都进行了尝试交换但是都由于竞争激烈而失败时才扩张数组的实际有效槽位(扩张不会超过初始化的长度),交换失败的次数可以由CAS失败之后Node对象的collides字段记录,那么对应的特定有效槽位则就是bound的高8位来记录,数组的实际有效槽位在不断的变大变小,例如加1后再减1,如果没一个版本号的概念,就相当于存在了ABA的问题。每一个版本的bound都需要重新记录CAS失败的次数collides,所以当bound发生变化之后,都会将collides重置为0.
关于数组实际有效槽位的扩张和缩减是由下面这两个个计算逻辑进行的:
每一次都是在原来的基础上 增加 SEQ +1 , 其实就是分别将bound的低8位和高8位都增加了1,例如:100000000 (初始值) -> 1000000001 (扩张1)-> 1100000010 (再扩张1) -> 10000000001 (缩减1) ......, 不论低8位的实际有效槽位怎么变化,高8位表示的版本号都是不断增长的。
接下来,看看arean数组的有效索引以及对有效索引位的存取方式。上面arean的初始长度是 (FULL + 2) << ASHIFT, 其实实际最多有效索引就是FULL+2个,因为Exchanger按每两个有效索引位之间间隔一个缓存行 1 << ASHIFT的大小来存储,例如:我的笔记本是双核四线程的,NCPU=4,FULL=2, arean数组的长度 (FULL + 2) << ASHIFT = 512, 每两个有效索引间隔 1 << ASHIFT = 128, 假设数组的第一个元素内存偏移地址是0, 那么有效的索引位依次就是:...(填充)..128...(填充)..256.....384.....512. 即最多只有4个有效索引,Exchanger内部其实限制最大只能到FULL,因此不会发生越界。从下面的根据线程取索引编号也可以印证这一点:
关于避免伪共享,Exchanger只是以大多数常见缓存行都是128个地址偏移来编写代码,而且Exchanger还对不满足这种情况进行了处理,那就是在构造实例的时候就会抛出异常:
将多槽位交换相关的东西介绍完了,下面看真正的实现逻辑:
多槽位交换的源码分析印证了上面的描述,但是实现的过程真的有点难懂,也可以用精妙来形容,通过上面的源码可以得出以下几个重要的实现细节:
1. 关于槽位的增长,由U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)语句完成,分别对高8位版本号加1和低8位最大有效索引加1,只有在当前线程在每一个有效索引位都尝试失败(这时候collides==m,m就是当前bound下最大有效索引)之后才会触发槽位的增长,当然有效槽位最大不能超过FULL,槽位增长之后,如果当前处于最左侧0索引,那么就定位到最大索引m处,否则 i-1 即继续往左侧移动进行尝试。
2. 关于槽位的缩减,由U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1)语句完成,分别对高8位版本号加1和低8位最大有效索引减1,只有在当前线程在非0索引位处等待了一段时间都没有等到线程来交换才会尝试缩减槽位(由于实际使用槽位并没有真正到达FULL +2,而是到FULL截至,所以不用担心丢失有效节点),槽位缩减之后,当前线程的索引也会减半,使其往左寻找槽位,这样就增大了交换成功的机率。
3. 关于线程的动态移动,不论槽位缩减还是增长之后,都会重置还没有占据槽位的那些线程的索引以及冲突失败次数collides,对于索引的重置策略是只要不是处于最大索引 m 处,那么就会将其重新定位到最右侧m处,否则定位到m-1,总之线程节点都是在不停的从右往左流动的过程。如果失败次数还没有达到最大(collides < m)或者槽位已经到达最大FULL,或者扩张槽位失败,那么当前线程只要还没到达最左侧的0索引位,那么就将索引减1,继续往左靠。
4. 关于0索引位,这是一个很关键的槽位,你会发现所有的线程节点都会在等待交换的过程中,只要没有成功就向左靠近,所有的线程阻塞都只会发生在0索引槽位,所以在该处交换成功的几率非常高,而在其它有效槽位只会以 spin + yield 的方式进行等待。
内存一致性
由于Exchanger内部在实现交换的时候采用了CAS+volatile对槽位进行更新替换,所以很容易得出通过Exchanger成功交换的每一对线程,每一个线程在调用exchange()方法之前的操作都 happen-before 另一个线程在exchange()方法返回之后的操作。
所以交换成功的每对线程中的任何一方在其exchange方法返回之后都可见另一个线程在调用exchange方法之前的操作。
总结
Exchanger的数据交换内部实现策略支持两个线程情况下的单槽位交换,以及多线程情况下的多槽位交换,在多槽位交换过程中,每个线程最终会和哪一个线程交换是不能确定的,它只能保证能够两两成功交换。单槽位交换很简单,当线程来交换数据的时候,如果发现槽位为空,则以spin + yield + block的方式进行等待,否则就和占据槽位的线程进行交换,然后唤醒等待的线程拿着数据返回。
多槽位的思想其实也好理解,只是它的实现过程非常精细而复杂,当线程来交换数据的时候,如果第一个有效槽位为空,那么占据槽位以spin + yield + block的方式等待,如果发现第一个有效槽位非空,那么就尝试和其进行交换,如果交换失败,说明已经有其它线程抢先尝试与其交换了,那么就往后移动一个有效槽位,如果此处被占据则尝试与其交换,否则就以spin + yield的方式等待,注意此时不会进入block状态,如果等待自旋结束依然没有线程来交换,则往左移动索引,如果在往左移动的过程中一直没有成功与那些槽位交换,最终移动到第一个有效槽位,那又以spin + yield + block的方式进行等待, 当然内部实现机制还会涉及到对多槽位数组有效容量的动态扩展和缩减,以及移动线程节点的过程中的数组容量版本号与交换失败记录的对比等等精密逻辑。