Welcome everyone

基于ZK的注册监听的本地缓存

java 汪明鑫 995浏览 0评论

先上github链接: https://github.com/PhantomThief/zknotify-cache


架构模式:

client端在构建缓存时,需要制定一个zk的路径

当源数据变更需要刷新本地缓存时,就更新zk该路径下的值

基于zk的watcher机制,就可以触发所有的监听者,从而拉取最新的数据,更新本地缓存,保证数据最终一致性。

我们拉下代码,看看这个组件的源码

先上类图

Broadcaster 主要封装了对zk的操作
看下ReloadableCache核心接口

public interface ReloadableCache<T> extends Supplier<T> {

    /**
     * 当第一次构建失败时,本方法会上抛异常
     */
    @Override
    T get();

    /**
     * 通知全局缓存更新
     * 注意:如果本地缓存没有初始化,本方法并不会初始化本地缓存并重新加载
     *
     * 如果需要初始化本地缓存,请先调用 {@link ReloadableCache#get()}
     */
    void reload();

    /**
     * 更新本地缓存的本地副本
     * 注意:如果本地缓存没有初始化,本方法并不会初始化并刷新本地的缓存
     *
     * 如果需要初始化本地缓存,请先调用 {@link ReloadableCache#get()}
     */
    void reloadLocal();
}

然后重点看下本地缓存实现类 ZkNotifyReloadCache

构建时需要指定cacheFactory,指定向zk注册的path等

@Override
    public T get() {
        if (cachedObject == null) {
            synchronized (ZkNotifyReloadCache.this) {
                if (cachedObject == null) {
                    if (entered) {
                        logger.warn("发现循环引用,请不要在 ReloadableCache factory 内引用自身,如果希望取到之前的缓存值,请参考"
                                + " com.github.phantomthief.localcache.CacheFactoryEx.get");
                    }
                    entered = true;
                    try {
                        cachedObject = init();
                    } finally {
                        entered = false;
                    }
                }
            }
        }
        return cachedObject;
    }

获取本地缓存值时,如果为空,就触发init

这里用到double check的单例模式

在init时还会向zk注册path,以及对应触发监听的行为

再看下主动触发更新缓存的方法:

@Override
    public void reload() {
        if (broadcaster != null && notifyZkPaths != null) {
            String content = String.valueOf(currentTimeMillis());
            notifyZkPaths.forEach(notifyZkPath -> broadcaster.broadcast(notifyZkPath,
                    content));
        } else {
            logger.warn("no zk broadcast or notify zk path found. ignore reload.");
        }
    }
broadcast 的内容就是当前时间戳
 @Override
    public void broadcast(String path, String content) {
        String realPath = makePath(zkPrefix, path);
        try {
            try {
                curatorFactory.get().setData().forPath(realPath, content.getBytes(UTF_8));
            } catch (KeeperException.NoNodeException e) {
                curatorFactory.get().create().creatingParentsIfNeeded().forPath(realPath,
                        content.getBytes(UTF_8));
            }
        } catch (Throwable e) {
            throwIfUnchecked(e);
            throw new RuntimeException(e);
        }
    }

可以看到底层时依赖 Curator 操作ZK的

最后看一个测试用例

@Test
    void test() {
        AtomicInteger count = new AtomicInteger();
        ZkNotifyReloadCache<String> cache = ZkNotifyReloadCache.<String> newBuilder()
                .withCacheFactory(() -> build(count))
                .withNotifyZkPath("/test")
                .withCuratorFactory(() -> curatorFramework)
                .build();
        assertEquals(cache.get(), "0");
        cache.reload();
        sleepUninterruptibly(1, SECONDS);
        assertEquals(cache.get(), "1");
        cache.reloadLocal();
        assertEquals(cache.get(), "2");
    }

那么目前这个本地缓存还有什么问题不?

首先过于依赖ZK,如果ZK挂了,或者出现一些网络问题,本地缓存就丢失更新了

因此可以在构建缓存时给一个主动触发更新的时间,比如几分钟更新一次,就算没有zk通知

这样就通过推拉结合的方式尽量保证本地缓存的最终一致性。

还有就是缓存构建时的洪荒流量,试想如果监听一个zk path下的实例有数百个,同时触发更新一起拉数据构建新的本地缓存,对下游数据源本身就有一定压力,因此也可以指定缓存构建时的打散时间,比如我打散10s构建缓存,那么压力就降低到十分之一,对下游起到一定的保护作用。

转载请注明:汪明鑫的个人博客 » 基于ZK的注册监听的本地缓存

喜欢 (2)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz