Netty write 流程

1. ChannelPipline 传播事件

  每一个Channel都会分配一个新的ChannelPipline所有的出入站事件都会流经ChannelPipline来进行处理

  其中处理每一个事件的主要是ChannelHandler例如ChannelOutboundHandlerChannelInboundHandler分别用来处理出入站事件通过调用ChannelHandlerContext实现它将被转发给同一超类型的下一个ChannelHandler进行处理

  ChannelHandlerContext代表了ChannelHandler和ChannelPipline之间的关联每当有ChannelHandler添加到ChannelPipline中时都会创建ChannelHandlerContextChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipline中的其他ChannelHandler之间的交互

  channel的入站事件会从第一个HeadContext的channelHandler一直向后传播而channel的出站事件会从最后的tailContext的channelHandler向前传播一直到HeadContext

  通常ChannelPipline中的每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件netty也可以通过channelPipline的addLast方法传递一个自定义的EventExecutorGroup来代替channel本身的I/O线程在出站时最后的HeadContext使用的channel本身的I/O线程

2. 写事件

  netty是一个异步网络I/O框架当调用完写事件后netty会直接返回一个ChannelFuture当数据被写入到底层的socket后netty会通过ChannelFutureListner告知我们写入结果

connection.getChannel().writeAndFlush(rpcMessage)
        .addListener(new FutureListener<Void>() {
            public void operationComplete(Future<Void> f) throws Exception {
                if (f.isSuccess()) {
                    log.info("channel write message success");
                } else {
                    log.error("write message error:", f.cause());
                }

            }
        });

写事件会在ChannelOutboundHandler中向前传播

private void write(Object msg, boolean flush, ChannelPromise promise) {
    ObjectUtil.checkNotNull(msg, "msg");

    ......

    AbstractChannelHandlerContext next = this.findContextOutbound(flush ? 98304 : '耀');
    Object m = this.pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }

}

  通过this.findContextOutbound拿到下一个ChannelHandlerContext调用next.invokeWrite如果是调用的是writeAndFlush则会调用next.invokeWriteAndFlush
最终会来到HeadContext,调用outboundBuffer来addMessage

public final void write(Object msg, ChannelPromise promise) {
    this.assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    
    ....
        int size;
        try {
            msg = AbstractChannel.this.filterOutboundMessage(msg);
            // 估计待写数据大小
            size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable var15) {
            try {
                ReferenceCountUtil.release(msg);
            } finally {
                this.safeSetFailure(promise, var15);
            }

            return;
        }

        outboundBuffer.addMessage(msg, size, promise);
    
}

  由于netty是异步网络框架对于write事件并不会直接写进socket中而是添加到待发送数据缓冲队列ChannelOutboundBuffer中之后通过flush操作将队列中所有的msg写进socket中

  这里的filterOutboundMessage是为了检查写入类型过滤不是bytebuf或者FileRegion类型的msg同时对非堆外内存进行转换转换为堆外内存提升性能

protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf)msg;
        return buf.isDirect() ? msg : this.newDirectBuffer(buf);
    } else if (msg instanceof FileRegion) {
        return msg;
    } else {
        throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }
}

3. 待缓冲数据队列–ChannelOutboundBuffer

ChannelOutboundBuffer是一个单链表结构的缓冲队列其队列类型为Entry其作用就是缓存待写入socket的数据信息其中包含的属性有unflushedEntrytailEntryflushedEntry等

  • unflushedEntry 代表只是通过write方法添加到了Entry链表的消息节点它是链表里第一个等待刷新的节点

  • tailEntry Entry链表的最后一个节点

  • flushedEntry 代表被flush方法标记为已刷新的消息节点即可以认为该Entry马上或者已经被发到网络了它指向的是链表里第一个要被刷新出去的节点

 调用addMessage方法之前三个指针的样子


当添加第一个msg后


添加第二个msg


以此类推

  直到达到了ChannelOutboundBuffer的高水位线才会停止添加设置channel为不可写直到恢复到低水位

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size != 0L) {
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > (long)this.channel.config().getWriteBufferHighWaterMark()) {
            this.setUnwritable(invokeLater);
        }

    }
}

4. flush

  我们看到write只会将待写入数据放进ChannelOutboundBuffer其实并不会真正的写进socket中而flush操作会依次向前传播最后在headContext中flush所有unflushedEntry

public final void flush() {
    this.assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer != null) {
        outboundBuffer.addFlush();
        this.flush0();
    }
}

