public class MyChatServer {
public static void main(String[] args) {
//步骤1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//步骤2
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MyChatServerHandler());
}
});
//步骤3
ChannelFuture f = b.bind(8888).sync();
//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (
Exception e) {
} finally {
//优雅关闭,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
EventLoopGroup workerGroup = new NioEventLoopGroup();
跟
//NioEventLoopGroup
public NioEventLoopGroup() {
this(0);
}
跟
//NioEventLoopGroup
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
跟
参数多传入了一个selector
//NioEventLoopGroup
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
跟
参数多传了一个DefaultSelectStrategyFactory.INSTANCE ,一个工厂类
//NioEventLoopGroup
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
跟
参数多传了一个RejectedExecutionHandlers.reject()
//NioEventLoopGroup
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
跟
DEFAULT_EVENT_LOOP_THREADS是一个默认的数(电脑的处理器核数*2(超线程)*2)(数不是重点)
//MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
跟
//MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
跟
//MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//if 检查,过
if (nThreads <= 0) {
}
//初始化,这是传的参数,参数就是null
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//定义数组长度
children = new EventExecutor[nThreads];
//初始化数组
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//对每一个EventExecutor初始化,并且传入args参数
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
} finally {
if (!success) {//用debug发现不走这里,略过
}
}
}
//初始化chooser,并且把上面的children传入
chooser = chooserFactory.newChooser(children);
//创建一个监听器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
//对每一个EventExecutor添加监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
跟到这里基本算是跟完了
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//MyChatServerHandler是自己定义的方法
socketChannel.pipeline().addLast(new MyChatServerHandler());
}
});
//ServerBootstrap
public ServerBootstrap() { }
//ServerBootstrap
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//给ServerBootstrap的父类AbstractBootstrap里的成员变量赋值
super.group(parentGroup);
//if省略
//给自己的成员变量赋值
this.childGroup = childGroup;
return this;
}
//AbstractBootstrap
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
//ReflectiveChannelFactory,看名字就是反射工厂类
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
//将NioServerSocketChannel.class赋值给clazz成员变量
this.clazz = clazz;
}
//AbstractBootstrap
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
//AbstractBootstrap
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
//if省略
//给成员变量赋值
this.channelFactory = channelFactory;
//AbstractBootstrap定义的参数泛型如下
//public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable
//在ServerBootStrap继承AbstractBootstrap中给B的赋值为ServerBootStrap
//public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
//能成功返回
return (B) this;
}
//AbstractBootstrap
public B handler(ChannelHandler handler) {
//if省略
//给成员变量赋值
this.handler = handler;
return (B) this;
}
//AbstractBootstrap
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
//添加参数,添加参数,添加参数
options.put(option, value);
}
}
return (B) this;
}
//ServerBootStrap
public ServerBootstrap childHandler(ChannelHandler childHandler) {
//if省略
//给成员变量赋值
this.childHandler = childHandler;
return this;
}
ChannelFuture f = b.bind(8888).sync();
//AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) {
//initAndRegister重点代码
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) { //debug我的没走,略
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//给注册号的channel添加监听器
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
//出现异常
promise.setFailure(cause);
} else {
//没有出现异常
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
//AbstractBootstrap
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//步骤3.2.1
channel = channelFactory.newChannel();
//步骤3.2.2
init(channel);
} catch (Throwable t) {}
步骤3.2.3
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) { //debug我的没走。略
}
return regFuture;
}
channel = channelFactory.newChannel();
跟进去发现是一个接口
public interface ChannelFactory<T extends Channel> {
T newChannel();
}
然后发现上面接口的实现类有ReflectiveChannelFactory(这个类在上面步骤2的3中有创建过)
//ReflectiveChannelFactory
public T newChannel() {
try {
//反射、反射、反射
//clazz在上面赋值过NioServerSocketChannel
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
此时应该调用NioServerSocketChannel的无参方法
//NioServerSocketChannel
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
跟newSocket(DEFAULT_SELECTOR_PROVIDER) 方法
//NioServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
//返回一个ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) { }
}
跟this(newSocket(provider))的this方法(☆)
//NioServerSocketChannel
public NioServerSocketChannel(ServerSocketChannel channel) {
//SelectionKey.OP_ACCEPT,SelectionKey.OP_ACCEPT,SelectionKey.OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//config中存入了当前NioServerSocketChannel
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
跟上面代码的super方法,一直跟
//AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
//将channel赋值给成员变量
this.ch = ch;
//Nio感兴趣的键
this.readInterestOp = readInterestOp;
try {
//Nio知识,设置为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
//略
}
}
跟上面的super方法
//AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
//初始化pipline对象
pipeline = newChannelPipeline();
}
此时NioServerSocketChannel已经创建完了,并且channel里带有ChannelPipeline对象
AbstractBootstrap
abstract void init(Channel channel) throws Exception;
找方法的实现类
//ServerBootStrap
//给bossGroup(parentGroup)的channel添加参数,并且添加了一个关键、关键、关键的ChannelInitializer
void init(Channel channel) throws Exception {
//获取bossGroup(parentGroup)的option(ChannelOption.SO_BACKLOG, 1024)这种参数并且传给channel(不是重点)
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
//获取bossGroup的attr这种参数并且传给channel(不是重点)
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//获取channel中的ChannelPipeline
ChannelPipeline p = channel.pipeline();
//获取workerGroup(childGroup)及其一些参数childHandler、ChildOption、ChildAttrs,这些参数都在创建ServerBootStrap的时候赋过值
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//重点、重点、重点\重点、重点、重点
//下面会重点解释这个地方
//P是ServerSocketChannnel中的ChannelPipeline,添加了一个ChannelInitializer整体(不用看里面的内容)
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
//对应handler(new LoggingHandler(LogLevel.INFO))
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//重点、重点、重点(后面会解释)
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
一直跟上面的的 p.addLast(new ChannelInitializer<Channel>() 的addLast方法
//DefaultChannelPipeline
//ChannelPipeline.addList虽然看似添加handler,其实是添加包含handler的ChannelHandlerContext
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//检查、省略
checkMultiplicity(handler);
//创建一个ChannelHandlerContext并且把hander传入进去
newCtx = newContext(group, filterName(name, handler), handler);
//将ChannelHandlerContext添加到创建的NioServerSocketChannel的Pipeline中
addLast0(newCtx);
//走这个方法就返回
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
//debug后面的没走,下面的略
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
跟上面代码中的 addLast0(newCtx) 方法
//DefaultChannelPipeline
//双向链表操作,将ctx添加到链表中
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
此时NioServerSocketChannel已经初始化完毕,主要的工作就是给channel里的成员变量赋值,并且添加了一ChannelInitializer类 ,并且吧包含handler的ChannelHandlerContext加到双向链表中
跟AbstractBootstrap中的initAndRegister方法里的
ChannelFuture regFuture = config().group().register(channel);
3.2.3.1跟config()方法
//ServerBootStrap
//将自己传给ServerBootstrapConfig
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
public final ServerBootstrapConfig config() {
return config;
}
3.2.3.2一直跟group()方法
//AbstractBootstrap
//group的成员变量的赋值在b.group(bossGroup, workerGroup),其实就是bossGroup
volatile EventLoopGroup group;
//返回成员变量
public final EventLoopGroup group() {
return group;
}
3.2.3.3跟register()方法
//MultithreadEventExecutorGroup
public ChannelFuture register(Channel channel) {
//next方法返回一个EventLoop,自己看
//跟register方法
return next().register(channel);
}
跟register方法
//SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
//DefaultChannelPromise传入了一个channel和this(EventExcuter)
return register(new DefaultChannelPromise(channel, this));
}
跟regiser方法
//SingleThreadEventLoop
public ChannelFuture register(final ChannelPromise promise) {
//检查,省略
ObjectUtil.checkNotNull(promise, "promise");
//重点关注register
promise.channel().unsafe().register(this, promise);
return promise;
}
跟register方法
//AbstractChannel
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//if省略
AbstractChannel.this.eventLoop = eventLoop;
//下面会执行register0(promise)
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
跟register0方法
//AbstractChannel
private void register0(ChannelPromise promise) {
try {
//if省略
boolean firstRegistration = neverRegistered;
//真正的注册方法
doRegister();
neverRegistered = false;
registered = true;
//调用handlerAdd方法
pipeline.invokeHandlerAddedIfNeeded();
//这个方法中有一个触发监听器的功能
safeSetSuccess(promise);
//调用ChannelRegiser方法
pipeline.fireChannelRegistered();
if (isActive()) {//debug没有执行,省略
}
} catch (Throwable t) {
}
}
跟上面代码的doRegister方法,实现真正的注册
//AbstractNioChannel
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//java Nio 的知识,注册
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
跟AbstractChannel的register方法里的
//AbstractChannel的register方法里的
pipeline.invokeHandlerAddedIfNeeded();
//DefaultChannelPipeline
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
跟callHandlerAddedForAllHandlers()方法
//DefaultChannelPipeline
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
//跟这个方法
task.execute();
task = task.next;
}
}
跟task.excute()方法
//DefaultChannelPipeline
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
}
}
}
跟callHandlerAdd0(ctx)方法
//DefaultChannelPipeline
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
//省略
}
}
其中看ctx.handler(),ctx就是ChannelHandlerContext创建的时候就会传入hander,所以一个ChannelHandlerContext对应一个handler
下面跟 ctx.handler().handlerAdded(ctx) handlerAdded方法 (☆☆☆☆☆)
此时你应该懂你的NioServerSocket中的handler有哪些???
还记得ServerBootStrap中的init方法吗?如下所示
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
p.addLast(new ChannelInitializer<Channel>() pipeline中添加了ChannelInitializer ,所以要调用ChannelInitializer的handlerAdd方法
跟ChannelInitializer里的handlerAdded方法
//ChannelInitializer
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
//跟下面方法
initChannel(ctx);
}
}
跟
//ChannelInitializer
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
//这个initChannel方法就是自定义的方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
} finally {
//最后将ctx移除,实现批量处理,因为这个类的作用是添加自定义的Handler,本身没处理的能力,留着干嘛,删了
remove(ctx);
}
return true;
}
return false;
}
在跟这个 initChannel((C) ctx.channel()) 方法,这个方法的内容不能在跟了,在跟就跟丢了,内容如下
//ServerBootStrap里的init方法的一部分
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
initChannel方法添加了
跟AbstractChannel的registero方法里的safeSetSuccess(promise)代码
//AbstractChannel
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
跟 promise.trySuccess()方法
//DefaultChannelPipeline
@Override
public boolean trySuccess() {
return trySuccess(null);
}
跟
//DefaultChannelPipeline
@Override
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
//通知监听器
notifyListeners();
return true;
}
return false;
}
跟AbstractChannel的registero方法里的pipeline.fireChannelRegistered()代码
他的运行原理和调用handerAdd方法差不多
channel.active没有调用