Netty内存池之PoolThreadCache详解

PoolThreadCahche是Netty内存管理中能够实现高效内存申请和释放的一个重要原因,Netty会为每一个线程都维护一个PoolThreadCache对象,当进行内存申请时,首先会尝试从PoolThreadCache中申请,如果无法从中申请到,则会尝试从Netty的公共内存池中申请。本文首先会对PoolThreadCache的数据结构进行讲解,然后会介绍Netty是如何初始化PoolThreadCache的,最后会介绍如何在PoolThreadCache中申请内存和如何将内存释放到PoolThreadCache中。

1. PoolThreadCache数据结构

PoolThreadCache的数据结构与PoolArena的主要属性结构非常相似,但细微位置有很大的不同。在PoolThreadCache中,其维护了三个数组(我们以直接内存的缓存方式为例进行讲解),如下所示:

// 存储tiny类型的内存缓存,该数组长度为32,其中只有下标为1~31的元素缓存了有效数据,第0号位空置。
// 这里内存大小的存储方式也与PoolSubpage类似,数组的每一号元素都存储了不同等级的内存块,每个等级的
// 内存块的内存大小差值为16byte,比如第1号位维护了大小为16byte的内存块,第二号为维护了大小为32byte的
// 内存块,依次类推,第31号位维护了大小为496byte的内存块。
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
// 存储small类型的内存缓存,该数组长度为4,数组中每个元素中维护的内存块大小也是成等级递增的,并且这里
// 的递增方式是按照2的指数次幂进行的,比如第0号为维护的是大小为512byte的内存块,第1号位维护的是大小为
// 1024byte的内存块,第2号位维护的是大小为2048byte的内存块,第3号位维护的是大小为4096byte的内存块
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
// 存储normal类型的内存缓存。需要注意的是,这里虽说是维护的normal类型的缓存,但是其只维护2<<13,2<<14
// 和2<<15三个大小的内存块,而该数组的大小也正好为3,因而这三个大小的内存块将被依次放置在该数组中。
// 如果申请的目标内存大于2<<15,那么Netty会将申请动作交由PoolArena进行。
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

这三个数组分别保存了tiny,small和normal类型的缓存数据,不同于PoolArena的使用PoolSubpage和PoolChunk进行内存的维护,这里都是使用MemoryRegionCache进行的。另外,在MemoryRegionCache中保存了一个有界队列,对于tiny类型的缓存,该队列的长度为512,对于small类型的缓存,该队列的长度为256,对于normal类型的缓存,该队列的长度为64。在进行内存释放的时候,如果队列已经满了,那么就会将该内存块释放回PoolArena中。这里需要说明的是,这里的队列中的元素统一使用的是Entry这种数据结构,该结构的主要属性如下:

static final class Entry<T> {
// 用于循环利用当前Entry对象的处理器,该处理器的实现原理,我们后续将进行讲解
final Handle<Entry<?>> recyclerHandle;
// 记录了当前内存块是从哪一个PoolChunk中申请得来的
PoolChunk<T> chunk;
// 如果是直接内存,该属性记录了当前内存块所在的ByteBuffer对象
ByteBuffer nioBuffer;
// 由于当前申请的内存块在PoolChunk以及PoolSubpage中的位置是可以通过一个长整型参数来表示的,
// 这个长整型参数就是这里的handle,因而这里直接将其记录下来,以便后续需要将当前内存块释放到
// PoolArena中时,能够快速获取其所在的位置
long handle = -1;
}

PoolThreadCache中维护每一个内存块最终都是使用的一个Entry对象来进行的,从上面的属性可以看出,记录该内存块最重要的属性是chunk和handle,chunk记录了当前内存块所在的PoolChunk对象,而handle则记录了当前内存块是在PoolChunk和PoolSubpage中的哪个位置(关于PoolChunk,PoolSubpage和PoolArena的实现原理,建议读者阅读一下前面的文章,这样有助于读者快速理解相关原理)。如此,对于Netty使用的PoolThreadCache的存储结构我们就有了一个比较清晰的认识。

下面我们通过一幅图来对PoolThreadCache的数据结构进行一个整体的演示:

如上图所示展示的就是PoolThreadCache的结构示意图。从图中可以看出在一个PoolThreadCache中,主要有三个MemoryRegionCache数组用于存储tiny,small和normal类型的内存块。每个MemoryRegionCache中有一个队列,队列中的元素类型为Entry。Entry的作用就是存储缓存的内存块的,其存储的方式主要是通过记录当前内存块所在的PoolChunk和标志其在PoolChunk中位置的handle参数。对于不同类型的数组,队列的长度是不一样的,tiny类型的是512,small类型的是256,normal类型的则是64。

2. PoolThreadCache初始化

对于PoolThreadCache的初始化,这里单独拿出来讲解的原因是,其初始化过程是与PoolThreadLocalCache所绑定的。PoolThreadLocalCache的作用与Java中的ThreadLocal的作用非常类似,其有一个initialValue()方法,用于在无法从PoolThreadLocalCache中获取数据时,通过调用该方法初始化一个。另外其提供了一个get()方法和和remove()方法,分别用于从PoolThreadLocalCache中将当前绑定的数据给清除。这里我们首先看看获取PoolThreadCache的入口代码:

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// 从PoolThreadLocalCache中尝试获取一个PoolThreadCache对象,
// 如果不存在,则自行初始化一个返回
PoolThreadCache cache = threadCache.get();
// 由于当前方法是需要返回一个direct buffer,因而这里直接使用cache中的directArena
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
// 如果directArena不为空,则直接调用其allocate()方法申请内存
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
// 如果当前缓存中由于某种原因无法获取到directArena,则直接创建一个存有直接内存的ByteBuf,
// 一般情况下不会走到这一步
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
// 为ByteBuf设置内存泄露检测功能
return toLeakAwareBuffer(buf);
}

从上面的代码中可以看出,在最开始的时候,就会通过PoolThreadLocalCache尝试获取一个PoolThreadCache对象,如果不存在,其会自行初始化一个。这里我们直接看其是如何初始化的,如下是PoolThreadLocalCache.initialValue()方法的源码:

@Override
protected synchronized PoolThreadCache initialValue() {
// 这里leastUsedArena()就是获取对应的PoolArena数组中最少被使用的那个Arena,将其返回。
// 这里的判断方式是通过比较PoolArena.numThreadCaches属性来进行的,该属性记录了当前PoolArena被
// 多少个线程所占用了。这里采用的思想就是,找到最少被使用的那个PoolArena,将其存入新的线程缓存中
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
// 只有在指定了为每个线程使用缓存,或者当前线程是FastThreadLocalThread的子类型时,才会使用线程缓存
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// 如果指定了不使用缓存,或者线程换粗对象不是FastThreadLocalThread类型的,则创建一个PoolThreadCache
// 对象,该对象中是不做任何缓存的,因为初始化数据都是0
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
// 在PoolArena数组中找到被最少线程占用的对象,将其返回。这样做的目的是,由于内存池是多个线程都可以
// 访问的公共区域,因而当这里就需要对内存池进行划分,以减少线程之间的竞争。
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}

从上述代码可以看出,对于PoolThreadCache的初始化,其首先会查找PoolArena数组中被最少线程占用的那个arena,然后将其封装到一个新建的PoolThreadCache中。

3. 内存申请

需要注意的是,PoolThreadCache申请内存并不是说其会创建一块内存,或者说其会到PoolArena中申请内存,而是指,其本身已经缓存有内存块,而当前申请的内存块大小正好与其一致,就会将该内存块返回;PoolThreadCache中的内存块都是在当前线程使用完创建的ByteBuf对象后,通过调用其release()方法释放内存时直接缓存到当前PoolThreadCache中的,其并不会直接将内存块返回给PoolArena。这里我们直接看一下其allocate()方法是如何实现的:

// 申请tiny类型的内存块
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
// 申请small类型的内存块
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf,
int reqCapacity, int normCapacity) {
return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}
// 申请normal类型的内存块
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf,
int reqCapacity, int normCapacity) {
return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}
// 从MemoryRegionCache中申请内存
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
return false;
}
// 从MemoryRegionCache中申请内存,本质上就是从其队列中申请,如果存在,则初始化申请到的内存块
boolean allocated = cache.allocate(buf, reqCapacity);
// 这里是如果当前PoolThreadCache中申请内存的次数达到了8192次,则对内存块进行一次trim()操作,
// 对使用较少的内存块,将其返还给PoolArena,以供给其他线程使用
if (++allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}

这里对于内存块的申请,我们可以看到,PoolThreadCache是将其分为tiny,small和normal三种不同的方法来调用的,而具体大小的区分其实是在PoolArena中进行区分的(读者可以阅读本人前面的关于PoolArena介绍的文章)。在对应的内存数组中找到MemoryRegionCache对象之后,通过调用allocate()方法来申请内存,申请完之后还会检查当前缓存申请次数是否达到了8192次,达到了则对缓存中使用的内存块进行检测,将较少使用的内存块返还给PoolArena。这里我们首先看一下获取MemoryRegionCache的代码是如何实现的,也即cacheForTiny(),cacheForSmall()和cacheForNormal()的代码:

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
// 计算当前数组下标索引,由于tiny类型的内存块每一层级相差16byte,因而这里的计算方式就是
// 将目标内存大小除以16
int idx = PoolArena.tinyIdx(normCapacity);
// 返回tiny类型的数组中对应位置的MemoryRegionCache
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
// 计算当前数组下标的索引,由于small类型的内存块大小都是2的指数次幂,因而这里就是将目标内存大小
// 除以1024之后计算其偏移量
int idx = PoolArena.smallIdx(normCapacity);
// 返回small类型的数组中对应位置的MemoryRegionCache
if (area.isDirect()) {
return cache(smallSubPageDirectCaches, idx);
}
return cache(smallSubPageHeapCaches, idx);
}
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
// 对于normal类型的缓存,这里也是首先将其向右位移13位,也就是8192,然后取2的对数,这样就
// 可以得到其在数组中的位置,然后返回normal类型的数组中对应位置的MemoryRegionCache
if (area.isDirect()) {
int idx = log2(normCapacity >> numShiftsNormalDirect);
return cache(normalDirectCaches, idx);
}
int idx = log2(normCapacity >> numShiftsNormalHeap);
return cache(normalHeapCaches, idx);
}

这里对于数组位置的计算,主要是根据各个数组数据存储方式的不同而进行的,而它们最终都是通过一个MemoryRegionCache存储的,因而只需要返回该缓存对象即可。下面我们继续看一下MemoryRegionCache.allocate()方法是如何申请内存的:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
// 尝试从队列中获取,如果队列中不存在,说明没有对应的内存块,则返回false,表示申请失败
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}

// 走到这里说明队列中存在对应的内存块,那么通过其存储的Entry对象来初始化ByteBuf对象,
// 如此即表示申请内存成功
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
// 对entry对象进行循环利用
entry.recycle();
// 更新当前已经申请的内存数量
++allocations;
return true;
}

可以看到,MemoryRegionCache申请内存的方式主要是从队列中取,如果取到了,则使用该内存块初始化一个ByteBuf对象。

前面我们讲到,PoolThreadCache会对其内存块使用次数进行计数,这么做的目的在于,如果一个ThreadPoolCache所缓存的内存块使用较少,那么就可以将其释放到PoolArena中,以便于其他线程可以申请使用。PoolThreadCache会在其内存总的申请次数达到8192时遍历其所有的MemoryRegionCache,然后调用其trim()方法进行内存释放,如下是该方法的源码:

public final void trim() {
// size表示当前MemoryRegionCache中队列的最大可存储容量,allocations表示当前MemoryRegionCache
// 的内存申请次数,size-allocations的含义就是判断当前申请的次数是否连队列的容量都没达到
int free = size - allocations;
allocations = 0;
// 如果申请的次数连队列的容量都没达到,则释放该内存块
if (free > 0) {
free(free);
}
}
private int free(int max) {
int numFreed = 0;
// 依次从队列中取出Entry数据,调用freeEntry()方法释放该Entry
for (; numFreed < max; numFreed++) {
Entry<T> entry = queue.poll();
if (entry != null) {
freeEntry(entry);
} else {
return numFreed;
}
}
return numFreed;
}
private void freeEntry(Entry entry) {
// 通过当前Entry中保存的PoolChunk和handle等数据释放当前内存块
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
ByteBuffer nioBuffer = entry.nioBuffer;
entry.recycle();
chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer);
}

4. 内存释放

​ 对于内存的释放,其原理比较简单,一般的释放内存的入口在ByteBuf对象中。当调用ByteBuf.release()方法的时候,其首先会将释放动作委托给PoolChunk的free()方法,PoolChunk则会判断当前是否是池化的ByteBuf,如果是池化的ByteBuf,则调用PoolThreadCache.add()方法将其添加到PoolThreadCache中,也就是说在释放内存时,其实际上是释放到当前线程的PoolThreadCache中的。如下是add()方法的源码:

boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
long handle, int normCapacity, SizeClass sizeClass) {
// 通过当前释放的内存块的大小计算其应该放到哪个等级的MemoryRegionCache中
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (cache == null) {
return false;
}

// 将内存块释放到目标MemoryRegionCache中
return cache.add(chunk, nioBuffer, handle);
}
public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
// 这里会尝试从缓存中获取一个Entry对象,如果没获取到则创建一个
Entry<T> entry = newEntry(chunk, nioBuffer, handle);
// 将实例化的Entry对象放到队列里
boolean queued = queue.offer(entry);
if (!queued) {
entry.recycle();
}
return queued;
}

5. 小结

本文首先详细讲解了PoolThreadCache的数据结构,并且说明了其中需要注意的点,然后介绍了PoolThreadCache的实例化方式,接着从申请和释放内存两个角度介绍了PoolThreadCache源码的实现方式。

获取资料:

本次给大家推荐一个Java架构学习群,里面包括:(高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)以及Java进阶学习路线图

对Java架构感兴趣的程序猿,欢迎加入Q群:468897908,不管你是刚入行得还是大牛我都欢迎,还有大牛整理的一套高效率学习路线和教程与您免费分享,同时每天更新视频资料。

最后,祝大家早日学有所成。

发表评论
留言与评论(共有 0 条评论)
   
验证码:

相关文章

推荐文章

'); })();