Event Bus 事件总线,是观察者模式的实践。
首先明确我们的被观察者
可以是一个事件、消息、或者简单的对象
package pers.wmx.springbootfreemarkerdemo.eventbus.event;
/**
* @author wangmingxin03
* Created on 2021-12-13
*/
public class OrderPlaceEvent {
private long userId;
private long orderId;
public OrderPlaceEvent(long userId, long orderId) {
this.userId = userId;
this.orderId = orderId;
}
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
}
然后定义观察者
package pers.wmx.springbootfreemarkerdemo.eventbus.observer;
import lombok.extern.slf4j.Slf4j;
import pers.wmx.springbootfreemarkerdemo.eventbus.Subscribe;
/**
* @author wangmingxin03
* Created on 2021-12-13
*/
@Slf4j
public class AObserver {
@Subscribe
public void handle(String msg) {
log.info("AObserver msg:{}", msg);
}
}
package pers.wmx.springbootfreemarkerdemo.eventbus.observer;
import lombok.extern.slf4j.Slf4j;
import pers.wmx.springbootfreemarkerdemo.eventbus.Subscribe;
/**
* @author wangmingxin03
* Created on 2021-12-13
*/
@Slf4j
public class BObserver {
@Subscribe
public void handle(String msg) {
log.info("BObserver msg:{}", msg);
}
}
package pers.wmx.springbootfreemarkerdemo.eventbus.observer;
import lombok.extern.slf4j.Slf4j;
import pers.wmx.springbootfreemarkerdemo.eventbus.Subscribe;
import pers.wmx.springbootfreemarkerdemo.eventbus.event.OrderPlaceEvent;
/**
* @author wangmingxin03
* Created on 2021-12-13
*/
@Slf4j
public class CObserver {
@Subscribe
public void handle(String msg) {
log.info("CObserver msg:{}", msg);
}
@Subscribe
public void handle(Integer msg) {
log.info("CObserver msg:{}", msg);
}
@Subscribe
public void handle(OrderPlaceEvent event) {
log.info("CObserver | userId:{}, orderId:{}",
event.getUserId(), event.getOrderId());
}
}
通过注解标明要监听事件
方法的参数就是监听具体的事件类型了
package pers.wmx.springbootfreemarkerdemo.eventbus;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 标明观察者中哪个函数接收消息
*
* @author wangmingxin03
* Created on 2021-12-13
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
}
我们抽象一层 ObserverAction
表明被@Subscribe
注解的方法
package pers.wmx.springbootfreemarkerdemo.eventbus;
import java.lang.reflect.Method;
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
/**
* @author wangmingxin03
* Created on 2021-12-13
*/
@Slf4j
public class ObserverAction {
private Object target;
private Method method;
public ObserverAction(Object target, Method method) {
this.target = Preconditions.checkNotNull(target);
this.method = method;
this.method.setAccessible(true);
}
public void execute(Object event) {
try {
method.invoke(target, event);
} catch (Exception e) {
log.error("execute exception ", e);
}
}
}
然后我们需要注册我们的监听者,下面就是最核心的逻辑
package pers.wmx.springbootfreemarkerdemo.eventbus;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
/**
* 管理观察者
*
* @author wangmingxin03
* Created on 2021-12-13
*/
@Slf4j
public class ObserverRegistry {
private ConcurrentMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry =
new ConcurrentHashMap<>();
/**
* 注册观察者
**/
public void register(Object observer) {
// 拿到这个观察者需要的所有观察事件
Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);
for(Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {
Class<?> eventType = entry.getKey();
Collection<ObserverAction> eventActions = entry.getValue();
CopyOnWriteArraySet<ObserverAction> registryEventActions = registry.get(eventType);
if (registryEventActions == null) {
registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());
registryEventActions = registry.get(eventType);
}
registryEventActions.addAll(eventActions);
}
}
private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {
Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();
Class<?> clazz = observer.getClass();
List<Method> methods = getAnnotatedMethods(clazz);
for (Method method : methods) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
if (!observerActions.containsKey(eventType)) {
observerActions.put(eventType, new ArrayList<>());
}
observerActions.get(eventType).add(new ObserverAction(observer, method));
}
return observerActions;
}
/**
* 获取有 @Subscribe 注解的方法
**/
private List<Method> getAnnotatedMethods(Class<?> clazz) {
List<Method> methods = Lists.newArrayList();
for (Method method : clazz.getDeclaredMethods()) {
if (method.isAnnotationPresent(Subscribe.class)) {
Class<?>[] parameterTypes = method.getParameterTypes();
// 方法参数只能有一个 -> 事件类型
if (parameterTypes.length != 1) {
log.error("invalid method :{}", method.toString());
} else {
methods.add(method);
}
}
}
return methods;
}
public List<ObserverAction> getMatchedObserverActions(Object event) {
log.info("getMatchedObserverActions | registry:{}", JSON.toJSON(registry));
List<ObserverAction> observerActions = Lists.newArrayList();
Class<?> clazz = event.getClass();
for(Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {
Class<?> eventType = entry.getKey();
CopyOnWriteArraySet<ObserverAction> eventActions = entry.getValue();
if (clazz.isAssignableFrom(eventType)) {
observerActions.addAll(eventActions);
}
}
return observerActions;
}
}
其实核心就是维护一个本地的注册表 事件类型到监听方法的映射
然后再定义一下我们对外暴露的事件总线SDK
package pers.wmx.springbootfreemarkerdemo.eventbus;
import java.util.List;
import java.util.concurrent.Executor;
import com.google.common.util.concurrent.MoreExecutors;
/**
* @author wangmingxin03
* Created on 2021-12-13
*/
public class EventBus {
private Executor executor;
private ObserverRegistry registry = new ObserverRegistry();
public EventBus() {
this(MoreExecutors.directExecutor());
}
protected EventBus(Executor executor) { this.executor = executor; }
public void register(Object observer) {
registry.register(observer);
}
public void post(Object event) {
List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
for (ObserverAction observerAction : observerActions) {
executor.execute(() -> {
observerAction.execute(event);
});
}
}
}
register
往事件总线注册观察者
post
发布事件
再整个异步的
package pers.wmx.springbootfreemarkerdemo.eventbus;
import java.util.concurrent.Executor;
/**
* @author wangmingxin03
* Created on 2021-12-13
*/
public class AsyncEventBus extends EventBus {
public AsyncEventBus(Executor executor) {
super(executor);
}
}
再看下测试类入口,跑下
package pers.wmx.springbootfreemarkerdemo.eventbus;
import java.util.concurrent.Executors;
import pers.wmx.springbootfreemarkerdemo.eventbus.event.OrderPlaceEvent;
import pers.wmx.springbootfreemarkerdemo.eventbus.observer.AObserver;
import pers.wmx.springbootfreemarkerdemo.eventbus.observer.BObserver;
import pers.wmx.springbootfreemarkerdemo.eventbus.observer.CObserver;
/**
* @author wangmingxin03
* Created on 2021-12-13
*/
public class EventBusMain {
private static final EventBus EVENT_BUS =
new AsyncEventBus(Executors.newFixedThreadPool(10));
public static void main(String[] args) {
// 注册三个观察者 扔到本地注册表
EVENT_BUS.register(new AObserver());
EVENT_BUS.register(new BObserver());
EVENT_BUS.register(new CObserver());
EVENT_BUS.post("haha");
EVENT_BUS.post(666);
EVENT_BUS.post(new OrderPlaceEvent(123, 777));
}
}
转载请注明:汪明鑫的个人博客 » 简易EventBus实现
说点什么
您将是第一位评论人!