Welcome everyone

简易EventBus实现

java 汪明鑫 129浏览 0评论

Event Bus 事件总线,是观察者模式的实践。

 

首先明确我们的被观察者

可以是一个事件、消息、或者简单的对象

package pers.wmx.springbootfreemarkerdemo.eventbus.event;

/**
 * @author wangmingxin03
 * Created on 2021-12-13
 */
public class OrderPlaceEvent {
    private long userId;

    private long orderId;

    public OrderPlaceEvent(long userId, long orderId) {
        this.userId = userId;
        this.orderId = orderId;
    }

    public long getUserId() {
        return userId;
    }

    public void setUserId(long userId) {
        this.userId = userId;
    }

    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }
}

 

然后定义观察者

package pers.wmx.springbootfreemarkerdemo.eventbus.observer;


import lombok.extern.slf4j.Slf4j;
import pers.wmx.springbootfreemarkerdemo.eventbus.Subscribe;

/**
 * @author wangmingxin03
 * Created on 2021-12-13
 */
@Slf4j
public class AObserver {
    @Subscribe
    public void handle(String msg) {
        log.info("AObserver msg:{}", msg);
    }
}

 

package pers.wmx.springbootfreemarkerdemo.eventbus.observer;


import lombok.extern.slf4j.Slf4j;
import pers.wmx.springbootfreemarkerdemo.eventbus.Subscribe;

/**
 * @author wangmingxin03
 * Created on 2021-12-13
 */
@Slf4j
public class BObserver {
    @Subscribe
    public void handle(String msg) {
        log.info("BObserver msg:{}", msg);
    }
}

 

package pers.wmx.springbootfreemarkerdemo.eventbus.observer;


import lombok.extern.slf4j.Slf4j;
import pers.wmx.springbootfreemarkerdemo.eventbus.Subscribe;
import pers.wmx.springbootfreemarkerdemo.eventbus.event.OrderPlaceEvent;

/**
 * @author wangmingxin03
 * Created on 2021-12-13
 */
@Slf4j
public class CObserver {
    @Subscribe
    public void handle(String msg) {
        log.info("CObserver msg:{}", msg);
    }

    @Subscribe
    public void handle(Integer msg) {
        log.info("CObserver msg:{}", msg);
    }

    @Subscribe
    public void handle(OrderPlaceEvent event) {
        log.info("CObserver | userId:{}, orderId:{}",
                event.getUserId(), event.getOrderId());
    }
}

 

通过注解标明要监听事件

方法的参数就是监听具体的事件类型了

package pers.wmx.springbootfreemarkerdemo.eventbus;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 标明观察者中哪个函数接收消息
 *
 * @author wangmingxin03
 * Created on 2021-12-13
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
}

 

我们抽象一层 ObserverAction 表明被@Subscribe注解的方法

package pers.wmx.springbootfreemarkerdemo.eventbus;

import java.lang.reflect.Method;

import com.google.common.base.Preconditions;

import lombok.extern.slf4j.Slf4j;

/**
 * @author wangmingxin03
 * Created on 2021-12-13
 */
@Slf4j
public class ObserverAction {
    private Object target;

    private Method method;

    public ObserverAction(Object target, Method method) {
        this.target = Preconditions.checkNotNull(target);
        this.method = method;
        this.method.setAccessible(true);
    }

    public void execute(Object event) {
        try {
            method.invoke(target, event);
        } catch (Exception e) {
            log.error("execute exception ", e);
        }
    }
}

 

然后我们需要注册我们的监听者,下面就是最核心的逻辑

package pers.wmx.springbootfreemarkerdemo.eventbus;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;

import lombok.extern.slf4j.Slf4j;

/**
 * 管理观察者
 *
 * @author wangmingxin03
 * Created on 2021-12-13
 */
@Slf4j
public class ObserverRegistry {
    private ConcurrentMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry =
            new ConcurrentHashMap<>();

    /**
     * 注册观察者
     **/
    public void register(Object observer) {
        // 拿到这个观察者需要的所有观察事件
        Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);

        for(Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {
            Class<?> eventType = entry.getKey();
            Collection<ObserverAction> eventActions = entry.getValue();

            CopyOnWriteArraySet<ObserverAction> registryEventActions = registry.get(eventType);
            if (registryEventActions == null) {
                registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());
                registryEventActions = registry.get(eventType);
            }

            registryEventActions.addAll(eventActions);
        }
    }

    private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {
        Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();
        Class<?> clazz = observer.getClass();

        List<Method> methods =  getAnnotatedMethods(clazz);
        for (Method method : methods) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            Class<?> eventType = parameterTypes[0];

            if (!observerActions.containsKey(eventType)) {
                observerActions.put(eventType, new ArrayList<>());
            }

            observerActions.get(eventType).add(new ObserverAction(observer, method));
        }

        return observerActions;
    }

    /**
     * 获取有 @Subscribe 注解的方法
     **/
    private List<Method> getAnnotatedMethods(Class<?> clazz) {
        List<Method> methods = Lists.newArrayList();

        for (Method method : clazz.getDeclaredMethods()) {
            if (method.isAnnotationPresent(Subscribe.class)) {
                Class<?>[] parameterTypes = method.getParameterTypes();

                // 方法参数只能有一个 -> 事件类型
                if (parameterTypes.length != 1) {
                    log.error("invalid method :{}", method.toString());
                } else {
                    methods.add(method);
                }
            }
        }

        return methods;
    }

    public List<ObserverAction> getMatchedObserverActions(Object event) {
        log.info("getMatchedObserverActions | registry:{}", JSON.toJSON(registry));

        List<ObserverAction> observerActions = Lists.newArrayList();

        Class<?> clazz = event.getClass();
        for(Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {
            Class<?> eventType = entry.getKey();
            CopyOnWriteArraySet<ObserverAction> eventActions = entry.getValue();

            if (clazz.isAssignableFrom(eventType)) {
                observerActions.addAll(eventActions);
            }
        }

        return observerActions;
    }
}

 

其实核心就是维护一个本地的注册表 事件类型到监听方法的映射

然后再定义一下我们对外暴露的事件总线SDK

 

package pers.wmx.springbootfreemarkerdemo.eventbus;

import java.util.List;
import java.util.concurrent.Executor;

import com.google.common.util.concurrent.MoreExecutors;

/**
 * @author wangmingxin03
 * Created on 2021-12-13
 */
public class EventBus {
    private Executor executor;

    private ObserverRegistry registry = new ObserverRegistry();

    public EventBus() {
        this(MoreExecutors.directExecutor());
    }

    protected EventBus(Executor executor) { this.executor = executor; }

    public void register(Object observer) {
        registry.register(observer);
    }

    public void post(Object event) {
        List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
        for (ObserverAction observerAction : observerActions) {
            executor.execute(() -> {
                observerAction.execute(event);
            });
        }
    }
}

 

register 往事件总线注册观察者

post 发布事件

 

再整个异步的

package pers.wmx.springbootfreemarkerdemo.eventbus;

import java.util.concurrent.Executor;

/**
 * @author wangmingxin03
 * Created on 2021-12-13
 */
public class AsyncEventBus extends EventBus {
    public AsyncEventBus(Executor executor) {
        super(executor);
    }
}

 

再看下测试类入口,跑下

package pers.wmx.springbootfreemarkerdemo.eventbus;

import java.util.concurrent.Executors;

import pers.wmx.springbootfreemarkerdemo.eventbus.event.OrderPlaceEvent;
import pers.wmx.springbootfreemarkerdemo.eventbus.observer.AObserver;
import pers.wmx.springbootfreemarkerdemo.eventbus.observer.BObserver;
import pers.wmx.springbootfreemarkerdemo.eventbus.observer.CObserver;

/**
 * @author wangmingxin03
 * Created on 2021-12-13
 */
public class EventBusMain {
    private static final EventBus EVENT_BUS =
            new AsyncEventBus(Executors.newFixedThreadPool(10));

    public static void main(String[] args) {
        // 注册三个观察者 扔到本地注册表
        EVENT_BUS.register(new AObserver());
        EVENT_BUS.register(new BObserver());
        EVENT_BUS.register(new CObserver());

        EVENT_BUS.post("haha");
        EVENT_BUS.post(666);
        EVENT_BUS.post(new OrderPlaceEvent(123, 777));
    }

}

 

 

 

 

 

 

转载请注明:汪明鑫的个人博客 » 简易EventBus实现

喜欢 (0)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz