24、Netty入门 - Netty心跳机制源码剖析


Netty作为一个网络框架,提供了诸多功能,比如编码解码等,Netty 还提供了非常重要的一个服务 ---- 心跳机制 heartbeat。通过心跳检查对方是否有效,这在 RPC 框架中是必不可少的功能。下面我们分析一下 Netty 内部 心跳服务源码的实现。


2.1 源码案例

public class MyServer {
    public static void main(String[] args) throws InterruptedException {
        // 创建两个线程组
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .handler(new LoggingHandler(LogLevel.INFO)) // 在bossgroup增加一个日志处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 加入 netty 提供的 IdleStateHandler
                             * 说明
                             * 1. IdleStateHandler 是 netty 提供的处理空闲状态的处理器
                             * 2. readerIdleTime:表示多长时间没有读,就会发送一个心跳检测包,检测是否还是连接状态
                             * 3. writerIdleTime:表示多长时间没有写,就会发送一个心跳检测包,检测是否还是连接状态
                             * 4. allIdleTime:表示多长时间没有读写,就会发送一个心跳检测包,检测是否还是连接状态
                             * 5. 当 IdleStateEvent 触发后,就会传递给管道的下一个handler进行处理,通过调用(触发)下一个 handler 的userEventTrigged 方法,
                             * 在该方法中去处理 IdleStateEvent 事件
                            pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
                            // 加入一个对空闲检测进一步处理的handler(自定义)
                            pipeline.addLast(new MyServerHandler());

            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();

        }finally {

 * 心跳处理器
public class MyServerHandler extends ChannelInboundHandlerAdapter {
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception 异常
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            // 将 evt 向下转型
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            switch (event.state()){
                case READER_IDLE:
                    eventType = "读空闲";
                case WRITER_IDLE:
                    eventType = "写空闲";
                case ALL_IDLE:
                    eventType = "读写空闲";

            System.out.println(ctx.channel().remoteAddress() + "--超时事件发生-->" + eventType);

如上源码中,我们对 Netty 服务器的(读/写)活动情况进行监听,利用 IdleStateHandler 来完成,其中三个参数分别读、写、读/写 空闲时间。


Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 三个 Handler 检测连接的有效性。

ReadTimeout 事件和 WriteTimeout 事件都会自动关闭连接,而且,属于异常处理,所以,重点分析IdleStateHandler。






当连接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。然后你可以通过你的 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件。



如果在指定的时间没有发生读事件,就会抛出这个异常,并自动关闭这个连接。你可以在 exceptionCaught 方法中处理这个异常。



当一个写操作不能在一定的时间内完成时,抛出此异常,并关闭连接。你同样可以在 exceptionCaught 方法中处理这个异常。

ReadTimeout 事件和 WriteTimeout 事件都会自动关闭连接,而且,属于异常处理,所以,这里只重点介绍IdleStateHandler。


首先,当调用 addLast 添加 IdleStateHandler 到 pipeline 中时,会触发 handlerAdd 方法

// 添加操作
pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));

// 该方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
        // channelActive() event has been fired already, which means this.channelActive() will
        // not be invoked. We have to initialize here instead.
    } else {
        // channelActive() event has not been fired yet.  this.channelActive() will be invoked
        // and initialization will occur there.

关注initialize 方法,如下代码中,对 IdleStateHandler 的属性进行初始化过程,涉及到以下几个。

private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
    case 1:
    case 2:

    state = 1;

    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
  • private final boolean observeOutput; : 是否考虑出站时候较慢的情况,默认是false

  • private final long readerIdleTimeNanos; : 读事件空闲时间,0 则禁用事件

  • private final long writerIdleTimeNanos; : 写事件空闲时间,0 则禁用事件

  • private final long allIdleTimeNanos; : 读/写事件空闲时间,0 则禁用事件

源码中,当参数 > 0 时,就创建一个定时任务,每个事件都创建,同时将 state 设置为1,避免重复初始化,调用 initOutputChanged 方法初始化监控出站数据属性。

创建定时任务源码如下,利用 channel 中的 EventLoop 来添加一个 Scheduler,看源码并 debug,观察 EventLoop 中 Scheduler 线程池中是否有新增任务,如下:

ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
    return ctx.executor().schedule(task, delay, unit);

在ScheduledTaskQueue 中的确有3个Task,因为 addLast 的方法中有三个参数都被初始化了,那么我们来看下对应的三个 Task 实现方法。

三个Task 都是 ChannelDuplexHandler 中的内部类,并且有一个共同父类, AbstractIdleTask

  private abstract static class AbstractIdleTask implements Runnable {

        private final ChannelHandlerContext ctx;

        AbstractIdleTask(ChannelHandlerContext ctx) {
            this.ctx = ctx;

        public void run() {
            if (!ctx.channel().isOpen()) {


        protected abstract void run(ChannelHandlerContext ctx);
  • AbstractIdleTask内部类设计用来单例模式,模板模式UML如下


  • 模板方法的通用流程:当通道关闭,就不执行任务,反之则执行抽象方法 run 方法,

3.1 读事件 task 分析

  • 先看 ReaderIdelTimeoutTask 方法中的 run 方法的实现
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

    ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {

    protected void run(ChannelHandlerContext ctx) {
        long nextDelay = readerIdleTimeNanos;
        if (!reading) {
            nextDelay -= ticksInNanos() - lastReadTime;

        if (nextDelay <= 0) {
            // Reader is idle - set a new timeout and notify the callback.
            // 用于取消任务 promise
            readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstReaderIdleEvent;
            firstReaderIdleEvent = false;

            try {
                // 再次提交任务
                IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                // 触发用户 handler use
                channelIdle(ctx, event);
            } catch (Throwable t) {
        } else {
            // Read occurred before the timeout - set a new timeout with shorter delay.
            readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);

2、如果读取操作结束了(执行了 channelReadComplete 方法设置),就用当前时间减去给定时间和最后一次读(执行了操作的时间 channelReadComplete 方法设置),如果小于 0 ,就触发事件。反之,继续放入队列。间隔时间是新的计算时间。
3、触发的逻辑是:首先将任务再次放到队列,时间是刚开始设置的时间,返回一个 promise 对象,用于做取消操作。然后,设置 first 属性为 false,表示下一次读取不是第一次了。这个属性在 channelRead 方法会被改成 true。
4、创建一个 IdleStateEvent 类型的写事件对象,将此对象传递给用户的 UserEventTriggered 方法。完成触发事件的操作。
5、总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,如果间隔超过了设置的时间,就出发 UserEventTriggered 方法。

3.2 写事件 task 分析

  • 写事件的run方法WriterIdleTimeoutTask, 和读事件的差不多,如下有一点区别
private final class WriterIdleTimeoutTask extends AbstractIdleTask {

    WriterIdleTimeoutTask(ChannelHandlerContext ctx) {

    protected void run(ChannelHandlerContext ctx) {

        long lastWriteTime = IdleStateHandler.this.lastWriteTime;
        long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
        if (nextDelay <= 0) {
            // Writer is idle - set a new timeout and notify the callback.
            writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstWriterIdleEvent;
            firstWriterIdleEvent = false;

            try {
                if (hasOutputChanged(ctx, first)) {

                IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                channelIdle(ctx, event);
            } catch (Throwable t) {
        } else {
            // Write occurred before the timeout - set a new timeout with shorter delay.
            writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);

  • 不通地方在于,run代码中多了一个判断hasOutputChanged 用来判断出站事件处理是否很慢,例如,我一个出站需要写的数据量大,需要处理20秒,但是我们执行写事件监测的时间是 5秒,此时我们只有第一次会触发,因为 hasOutputChanged 中有判断first 的时候还是会执行异常抛出

3.3 所有事件(读/写)事件 task 分析

private final class AllIdleTimeoutTask extends AbstractIdleTask {

    AllIdleTimeoutTask(ChannelHandlerContext ctx) {

    protected void run(ChannelHandlerContext ctx) {

        long nextDelay = allIdleTimeNanos;
        if (!reading) {
            nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
        if (nextDelay <= 0) {
            // Both reader and writer are idle - set a new timeout and
            // notify the callback.
            allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstAllIdleEvent;
            firstAllIdleEvent = false;

            try {
                if (hasOutputChanged(ctx, first)) {

                IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                channelIdle(ctx, event);
            } catch (Throwable t) {
        } else {
            // Either read or write occurred before the timeout - set a new
            // timeout with shorter delay.
            allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
  • 与写事件基本一致,不同地方在于事件判断

  • nextDelay -= tickInNanos() - Math.max(lastReadTime, lastWriteTime);当前时间-最后一次写/读的时间,如果>0说明超过了

  • 此处的时间判断是取读,写事件中的最大值,然后像写事件一样,判断是否发生了写很慢的情况


1、IdleStateHandler 实现心跳检测功能,当服务器和客户端没有任务读写,并且超过设置事件,会触发 handler 的 userEventTriggered 方法,用户可以在这个方法中实现自己的逻辑。

2、IdleStateHandler 的实现基于 EventLoop 的定时任务,每次读写都会记录一个值,在定时任务运行的时候,通过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。

3、内部有 3 个定时任务,分别对应读事件、写事件、读写事件。通常用户监听读写事件就足够了。

4、同时,IdleStateHandler 内部也考虑了一些极端情况:客户端接收缓慢,一次接收数据的速度超过了设置的空闲时间。Netty 通过构造方法中的 observeOutput 属性来决定是否对出站缓冲区的情况进行判断。

5、如果出站缓慢,Netty 不任务这是空闲,也就不触发空闲事件。但第一次无论如何也是要触发的。因为第一次无法判断是出站缓慢还是空闲。当然,出站缓慢的话,可能造成 OOM,OOM比空闲的问题更大。

6、所以,当你的应用出现了内存溢出,OOM之类,并且写空闲极少发生(使用了 observeOutput 为 true),那么就需要注意是不是数据出站速度过慢。

7、还有一个注意的地方:就是 ReadTimeoutHandler,它继承自 IdelStateHandler,当触发读空闲事件的时候,就触发 ctx.fireExceptionCaught 方法,并传入一个 ReadTimeoutException,然后关闭 Socket。

8、而 WriteTimeoutHandler 的实现不是基于 IdleStateHandler 的,他的原理是,当调用 write方法的时候,会创建一个定时任务,任务内容是根据传入的 promise 的完成情况来判断是否超出了写的时间。当定时任务根据指定时间开始运行,发现 promise 的 isDone 方法返回 false,表名还没有写完,说明超时了,则抛出异常。当 write 方法完成后,会打断定时任务。