先上github链接: https://github.com/PhantomThief/zknotify-cache
架构模式:
client端在构建缓存时,需要制定一个zk的路径
当源数据变更需要刷新本地缓存时,就更新zk该路径下的值
基于zk的watcher机制,就可以触发所有的监听者,从而拉取最新的数据,更新本地缓存,保证数据最终一致性。
我们拉下代码,看看这个组件的源码
先上类图
Broadcaster 主要封装了对zk的操作 看下ReloadableCache核心接口
public interface ReloadableCache<T> extends Supplier<T> {
/**
* 当第一次构建失败时,本方法会上抛异常
*/
@Override
T get();
/**
* 通知全局缓存更新
* 注意:如果本地缓存没有初始化,本方法并不会初始化本地缓存并重新加载
*
* 如果需要初始化本地缓存,请先调用 {@link ReloadableCache#get()}
*/
void reload();
/**
* 更新本地缓存的本地副本
* 注意:如果本地缓存没有初始化,本方法并不会初始化并刷新本地的缓存
*
* 如果需要初始化本地缓存,请先调用 {@link ReloadableCache#get()}
*/
void reloadLocal();
}
然后重点看下本地缓存实现类 ZkNotifyReloadCache
构建时需要指定cacheFactory,指定向zk注册的path等
@Override
public T get() {
if (cachedObject == null) {
synchronized (ZkNotifyReloadCache.this) {
if (cachedObject == null) {
if (entered) {
logger.warn("发现循环引用,请不要在 ReloadableCache factory 内引用自身,如果希望取到之前的缓存值,请参考"
+ " com.github.phantomthief.localcache.CacheFactoryEx.get");
}
entered = true;
try {
cachedObject = init();
} finally {
entered = false;
}
}
}
}
return cachedObject;
}
获取本地缓存值时,如果为空,就触发init
这里用到double check的单例模式
在init时还会向zk注册path,以及对应触发监听的行为
再看下主动触发更新缓存的方法:
@Override
public void reload() {
if (broadcaster != null && notifyZkPaths != null) {
String content = String.valueOf(currentTimeMillis());
notifyZkPaths.forEach(notifyZkPath -> broadcaster.broadcast(notifyZkPath,
content));
} else {
logger.warn("no zk broadcast or notify zk path found. ignore reload.");
}
}
broadcast 的内容就是当前时间戳
@Override
public void broadcast(String path, String content) {
String realPath = makePath(zkPrefix, path);
try {
try {
curatorFactory.get().setData().forPath(realPath, content.getBytes(UTF_8));
} catch (KeeperException.NoNodeException e) {
curatorFactory.get().create().creatingParentsIfNeeded().forPath(realPath,
content.getBytes(UTF_8));
}
} catch (Throwable e) {
throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
可以看到底层时依赖 Curator 操作ZK的
最后看一个测试用例
@Test
void test() {
AtomicInteger count = new AtomicInteger();
ZkNotifyReloadCache<String> cache = ZkNotifyReloadCache.<String> newBuilder()
.withCacheFactory(() -> build(count))
.withNotifyZkPath("/test")
.withCuratorFactory(() -> curatorFramework)
.build();
assertEquals(cache.get(), "0");
cache.reload();
sleepUninterruptibly(1, SECONDS);
assertEquals(cache.get(), "1");
cache.reloadLocal();
assertEquals(cache.get(), "2");
}
那么目前这个本地缓存还有什么问题不?
首先过于依赖ZK,如果ZK挂了,或者出现一些网络问题,本地缓存就丢失更新了
因此可以在构建缓存时给一个主动触发更新的时间,比如几分钟更新一次,就算没有zk通知
这样就通过推拉结合的方式尽量保证本地缓存的最终一致性。
还有就是缓存构建时的洪荒流量,试想如果监听一个zk path下的实例有数百个,同时触发更新一起拉数据构建新的本地缓存,对下游数据源本身就有一定压力,因此也可以指定缓存构建时的打散时间,比如我打散10s构建缓存,那么压力就降低到十分之一,对下游起到一定的保护作用。
转载请注明:汪明鑫的个人博客 » 基于ZK的注册监听的本地缓存
说点什么
您将是第一位评论人!