从零开始实现Netty极简版 (一) 基本骨架

第一步的任务是实现一个基本骨架, 让程序能够跑起来。

程序能够监听一个指定的端口接收客户端端连接,并在控制台打印出客户端发送过来的字符串。

源码地址

启动代码

服务端启动代码如下:(是不是跟netty的启动代码很相似?)

ChannelHandler handler = (o) -> {
byte[] bytes = (byte[])o;
String msg = new String(bytes);
System.out.println("接收到消息:" + msg);
};
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(handler);
b.bind(6666);

以上的测试监听了6666端口,并将接收到的文本消息打印到控制台

实现思路

这一步的目标就是为了能让程序跑起来,因此错误处理,异步处理等等能不管的尽量不管,一切尽可能的简单。

主要实现的内容包括EventLoopChannel,以及一个自定义的ChannelHandler,为了便于理解, 总的的代码控制在300行左右。

接口定义

这里定义了EventLoopGroup, EventLoop, Channel, ChannelHandler四个接口,都非常非常的简单。

public interface EventLoopGroup {
//获取组内的下一个EventLoop
EventLoop next();
//将channel注册到EventLoopGroup
void register(Channel channel);
}

public interface EventLoop extends EventLoopGroup {
// 纯粹的概念定义,方法全部继承自EventLoopGroup
}

//我这里没有unsafe,所以把unsafe的功能整进Channel了
public interface Channel {
//将Channel注册到EventLoop
void register(EventLoop eventLoop);
//绑定指定端口
void bind(SocketAddress localAddress);
//处理事件
Object doReadMessages();
//获取关联的EventLoop
EventLoop eventLoop();
//这个方法是我为了简化代码自己发明的,netty中没有
void setHandler(ChannelHandler handler);
}

public interface ChannelHandler {
//没错,在我的实现里,ChannelHandler只有一个方法
void channelRead(Object msg);
}

实现NioEventLoopGroup

接着是实现NioEventLoopGroup,这个逻辑很简单,就是初始化指定数目的NioEventLoop,并实现一个选择算法

public class NioEventLoopGroup implements EventLoopGroup {
private final int nThreads;
NioEventLoop[] eventLoops;
int curIndex = 0;
//初始化线程数量
public NioEventLoopGroup(int nThreads) {
this.nThreads = nThreads;
initEventLoops();
}
private void initEventLoops(){
eventLoops = new NioEventLoop[nThreads];
//直接初始化所有的子NioEventLoop
for (int i=0;i<nThreads;i++)
eventLoops[i] = new NioEventLoop();
}
@Override
public void register(Channel channel) {
next().register(channel);
}
@Override
//实现选择算法
public EventLoop next() {
return eventLoops[curIndex++ % nThreads];
}
}

实现NioEventLoop

下面是NioEventLoop,NioEventLoop的实现相对来说复杂一些,它负责创建选择器,以及进行select循环

public class NioEventLoop implements EventLoop, Runnable {
private Selector selector;
NioEventLoop() {
try {
selector = Selector.open();
//Netty里面是等有任务以后才初始化线程,我这里管不了这么多了,在构造函数直接启动线程
Thread t= new Thread(this);
t.start();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public EventLoop next() {
return this;
}

@Override
public void register(Channel channel) {
channel.register(this);
}

public Selector unwrappedSelector() {
return selector;
}

@Override
public void run() {
while (true) {
try {
int readyChannels = selector.select(512);
processSelectedKeysPlain(selector.selectedKeys());
Thread.yield();
} catch (Throwable e) {
e.printStackTrace();
}
}

}

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys){
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final AbstractNioChannel a = (AbstractNioChannel)k.attachment();
i.remove();
a.read();
if (!i.hasNext()) {
break;
}
}
}
}

实现AbstractNioChannel

接着就是要实现NioServerSocketChannel以及NioSocketChannel了,因为这两个类比较相似,为了避免重复代码,所以提取了一个抽象基类AbstractNioChannel

//AbstractNioChannel负责NioServerSocketChannel以及NioSocketChannel的公共操作
public abstract class AbstractNioChannel implements Channel {
private NioEventLoop eventLoop; //关联的eventLoop
protected Channel parent; //父Channel 暂时没用
private final SelectableChannel ch; //Nio的Channel
protected final int readInterestOp; //关注的操作
protected ChannelHandler handler; //这是我发明的handler
@Override
public void setHandler(ChannelHandler handler){
this.handler = handler;
}

//构造函数,接收parent, Nio的Channel以及关注的OP
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (Throwable e) {

}
}
@Override
public void register(EventLoop eventLoop){
this.eventLoop = (NioEventLoop)eventLoop;
doRegister();

}
private void doRegister(){
try {
//channel向Selector注册就在此处
javaChannel().register(eventLoop.unwrappedSelector(), readInterestOp,this);
//因为此时Selector可能正在select,所以唤醒它一下,重新进行select
eventLoop.unwrappedSelector().wakeup();
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
@Override
public NioEventLoop eventLoop() {
return eventLoop;
}
@Override
public void bind(SocketAddress localAddress) {}
protected SelectableChannel javaChannel() {
return ch;
}
public void read(){
Object msg = doReadMessages();
if(msg != null && handler != null){
handler.channelRead(msg);
}

}
}

实现NioServerSocketChannel

NioServerSocketChannel实际上就是Nio的ServerSocketChannel的封装,主要负责监听端口,接受连接

//负责监听端口,接受连接
public class NioServerSocketChannel extends AbstractNioChannel {
public NioServerSocketChannel() throws IOException {
super(null, ServerSocketChannel.open(), SelectionKey.OP_ACCEPT);
}

@Override
public void bind(SocketAddress localAddress) {
try {
//bind操作
javaChannel().bind(localAddress);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public Object doReadMessages() {
try {
//accept操作
SocketChannel ch = javaChannel().accept();
if(ch != null){
return new NioSocketChannel(this, ch);
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}

}

实现NioSocketChannel

NioSocketChannel实际上就是Nio的SocketChannel的封装,主要负责接收数据

public class NioSocketChannel extends AbstractNioChannel{
private ByteBuffer buffer = ByteBuffer.allocate(1024);
NioSocketChannel(Channel parent, SocketChannel socket){
super(parent,socket, SelectionKey.OP_READ);
}

@Override
protected SocketChannel javaChannel() {
return (SocketChannel) super.javaChannel();
}

@Override
public Object doReadMessages() {
try {
buffer.clear();
javaChannel().read(buffer);
buffer.flip();
if(buffer.limit() > 0){
byte[] result = new byte[buffer.limit()];
buffer.get(result);
return result;
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}

实现ServerBootstrap

最后要实现的就是启动器ServerBootstrap了,这个类的主要工作就是负责组装以上实现的内容

//服务端启动器
public class ServerBootstrap {
private Class<? extends Channel> channelClass;
private EventLoopGroup parentGroup;
private EventLoopGroup childGroup;
private ChannelHandler childHandler;

public void bind(int port){
Channel channel = initAndRegister();
channel.bind(new InetSocketAddress(port));
}

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
this.parentGroup = parentGroup;
this.childGroup = childGroup;
return this;
}
public ServerBootstrap channel(Class<? extends Channel> channelClass) {
this.channelClass = channelClass;
return this;
}

public ServerBootstrap childHandler(ChannelHandler childHandler){
this.childHandler = childHandler;
return this;
}

private Channel initAndRegister(){
try {
//创建NioServerSocketChannel
Channel channel = channelClass.newInstance();
init(channel);
//注册到主EventLoopGroup
parentGroup.register(channel);
return channel;
} catch (Throwable e) {
e.printStackTrace();
}
return null;
}

private void init(Channel channel){
channel.setHandler(new ServerBootstrapAcceptor(childGroup,childHandler));

}

private static class ServerBootstrapAcceptor implements ChannelHandler{
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
ServerBootstrapAcceptor(EventLoopGroup childGroup,ChannelHandler childHandler){
this.childGroup = childGroup;
this.childHandler = childHandler;
}
@Override
public void channelRead(Object msg) {
final Channel child = (Channel)msg;
child.setHandler(childHandler);
//在此处将接收到的NioSocketChannel注册到childGroup
childGroup.register(child);
}
}
}

阶段一总结

到此为止,这个极度缩水版的netty的骨架已经完成了,在netty中,bind和accept都是在Channel的pipeline中完成的,但我这个实现并没有pipeline,所以直接放在的Channel中,以后的更新中我会把这些特性都加入进来。