本文共 9359 字,大约阅读时间需要 31 分钟。
引导一个应用程序是指对它进行配置,并使它运行起来的过程。Netty处理引导的方式使你的应用程序和网络层相隔离,无论它是客户端还是服务器,所有的框架组件都将会在后台结合在一起并且启用
相对于将具体的引导类分别看作用于服务器和客户端的引导来说,记住它们的本意是用来支撑不同的应用程序的功能的将有所裨益。也就是说,服务器致力于使用一个父Channel来接受来自客户端的连接,并创建子Channel以用于它们之间的通信;而客户端将最可能只需要一个单独的、没有父Channel的Channel 来用于所有的网络交互。
客户端引导代码实现:
Bootstrap bootstrap = new Bootstrap();
服务端引导代码实现:
ServerBootstrap serverBootstrap = new ServerBootstrap(); ①serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); ②
分析ServerBootstrap源码:
public class ServerBootstrap extends AbstractBootstrap{ private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class); private final Map , Object> childOptions = new LinkedHashMap , Object>(); private final Map , Object> childAttrs = new LinkedHashMap , Object>(); private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); private volatile EventLoopGroup childGroup; private volatile ChannelHandler childHandler; public ServerBootstrap() { } ...
对于ServerBootstrap的父类AbstractBootstrap:
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { }
AbstractBootstrap是一个辅助类,用来很容易的启动一个Channel,该方法的签名中,子类型B是其父类型的一个类型参数,因此可以返回到运行时实例的引用以支持方法的链式调用
对于②处的代码,首先调用的是ServerBootstrap类中group(bossGroup, workerGroup)方法:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
该方法将parentGroup赋值给AbstractBootstrap父类中的EventLoopGroup group成员变量,父类实现代码如下,同时将childGroup赋值给ServerBootstrap类的EventLoopGroup childGroup对象。这些EventLoopGroup将用来处理整个程序的I/O操作和事件的处理。
ServerBootstrap类的父类AbstractBootstrap中group具体实现:
//用来处理将要创建关于client对应的Channel相关事件public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return (B) this; }
parentGroup可以看作acceptor用来接受来自客户端的连接,也就是对应的ServerChannel,ServerChannel的实现负责创建子Channel。而childGroup可以看作是client所对应的子Channel
②调用完group方法之后,调用的是channel(NioServerSocketChannel.class)方法,对于NioServerSocketChannel类:
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class); private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See #2308. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } private final ServerSocketChannelConfig config; /** * 默认的无参构造函数 * Create a new instance */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } ...
对于SelectorProvider.provider(),是系统级别针对selector的操作:
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
该类通过系统类型实现了基于NIO selector方式的SelectorProvider来接受新的连接,该实例只会在第一次进行初始化,后续的调用将直接返回
回到上述channel(NioServerSocketChannel.class)方法:
public B channel(Class channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory(channelClass)); }
对于ReflectiveChannelFactory类,可以发现是通过无参的构造方法来反射创建实例对象:
public class ReflectiveChannelFactoryimplements ChannelFactory { private final Class clazz; public ReflectiveChannelFactory(Class clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; } @Override public T newChannel() { try { return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } ...
对于channelFactory方法可以追溯到AbstractBootstrap类中的:
public B channelFactory(ChannelFactory channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return (B) this; }
对于返回的B对象,查看类申明可以看到:
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable
追溯到前面我们申明ServerBootstrap实例:
public class ServerBootstrap extends AbstractBootstrap
因此可以知道返回的B代表ServerBootstrap
对于②处后续的childHandler方法:
public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); } this.childHandler = childHandler; return this; }
添加一个ChannelHandler来处理Channel的请求,对于所添加的MyServerInitializer代码实现如下:
public class MyServerInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //这里的若干ChannelHandler只是举例说明 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyServerHandler()); }}
对于所继承的ChannelInitializer类实现:
@Sharablepublic abstract class ChannelInitializerextends ChannelInboundHandlerAdapter { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class); private final ConcurrentMap initMap = PlatformDependent.newConcurrentHashMap(); protected abstract void initChannel(C ch) throws Exception; ① @Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (initChannel(ctx)) { ctx.pipeline().fireChannelRegistered(); ② } else { ctx.fireChannelRegistered(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause); ctx.close(); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { initChannel(ctx); } } @SuppressWarnings("unchecked") private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); ① } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { remove(ctx); ③ } return true; } return false; } private void remove(ChannelHandlerContext ctx) { try { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } finally { initMap.remove(ctx); } }}
ChannelInitializer是一个特殊的实现了ChannelInboundHandler的handler,当Channel被注册到了它的EventLoop之后,会回调ChannelInitializer类中的channelRegistered方法,从而会调用我们所实现的initChannel()方法(见①处),由于我们调用了Channel初始化的操作也就是initChannel方法,因此我们需要对pipeline从头开始传递事件以避免丢失事件(见②)
需要注意的是代码③处,当initChannel方法执行完后,也就是将一个或多个ChannelHandler添加到ChannelPipeline之后会从ChannelPipeline中移除它自己,因为之后的不再需要这个特殊的ChannelInitializer所对应的ChannelHandler来处理任何操作
这是很netty很巧妙的一个设计,利用ChannelHandle被添加到ChannelPipeline的特性一次性来添加更多的ChannelHandle
转载地址:http://hqrgi.baihongyu.com/