Welcome everyone

Guava Cache 原理分析

java 汪明鑫 137浏览 0评论

没有用过Guava Cache的同学先看一下介绍的文章 http://xinyeshuaiqi.cn/2020/06/08/local-cache/

本文主要介绍一些Guava Cache的一些数据结构和底层原理

先看一眼核心类图:

吗的,图有点大,看着好糊。。。

CacheBuilder使用了建造者模式,负责构建cache

LocalCache本身是一个Map,内部封装了Segment,通过分段锁,降低锁的粒度,提高并发度

Segment内部又有一个数组,数组的元素是ReferenceEntry, 又是一个链表

然后LocalCache还有2内部类:

1、LocalManualCache实现了Cache,具有了所有cache方法。

2、LocalLoadingCache实现了LoadingCache,具有了所有LoadingCache方法。

3、LocalLoadingCache继承了LocalManualCache,那么对外暴露的LocalLoadingCache的方法既有自身需要的,又有cache应该具有的。

4、通过LocalLoadingCache和LocalManualCache的父子关系实现了LocalCache的细节。

这个操作到底有多大意义,我没太看明白,是不是要一个内部类也可以做到,甚至不要内部类,感觉还更清晰点。

存储结构大概是这个样子:

ReferenceEntry 支持不同的引用类型:

回想我们是如何操作一个Map呢?put KV, get K

但是LocalCache并不需要我们主动put值,而是构造缓存时指定loader,get时不存在再触发缓存load

 @CheckReturnValue
  public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
      CacheLoader<? super K1, V1> loader) {
    checkWeightWithWeigher();
    return new LocalCache.LocalLoadingCache<>(this, loader);
  }

那我们就顺着这个build的主流程往下去看!

核心的初始化代码就在这里:

/**
   * Creates a new, empty map with the specified strategy, initial capacity and concurrency level.
   */
  LocalCache(
      CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {
    concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);

    // 。。。
    // 一些赋值操作

    int segmentShift = 0;
    int segmentCount = 1;

   // evictsBySize()表示是否限制容量
    while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {

      // 这时的segmentShift还是表示segmentCount是2的多少次幂
      ++segmentShift;

      //  segmentCount是满足while条件的最大值的2倍
      segmentCount <<= 1;
    }

    // 最终的segmentShift用于取hash的高位的相应位数,用来计算寻找一个元素在哪个segment中
    this.segmentShift = 32 - segmentShift;
   
   // 掩码,取hash的低位的相应位数的值,即为在segment中的角标
    segmentMask = segmentCount - 1;
   
   // 先创建数组
    this.segments = newSegmentArray(segmentCount);

   // 设置每个segment中数组的大小,保整每个segmentSize大小为2的n次方
    // 总的segmentSize大于等于initialCapacity
    // segmentCapacity = initialCapacity 除以 segmentCount 向上取整
    int segmentCapacity = initialCapacity / segmentCount;
    if (segmentCapacity * segmentCount < initialCapacity) {
      ++segmentCapacity;
    }

// segmentSize = 不小于segmentCapacity的 最小的 2的整数幂
    // segmentSize用作段的初始容量
    int segmentSize = 1;
    while (segmentSize < segmentCapacity) {
      segmentSize <<= 1;
    }

   // 初始化数组的每个segment
    if (evictsBySize()) {
      // Ensure sum of segment max weights = overall max weights
      long maxSegmentWeight = maxWeight / segmentCount + 1;
      long remainder = maxWeight % segmentCount;
      for (int i = 0; i < this.segments.length; ++i) {
        if (i == remainder) {
          maxSegmentWeight--;
        }
        this.segments[i] =
            createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
      }
    } else {
      for (int i = 0; i < this.segments.length; ++i) {
        this.segments[i] =
            createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
      }
    }
  }
Segment(
        LocalCache<K, V> map,
        int initialCapacity,
        long maxSegmentWeight,
        StatsCounter statsCounter) {
      this.map = map;
      this.maxSegmentWeight = maxSegmentWeight;
      this.statsCounter = checkNotNull(statsCounter);
      initTable(newEntryArray(initialCapacity));

      keyReferenceQueue = map.usesKeyReferences() ? new ReferenceQueue<>() : null;

      valueReferenceQueue = map.usesValueReferences() ? new ReferenceQueue<>() : null;

      recencyQueue =
          map.usesAccessQueue() ? new ConcurrentLinkedQueue<>() : LocalCache.discardingQueue();

      writeQueue = map.usesWriteQueue() ? new WriteQueue<>() : LocalCache.discardingQueue();

      accessQueue = map.usesAccessQueue() ? new AccessQueue<>() : LocalCache.discardingQueue();
    }

OK,下面来看下核心的加载数据流程

@Override
    public V get(K key, final Callable<? extends V> valueLoader) throws ExecutionException {
      checkNotNull(valueLoader);
      return localCache.get(
          key,
          new CacheLoader<Object, V>() {
            @Override
            public V load(Object key) throws Exception {
              return valueLoader.call();
            }
          });
    }
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
    int hash = hash(checkNotNull(key));
    return segmentFor(hash).get(key, hash, loader);
  }

通过hash定位segment, 从segment取数

V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      checkNotNull(key);
      checkNotNull(loader);
      try {

        // count 表示segment中的元素个数
        if (count != 0) { // read-volatile
          // don't call getLiveEntry, which would ignore loading values
          // 找到对应的entry
          ReferenceEntry<K, V> e = getEntry(key, hash);
          if (e != null) {
            long now = map.ticker.read();  // 获取当前纳秒时间戳

            // 拿到存活数据
            V value = getLiveValue(e, now);

            if (value != null) {
              recordRead(e, now);  // 记录accessTime
              statsCounter.recordHits(1);

              // 缓存刷新 --- 后面再聚焦看下
              return scheduleRefresh(e, key, hash, value, now, loader);
            }
            
            // 如果值为空,看看是不是因为正在加载,如果正在加载阻塞等待
            ValueReference<K, V> valueReference = e.getValueReference();
            if (valueReference.isLoading()) {
              return waitForLoadingValue(e, key, valueReference);
            }
          }
        }

        // at this point e is either null or expired;
        // segment为空或者key、value不存在或者缓存过期了
        return lockedGetOrLoad(key, hash, loader);
      } catch (ExecutionException ee) {
        Throwable cause = ee.getCause();
        if (cause instanceof Error) {
          throw new ExecutionError((Error) cause);
        } else if (cause instanceof RuntimeException) {
          throw new UncheckedExecutionException(cause);
        }
        throw ee;
      } finally {
        postReadCleanup();
      }
    }

整个流程图看着就会清楚些

多个线程请求,只会有一个线程去真正加载,其他线程阻塞

我们再来看下scheduleRefresh 和 lockedGetOrLoad

 V scheduleRefresh(
        ReferenceEntry<K, V> entry,
        K key,
        int hash,
        V oldValue,
        long now,
        CacheLoader<? super K, V> loader) {
      if (map.refreshes()
          && (now - entry.getWriteTime() > map.refreshNanos)
          && !entry.getValueReference().isLoading()) {
        V newValue = refresh(key, hash, loader, true);
        if (newValue != null) {
          return newValue;
        }
      }
      return oldValue;
    }

如果新值没刷出来,直接返回旧值

@Nullable
    V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
      final LoadingValueReference<K, V> loadingValueReference =
          insertLoadingValueReference(key, hash, checkTime);
      if (loadingValueReference == null) {
        return null;
      }

      ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
      if (result.isDone()) {
        try {
          return Uninterruptibles.getUninterruptibly(result);
        } catch (Throwable t) {
          // don't let refresh exceptions propagate; error was already logged
        }
      }
      return null;
    }

刷新是开启一个新的线程去刷新的

再来看下 lockedGetOrLoad

V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      ReferenceEntry<K, V> e;
      ValueReference<K, V> valueReference = null;
      LoadingValueReference<K, V> loadingValueReference = null;
      boolean createNewEntry = true;

      // 加锁
      lock();
      try {
        // re-read ticker once inside the lock
        long now = map.ticker.read();
        preWriteCleanup(now);

        int newCount = this.count - 1;
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);

        // 定位到entry
        ReferenceEntry<K, V> first = table.get(index);

        for (e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            valueReference = e.getValueReference();
            if (valueReference.isLoading()) {
              // 是否已经在加载了,已经加载的话,就不用在加载啦
              createNewEntry = false;
            } else {
              V value = valueReference.get();

              // value 已经被GC回收啦
              if (value == null) {
                enqueueNotification(
                    entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
              } else if (map.isExpired(e, now)) {
                  // 是否过期
                // This is a duplicate check, as preWriteCleanup already purged expired
                // entries, but let's accommodate an incorrect expiration queue.
                enqueueNotification(
                    entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
              } else {
                 // 如果读到的value存在且没有超时,说明value是有效值
                recordLockedRead(e, now);
                statsCounter.recordHits(1);
                // we were concurrent with loading; don't consider refresh
                return value;
              }

              // 读到无效缓存,则移除
              // immediately reuse invalid entries
              writeQueue.remove(e);
              accessQueue.remove(e);
              this.count = newCount; // write-volatile
            }
            break;
          }
        }
 
        // 没有找到对应的entry, 需要创建一个新entry
        if (createNewEntry) {
          loadingValueReference = new LoadingValueReference<>();

          if (e == null) {
            e = newEntry(key, hash, first);
            e.setValueReference(loadingValueReference);
            table.set(index, e);
          } else {
            e.setValueReference(loadingValueReference);
          }
        }
      } finally {
        // 写操作完成,释放锁
        unlock();
        postWriteCleanup();
      }

      if (createNewEntry) {
        try {
          // Synchronizes on the entry to allow failing fast when a recursive load is
          // detected. This may be circumvented when an entry is copied, but will fail fast most
          // of the time.
          // 如果当前线程触发了缓存加载的操作,就针对e加锁等待加载完成
          synchronized (e) {
            return loadSync(key, hash, loadingValueReference, loader);
          }
        } finally {
          statsCounter.recordMisses(1);
        }
      } else {
        // The entry already exists. Wait for loading.
        // 如果当前线程不是触发缓存加载的操作的线程,就阻塞等待缓存加载完成
        return waitForLoadingValue(e, key, valueReference);
      }
    }
