1. ChannelPipline 传播事件
每一个Channel都会分配一个新的ChannelPipline,所有的出入站事件都会流经ChannelPipline来进行处理。
其中处理每一个事件的主要是ChannelHandler,例如ChannelOutboundHandler、ChannelInboundHandler分别用来处理出入站事件。通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler进行处理。
ChannelHandlerContext代表了ChannelHandler和ChannelPipline之间的关联,每当有ChannelHandler添加到ChannelPipline中时,都会创建ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipline中的其他ChannelHandler之间的交互。
channel的入站事件会从第一个HeadContext的channelHandler一直向后传播,而channel的出站事件会从最后的tailContext的channelHandler向前传播,一直到HeadContext。
通常ChannelPipline中的每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件,netty也可以通过channelPipline的addLast方法,传递一个自定义的EventExecutorGroup来代替channel本身的I/O线程。在出站时,最后的HeadContext使用的channel本身的I/O线程。
2. 写事件
netty是一个异步网络I/O框架,当调用完写事件后,netty会直接返回一个ChannelFuture,当数据被写入到底层的socket后,netty会通过ChannelFutureListner告知我们写入结果:
1 2 3 4 5 6 7 8 9 10 11
| connection.getChannel().writeAndFlush(rpcMessage) .addListener(new FutureListener<Void>() { public void operationComplete(Future<Void> f) throws Exception { if (f.isSuccess()) { log.info("channel write message success"); } else { log.error("write message error:", f.cause()); }
} });
|
写事件会在ChannelOutboundHandler中向前传播:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private void write(Object msg, boolean flush, ChannelPromise promise) { ObjectUtil.checkNotNull(msg, "msg");
......
AbstractChannelHandlerContext next = this.findContextOutbound(flush ? 98304 : '耀'); Object m = this.pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush); if (!safeExecute(executor, task, promise, m, !flush)) { task.cancel(); } }
}
|
通过this.findContextOutbound拿到下一个ChannelHandlerContext,调用next.invokeWrite,如果是调用的是writeAndFlush,则会调用next.invokeWriteAndFlush。
最终会来到HeadContext,调用outboundBuffer来addMessage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public final void write(Object msg, ChannelPromise promise) { this.assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; .... int size; try { msg = AbstractChannel.this.filterOutboundMessage(msg); size = AbstractChannel.this.pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable var15) { try { ReferenceCountUtil.release(msg); } finally { this.safeSetFailure(promise, var15); }
return; }
outboundBuffer.addMessage(msg, size, promise); }
|
由于netty是异步网络框架,对于write事件,并不会直接写进socket中,而是添加到待发送数据缓冲队列ChannelOutboundBuffer中,之后通过flush操作,将队列中所有的msg写进socket中。
这里的filterOutboundMessage是为了检查写入类型,过滤不是bytebuf或者FileRegion类型的msg,同时对非堆外内存进行转换,转换为堆外内存,提升性能。
1 2 3 4 5 6 7 8 9 10
| protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf)msg; return buf.isDirect() ? msg : this.newDirectBuffer(buf); } else if (msg instanceof FileRegion) { return msg; } else { throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); } }
|
3. 待缓冲数据队列--ChannelOutboundBuffer
ChannelOutboundBuffer是一个单链表结构的缓冲队列,其队列类型为Entry。其作用就是缓存待写入socket的数据信息,其中包含的属性有unflushedEntry、tailEntry、flushedEntry等
-
unflushedEntry : 代表只是通过write方法添加到了Entry链表的消息节点。它是链表里第一个等待刷新的节点
-
tailEntry : Entry链表的最后一个节点
-
flushedEntry : 代表被flush方法标记为已刷新的消息节点,即可以认为该Entry马上或者已经被发到网络了,它指向的是链表里第一个要被刷新出去的节点
调用addMessage方法之前,三个指针的样子:
当添加第一个msg后:
添加第二个msg:
以此类推:
直到达到了ChannelOutboundBuffer的高水位线,才会停止添加,设置channel为不可写,直到恢复到低水位。
1 2 3 4 5 6 7 8 9
| private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size != 0L) { long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); if (newWriteBufferSize > (long)this.channel.config().getWriteBufferHighWaterMark()) { this.setUnwritable(invokeLater); }
} }
|
4. flush
我们看到,write只会将待写入数据放进ChannelOutboundBuffer,其实并不会真正的写进socket中,而flush操作会依次向前传播,最后在headContext中flush所有unflushedEntry。
1 2 3 4 5 6 7 8
| public final void flush() { this.assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer != null) { outboundBuffer.addFlush(); this.flush0(); } }
|
addFlush操作会将所有unflushedEntry标记为flushedEntry,然后进行flush。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public void addFlush() { Entry entry = this.unflushedEntry; if (entry != null) { if (this.flushedEntry == null) { this.flushedEntry = entry; }
do { ++this.flushed; if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); this.decrementPendingOutboundBytes((long)pending, false, true); }
entry = entry.next; } while(entry != null);
this.unflushedEntry = null; } }
|
所有待写入socket的entry会从flushedEntry开始,unflushedEntry指向null,同时所有entry都变成不可取消的状态。
接下来就是真正的开始write数据到底层的socket了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| protected void flush0() { if (!this.inFlush0) { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer != null && !outboundBuffer.isEmpty()) { this.inFlush0 = true; if (AbstractChannel.this.isActive()) { try { AbstractChannel.this.doWrite(outboundBuffer); } catch (Throwable var10) { this.handleWriteError(var10); } finally { this.inFlush0 = false; }
} ..... } } }
protected void doWrite(ChannelOutboundBuffer in) throws Exception { java.nio.channels.SocketChannel ch = this.javaChannel(); int writeSpinCount = this.config().getWriteSpinCount();
do { if (in.isEmpty()) { this.clearOpWrite(); return; }
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig)this.config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, (long)maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); switch (nioBufferCnt) { case 0: writeSpinCount -= this.doWrite0(in); break; case 1: ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { this.incompleteWrite(true); return; }
this.adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes((long)localWrittenBytes); --writeSpinCount; break; default: long attemptedBytes = in.nioBufferSize(); long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0L) { this.incompleteWrite(true); return; }
this.adjustMaxBytesPerGatheringWrite((int)attemptedBytes, (int)localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; } } while(writeSpinCount > 0);
this.incompleteWrite(writeSpinCount < 0);
|
写一直发生在这个do while里面,writeSpinCount表示最大写的次数,默认是16次;通过调用nioBuffers方法,从flushedEntry开始,将bytebuf转换成JDK的ByteBuffer数组。每写完一个ByteBuffer,都会进行remove操作,然后write下一个entry。
这里注意两点:
- maxBytesPerGatheringWrite
每次写入数据的大小,都是适应调整的。maxBytesPerGatheringWrite 决定每次 write 可以从 channelOutboundBuffer 中最多发送数据,初始值为 SO_SNDBUF大小 * 2 = 293976 = 146988 << 1。
1 2 3 4 5 6 7 8 9 10
| private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) { if (attempted == written) { if (attempted << 1 > oldMaxBytesPerGatheringWrite) { ((NioSocketChannelConfig)this.config).setMaxBytesPerGatheringWrite(attempted << 1); } } else if (attempted > 4096 && written < attempted >>> 1) { ((NioSocketChannelConfig)this.config).setMaxBytesPerGatheringWrite(attempted >>> 1); }
}
|
如果这次尝试写入量attempted与真实写入量written一样,并且真实写入量的2倍大于最大可写入量,netty会扩大下次写入量为这次写入量的2倍written * 2, 如果真实写入量的2倍小于这次最大可写入量,则表示需求也不是很大,就不会进行扩容。而真实写入量小于尝试写入的一半,则会缩小下次最大写入量为attempted的1/2。当然有最小值的限制。
- writeSpinCount
默认一次写入最多循环16次,第一种情况是,如果超过16次,并且数据没有写完,则会强制退出channel的write,添加到IO线程的task中,让其他的channel去执行任务,如果是未超过16次,但是socket写满了,则会停止写入,注册写事件,等待socket可写为止。
1 2 3 4 5 6 7 8 9
| protected final void incompleteWrite(boolean setOpWrite) { if (setOpWrite) { this.setOpWrite(); } else { this.clearOpWrite(); this.eventLoop().execute(this.flushTask); }
}
|
channel的写事件会向前传播,进行write和flush,依次添加进待刷新队列,然后是执行flush操作,在flush操作时,也会根据写入量动态进行放缩来调整写入量,同时也不会一直让一个耗时的entry一直进行写入,如果超过writeSpinCount次数,则会添加进futuretask,进行下次写入,要么是socket写满了,等待可写为止。当最终写入socket后,会通过channelPromise,进行异步通知。