addFlush操作会将所有unflushedEntry标记为flushedEntry然后进行flush

public void addFlush() {
    Entry entry = this.unflushedEntry;
    if (entry != null) {
        if (this.flushedEntry == null) {
            this.flushedEntry = entry;
        }

        do {
            ++this.flushed;
            if (!entry.promise.setUncancellable()) {
                int pending = entry.cancel();
                this.decrementPendingOutboundBytes((long)pending, false, true);
            }

            entry = entry.next;
        } while(entry != null);

        this.unflushedEntry = null;
    }

}

  所有待写入socket的entry会从flushedEntry开始unflushedEntry指向null同时所有entry都变成不可取消的状态
接下来就是真正的开始write数据到底层的socket了

protected void flush0() {
    if (!this.inFlush0) {
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer != null && !outboundBuffer.isEmpty()) {
            this.inFlush0 = true;
            if (AbstractChannel.this.isActive()) {
                try {
                    AbstractChannel.this.doWrite(outboundBuffer);
                } catch (Throwable var10) {
                    this.handleWriteError(var10);
                } finally {
                    this.inFlush0 = false;
                }

            } 
            .....
        }
    }
}

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    java.nio.channels.SocketChannel ch = this.javaChannel();
    int writeSpinCount = this.config().getWriteSpinCount();

    do {
        if (in.isEmpty()) {
            this.clearOpWrite();
            return;
        }

        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig)this.config).getMaxBytesPerGatheringWrite();
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, (long)maxBytesPerGatheringWrite);
        int nioBufferCnt = in.nioBufferCount();
        switch (nioBufferCnt) {
            case 0:
                writeSpinCount -= this.doWrite0(in);
                break;
            case 1:
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {
                    this.incompleteWrite(true);
                    return;
                }

                this.adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes((long)localWrittenBytes);
                --writeSpinCount;
                break;
            default:
                long attemptedBytes = in.nioBufferSize();
                long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes <= 0L) {
                    this.incompleteWrite(true);
                    return;
                }

                this.adjustMaxBytesPerGatheringWrite((int)attemptedBytes, (int)localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
        }
    } while(writeSpinCount > 0);

    this.incompleteWrite(writeSpinCount < 0);

  写一直发生在这个do while里面writeSpinCount表示最大写的次数默认是16次通过调用nioBuffers方法从flushedEntry开始将bytebuf转换成JDK的ByteBuffer数组每写完一个ByteBuffer都会进行remove操作然后write下一个entry

  这里注意两点

  1. maxBytesPerGatheringWrite

  每次写入数据的大小都是适应调整的maxBytesPerGatheringWrite 决定每次 write 可以从 channelOutboundBuffer 中最多发送数据初始值为 SO_SNDBUF大小 * 2 = 293976 = 146988 << 1

private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
    if (attempted == written) {
        if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
            ((NioSocketChannelConfig)this.config).setMaxBytesPerGatheringWrite(attempted << 1);
        }
    } else if (attempted > 4096 && written < attempted >>> 1) {
        ((NioSocketChannelConfig)this.config).setMaxBytesPerGatheringWrite(attempted >>> 1);
    }

}

  如果这次尝试写入量attempted与真实写入量written一样并且真实写入量的2倍大于最大可写入量netty会扩大下次写入量为这次写入量的2倍written * 2 如果真实写入量的2倍小于这次最大可写入量则表示需求也不是很大就不会进行扩容而真实写入量小于尝试写入的一半则会缩小下次最大写入量为attempted的1/2当然有最小值的限制

  1. writeSpinCount

  默认一次写入最多循环16次第一种情况是如果超过16次并且数据没有写完则会强制退出channel的write添加到IO线程的task中让其他的channel去执行任务如果是未超过16次但是socket写满了则会停止写入注册写事件等待socket可写为止

protected final void incompleteWrite(boolean setOpWrite) {
    if (setOpWrite) {
        this.setOpWrite();
    } else {
        this.clearOpWrite();
        this.eventLoop().execute(this.flushTask);
    }

}

  channel的写事件会向前传播进行write和flush依次添加进待刷新队列然后是执行flush操作在flush操作时也会根据写入量动态进行放缩来调整写入量同时也不会一直让一个耗时的entry一直进行写入如果超过writeSpinCount次数则会添加进futuretask进行下次写入要么是socket写满了等待可写为止当最终写入socket后会通过channelPromise进行异步通知