1. ChannelPipline 传播事件
  每一个Channel都会分配一个新的ChannelPipline,所有的出入站事件都会流经ChannelPipline来进行处理。
  其中处理每一个事件的主要是ChannelHandler,例如ChannelOutboundHandler、ChannelInboundHandler分别用来处理出入站事件。通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler进行处理。
  ChannelHandlerContext代表了ChannelHandler和ChannelPipline之间的关联,每当有ChannelHandler添加到ChannelPipline中时,都会创建ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipline中的其他ChannelHandler之间的交互。
 
  channel的入站事件会从第一个HeadContext的channelHandler一直向后传播,而channel的出站事件会从最后的tailContext的channelHandler向前传播,一直到HeadContext。
  通常ChannelPipline中的每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件,netty也可以通过channelPipline的addLast方法,传递一个自定义的EventExecutorGroup来代替channel本身的I/O线程。在出站时,最后的HeadContext使用的channel本身的I/O线程。
2. 写事件
  netty是一个异步网络I/O框架,当调用完写事件后,netty会直接返回一个ChannelFuture,当数据被写入到底层的socket后,netty会通过ChannelFutureListner告知我们写入结果:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 
 | connection.getChannel().writeAndFlush(rpcMessage).addListener(new FutureListener<Void>() {
 public void operationComplete(Future<Void> f) throws Exception {
 if (f.isSuccess()) {
 log.info("channel write message success");
 } else {
 log.error("write message error:", f.cause());
 }
 
 }
 });
 
 | 
写事件会在ChannelOutboundHandler中向前传播:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 
 | private void write(Object msg, boolean flush, ChannelPromise promise) {ObjectUtil.checkNotNull(msg, "msg");
 
 ......
 
 AbstractChannelHandlerContext next = this.findContextOutbound(flush ? 98304 : '耀');
 Object m = this.pipeline.touch(msg, next);
 EventExecutor executor = next.executor();
 if (executor.inEventLoop()) {
 if (flush) {
 next.invokeWriteAndFlush(m, promise);
 } else {
 next.invokeWrite(m, promise);
 }
 } else {
 WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);
 if (!safeExecute(executor, task, promise, m, !flush)) {
 task.cancel();
 }
 }
 
 }
 
 | 
  通过this.findContextOutbound拿到下一个ChannelHandlerContext,调用next.invokeWrite,如果是调用的是writeAndFlush,则会调用next.invokeWriteAndFlush。
最终会来到HeadContext,调用outboundBuffer来addMessage
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | public final void write(Object msg, ChannelPromise promise) {this.assertEventLoop();
 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
 
 ....
 int size;
 try {
 msg = AbstractChannel.this.filterOutboundMessage(msg);
 
 size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
 if (size < 0) {
 size = 0;
 }
 } catch (Throwable var15) {
 try {
 ReferenceCountUtil.release(msg);
 } finally {
 this.safeSetFailure(promise, var15);
 }
 
 return;
 }
 
 outboundBuffer.addMessage(msg, size, promise);
 
 }
 
 | 
  由于netty是异步网络框架,对于write事件,并不会直接写进socket中,而是添加到待发送数据缓冲队列ChannelOutboundBuffer中,之后通过flush操作,将队列中所有的msg写进socket中。
  这里的filterOutboundMessage是为了检查写入类型,过滤不是bytebuf或者FileRegion类型的msg,同时对非堆外内存进行转换,转换为堆外内存,提升性能。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | protected final Object filterOutboundMessage(Object msg) {if (msg instanceof ByteBuf) {
 ByteBuf buf = (ByteBuf)msg;
 return buf.isDirect() ? msg : this.newDirectBuffer(buf);
 } else if (msg instanceof FileRegion) {
 return msg;
 } else {
 throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
 }
 }
 
 | 
3. 待缓冲数据队列--ChannelOutboundBuffer
ChannelOutboundBuffer是一个单链表结构的缓冲队列,其队列类型为Entry。其作用就是缓存待写入socket的数据信息,其中包含的属性有unflushedEntry、tailEntry、flushedEntry等
- 
unflushedEntry : 代表只是通过write方法添加到了Entry链表的消息节点。它是链表里第一个等待刷新的节点 
- 
tailEntry : Entry链表的最后一个节点 
- 
flushedEntry : 代表被flush方法标记为已刷新的消息节点,即可以认为该Entry马上或者已经被发到网络了,它指向的是链表里第一个要被刷新出去的节点 
 调用addMessage方法之前,三个指针的样子:
 
当添加第一个msg后:
 
添加第二个msg:
 
以此类推:
 
  直到达到了ChannelOutboundBuffer的高水位线,才会停止添加,设置channel为不可写,直到恢复到低水位。
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | private void incrementPendingOutboundBytes(long size, boolean invokeLater) {if (size != 0L) {
 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
 if (newWriteBufferSize > (long)this.channel.config().getWriteBufferHighWaterMark()) {
 this.setUnwritable(invokeLater);
 }
 
 }
 }
 
 | 
4. flush
  我们看到,write只会将待写入数据放进ChannelOutboundBuffer,其实并不会真正的写进socket中,而flush操作会依次向前传播,最后在headContext中flush所有unflushedEntry。
