详细图解NettyReactor启动全流程

本系列Netty源码解析文章基于 4.1.56.Final版本。

大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?

大家先不要惊慌,问题不大,本文笔者的目的就是要让大家清晰的理解这幅流程图,从而深刻的理解Netty Reactor的启动全流程,包括其中涉及到的各种代码设计实现细节。

在上篇文章​​《聊聊Netty那些事儿之Reactor在Netty中的实现(创建篇)》​​中我们详细介绍了Netty服务端核心引擎组件主从Reactor组模型 NioEventLoopGroup以及Reactor模型 NioEventLoop的创建过程。最终我们得到了netty Reactor模型的运行骨架如下:

现在Netty服务端程序的骨架是搭建好了,本文我们就基于这个骨架来深入剖析下Netty服务端的启动过程。

我们继续回到上篇文章提到的Netty服务端代码模板中,在创建完主从Reactor线程组:bossGroup,workerGroup后,接下来就开始配置Netty服务端的启动辅助类ServerBootstrap了。

public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//创建主从Reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
.option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
.handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer() {//设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

在上篇文章中我们对代码模板中涉及到ServerBootstrap的一些配置方法做了简单的介绍,大家如果忘记的话,可以在返回去回顾一下。

ServerBootstrap类其实没有什么特别的逻辑,主要是对Netty启动过程中需要用到的一些核心信息进行配置管理,比如:

  • Netty的核心引擎组件主从Reactor线程组:bossGroup,workerGroup。通过ServerBootstrap#group方法配置。
  • Netty服务端使用到的Channel类型:NioServerSocketChannel ,通过ServerBootstrap#channel方法配置。以及配置NioServerSocketChannel时用到的SocketOption。SocketOption用于设置底层JDK NIO Socket的一些选项。通过ServerBootstrap#option方法进行配置。

主ReactorGroup中的MainReactor管理的Channel类型为NioServerSocketChannel,如图所示主要用来监听端口,接收客户端连接,为客户端创建初始化NioSocketChannel,然后采用round-robin轮询的方式从图中从ReactorGroup中选择一个SubReactor与该客户端NioSocketChannel进行绑定。

从ReactorGroup中的SubReactor管理的Channel类型为NioSocketChannel,它是netty中定义客户端连接的一个模型,每个连接对应一个。如图所示SubReactor负责监听处理绑定在其上的所有NioSocketChannel上的IO事件。

  • 保存服务端NioServerSocketChannel和客户端NioSocketChannel对应pipeline中指定的ChannelHandler。用于后续Channel向Reactor注册成功之后,初始化Channel里的pipeline。

不管是服务端用到的NioServerSocketChannel还是客户端用到的NioSocketChannel,每个Channel实例都会有一个Pipeline,Pipeline中有多个ChannelHandler用于编排处理对应Channel上感兴趣的IO事件。

ServerBootstrap结构中包含了netty服务端程序启动的所有配置信息,在我们介绍启动流程之前,先来看下ServerBootstrap的源码结构:

ServerBootstrap

ServerBootstrap的继承结构比较简单,继承层次的职责分工也比较明确。

ServerBootstrap主要负责对主从Reactor线程组相关的配置进行管理,其中带child前缀的配置方法是对从Reactor线程组的相关配置管理。从Reactor线程组中的Sub Reactor负责管理的客户端NioSocketChannel相关配置存储在ServerBootstrap结构中。

父类AbstractBootstrap则是主要负责对主Reactor线程组相关的配置进行管理,以及主Reactor线程组中的Main Reactor负责处理的服务端ServerSocketChannel相关的配置管理。

一、 配置主从Reactor线程组

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
public class ServerBootstrap extends AbstractBootstrap {
//Main Reactor线程组
volatile EventLoopGroup group;
//Sub Reactor线程组
private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//父类管理主Reactor线程组
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
}

二、 配置服务端ServerSocketChannel

ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap {
//用于创建ServerSocketChannel ReflectiveChannelFactory
private volatile ChannelFactory channelFactory;
public B channel(Class channelClass) {
return channelFactory(new ReflectiveChannelFactory(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
@Deprecated
public B channelFactory(ChannelFactory channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
}

在向ServerBootstrap配置服务端ServerSocketChannel的channel方法中,其实是创建了一个ChannelFactory工厂实例ReflectiveChannelFactory,在Netty服务端启动的过程中,会通过这个ChannelFactory去创建相应的Channel实例。

我们可以通过这个方法来配置netty的IO模型,下面为ServerSocketChannel在不同IO模型下的实现:

EventLoopGroupReactor线程组在不同IO模型下的实现:

我们只需要将IO模型的这些核心接口对应的实现类前缀改为对应IO模型的前缀,就可以轻松在Netty中完成对IO模型的切换。

1、 ReflectiveChannelFactory

public class ReflectiveChannelFactory implements ChannelFactory {
//NioServerSocketChannelde 构造器
private final Constructor constructor;
public ReflectiveChannelFactory(Class clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
//反射获取NioServerSocketChannel的构造器
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
//创建NioServerSocketChannel实例
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
}

从类的签名我们可以看出,这个工厂类是通过泛型加反射的方式来创建对应的Channel实例。

  • 泛型参数T extends Channel表示的是要通过工厂类创建的Channel类型,这里我们初始化的是NioServerSocketChannel。
  • 在ReflectiveChannelFactory的构造器中通过反射的方式获取NioServerSocketChannel的构造器。
  • 在newChannel方法中通过构造器反射创建NioServerSocketChannel实例。

注意这时只是配置阶段,NioServerSocketChannel此时并未被创建。它是在启动的时候才会被创建出来。

三、为NioServerSocketChannel配置ChannelOption

ServerBootstrap b = new ServerBootstrap();
//设置被MainReactor管理的NioServerSocketChannel的Socket选项
b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {
//serverSocketChannel中的ChannelOption配置
private final Map, Object> options = new LinkedHashMap, Object>();
public B option(ChannelOption option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
}

无论是服务端的NioServerSocketChannel还是客户端的NioSocketChannel它们的相关底层Socket选项ChannelOption配置全部存放于一个Map类型的数据结构中。

由于客户端NioSocketChannel是由从Reactor线程组中的Sub Reactor来负责处理,所以涉及到客户端NioSocketChannel所有的方法和配置全部是以child前缀开头。

ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap {
//客户端SocketChannel对应的ChannelOption配置
private final Map, Object> childOptions = new LinkedHashMap, Object>();
public ServerBootstrap childOption(ChannelOption childOption, T value) {
ObjectUtil.checkNotNull(childOption, "childOption");
synchronized (childOptions) {
if (value == null) {
childOptions.remove(childOption);
} else {
childOptions.put(childOption, value);
}
}
return this;
}
}

相关的底层Socket选项,netty全部枚举在ChannelOption类中,笔者这里就不一一列举了,在本系列后续相关的文章中,笔者还会为大家详细的介绍这些参数的作用。

public class ChannelOption extends AbstractConstant> {
..................省略..............
public static final ChannelOption SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption SO_TIMEOUT = valueOf("SO_TIMEOUT");
..................省略..............
}

四、为服务端NioServerSocketChannel中的Pipeline配置ChannelHandler

   //serverSocketChannel中pipeline里的handler(主要是acceptor)
private volatile ChannelHandler handler;
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}

向NioServerSocketChannel中的Pipeline添加ChannelHandler分为两种方式:

  • 显式添加:显式添加的方式是由用户在main线程中通过ServerBootstrap#handler的方式添加。如果需要添加多个ChannelHandler,则可以通过ChannelInitializer向pipeline中进行添加。

关于ChannelInitializer后面笔者会有详细介绍,这里大家只需要知道ChannelInitializer是一种特殊的ChannelHandler,用于初始化pipeline。适用于向pipeline中添加多个ChannelHandler的场景。

  ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
.handler(new ChannelInitializer() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(channelhandler1)
.addLast(channelHandler2)
......
.addLast(channelHandler3);
}
})
  • 隐式添加:隐式添加主要添加的就是主ReactorGroup的核心组件也就是下图中的acceptor,Netty中的实现为ServerBootstrapAcceptor,本质上也是一种ChannelHandler,主要负责在客户端连接建立好后,初始化客户端NioSocketChannel,在从Reactor线程组中选取一个Sub Reactor,将客户端NioSocketChannel 注册到Sub Reactor中的selector上。

隐式添加ServerBootstrapAcceptor是由Netty框架在启动的时候负责添加,用户无需关心。

在本例中,NioServerSocketChannel的PipeLine中只有两个ChannelHandler,一个由用户在外部显式添加的LoggingHandler,另一个是由Netty框架隐式添加的ServerBootstrapAcceptor。

其实我们在实际项目使用的过程中,不会向netty服务端NioServerSocketChannel添加额外的ChannelHandler,NioServerSocketChannel只需要专心做好自己最重要的本职工作接收客户端连接就好了。这里额外添加一个LoggingHandler只是为了向大家展示ServerBootstrap的配置方法。

五、 为客户端NioSocketChannel中的Pipeline配置ChannelHandler

  final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer() {//设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
    //socketChannel中pipeline中的处理handler
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}

向客户端NioSocketChannel中的Pipeline里添加ChannelHandler完全是由用户自己控制显式添加,添加的数量不受限制。

由于在Netty的IO线程模型中,是由单个Sub Reactor线程负责执行客户端NioSocketChannel中的Pipeline,一个Sub Reactor线程负责处理多个NioSocketChannel上的IO事件,如果Pipeline中的ChannelHandler添加的太多,就会影响Sub Reactor线程执行其他NioSocketChannel上的Pipeline,从而降低IO处理效率,降低吞吐量。

所以Pipeline中的ChannelHandler不易添加过多,并且不能再ChannelHandler中执行耗时的业务处理任务。

在我们通过ServerBootstrap配置netty服务端启动信息的时候,无论是向服务端NioServerSocketChannel的pipeline中添加ChannelHandler,还是向客户端NioSocketChannel的pipeline中添加ChannelHandler,当涉及到多个ChannelHandler添加的时候,我们都会用到ChannelInitializer,那么这个ChannelInitializer究竟是何方圣神,为什么要这样做呢?我们接着往下看。

ChannelInitializer

首先ChannelInitializer它继承于ChannelHandler,它自己本身就是一个ChannelHandler,所以它可以添加到childHandler中。

其他的父类大家这里可以不用管,后面文章中笔者会一一为大家详细介绍。

那为什么不直接添加ChannelHandler而是选择用ChannelInitializer呢?

这里主要有两点原因:

  • 前边我们提到,客户端NioSocketChannel是在服务端accept连接后,在服务端NioServerSocketChannel中被创建出来的。但是此时我们正处于配置ServerBootStrap阶段,服务端还没有启动,更没有客户端连接上来,此时客户端NioSocketChannel还没有被创建出来,所以也就没办法向客户端NioSocketChannel的pipeline中添加ChannelHandler。
  • 客户端NioSocketChannel中Pipeline里可以添加任意多个ChannelHandler,但是Netty框架无法预知用户到底需要添加多少个ChannelHandler,所以Netty框架提供了回调函数ChannelInitializer#initChannel,使用户可以自定义ChannelHandler的添加行为。

当客户端NioSocketChannel注册到对应的Sub Reactor上后,紧接着就会初始化NioSocketChannel中的Pipeline,此时Netty框架会回调ChannelInitializer#initChannel执行用户自定义的添加逻辑。

public abstract class ChannelInitializer extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//当channelRegister事件发生时,调用initChannel初始化pipeline
if (initChannel(ctx)) {
.................省略...............
} else {
.................省略...............
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
//此时客户单NioSocketChannel已经创建并初始化好了
initChannel((C) ctx.channel());
} catch (Throwable cause) {
.................省略...............
} finally {
.................省略...............
}
return true;
}
return false;
}
protected abstract void initChannel(C ch) throws Exception;
.................省略...............
}

这里由netty框架回调的ChannelInitializer#initChannel方法正是我们自定义的添加逻辑。

   final EchoServerHandler serverHandler = new EchoServerHandler();
serverBootstrap.childHandler(new ChannelInitializer() {//设置从Reactor中注册channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

到此为止,Netty服务端启动所需要的必要配置信息,已经全部存入ServerBootStrap启动辅助类中。

接下来要做的事情就是服务端的启动了。

// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = serverBootStrap.bind(PORT).sync();

Netty服务端的启动

经过前面的铺垫终于来到了本文的核心内容----Netty服务端的启动过程。

如代码模板中的示例所示,Netty服务端的启动过程封装在io.netty.bootstrap.AbstractBootstrap#bind(int)函数中。

接下来我们看一下Netty服务端在启动过程中究竟干了哪些事情?

大家看到这副启动流程图先不要慌,接下来的内容笔者会带大家各个击破它,在文章的最后保证让大家看懂这副流程图。

我们先来从netty服务端启动的入口函数开始我们今天的源码解析旅程:

    public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
//校验Netty核心组件是否配置齐全
validate();
//服务端开始启动,绑定端口地址,接收客户端连接
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//异步创建,初始化,注册ServerSocketChannel到main reactor上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
........serverSocketChannel向Main Reactor注册成功后开始绑定端口....,
} else {
//详细图解NettyReactor启动全流程
网站链接:http://www.csdahua.cn/qtweb/news42/312592.html

网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网