从零开始实现Netty极简版 (二) ChanelFuture和ChannelPromise

在从零开始实现Netty极简版 (一)中, 我们实现一个基本骨架, 接下来的目标便是实现异步操作。

源码地址

因此服务的启动代码需要做相应升级

//升级前, bind是同步操作
b.bind(6666);

//升级后,bind成为了异步操作,可以添加listener
b.bind(6666).addListener((future)->{
System.out.println("绑定端口成功");
});

新的接口

Netty中实现异步操作主要依靠其的ChannelFutureChannelPromise, 以及对应的事件监听器ChannelFutureListener,因此需要定义这三个新的接口

public interface ChannelFuture {
//返回关联的channel
Channel channel();
//判断操作是否完成
boolean isDone();
//等待操作完成
ChannelFuture await() throws InterruptedException;
//添加监听器,监听操作完成
ChannelFuture addListener(ChannelFutureListener listener);
}

public interface ChannelPromise extends ChannelFuture{
//这里只实现一个方法,就是设置操作成功, 因为我们暂时不考虑异常情况!
ChannelPromise setSuccess();
}

public interface ChannelFutureListener {
void operationComplete(ChannelFuture future);
}

实现DefaultChannelPromise

DefaultChannelPromise实现了ChannelPromise,因为ChannelPromise继承自ChannelFuture,所以它也实现了ChannelFuture,在本文中,ChannelFuture有且仅有DefaultChannelPromise这一个实现。

下面是主要的代码

public class DefaultChannelPromise implements ChannelPromise {
//省略部分代码...

//阻塞等待操作完成
@Override
public ChannelPromise await() throws InterruptedException {
if (isDone()) {
return this;
}
//当操作没有完成时,挂起当前线程
synchronized (this) {
while (!isDone()) {
wait();
}
}
return this;
}

//添加监听器
@Override
public ChannelFuture addListener(ChannelFutureListener listener) {
synchronized (this){
listeners.add(listener);
}
//检查一下是不是早就完成了
if (isDone()) {
//通知所有监听器
notifyListeners();
}
return this;
}

//设置操作完成
@Override
public ChannelPromise setSuccess() {
isSuccess = true;
synchronized (this){
//唤醒所有被await挂起的线程
notifyAll();
}
//调用所有监听器
notifyListeners();
return this;
}
}

同步方法升级为异步方法

因为现在定义了异步接口, 所以之前实现的同步方法都需要升级, 下面是需要升级的地方

  • Channel

    //升级前
    void register(EventLoop eventLoop);
    void bind(SocketAddress localAddress);
    //升级后
    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
  • EventLoopGroup

    //升级前
    void register(Channel channel);
    //升级后
    ChannelFuture register(Channel channel);
  • EventLoop

    //升级前这个接口是空的,升级后如下
    boolean inEventLoop(); //判断当前是否已经在事件循环的线程中
    void execute(Runnable command); //提交任务到事件循环线程中运行

其中EventLoop做了重大升级, 增加了两个方法, inEventLoop的作用是判断当前线程是否是事件循环的线程,如果不是,就需要再手工调用execute方法,提交任务到事件循环的线程中, Netty就是通过这种方式保证单个channel的所有操作都是在同一线程中进行的。

升级NioEventLoop

有了异步提交任务的功能后, NioEventLoop的任务越来越重了, 首先需要对register方法进行升级:

//升级前
public void register(Channel channel) {
channel.register(this);
}

//升级后
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel,this));

}

public ChannelFuture register(final ChannelPromise promise){
promise.channel().register(this,promise);
return promise;
}

出了register方法签名的改动外, NioEventLoop还增加了线程池的功能,能够对其提交任务。

因此在它的事件循环中,不仅需要调用selector.select(),还需要忙里偷闲来完成异步提交的任务,所以需要在它的死循环中加入runAllTasks()

//死循环内部代码
//升级前
int readyChannels = selector.select(512);
processSelectedKeysPlain(selector.selectedKeys());
Thread.yield();

//升级后
int readyChannels = selector.select(512);
processSelectedKeysPlain(selector.selectedKeys());
runAllTasks(); //执行异步提交的任务
Thread.yield();

被提交的任务被暂存在一个队列中

final private Queue<Runnable> taskQueue = new LinkedBlockingQueue<>();

public void execute(Runnable task) {
taskQueue.offer(task);
selector.wakeup(); //唤醒selector, 以免它因无事可做而一直阻塞
}

protected void runAllTasks(){
for (;;) {
Runnable task = taskQueue.poll();
if(task == null)
break;
safeExecute(task);
}
}

protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable e) {
e.printStackTrace();
}
}

升级AbstractNioChannel

AbstractNioChannel的升级主要是在更新register方法为异步方法

public void register(EventLoop eventLoop, ChannelPromise promise) {
this.eventLoop = (NioEventLoop)eventLoop;
//确保注册操作是在关联的EventLoop中完成
if(eventLoop.inEventLoop()){
register0(promise);
}else {
eventLoop.execute(()-> register0(promise));
}

}

private void register0(ChannelPromise promise){
doRegister();
//操作都完成了,设置promise状态为完成
promise.setSuccess();
}

升级ServerBootstrap

最后是升级ServerBootstrap,也就是它的bind方法

//升级前
public void bind(int port){
Channel channel = initAndRegister();
channel.bind(new InetSocketAddress(port));
}

//升级后
public ChannelFuture bind(int port){
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
final ChannelPromise promise = new DefaultChannelPromise(channel,channel.eventLoop());
regFuture.addListener((future) -> {
channel.bind(new InetSocketAddress(port),promise);
});
return promise;
}

阶段二总结

还有几处很小的修改本文没有提到, 详情可以查看完整的源代码, 到此为止, 异步机制就已经加入了我们的极简版Netty实现了。