| 12
 3
 4
 5
 6
 7
 8
 
 | public final void flush() {this.assertEventLoop();
 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
 if (outboundBuffer != null) {
 outboundBuffer.addFlush();
 this.flush0();
 }
 }
 
 | 
addFlush操作会将所有unflushedEntry标记为flushedEntry,然后进行flush。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 
 | public void addFlush() {Entry entry = this.unflushedEntry;
 if (entry != null) {
 if (this.flushedEntry == null) {
 this.flushedEntry = entry;
 }
 
 do {
 ++this.flushed;
 if (!entry.promise.setUncancellable()) {
 int pending = entry.cancel();
 this.decrementPendingOutboundBytes((long)pending, false, true);
 }
 
 entry = entry.next;
 } while(entry != null);
 
 this.unflushedEntry = null;
 }
 }
 
 | 
 
  所有待写入socket的entry会从flushedEntry开始,unflushedEntry指向null,同时所有entry都变成不可取消的状态。
接下来就是真正的开始write数据到底层的socket了
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 
 | protected void flush0() {if (!this.inFlush0) {
 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
 if (outboundBuffer != null && !outboundBuffer.isEmpty()) {
 this.inFlush0 = true;
 if (AbstractChannel.this.isActive()) {
 try {
 AbstractChannel.this.doWrite(outboundBuffer);
 } catch (Throwable var10) {
 this.handleWriteError(var10);
 } finally {
 this.inFlush0 = false;
 }
 
 }
 .....
 }
 }
 }
 
 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
 java.nio.channels.SocketChannel ch = this.javaChannel();
 int writeSpinCount = this.config().getWriteSpinCount();
 
 do {
 if (in.isEmpty()) {
 this.clearOpWrite();
 return;
 }
 
 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig)this.config).getMaxBytesPerGatheringWrite();
 ByteBuffer[] nioBuffers = in.nioBuffers(1024, (long)maxBytesPerGatheringWrite);
 int nioBufferCnt = in.nioBufferCount();
 switch (nioBufferCnt) {
 case 0:
 writeSpinCount -= this.doWrite0(in);
 break;
 case 1:
 ByteBuffer buffer = nioBuffers[0];
 int attemptedBytes = buffer.remaining();
 int localWrittenBytes = ch.write(buffer);
 if (localWrittenBytes <= 0) {
 this.incompleteWrite(true);
 return;
 }
 
 this.adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
 in.removeBytes((long)localWrittenBytes);
 --writeSpinCount;
 break;
 default:
 long attemptedBytes = in.nioBufferSize();
 long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
 if (localWrittenBytes <= 0L) {
 this.incompleteWrite(true);
 return;
 }
 
 this.adjustMaxBytesPerGatheringWrite((int)attemptedBytes, (int)localWrittenBytes, maxBytesPerGatheringWrite);
 in.removeBytes(localWrittenBytes);
 --writeSpinCount;
 }
 } while(writeSpinCount > 0);
 
 this.incompleteWrite(writeSpinCount < 0);
 
 | 
  写一直发生在这个do while里面,writeSpinCount表示最大写的次数,默认是16次;通过调用nioBuffers方法,从flushedEntry开始,将bytebuf转换成JDK的ByteBuffer数组。每写完一个ByteBuffer,都会进行remove操作,然后write下一个entry。
  这里注意两点:
- maxBytesPerGatheringWrite
  每次写入数据的大小,都是适应调整的。maxBytesPerGatheringWrite 决定每次 write 可以从 channelOutboundBuffer 中最多发送数据,初始值为 SO_SNDBUF大小 * 2 = 293976 = 146988 << 1。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {if (attempted == written) {
 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
 ((NioSocketChannelConfig)this.config).setMaxBytesPerGatheringWrite(attempted << 1);
 }
 } else if (attempted > 4096 && written < attempted >>> 1) {
 ((NioSocketChannelConfig)this.config).setMaxBytesPerGatheringWrite(attempted >>> 1);
 }
 
 }
 
 | 
  如果这次尝试写入量attempted与真实写入量written一样,并且真实写入量的2倍大于最大可写入量,netty会扩大下次写入量为这次写入量的2倍written * 2, 如果真实写入量的2倍小于这次最大可写入量,则表示需求也不是很大,就不会进行扩容。而真实写入量小于尝试写入的一半,则会缩小下次最大写入量为attempted的1/2。当然有最小值的限制。
- writeSpinCount
  默认一次写入最多循环16次,第一种情况是,如果超过16次,并且数据没有写完,则会强制退出channel的write,添加到IO线程的task中,让其他的channel去执行任务,如果是未超过16次,但是socket写满了,则会停止写入,注册写事件,等待socket可写为止。
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | protected final void incompleteWrite(boolean setOpWrite) {if (setOpWrite) {
 this.setOpWrite();
 } else {
 this.clearOpWrite();
 this.eventLoop().execute(this.flushTask);
 }
 
 }
 
 | 
  channel的写事件会向前传播,进行write和flush,依次添加进待刷新队列,然后是执行flush操作,在flush操作时,也会根据写入量动态进行放缩来调整写入量,同时也不会一直让一个耗时的entry一直进行写入,如果超过writeSpinCount次数,则会添加进futuretask,进行下次写入,要么是socket写满了,等待可写为止。当最终写入socket后,会通过channelPromise,进行异步通知。