在从零开始实现Netty极简版 (一)中, 我们实现一个基本骨架, 接下来的目标便是实现异步操作。
因此服务的启动代码需要做相应升级//升级前, bind是同步操作
b.bind(6666);
//升级后,bind成为了异步操作,可以添加listener
b.bind(6666).addListener((future)->{
System.out.println("绑定端口成功");
});
新的接口
Netty中实现异步操作主要依靠其的ChannelFuture和ChannelPromise, 以及对应的事件监听器ChannelFutureListener,因此需要定义这三个新的接口
public interface ChannelFuture { |
实现DefaultChannelPromise
DefaultChannelPromise实现了ChannelPromise,因为ChannelPromise继承自ChannelFuture,所以它也实现了ChannelFuture,在本文中,ChannelFuture有且仅有DefaultChannelPromise这一个实现。
下面是主要的代码
public class DefaultChannelPromise implements ChannelPromise { |
同步方法升级为异步方法
因为现在定义了异步接口, 所以之前实现的同步方法都需要升级, 下面是需要升级的地方
Channel
//升级前
void register(EventLoop eventLoop);
void bind(SocketAddress localAddress);
//升级后
void register(EventLoop eventLoop, ChannelPromise promise);
void bind(SocketAddress localAddress, ChannelPromise promise);EventLoopGroup
//升级前
void register(Channel channel);
//升级后
ChannelFuture register(Channel channel);EventLoop
//升级前这个接口是空的,升级后如下
boolean inEventLoop(); //判断当前是否已经在事件循环的线程中
void execute(Runnable command); //提交任务到事件循环线程中运行
其中EventLoop做了重大升级, 增加了两个方法, inEventLoop的作用是判断当前线程是否是事件循环的线程,如果不是,就需要再手工调用execute方法,提交任务到事件循环的线程中, Netty就是通过这种方式保证单个channel的所有操作都是在同一线程中进行的。
升级NioEventLoop
有了异步提交任务的功能后, NioEventLoop的任务越来越重了, 首先需要对register方法进行升级://升级前
public void register(Channel channel) {
channel.register(this);
}
//升级后
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel,this));
}
public ChannelFuture register(final ChannelPromise promise){
promise.channel().register(this,promise);
return promise;
}
出了register方法签名的改动外, NioEventLoop还增加了线程池的功能,能够对其提交任务。
因此在它的事件循环中,不仅需要调用selector.select(),还需要忙里偷闲来完成异步提交的任务,所以需要在它的死循环中加入runAllTasks()
//死循环内部代码 |
被提交的任务被暂存在一个队列中final private Queue<Runnable> taskQueue = new LinkedBlockingQueue<>();
public void execute(Runnable task) {
taskQueue.offer(task);
selector.wakeup(); //唤醒selector, 以免它因无事可做而一直阻塞
}
protected void runAllTasks(){
for (;;) {
Runnable task = taskQueue.poll();
if(task == null)
break;
safeExecute(task);
}
}
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
升级AbstractNioChannel
AbstractNioChannel的升级主要是在更新register方法为异步方法
public void register(EventLoop eventLoop, ChannelPromise promise) { |
升级ServerBootstrap
最后是升级ServerBootstrap,也就是它的bind方法
//升级前 |
阶段二总结
还有几处很小的修改本文没有提到, 详情可以查看完整的源代码, 到此为止, 异步机制就已经加入了我们的极简版Netty实现了。