1. ChannelPipline 传播事件
每一个Channel都会分配一个新的ChannelPipline,所有的出入站事件都会流经ChannelPipline来进行处理。
其中处理每一个事件的主要是ChannelHandler,例如ChannelOutboundHandler、ChannelInboundHandler分别用来处理出入站事件。通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler进行处理。
ChannelHandlerContext代表了ChannelHandler和ChannelPipline之间的关联,每当有ChannelHandler添加到ChannelPipline中时,都会创建ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipline中的其他ChannelHandler之间的交互。
channel的入站事件会从第一个HeadContext的channelHandler一直向后传播,而channel的出站事件会从最后的tailContext的channelHandler向前传播,一直到HeadContext。
通常ChannelPipline中的每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件,netty也可以通过channelPipline的addLast方法,传递一个自定义的EventExecutorGroup来代替channel本身的I/O线程。在出站时,最后的HeadContext使用的channel本身的I/O线程。
2. 写事件
netty是一个异步网络I/O框架,当调用完写事件后,netty会直接返回一个ChannelFuture,当数据被写入到底层的socket后,netty会通过ChannelFutureListner告知我们写入结果:
connection.getChannel().writeAndFlush(rpcMessage)
.addListener(new FutureListener<Void>() {
public void operationComplete(Future<Void> f) throws Exception {
if (f.isSuccess()) {
log.info("channel write message success");
} else {
log.error("write message error:", f.cause());
}
}
});
写事件会在ChannelOutboundHandler中向前传播:
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
......
AbstractChannelHandlerContext next = this.findContextOutbound(flush ? 98304 : '耀');
Object m = this.pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
通过this.findContextOutbound拿到下一个ChannelHandlerContext,调用next.invokeWrite,如果是调用的是writeAndFlush,则会调用next.invokeWriteAndFlush。
最终会来到HeadContext,调用outboundBuffer来addMessage
public final void write(Object msg, ChannelPromise promise) {
this.assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
....
int size;
try {
msg = AbstractChannel.this.filterOutboundMessage(msg);
// 估计待写数据大小
size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable var15) {
try {
ReferenceCountUtil.release(msg);
} finally {
this.safeSetFailure(promise, var15);
}
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
由于netty是异步网络框架,对于write事件,并不会直接写进socket中,而是添加到待发送数据缓冲队列ChannelOutboundBuffer中,之后通过flush操作,将队列中所有的msg写进socket中。
这里的filterOutboundMessage是为了检查写入类型,过滤不是bytebuf或者FileRegion类型的msg,同时对非堆外内存进行转换,转换为堆外内存,提升性能。
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf)msg;
return buf.isDirect() ? msg : this.newDirectBuffer(buf);
} else if (msg instanceof FileRegion) {
return msg;
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
}
3. 待缓冲数据队列--ChannelOutboundBuffer
ChannelOutboundBuffer是一个单链表结构的缓冲队列,其队列类型为Entry。其作用就是缓存待写入socket的数据信息,其中包含的属性有unflushedEntry、tailEntry、flushedEntry等
-
unflushedEntry : 代表只是通过write方法添加到了Entry链表的消息节点。它是链表里第一个等待刷新的节点
-
tailEntry : Entry链表的最后一个节点
-
flushedEntry : 代表被flush方法标记为已刷新的消息节点,即可以认为该Entry马上或者已经被发到网络了,它指向的是链表里第一个要被刷新出去的节点
调用addMessage方法之前,三个指针的样子:
当添加第一个msg后:
添加第二个msg:
以此类推:
直到达到了ChannelOutboundBuffer的高水位线,才会停止添加,设置channel为不可写,直到恢复到低水位。
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size != 0L) {
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > (long)this.channel.config().getWriteBufferHighWaterMark()) {
this.setUnwritable(invokeLater);
}
}
}
4. flush
我们看到,write只会将待写入数据放进ChannelOutboundBuffer,其实并不会真正的写进socket中,而flush操作会依次向前传播,最后在headContext中flush所有unflushedEntry。
public final void flush() {
this.assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer != null) {
outboundBuffer.addFlush();
this.flush0();
}
}
addFlush操作会将所有unflushedEntry标记为flushedEntry,然后进行flush。
public void addFlush() {
Entry entry = this.unflushedEntry;
if (entry != null) {
if (this.flushedEntry == null) {
this.flushedEntry = entry;
}
do {
++this.flushed;
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
this.decrementPendingOutboundBytes((long)pending, false, true);
}
entry = entry.next;
} while(entry != null);
this.unflushedEntry = null;
}
}
所有待写入socket的entry会从flushedEntry开始,unflushedEntry指向null,同时所有entry都变成不可取消的状态。
接下来就是真正的开始write数据到底层的socket了
protected void flush0() {
if (!this.inFlush0) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer != null && !outboundBuffer.isEmpty()) {
this.inFlush0 = true;
if (AbstractChannel.this.isActive()) {
try {
AbstractChannel.this.doWrite(outboundBuffer);
} catch (Throwable var10) {
this.handleWriteError(var10);
} finally {
this.inFlush0 = false;
}
}
.....
}
}
}
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
java.nio.channels.SocketChannel ch = this.javaChannel();
int writeSpinCount = this.config().getWriteSpinCount();
do {
if (in.isEmpty()) {
this.clearOpWrite();
return;
}
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig)this.config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, (long)maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
writeSpinCount -= this.doWrite0(in);
break;
case 1:
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
this.incompleteWrite(true);
return;
}
this.adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes((long)localWrittenBytes);
--writeSpinCount;
break;
default:
long attemptedBytes = in.nioBufferSize();
long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0L) {
this.incompleteWrite(true);
return;
}
this.adjustMaxBytesPerGatheringWrite((int)attemptedBytes, (int)localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
}
} while(writeSpinCount > 0);
this.incompleteWrite(writeSpinCount < 0);
写一直发生在这个do while里面,writeSpinCount表示最大写的次数,默认是16次;通过调用nioBuffers方法,从flushedEntry开始,将bytebuf转换成JDK的ByteBuffer数组。每写完一个ByteBuffer,都会进行remove操作,然后write下一个entry。
这里注意两点:
- maxBytesPerGatheringWrite
每次写入数据的大小,都是适应调整的。maxBytesPerGatheringWrite 决定每次 write 可以从 channelOutboundBuffer 中最多发送数据,初始值为 SO_SNDBUF大小 * 2 = 293976 = 146988 << 1。
private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
((NioSocketChannelConfig)this.config).setMaxBytesPerGatheringWrite(attempted << 1);
}
} else if (attempted > 4096 && written < attempted >>> 1) {
((NioSocketChannelConfig)this.config).setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}
如果这次尝试写入量attempted与真实写入量written一样,并且真实写入量的2倍大于最大可写入量,netty会扩大下次写入量为这次写入量的2倍written * 2, 如果真实写入量的2倍小于这次最大可写入量,则表示需求也不是很大,就不会进行扩容。而真实写入量小于尝试写入的一半,则会缩小下次最大写入量为attempted的1/2。当然有最小值的限制。
- writeSpinCount
默认一次写入最多循环16次,第一种情况是,如果超过16次,并且数据没有写完,则会强制退出channel的write,添加到IO线程的task中,让其他的channel去执行任务,如果是未超过16次,但是socket写满了,则会停止写入,注册写事件,等待socket可写为止。
protected final void incompleteWrite(boolean setOpWrite) {
if (setOpWrite) {
this.setOpWrite();
} else {
this.clearOpWrite();
this.eventLoop().execute(this.flushTask);
}
}
channel的写事件会向前传播,进行write和flush,依次添加进待刷新队列,然后是执行flush操作,在flush操作时,也会根据写入量动态进行放缩来调整写入量,同时也不会一直让一个耗时的entry一直进行写入,如果超过writeSpinCount次数,则会添加进futuretask,进行下次写入,要么是socket写满了,等待可写为止。当最终写入socket后,会通过channelPromise,进行异步通知。