Netty 组成与原理以及相关源码解析

Netty详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
ServerBootstrap和Bootstrap启动类  


EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bootstrap.group(bossGroup, workerGroup);
放入两个group,一个acceptor线程池,一个io的work线程池,不填默认为Runtime.getRuntime().availableProcessors() * 2个线程
NioEventLoopGroup由EventExcutor数组构成,通过newChild构建NioEventLoop数组
NioEventLoop内部有selector


bootstrap.channel(NioServerSocketChannel.class);
内部构建一个channelFactory为:channelFactory(new ReflectiveChannelFactory<C>(channelClass));
后期直接ChannelFactory#newChannel来实例化Channel
每次实例化一个NioSocketChannel等类型都会自带一个pipeline = newChannelPipeline();
并且这个ChannelPipeline还自带head和tail两个ChannelHandlerContext


服务器版
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast(......);
}
});
直接给ServerBootStrap相关的childHandler

ServerBootStrap#bind--->doBind--->initAndRegister
首先实例一个新的channel:channelFactory.newChannel();
---->ServerBootstrap#init
1:channel.config().setOptions(options);//设置相关参数
2:ChannelPipeline p = channel.pipeline(); //取出pipeline
3:p.addLast(new ChannelInitializer<Channel>() {
pipeline.addLast(
new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)
);
})
添加一个ServerBootstrapAcceptor,等待ChannelInitializer#channelRegistered----->initChannel
将相关的ServerBootstrapAcceptor绑定带主handler后马上进行ctx.pipeline().fireChannelRegistered();

这样ServerBootStrapAcceptor会激活channelRead进行以下几步
1:channel.pipeline().addLast(childHandler); //所有的children Handler绑定到pipeline
//而且这个channel是boos进行acceptor后的channel产物

2:childGroup.register(child) //workNioEventLoopGroup线程池绑定channel去处理,并将线程池中某线程selector绑定channel



最后ServerBootStrap#doBind0

bossGroup的启动起来
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});








客户端版本
BootStrap#connect---->doResolveAndConnect---->initAndRegister

channelFactory.newChannel();
得到channel一个channel

config().group().register(channel);
从线程池拿出一个线程selector绑定channel

并在ChannelInitializer添加handler

BootStrap##doConnect

channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});

绑定相关channel的eventloop线程启动

EventLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
客户端版本
BootStrap#connect---->doResolveAndConnect---->doConnect

channel.eventLoop().execute(()->{
channel.connect
//一个runnable执行connect
})
因为channel绑定了eventloop所以直接可以execute执行


public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
//判断当前线程是否属于这个eventloop的thread
addTask(task);
} else {
//如果不是则开启线程
//用于BootStrap刚开始connect&&bind的时候位于主线程启动这个eventloop的线程
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
//如果关闭了则移掉任务并进入拒绝handler
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}


startThread---->doStartThread---->SingleThreadEventExecutor.this.run()

NioEventLoop 事件循环的核心就是这里!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
//判断当前任务队列中是否有任务

//调用的selectNow()方法是不会阻塞当前线程的
//如果有, 则返回就绪 IO 事件的个数; 如果没有, 则返回0
selectNow();

} else {

//当 hasTasks() 为假时,调用的select(oldWakenUp)是会阻塞当前线程的.
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}

//这其实也很好理解:当 taskQueue中没有任务时,那么 Netty 可以阻塞地等待 IO 就绪事件;
//而当taskQueue中有任务时,我们自然地希望所提交的任务可以尽快地执行
//因此 Netty 会调用非阻塞的selectNow()方法,以保证 taskQueue 中的任务尽快可以执行.

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();

processSelectedKeys();

final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

//设 IO 操作耗时为 ioTime, ioTime 占的时间比例为 ioRatio, 则:
//ioTime / ioRatio = taskTime / taskRatio
//taskRatio = 100 - ioRatio
//=> taskTime = ioTime * (100 - ioRatio) / ioRatio

}

if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
...
}
}
}




//处理Selectedkeys,如果为null,直接selector去获取新的selectedKeys
private void processSelectedKeys() {
if (selectedKeys != null) {
//selectedKeys 字段是在调用 openSelector() 方法时设置
//根据 JVM 平台的不同, 而有设置不同的值
//一般情况都不为null
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}



//迭代selectedKeys获取就绪的 IO 事件,然后为每个事件都调用 processSelectedKey 来处理它.
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
...
}
}






//完全是 Java NIO 的 Selector 的那一套处理流程!
//processSelectedKey 中处理了三个事件, 分别是:

//OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取.

//OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据.

//OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态.
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();

// 可读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}

// 可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

// 连接建立事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}




每个channel实例化的同时不仅仅带一个pipeline
还带一个 NioSocketChannelUnsafe 实例,负责的是 Channel 的底层 IO 操作.
所以上面代码都是ch.unsafe()的操作

Channel

NioSocketChannel#NioSocketChannelUnsafe#read

  • 分配 ByteBuf
  • 从 SocketChannel 中读取数据
  • 调用 pipeline.fireChannelRead 发送一个 inbound 事件.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
public final void read() {
...
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);

// 检查读取结果.
...

//pipeline操作
pipeline.fireChannelRead(byteBuf);
byteBuf = null;

...

totalReadAmount += localReadAmount;

// 检查是否是配置了自动读取, 如果不是, 则立即退出循环.
...
} while (++ messages < maxMessagesPerRead);

//pipeline操作
pipeline.fireChannelReadComplete();


allocHandle.record(totalReadAmount);

if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
}
}

ChannelPipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}



//主要是以下三个方法起到在pipeline里面一直可循环
//ChannelPipeline的fireChannelXXXX其实主要还是调用了ChannelHandlerContext的fireChannelXXXXXX

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}

ChannelInitializer

所有的ChannelInitializer都是经过channelRegistered再来initChannel的流程
这样可往pipeline添加handler,并且后期再移出ChannelInitializer
最后再往下走一遍ctx.pipeline().fireChannelRegistered();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ServerBootstrap#bind---->doBind---->
initAndRegister

- 初始化channel,channel内部初始化pipeline带head和tail两个channelHandlerContext
- 添加ChannelInitializer,后期channelRegister的时候执行initChannel往channelPipeline添加handler
- 选出一个nioeventloop,将当前的channel绑定到eventloop的selector //config().group().register(channel);
- register绑定的时候,也会将当前channel的unsafe的registry激发绑定
- 再推倒到unsafe#register0从而开始推fireChannelRegisterred
- 从而让ChannelInitializer的channelRegisterred开始进入initChannel添加handler工作,并移除当前的ChannelInitializer

- 启动eventloop准备startThread; channel.eventLoop().execute(.....)
- 添加runnableTask,并判断是否需要startThread
- 进到SingleThreadEventExecutor.run 去分析hasTask,做select是否延迟的决定
- processSelectedKeys处理selectedkey
- 遍历selectedkey,根据selecto的可以操作指令做判断
- readyOps && SelectionKey.OP_CONNECT ---> unsafe.read ---> pipeline.fireChannelRead();
- readyOps && SelectionKey.OP_WRITE ......
- 然后通过ioRatio判断多少时间去执行runAllTasks