博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty源码分析之-引导详解(4)
阅读量:4290 次
发布时间:2019-05-27

本文共 9359 字,大约阅读时间需要 31 分钟。

引导一个应用程序是指对它进行配置,并使它运行起来的过程。Netty处理引导的方式使你的应用程序和网络层相隔离,无论它是客户端还是服务器,所有的框架组件都将会在后台结合在一起并且启用

相对于将具体的引导类分别看作用于服务器和客户端的引导来说,记住它们的本意是用来支撑不同的应用程序的功能的将有所裨益。也就是说,服务器致力于使用一个父Channel来接受来自客户端的连接,并创建子Channel以用于它们之间的通信;而客户端将最可能只需要一个单独的、没有父Channel的Channel 来用于所有的网络交互。

客户端引导代码实现:

Bootstrap bootstrap = new Bootstrap();

服务端引导代码实现:

ServerBootstrap serverBootstrap = new ServerBootstrap();   ①serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer());   ②

分析ServerBootstrap源码:

public class ServerBootstrap extends AbstractBootstrap
{
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class); private final Map
, Object> childOptions = new LinkedHashMap
, Object>(); private final Map
, Object> childAttrs = new LinkedHashMap
, Object>(); private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); private volatile EventLoopGroup childGroup; private volatile ChannelHandler childHandler; public ServerBootstrap() { } ...

对于ServerBootstrap的父类AbstractBootstrap:

public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {
}

AbstractBootstrap是一个辅助类,用来很容易的启动一个Channel,该方法的签名中,子类型B是其父类型的一个类型参数,因此可以返回到运行时实例的引用以支持方法的链式调用


对于②处的代码,首先调用的是ServerBootstrap类中group(bossGroup, workerGroup)方法:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {        super.group(parentGroup);        if (childGroup == null) {            throw new NullPointerException("childGroup");        }        if (this.childGroup != null) {            throw new IllegalStateException("childGroup set already");        }        this.childGroup = childGroup;        return this;    }

该方法将parentGroup赋值给AbstractBootstrap父类中的EventLoopGroup group成员变量,父类实现代码如下,同时将childGroup赋值给ServerBootstrap类的EventLoopGroup childGroup对象。这些EventLoopGroup将用来处理整个程序的I/O操作和事件的处理。

ServerBootstrap类的父类AbstractBootstrap中group具体实现:

//用来处理将要创建关于client对应的Channel相关事件public B group(EventLoopGroup group) {        if (group == null) {            throw new NullPointerException("group");        }        if (this.group != null) {            throw new IllegalStateException("group set already");        }        this.group = group;        return (B) this;    }

parentGroup可以看作acceptor用来接受来自客户端的连接,也就是对应的ServerChannel,ServerChannel的实现负责创建子Channel。而childGroup可以看作是client所对应的子Channel


②调用完group方法之后,调用的是channel(NioServerSocketChannel.class)方法,对于NioServerSocketChannel类:

public class NioServerSocketChannel extends AbstractNioMessageChannel                             implements io.netty.channel.socket.ServerSocketChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class); private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See #2308. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } private final ServerSocketChannelConfig config; /** * 默认的无参构造函数 * Create a new instance */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } ...

对于SelectorProvider.provider(),是系统级别针对selector的操作:

public static SelectorProvider provider() {        synchronized (lock) {            if (provider != null)                return provider;            return AccessController.doPrivileged(                new PrivilegedAction
() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }

该类通过系统类型实现了基于NIO selector方式的SelectorProvider来接受新的连接,该实例只会在第一次进行初始化,后续的调用将直接返回


回到上述channel(NioServerSocketChannel.class)方法:

public B channel(Class
channelClass) {
if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory
(channelClass)); }

对于ReflectiveChannelFactory类,可以发现是通过无参的构造方法来反射创建实例对象:

public class ReflectiveChannelFactory
implements ChannelFactory
{
private final Class
clazz; public ReflectiveChannelFactory(Class
clazz) {
if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; } @Override public T newChannel() { try { return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } ...

对于channelFactory方法可以追溯到AbstractBootstrap类中的:

public B channelFactory(ChannelFactory
channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return (B) this; }

对于返回的B对象,查看类申明可以看到:

public abstract class AbstractBootstrap, C extends Channel> implements Cloneable

追溯到前面我们申明ServerBootstrap实例:

public class ServerBootstrap extends AbstractBootstrap

因此可以知道返回的B代表ServerBootstrap


对于②处后续的childHandler方法:

public ServerBootstrap childHandler(ChannelHandler childHandler) {        if (childHandler == null) {            throw new NullPointerException("childHandler");        }        this.childHandler = childHandler;        return this;    }

添加一个ChannelHandler来处理Channel的请求,对于所添加的MyServerInitializer代码实现如下:

public class MyServerInitializer extends ChannelInitializer
{
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //这里的若干ChannelHandler只是举例说明 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyServerHandler()); }}

对于所继承的ChannelInitializer类实现:

@Sharablepublic abstract class ChannelInitializer
extends ChannelInboundHandlerAdapter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class); private final ConcurrentMap
initMap = PlatformDependent.newConcurrentHashMap(); protected abstract void initChannel(C ch) throws Exception; ① @Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (initChannel(ctx)) { ctx.pipeline().fireChannelRegistered(); ② } else { ctx.fireChannelRegistered(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause); ctx.close(); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { initChannel(ctx); } } @SuppressWarnings("unchecked") private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); ① } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { remove(ctx); ③ } return true; } return false; } private void remove(ChannelHandlerContext ctx) { try { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } finally { initMap.remove(ctx); } }}

ChannelInitializer是一个特殊的实现了ChannelInboundHandler的handler,当Channel被注册到了它的EventLoop之后,会回调ChannelInitializer类中的channelRegistered方法,从而会调用我们所实现的initChannel()方法(见①处),由于我们调用了Channel初始化的操作也就是initChannel方法,因此我们需要对pipeline从头开始传递事件以避免丢失事件(见②)

需要注意的是代码③处,当initChannel方法执行完后,也就是将一个或多个ChannelHandler添加到ChannelPipeline之后会从ChannelPipeline中移除它自己,因为之后的不再需要这个特殊的ChannelInitializer所对应的ChannelHandler来处理任何操作

这是很netty很巧妙的一个设计,利用ChannelHandle被添加到ChannelPipeline的特性一次性来添加更多的ChannelHandle

转载地址:http://hqrgi.baihongyu.com/

你可能感兴趣的文章
Android面试神器之Rxjava破冰
查看>>
面试神器第二弹:Rxjava熟悉——操作符
查看>>
带你过一遍Android 多主题框架——MagicaSakura
查看>>
这款神器你不学就要落后了!
查看>>
谷歌开源跨平台UI框架——Flutter
查看>>
Android进阶必学:自定义注解之动态代理
查看>>
Android进阶必学:自定义注解之反射
查看>>
Android进阶之注解解析和自定义注解
查看>>
你想成为Android高级工程师你还得学习Hook
查看>>
菜鸟带你Hook技术实战
查看>>
BAT面试题集锦——Java基础(一)
查看>>
BAT面试题集锦——Java基础(二)
查看>>
Retrofit原理解析最简洁的思路
查看>>
Okhttp的源码解读
查看>>
【Android P】 JobScheduler服务源码解析(二) ——框架解析
查看>>
【Android P】 JobScheduler服务源码解析(三)—— 使用Job需要注意的点
查看>>
string和wstring相互转换
查看>>
c++读取utf8等不同编码文件
查看>>
STL中的vector
查看>>
C++中的map
查看>>