在从零开始实现Netty极简版 (二)中, 我们已经实现了异步接口, 接下来的目标便是实现ChannelPipeline,也就是Netty中的责任链。
ChannelPipeline是Netty非常重要的特性, 因为要学习使用Netty, 那就必须要了解ChannelPipeline, 在Netty中, 网络通讯几乎所有的关键操作都跟ChannelPipeline有关系, 包括bind, accept, read, write等等。
由于之前没有实现ChannelPipeline,所以这些相关操作都不是在ChannelPipeline中完成的, 而在本文中, 这些操作都将被被移入它们应该在的位置。
源码地址
启动代码
ChannelInboundHandler handler = (ctx,msg) -> { |
可以看到, 这次不仅支持了数据读取, 还支持了数据写入, 数据写入也是异步操作, 操作完成后将输出”回复成功”。
新的接口
Netty定义为Pipeline定义了大量相关的概念, 具体体现为许多个接口, 这里每个接口里都定义了大量的方法, 这些接口定义的方法看上去很类似, 但在概念上是有区别的。
由于方法太多会分散注意力, 因此本文只定义了会用到的少许几个方法, 这样看起来会清爽很多。
下面是接口定义://没有定义任何方法
public interface ChannelHandler {}
public interface ChannelInboundInvoker {
ChannelInboundInvoker fireChannelRead(Object msg);
}
public interface ChannelOutboundInvoker {
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture write(Object msg);
}
public interface ChannelInboundHandler extends ChannelHandler {
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
}
public interface ChannelOutboundHandler extends ChannelHandler{
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
}
public interface ChannelHandlerContext extends ChannelInboundInvoker, ChannelOutboundInvoker{
ChannelHandler handler();
}
public interface ChannelPipeline extends ChannelInboundInvoker,ChannelOutboundInvoker {
Channel channel();
ChannelPipeline addLast(ChannelHandler handlers);
}
在xxxInvoker 和 xxxHandler中有名称相似的方法, 比如ChannelOutboundHandler和ChannelOutboundInvoker都有write方法, 实际上除了write方法,它们还有很多类似的方法, 只是我这个实现里没有。那他们的概念有什么区别, 请继续看下去吧。
责任传递的实现AbstractChannelHandlerContext
Netty中责任链的实现是在AbstractChannelHandlerContext中实现的,他包装了ChannelHandler, 在Netty中是ChannelHandlerContext所有实现类的基类, ChannelHandlerContext就是依赖它来让消息在责任链中传递的。
在AbstractChannelHandlerContext中要处理很多类型的消息传递, 这些方法实现都大同小异, 对于一个xxx操作, 会定义xxx方法以及invokeXxx方法
xxx方法负责确定该操作是出站还是入站操作, 找到下一个对应的
AbstractChannelHandlerContext, 然后在它对应的的EventLoop中调用invokeXxxinvokeXxx方法负责调用自己包装的handler的相应方法
下面是write方法的例子,其他方法请参阅源码
public ChannelFuture write(Object msg) { |
findContextOutbound方法和findContextInbound方法展示了Netty中的责任链是如何把AbstractChannelHandlerContext串联起来的:
private AbstractChannelHandlerContext findContextOutbound() { |
责任链的初始化
责任链的是随Channel一起初始化的, 并且会和对应的Channel不离不弃, 相守一生。
在本文的实现中, ChannelPipeline只有一个惟一的实现类, 那就是DefaultChannelPipeline。
public DefaultChannelPipeline(Channel channel) { |
可以从构造函数中看到, DefaultChannelPipeline关联了对应的Channel,并初始化了TailContext和HeadContext两个AbstractChannelHandlerContext,并且将他们连接了起来。
TailContext在我们的实现中暂时并没什么用, 而HeadContext却比较重要, 它是Netty责任链的最外侧, 它是所有入站消息的第一站, 也是所有出站消息的最后一站, 因此它有兜底的义务。
下面是它的的部分代码
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
channel.unsafe().bind(localAddress,promise);
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
channel.unsafe().write(msg,promise);
}
在上面的代码里, 你终于见到了unsafe, 其实我原计划是不想定义unsafe的, 想把它的功能全部定义到channel中去, 但我现在发现这是行不通的, 下面会解释。
在Channel中定义Unsafe
首先看全新的Channel定义
public interface Channel { |
可以看到Unsafe和Channel都定义了相同签名的bind方法, 而这两个bind方法的概念是不同的:
Channel中的bind方法是一个操作请求, 这个请求会被投递到Channel对应的ChannelPiple中。Unsafe中的bind方法是一个直接操作, 它直接调用了doBind,而后者调用java的Nio相关操作除了
bind外, 还有一些其他操作也是这个情况, 因此Netty设计者才区分了Channel和Unsafe
//AbstractNioChannel中的bind实现 |
阶段三总结
到此位置, ChannelPipeline就已经加入到了我们的Netty实现中, 同样的, 还有一些不太重要的细节本文中没有提到, 可以自行查看源代码。