从零开始实现Netty极简版 (三) 实现ChannelPipeline

在从零开始实现Netty极简版 (二)中, 我们已经实现了异步接口, 接下来的目标便是实现ChannelPipeline,也就是Netty中的责任链。

ChannelPipeline是Netty非常重要的特性, 因为要学习使用Netty, 那就必须要了解ChannelPipeline, 在Netty中, 网络通讯几乎所有的关键操作都跟ChannelPipeline有关系, 包括bind, accept, read, write等等。

由于之前没有实现ChannelPipeline,所以这些相关操作都不是在ChannelPipeline中完成的, 而在本文中, 这些操作都将被被移入它们应该在的位置。
源码地址

启动代码

ChannelInboundHandler handler = (ctx,msg) -> {
byte[] bytes = (byte[])msg;
String str = new String(bytes);
System.out.println("接收到消息:" + str);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
ctx.write(buffer).addListener((future) -> {
System.out.println("回复成功");
});
};
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(handler);
//这里改成异步调用
b.bind(6666).addListener((future)->{
System.out.println("绑定端口成功");
});

可以看到, 这次不仅支持了数据读取, 还支持了数据写入, 数据写入也是异步操作, 操作完成后将输出”回复成功”。

新的接口

Netty定义为Pipeline定义了大量相关的概念, 具体体现为许多个接口, 这里每个接口里都定义了大量的方法, 这些接口定义的方法看上去很类似, 但在概念上是有区别的。

由于方法太多会分散注意力, 因此本文只定义了会用到的少许几个方法, 这样看起来会清爽很多。

下面是接口定义:

//没有定义任何方法
public interface ChannelHandler {}

public interface ChannelInboundInvoker {
ChannelInboundInvoker fireChannelRead(Object msg);
}

public interface ChannelOutboundInvoker {
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture write(Object msg);
}

public interface ChannelInboundHandler extends ChannelHandler {
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
}

public interface ChannelOutboundHandler extends ChannelHandler{
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
}

public interface ChannelHandlerContext extends ChannelInboundInvoker, ChannelOutboundInvoker{
ChannelHandler handler();
}

public interface ChannelPipeline extends ChannelInboundInvoker,ChannelOutboundInvoker {
Channel channel();
ChannelPipeline addLast(ChannelHandler handlers);
}

在xxxInvoker 和 xxxHandler中有名称相似的方法, 比如ChannelOutboundHandlerChannelOutboundInvoker都有write方法, 实际上除了write方法,它们还有很多类似的方法, 只是我这个实现里没有。那他们的概念有什么区别, 请继续看下去吧。

责任传递的实现AbstractChannelHandlerContext

Netty中责任链的实现是在AbstractChannelHandlerContext中实现的,他包装了ChannelHandler, 在Netty中是ChannelHandlerContext所有实现类的基类, ChannelHandlerContext就是依赖它来让消息在责任链中传递的。

AbstractChannelHandlerContext中要处理很多类型的消息传递, 这些方法实现都大同小异, 对于一个xxx操作, 会定义xxx方法以及invokeXxx方法

  • xxx方法负责确定该操作是出站还是入站操作, 找到下一个对应的AbstractChannelHandlerContext, 然后在它对应的的EventLoop中调用invokeXxx

  • invokeXxx方法负责调用自己包装的handler的相应方法

下面是write方法的例子,其他方法请参阅源码

    public ChannelFuture write(Object msg) {
AbstractChannelHandlerContext next = findContextOutbound();
ChannelPromise promise = new DefaultChannelPromise(pipeline.channel(),eventLoop());
EventLoop eventLoop = next.eventLoop();
if (eventLoop.inEventLoop()) {
next.invokeWrite(msg, promise);
}else {
safeExecute(eventLoop,()-> {next.invokeWrite(msg, promise);},promise,msg);
}

return promise;
}

private void invokeWrite(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
t.printStackTrace();
}
}

findContextOutbound方法和findContextInbound方法展示了Netty中的责任链是如何把AbstractChannelHandlerContext串联起来的:

private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}

责任链的初始化

责任链的是随Channel一起初始化的, 并且会和对应的Channel不离不弃, 相守一生。

在本文的实现中, ChannelPipeline只有一个惟一的实现类, 那就是DefaultChannelPipeline

public DefaultChannelPipeline(Channel channel) {
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}

可以从构造函数中看到, DefaultChannelPipeline关联了对应的Channel,并初始化了TailContextHeadContext两个AbstractChannelHandlerContext,并且将他们连接了起来。

TailContext在我们的实现中暂时并没什么用, 而HeadContext却比较重要, 它是Netty责任链的最外侧, 它是所有入站消息的第一站, 也是所有出站消息的最后一站, 因此它有兜底的义务。

下面是它的的部分代码

@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
channel.unsafe().bind(localAddress,promise);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
channel.unsafe().write(msg,promise);
}

在上面的代码里, 你终于见到了unsafe, 其实我原计划是不想定义unsafe的, 想把它的功能全部定义到channel中去, 但我现在发现这是行不通的, 下面会解释。

在Channel中定义Unsafe

首先看全新的Channel定义

public interface Channel {
void register(EventLoop eventLoop, ChannelPromise promise);
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
EventLoop eventLoop();
ChannelPipeline pipeline();
Unsafe unsafe();

interface Unsafe{
void bind(SocketAddress localAddress, ChannelPromise promise);
void write(Object msg, ChannelPromise promise);

}

}

可以看到UnsafeChannel都定义了相同签名的bind方法, 而这两个bind方法的概念是不同的:

  • Channel中的bind方法是一个操作请求, 这个请求会被投递到Channel对应的ChannelPiple中。

  • Unsafe中的bind方法是一个直接操作, 它直接调用了doBind,而后者调用java的Nio相关操作

  • 除了bind外, 还有一些其他操作也是这个情况, 因此Netty设计者才区分了Channel和Unsafe

//AbstractNioChannel中的bind实现
public ChannelFuture bind(SocketAddress localAddress,ChannelPromise promise) {
return pipeline.bind(localAddress,promise);
}

//NioUnsafe中的bind实现
public void bind(SocketAddress localAddress, ChannelPromise promise) {
// 其他操作省略 ...
doBind(localAddress); //直接调用doBind
}

//NioServerSocketChannel中的doBind
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress);
}

阶段三总结

到此位置, ChannelPipeline就已经加入到了我们的Netty实现中, 同样的, 还有一些不太重要的细节本文中没有提到, 可以自行查看源代码。