背景
笔者在使用 EventBus 的过程中发现有时只能收到最后一次的粘性 Event ,导致业务逻辑出现混乱,下面是笔者的使用示例:
// Event.java
public final class Event {
private final int code;
public Event(int code) {
this.code = code;
}
public int getCode() {
return code;
}
}
复制代码
// Example.java
public class Example {
// 调用多次
public void test(int code) {
EventBus.getDefault().postSticky(new Event(code));
}
// 调用多次 `test(int code)` 后再注册订阅者
public void register() {
EventBus.getDefault().register(this);
}
@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
public void receiveEvent(Event event) {
// 发现只能收到最后一次的粘性事件
System.out.println(event.getCode());
}
}
复制代码
所以去查看了 EventBus 的源码,接下来我们分析下 EventBus 发送粘性事件的流程。
分析
粘性事件
以下源码基于 EventBus 3.3.1 版本
下面是发送粘性事件的源码:
private final Map<Class<?>, Object> stickyEvents;
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
复制代码
postSticky
代码比较简单,首先对 stickyEvents
进行加锁,接下来把 event 事件的 Class 对象作为 Key,event 事件本身作为 value 放进 Map 中,其中stickyEvents
是 Map 对象,实例是 ConcurrentHashMap
, 其 Key 和 Value 的泛型形参分别是 Class<?>
和 Object
, 它的作用就是用来存储粘性事件;然后调用 post(event)
把粘性事件当作普通事件发送一下。
首先我们看下最后为什么要调用下 post(event)
?
虽然 post(evnet)
上面有注释,简单翻译下:"在放进 Map 后应该再发送一次,以防止订阅者想立即删除此事件",读完注释后,可能还是不太明白,这里笔者认为:在前面存储完粘性事件后,这里调用 post
把粘性事件当作普通事件发送出去,或许是因为现在已经有注册的粘性事件订阅者,此时把已经注册的粘性事件订阅者当作普通事件的订阅者,这样已经注册的粘性事件订阅者可以立即收到相应的事件,只是此时事件不再是粘性的。
在 postSticky
中我们并没有看到粘性事件是在哪里发送的,想一想我们使用粘性事件的目的是什么?当注册订阅者时可以收到之前发送的事件,这样来看,粘性事件的发送是在注册订阅者时,下面是注册订阅者的源码,删除了一些无关代码:
public void register(Object subscriber) {
// 省略无关代码
Class<?> subscriberClass = subscriber.getClass();
// 查找订阅者所有的Event接收方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
复制代码
register
代码也比较简单,首先通过订阅者的 Class 对象查找订阅者所有的Event事件接收方法,然后对 EventBus 对象加锁,遍历所有的Event事件接收方法 subscriberMethods
调用 subscribe
方法,以下是 subscribe
方法源码:
// Key 为 Event Class 对象,Value 为存储 Event 的订阅者和接收 Event 方法对象的集合
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// Key 为订阅者对象,Value 为订阅者中的 Event Class对象集合
private final Map<Object, List<Class<?>>> typesBySubscriber;
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// Event Class对象
Class<?> eventType = subscriberMethod.eventType;
// 订阅者和接收 Event 方法对象
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 根据 Event Class对象,获取订阅者和接收 Event 方法对象的集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
// 判断订阅者和接收 Event 方法对象是否为空
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
// 判断是否已经包含了新的订阅者和接收 Event 方法对象,若是包含则认为是重复注册
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
// 这里是按优先级排序插入到集合中
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 这里是把 Event Class对象添加进对应订阅者的 Event Class对象集合中
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
// 上面已经判断了是否重复注册,所以这里直接添加
subscribedEvents.add(eventType);
// 接下来就是粘性事件的发送逻辑了
// 判断 Event 接收方法是否可以处理粘性事件
if (subscriberMethod.sticky) {
// 这里判断是否考虑 Event 事件类的继承关系,默认为 Ture
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
复制代码
在上面的源码中,增加了不少注释有助于我们读懂源码,在源码的最后就是粘性事件的发送逻辑了,其中有两个分支,其中一个分支根据 Event 事件的继承关系发送事件,另外一个分支根据接收 Event 方法中的 Event Class 对象从 stickyEvents
中直接查找粘性事件,最后两个分支殊途同归,都调用了 checkPostStickyEventToSubscription
方法:
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
复制代码
checkPostStickyEventToSubscription
方法很简单,对粘性事件做下判空处理,继续调用 postToSubscription
方法,传入订阅者与接收 Event 方法对象,粘性事件和是否是主线程布尔值:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;A
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
复制代码
postToSubscription
方法比较长,但是比较好理解,就是根据接收 Event 方法上的 @Subscribe
注解中传入的线程模型进行事件的分发,具体的事件分发流程,有空再分析,本文就先不分析了,现在我们只需知道最后都会调用 invokeSubscriber(Subscription subscription, Object event)
方法即可:
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 反射调用 Event 接收方法传入 Event 事件
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
复制代码
终于在 invokeSubscriber
方法中找到调用 Event 接收方法的地方了,原来 EventBus 最后是通过反射调用 Event 接收方法并传入相应 Event 事件的。
分析完 Event 事件的发送流程,好像没有发现为什么有时收不到粘性事件。
我们回过头来再看下笔者的使用示例,为了方便查看,下面贴出使用示例代码:
// Example.java
public class Example {
// 调用多次
public void test(int code) {
EventBus.getDefault().postSticky(new Event(code));
}
// 调用多次 `test(int code)` 后再注册订阅者
public void register() {
EventBus.getDefault().register(this);
}
@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
public void receiveEvent(Event event) {
// 发现只能收到最后一次的粘性事件
System.out.println(event.getCode());
}
}
复制代码
可能细心的读者已经发现 test
方法调用了,问题应该出在 postSticky
方法中,让我们再次查看 postSticky
方法:
private final Map<Class<?>, Object> stickyEvents;
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
复制代码
根据前面分析 postSticky
方法的结果,stickyEvents
用于存储粘性事件,它是个 Map 结构,而 stickyEvents
的 Key 正是 Event 的 Class 对象,根据 Map 结构的存储原理:如果存在相同的 Key,则覆盖 Value 的值,而 stickyEvents
的 Value 正是 Event 本身。
终于真相大白,多次调用 test
方法发送粘性事件,EventBus 只会存储最后一次的粘性事件。
小结
EventBus 针对同一个粘性 Event 事件只会存储最后一次发送的粘性事件。
EventBus 的上述实现可能是因为多次发送同一个粘性事件,则认为之前的事件是过期事件应该抛弃,因此只传递最新的粘性事件。
EventBus 的这种实现无法满足笔者的业务逻辑需求,笔者希望多次发送的粘性事件,订阅者都能接收到,而不是只接收最新的粘性事件,可以理解为粘性事件必达订阅者,下面让我们修改 EventBus 的源码来满足需求吧。
修改
上一节我们分析了粘性事件的发送流程,为了满足粘性事件必达的需求,基于现有粘性事件流程,我们可以仿照粘性事件的发送来提供一个发送必达消息的方法。
Subscribe
首先我们定义 Event 接收方法可以接收粘性事件是在 @Subscribe
中 sticky = true
, 所以我们可以修改 Subscribe
注解,增加粘性事件必达的方法:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POSTING;
/**
* If true, delivers the most recent sticky event (posted with
* {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
*/
boolean sticky() default false;
// 增加消息必达的方法
boolean rendezvous() default false;
/** Subscriber priority to influence the order of event delivery.
* Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
* others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
* delivery among subscribers with different {@link ThreadMode}s! */
int priority() default 0;
}
复制代码
rendezvous
以为约会、约定的意思,可以理解为不见不散,在这里它有两层作用,其一是标记方法可以接收粘性事件,其二是标记方法接收的事件是必达的。
findSubscriberMethods
接下来就需要解析 rendezvous
了,我们先看看 sticky
是如何解析的,在上一节我们分析了 register
方法,方便查看,下面再贴出 register
方法源码:
public void register(Object subscriber) {
// 省略无关代码
Class<?> subscriberClass = subscriber.getClass();
// 查找订阅者所有的Event接收方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
复制代码
上一节分析中,我们没有分析查找订阅者中所有的 Event 接收方法 findSubscriberMethods
,接下来我们分析下在 findSubscriberMethods
方法是如何查找 Event 接收方法的:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 先从缓存中查找
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// 是否忽略生成索引,默认为False,所以这里走else分支
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 查找Event接收方法
subscriberMethods = findUsingInfo(subscriberClass);
}
// 如果订阅者和订阅者父类中没有Event接收方法则抛出异常
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 添加进缓存中
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
复制代码
调用 findSubscriberMethods
方法需要传入订阅者 Class 对象,通过笔者在源码中增加的注释分析发现默认调用 findUsingInfo
方法查找 Event 接收方法,我们继续跟踪 findUsingInfo
方法:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// FindState对订阅者Class对象和Event接收方法进行了一层封装
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass); // ①
while (findState.clazz != null) {
// 查找订阅者信息,包含订阅者Class对象、 订阅者父类、Event接收方法等
findState.subscriberInfo = getSubscriberInfo(findState); // ②
// 在 ① initForSubscriber中会把subscriberInfo置为null,
// 在 ② getSubscriberInfo中没有Index对象,
// 所以第一次时这里会走else分支
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 查找Event接收方法
findUsingReflectionInSingleClass(findState);
}
// 查找父类的Event接收方法
findState.moveToSuperclass();
}
// 通过findState返回Event接收方法,并回收findState
return getMethodsAndRelease(findState);
}
复制代码
根据笔者在源码中的注释分析,在 findUsingInfo
方法中使用「享元模式」对 FindState
进行回收利用,避免创建大量临时的 FindState
对象占用内存,最后再次调用 findUsingReflectionInSingleClass
方法查找 Event 接收方法,看方法名字应该是使用反射查找,findUsingReflectionInSingleClass
源码较长,删减一些不关心的代码:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
// 通过反射获取当前类中声明的所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// 删减不关心的代码
}
// 遍历所有方法
for (Method method : methods) {
// 获取方法的修饰符
int modifiers = method.getModifiers();
// 判断方法是否是public的;是否是抽象方法,是否是静态方法,是否是桥接方法,是否是合成方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
// 获取方法的形参Class对象数组
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
// 获取方法上的Subscribe注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 检测是否已经添加了相同签名的方法,考虑子类复写父类方法的情况
if (findState.checkAdd(method, eventType)) {
// 获取注解的参数
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky(),
// 这里我们添加rendezvous参数 ①
subscribeAnnotation.rendezvous()));
}
}
}
// 删减不关心的代码
}
// 删减不关心的代码
}
}
复制代码
在 findUsingReflectionInSingleClass
方法中通过反射获取订阅者中声明的所有方法,然后遍历所有方法:
- 首先判断方法的修饰符是否符合,
- 其次判断方法是否只有一个形参,
- 再次判断方法是否有
Subscribe
注解, - 然后检测是否已经添加了相同签名的方法,主要是考虑子类复写父类方法这种情况,
- 最后获取
Subscribe
注解的参数,在这里我们解析rendezvous
,封装进SubscriberMethod
中。
在 SubscriberMethod
中增加 rendezvous
字段,删除不关心的代码:
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
// 增加 `rendezvous` 字段
final boolean rendezvous;
/** Used for efficient comparison */
String methodString;
public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode,
int priority, boolean sticky,
// 增加 `rendezvous` 形参
boolean rendezvous) {
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
this.priority = priority;
this.sticky = sticky;
this.rendezvous = rendezvous;
}
}
复制代码
postRendezvous
好的,rendezvous
已经解析出来了,接下来我们对外提供发送必达事件的接口:
// 选择List存储必达事件,使用Pair封装必达事件的Key和Value
private final List<Pair<Class<?>, Object>> rendezvousEvents;
public void postRendezvous(Object event) {
synchronized (rendezvousEvents) {
rendezvousEvents.add(Pair.create(event.getClass(), event));
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
复制代码
上面的源码,我们通过仿照 postSticky
方法实现了 postRendezvous
方法,在 postSticky
方法中使用 Map 存储粘性事件,不过我们在 postRendezvous
方法中使用 List 存储必达事件,保证必达事件不会因为 Key 相同而被覆盖丢失,最后也是调用 post
方法尝试先发送一次必达事件。
register
在上一节中我们分析了粘性事件是在 register
中调用 subscribe
方法进行发送的,这里我们仿照粘性事件的发送逻辑,实现必达事件的发送逻辑,我们可以在 subscribe
方法最后增加发送必达事件的逻辑,以下源码省略了一些不关心的代码:
private final List<Pair<Class<?>, Object>> rendezvousEvents;
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 省略不关心的代码
// 粘性事件发送逻辑
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
// 新增必达事件发送逻辑
// 判断方法是否可以接收必达事件
if (subscriberMethod.rendezvous) {
if (eventInheritance) {
for (Pair<Class<?>, Object> next : rendezvousEvents) {
Class<?> candidateEventType = next.first;
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = next.second;
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object rendezvousEvent = getRendezvousEvent(eventType);
if (rendezvousEvent != null) {
checkPostStickyEventToSubscription(newSubscription, rendezvousEvent);
}
}
}
}
复制代码
在 subscribe
方法中,我们通过仿照粘性事件的发送逻辑增加了必达事件的发送:
- 首先判断 Event 接收方法是否可以接收必达事件
- 其次考虑 Event 必达事件的继承关系,
- 最后两个分支都调用
checkPostStickyEventToSubscription
方法发送必达事件
happy~
总结
使用第三方库时,发现问题不要慌张,带着问题去查看源码总有一番收获,这也告诫我们在使用第三库时最好先搞明白它的实现原理,遇到问题时不至于束手无策。
通过分析 EventBus 的源码,我们有以下收获:
- 明白了我们注册订阅者时 EventBus 做了哪些事情
- 知晓了我们发送粘性事件时,EventBus 是如何处理及何时发送粘性事件的
- 了解到 EventBus 是通过反射调用 Event 事件的接收方法
- 学习了 EventBus 中的一些优化点,比如对
FindState
使用「享元模式」避免创建大量临时对象占用内存 - 进一步了解到对并发的处理
通过以上收获,我们成功修改 EventBus 源码实现了我们必达事件的需求。
到这里我们已经完成了必达事件的发送,不过我们还剩下获取必达事件,移除必达事件没有实现,最后 EventBus 中还有单元测试 module,我们还没有针对 rendezvous
编写单元测试,读者有兴趣的话,可以自己试着实现。
希望可以帮到你~