V loadSync(
        K key,
        int hash,
        LoadingValueReference<K, V> loadingValueReference,
        CacheLoader<? super K, V> loader)
        throws ExecutionException {
      ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
      return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
    }
 public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
      try {
        stopwatch.start();
        V previousValue = oldValue.get();
        if (previousValue == null) {
          // 没有旧值 load
          V newValue = loader.load(key);
          return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
        }

        // 有旧值 reload
        ListenableFuture<V> newValue = loader.reload(key, previousValue);
        if (newValue == null) {
          return Futures.immediateFuture(null);
        }
        // To avoid a race, make sure the refreshed value is set into loadingValueReference
        // *before* returning newValue from the cache query.
        return transform(
            newValue,
            newResult -> {
              LoadingValueReference.this.set(newResult);
              return newResult;
            },
            directExecutor());
      } catch (Throwable t) {
        ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
        if (t instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
        return result;
      }
    }

加载过程看的差不多了,再来看下元素是什么时候过期移除的

移除不是自动移除的,其实是伴随着get 的行为来做过期移除逻辑

如果是配置的 refreshAfterWrite 就是触发刷新,刷不到新值就直接返回旧值

如果是配置了 expireAfterWrite、expireAfterAccess呢

在上面的get方法中有一个方法调用 com.google.common.cache.LocalCache.Segment#getLiveValue

/**
     * Gets the value from an entry. Returns null if the entry is invalid, partially-collected,
     * loading, or expired.
     */
    V getLiveValue(ReferenceEntry<K, V> entry, long now) {
      if (entry.getKey() == null) {
        tryDrainReferenceQueues();
        return null;
      }
      V value = entry.getValueReference().get();
      if (value == null) {
        tryDrainReferenceQueues();
        return null;
      }
     
     // 判断是否过期
      if (map.isExpired(entry, now)) {
        // 过期清理
        tryExpireEntries(now);
        return null;
      }
      return value;
    }
/** Returns true if the entry has expired. */
  boolean isExpired(ReferenceEntry<K, V> entry, long now) {
    checkNotNull(entry);
    if (expiresAfterAccess() && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
      // 配置了expiresAfterAccess,上次的accessTime已过期
      return true;
    }
    if (expiresAfterWrite() && (now - entry.getWriteTime() >= expireAfterWriteNanos)) {
      // 配置了expiresAfterWrite,上次的writeTime已过期
      return true;
    }
    return false;
  }
/** Cleanup expired entries when the lock is available. */
    void tryExpireEntries(long now) {
      // 尝试加锁清理entry
      if (tryLock()) {
        try {
          expireEntries(now);
        } finally {
          unlock();
          // don't call postWriteCleanup as we're in a read
        }
      }
    }
@GuardedBy("this")
    void expireEntries(long now) {
      drainRecencyQueue();

      ReferenceEntry<K, V> e;
      while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
        if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
          throw new AssertionError();
        }
      }
      while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
        if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
          throw new AssertionError();
        }
      }
    }

从 writeQueue 和 accessQueue 中取出元素清理

最后小结一下:

通过CacheBuilder设置参数创建缓存,LocalCache封装了对Segment的操作,

Segment里又有一个数组,数组的每个元素都是一个entry,entry也是一个链表,存储元素

entry 的key,value支持不同的引用类型

可以自定义刷新时间和过期时间

刷新如果刷不出来新值,直接返回旧值

然后entry的清理是伴随着get操作进行的

通过加锁的方式和设置entry loading 状态来避免重复加载

可以注册 RemoveLienster, 数据移除可以接受通知

支持缓存命中情况的统计

转载请注明:汪明鑫的个人博客 » Guava Cache 原理分析

喜欢 (4)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz