当前位置 : 首页 » 文章分类 :  开发  »  Netty

Netty

Netty 是一个异步事件驱动网络应用框架,可用来快速开发高性能且可维护的客户端服务器应用。

Netty
https://netty.io/

netty / netty
https://github.com/netty/netty

netty 4.0 的一项重大改进就是将全部入栈、出栈流量数据都放入堆外内存中了。


Netty 堆外内存

Netty堆外内存泄漏排查,这一篇全讲清楚了
https://segmentfault.com/a/1190000021469481

先要熟悉带 Cleaner 的 DirectByteBuffer 的堆外内存回收原理。

noCleaner 的 DirectByteBuffer

Netty 在 4.1 引入 noCleaner 策略:创建不带 Cleaner 的 DirectByteBuffer 对象,这样做的好处是绕开带 Cleaner 的 DirectByteBuffer 执行构造方法和执行 Cleaner 的 clean() 方法中一些额外开销,当堆外内存不够的时候,不会触发 System.gc(),提高性能。

以非池化直接内存申请为例 UnpooledByteBufAllocator.newDirectBuffer() 申请堆外内存时,先判断 noCleaner 是否可用

  • noCleaner 为 true:创建 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 对象,即 noCleaner 的 DirectByteBuffer
  • noCleaner 为 false:创建 InstrumentedUnpooledUnsafeDirectByteBuf 对象,即 hasCleaner 的 DirectByteBuffer
  1. public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
  2. public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector) {
  3. this(preferDirect, disableLeakDetector, PlatformDependent.useDirectBufferNoCleaner());
  4. }
  5. public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector, boolean tryNoCleaner) {
  6. super(preferDirect);
  7. this.disableLeakDetector = disableLeakDetector;
  8. noCleaner = tryNoCleaner && PlatformDependent.hasUnsafe() && PlatformDependent.hasDirectBufferNoCleanerConstructor();
  9. }
  10. @Override
  11. protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  12. final ByteBuf buf;
  13. if (PlatformDependent.hasUnsafe()) {
  14. buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
  15. new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
  16. } else {
  17. buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  18. }
  19. return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
  20. }
  21. }

noCleaner 的结果来自静态方法 PlatformDependent.useDirectBufferNoCleaner()

  1. public final class PlatformDependent {
  2. private static final boolean USE_DIRECT_BUFFER_NO_CLEANER;
  3. static {
  4. long maxDirectMemory = SystemPropertyUtil.getLong("io.netty.maxDirectMemory", -1);
  5. if (maxDirectMemory == 0 || !hasUnsafe() || !PlatformDependent0.hasDirectBufferNoCleanerConstructor()) {
  6. USE_DIRECT_BUFFER_NO_CLEANER = false;
  7. DIRECT_MEMORY_COUNTER = null;
  8. } else {
  9. USE_DIRECT_BUFFER_NO_CLEANER = true;
  10. if (maxDirectMemory < 0) {
  11. maxDirectMemory = MAX_DIRECT_MEMORY;
  12. if (maxDirectMemory <= 0) {
  13. DIRECT_MEMORY_COUNTER = null;
  14. } else {
  15. DIRECT_MEMORY_COUNTER = new AtomicLong();
  16. }
  17. } else {
  18. DIRECT_MEMORY_COUNTER = new AtomicLong();
  19. }
  20. }
  21. }
  22. public static boolean useDirectBufferNoCleaner() {
  23. return USE_DIRECT_BUFFER_NO_CLEANER;
  24. }
  25. }

PlatformDependent 类在初始化时会根据系统属性、-XX:MaxDirectMemorySize 和 -Dio.netty.maxDirectMemory 等判断是否使用 noCleaner 策略。
有 arthas 的话,可以直接执行 ognl '@io.netty.util.internal.PlatformDependent@useDirectBufferNoCleaner()' 确认 Netty 使用的是否是 noCleaner 的堆外内存

noCleaner 的 DirectByteBuffer 和 hasCleaner 的 DirectByteBuffer 的区别如下:
1、构造方式不同:
noCleaner 的 DirectByteBuffer:由反射调用 private DirectByteBuffer(long addr, int cap) 创建,cleaner 是 null
hasCleaner 的 DirectByteBuffer:由 new DirectByteBuffer(int cap) 创建

2、释放内存的方式不同
noCleaner 的 DirectByteBuffer:使用 UnSafe.freeMemory(address);
hasCleaner 的 DirectByteBuffer:使用 DirectByteBuffer 的 Cleaner 的 clean() 方法

Netty 在启动时需要判断检查当前环境、环境配置参数是否允许 noCleaner 策略(具体逻辑位于 PlatformDependent 的 static 代码块),例如运行在 Android 下时,是没有 Unsafe 类的,不允许使用noCleaner 策略,如果不允许,则使用 hasCleaner 策略


为什么要使用 noCleaner 的堆外内存

一是因为带 cleaner 的 DirectByteBuffer 堆外内存自动回收不实时:需要 ByteBuffer 对象被 GC 线程回收才会触发,如果 ByteBuffer 对象进入老年代后才变得可回收,则需要等到发送频率较低老年代 GC 才会触发

另一方面,Netty 需要基于 ByteBuf.release() 方法执行其他操作,例如池化内存释放回内存池,否则该对象会被内存池一直标记为已使用。


-XX:MaxDirectMemorySize 和 -Dio.netty.maxDirectMemory

-XX:MaxDirectMemorySize
用于限制 Netty 中 hasCleaner 策略的 DirectByteBuffer 堆外内存的大小,默认值是 JVM 能从操作系统申请的最大内存,如果内存本身没限制,则值为 Long.MAX_VALUE 个字节(默认值由 Runtime.getRuntime().maxMemory() 返回),代码位于 java.nio.Bits#reserveMemory() 方法中
-XX:MaxDirectMemorySize 无法限制 Netty 中 noCleaner 策略的 DirectByteBuffer 堆外内存的大小

-Dio.netty.maxDirectMemory
用于限制 noCleaner 策略下 Netty 的 DirectByteBuffer 分配的最大堆外内存的大小,如果该值为0,则使用 hasCleaner 策略,代码位于 PlatformDependent#incrementMemoryCounter() 方法中


查看 Netty 使用了多少堆外内存

有 arthas 的话,先通过 ognl '@io.netty.util.internal.PlatformDependent@useDirectBufferNoCleaner()' 确认使用的是否是 noCleaner 的堆外内存。

1、hasCleaner 的 DirectByteBuffer 使用量监控
java.nio.Bits 类是有记录 hasCleaner 的 DirectByteBuffer 堆外内存的使用情况,但是该类是包级别的访问权限,不能直接获取,可以通过 MXBean 来获取

  1. List<BufferPoolMXBean> bufferPoolMXBeans = ManagementFactoryHelper.getBufferPoolMXBeans();
  2. BufferPoolMXBean directBufferMXBean = bufferPoolMXBeans.get(0);
  3. // hasCleaner的DirectBuffer的数量
  4. long count = directBufferMXBean.getCount();
  5. // hasCleaner的DirectBuffer的堆外内存占用大小,单位字节
  6. long memoryUsed = directBufferMXBean.getMemoryUsed();

此外,MappedByteBuffer 是基于 FileChannelImpl.map 进行进行 mmap 内存映射(零拷贝的一种实现)得到的另外一种堆外内存的 ByteBuffer,可以通过 ManagementFactoryHelper.getBufferPoolMXBeans().get(1) 获取到该堆外内存的监控指标

2、noCleaner 的 DirectByteBuffer 使用量监控
看静态方法 PlatformDependent.usedDirectMemory() 的返回即可
arthas 执行

  1. ognl '@io.netty.util.internal.PlatformDependent@maxDirectMemory()'
  2. ognl '@io.netty.util.internal.PlatformDependent@usedDirectMemory()'

Netty 内存泄漏检测

Netty 自带了内存泄漏检测工具,可用于检测出 ByteBuf 对象被 GC 回收但 ByteBuf 管理的内存没有释放的情况,但不适用 ByteBuf 对象还没被 GC 回收内存泄漏的情况,例如任务队列积压。

Netty 提供 4 个内存泄漏检测级别:

  • disabled 完全关闭内存泄露检测
  • simple 以约1%的抽样率检测是否泄露,默认级别
  • advanced 抽样率同simple,但显示详细的泄露报告
  • paranoid 抽样率为100%,显示报告信息同 advanced

在命令行参数配置,比如 -Dio.netty.leakDetectionLevel=paranoid

Netty 内存泄漏检测原理(WeakReference 的引用队列)

Netty 内存泄漏检测的原理是利用弱引用(WeakReference)的引用队列(refQueue),通过将 ByteBuf 对象用弱引用包装起来,当发生GC时,如果 GC 线程检测到 ByteBuf 对象只被弱引用对象关联,会将该 WeakReference 加入 refQueue;
当 ByteBuf 内存被正常释放,会调用 WeakReference 的 clear() 方法解除对 ByteBuf 的引用,后续 GC 线程不会再将该 WeakReference 加入 refQueue;
Netty 在每次创建 ByteBuf 时,基于抽样率,抽样命中时会轮询(poll) refQueue 中的 WeakReference 对象,轮询返回的非null的 WeakReference 关联的 ByteBuf 即为泄漏的堆外内存

如果禁用了内存泄漏检测,直接返回 ByteBuf,否则通过 toLeakAwareBuffer(buf) 返回一个经过包装的 ByteBuf:

toLeakAwareBuffer() 中根据不同的内存泄漏检测级别返回不同的包装结构

  1. public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
  2. protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
  3. ResourceLeakTracker<ByteBuf> leak;
  4. switch (ResourceLeakDetector.getLevel()) {
  5. case SIMPLE:
  6. leak = AbstractByteBuf.leakDetector.track(buf);
  7. if (leak != null) {
  8. buf = new SimpleLeakAwareByteBuf(buf, leak);
  9. }
  10. break;
  11. case ADVANCED:
  12. case PARANOID:
  13. leak = AbstractByteBuf.leakDetector.track(buf);
  14. if (leak != null) {
  15. buf = new AdvancedLeakAwareByteBuf(buf, leak);
  16. }
  17. break;
  18. default:
  19. break;
  20. }
  21. return buf;
  22. }
  23. }

track() 中根据采样级别 随机/全部 采样将 ByteBuf 包装为 DefaultResourceLeak

  1. public class ResourceLeakDetector<T> {
  2. private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
  3. public final ResourceLeakTracker<T> track(T obj) {
  4. return track0(obj);
  5. }
  6. @SuppressWarnings("unchecked")
  7. private DefaultResourceLeak track0(T obj) {
  8. Level level = ResourceLeakDetector.level;
  9. if (level == Level.DISABLED) {
  10. return null;
  11. }
  12. // PARANOID 以下级别,随机抽样
  13. if (level.ordinal() < Level.PARANOID.ordinal()) {
  14. if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
  15. reportLeak();
  16. return new DefaultResourceLeak(obj, refQueue, allLeaks);
  17. }
  18. return null;
  19. }
  20. // PARANOID 级别,全部采样
  21. reportLeak();
  22. return new DefaultResourceLeak(obj, refQueue, allLeaks);
  23. }
  24. }

DefaultResourceLeak 是弱引用 WeakReference 的子类,内部调用了 WeakReference 的带引用队列的构造方法,传入 refQueue

  1. private static final class DefaultResourceLeak<T> extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {
  2. DefaultResourceLeak(
  3. Object referent,
  4. ReferenceQueue<Object> refQueue,
  5. Set<DefaultResourceLeak<?>> allLeaks) {
  6. super(referent, refQueue);
  7. assert referent != null;
  8. // Store the hash of the tracked object to later assert it in the close(...) method.
  9. // It's important that we not store a reference to the referent as this would disallow it from
  10. // be collected via the WeakReference.
  11. trackedHash = System.identityHashCode(referent);
  12. allLeaks.add(this);
  13. // Create a new Record so we always have the creation stacktrace included.
  14. headUpdater.set(this, new TraceRecord(TraceRecord.BOTTOM));
  15. this.allLeaks = allLeaks;
  16. }
  17. }

Netty 中的三种帧编码方式

方式 解码 特点
固定长度 FixedLengthFrameDecoder 简单,浪费空间
分隔符 DelimiterBasedFrameDecoder 简单,需要转义分隔符
length字段 LengthFieldBasedFrameDecoder 推荐

Netty解决粘包半包问题(含netty实例)
https://segmentfault.com/a/1190000023664467

FixedLengthFrameDecoder

DelimiterBasedFrameDecoder

LengthFieldBasedFrameDecoder


RxNetty

RxNetty 是将 Reactive 异步机制(类似Node.js) 带入了 Netty.

RxNetty是Netty的Reactive扩展
https://www.jdon.com/46629


Netty Pipeline 流水线

一条 Netty 通道需要很多业务处理器(Handler)来处理业务。每条通道内部都有一条流水线(Pipeline)将 Handler 装配起来。Netty 的业务处理器流水线 ChannelPipeline 是基于责任链设计模式来设计的,内部是一个双向链表结构,能够支持动态地添加和删除业务处理器。

在 Netty 的设计中 Handler 是无状态的,不保存和 Channel 有关的信息。Handler 的目标是将自己的处理逻辑做得很通用,可以给不同的 Channel 使用。与 Handler 不同的是,Pipeline 是有状态的,保存了 Channel 的关系。于是,Handler 和 Pipeline 之间需要一个中间角色将它们联系起来: ChannelHandlerContext(通道处理器上下文)

当业务处理器被添加到流水线中时会为其专门创建一个 ChannelHandlerContext 实例,主要封装了 ChannelHandler(通道处理器)和 ChannelPipeline(通道流水线)之间的关联关系。所以,流水线 ChannelPipeline 中的双向链接实质是一个由 ChannelHandlerContext 组成的双向链表。

Channel 拥有一条 ChannelPipeline,每一个流水线节点为一个 ChannelHandlerContext 上下文对象,每一个上下文中包裹了一个 ChannelHandler。在 ChannelHandler 的入站/出站处理方法中,Netty 会传递一个 Context 实例作为实际参数。处理器中的回调代码可以通过 Context 实参,在业务处理过程中去获取 ChannelPipeline 实例或者 Channel 实例。


上一篇 MySQL-Replication 复制

下一篇 MySQL-DataTypes 数据类型

阅读
评论
2.4k
阅读预计10分钟
创建日期 2021-07-30
修改日期 2023-01-31
类别
标签

页面信息

location:
protocol: http:
host: devgou.com
hostname: devgou.com
origin: http://devgou.com
pathname: /article/Netty/
href: http://devgou.com/article/Netty/
document:
referrer:
navigator:
platform: Linux x86_64
userAgent: Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; ClaudeBot/1.0; +claudebot@anthropic.com)

评论