最近在看netty的东西,为了学以致用,顺带看了下netty作为dubbo传输层的一种实现是怎么应用的

Netty体系结构

Netty 是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器 和客户端。Netty屏蔽了一些Java NIO上的细节,业务只要实现Netty暴露出来的回调接口,即可方便实现业务逻辑。

Netty主要有以下几个核心概念

  • Channel,原生socket的包装,暴露了一些方便实用的接口
  • EventLoop,Netty核心处理逻辑,绑定到一个线程上,处理Channel的整个生命周期
  • ChannelFuture,提供对异步结果处理的若干回调接口
  • ChannelHandler,Channel生命周期各节点的回调接口,类似filter之于servlet,interceptor之于spring mvc
  • ChannelPipeline,ChannelHandler的处理链

image-20181031144818484

image-20181031144852568

Dubbo中的应用

dubbo在服务端传输层附近的主要工作是:

  • 数据传输上,protocol调用信息(方法名,参数等)包装成invocation,invocation中会有dubbo协议维护的额外信息(比如attachment),然后交给序列化层序列化,然后交给netty传输
  • 调用关系上,通过ProxyFactory讲业务service实现包装成invoker,再由protocol层包装成exporter对外提供网络服务

消费端反过来即可

netty服务由NettyServer对外暴露,DubboProtocol根据invocation找到invoker,由invoker调用具体service实现

代码分析开始

server启动

protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

写过netty程序的人应该很熟悉这里了,一个典型的netty应用程序启动流程

  • 初始化ServerBootstrap
  • 初始化两个NioEventLoopGroup,一个用于接受连接,一个用于处理业务逻辑
  • 安插3个ChannelHandler

ChannelHandler

dubbo使用了3个ChannelHandler,其中2个解编码器,一个业务handler,一个一个说

NettyCodecAdapter

适配了解码器和编码器,内部类包装了构造的时候传进去的Codec2对象,底下由这个Codec2实现解码和编码

private class InternalEncoder extends MessageToByteEncoder {

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        ...
        codec.encode(channel, buffer, msg);
        ...
    }
}
private class InternalDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
		...
        msg = codec.decode(channel, message);
   		...
}

codec可以找到跟dubbo协议相关的DubboCodec类,在重载的decodeBody方法中,包装DecodeableRpcInvocation,塞进Request.data中,返回这个Request给下级handler

NettyServerHandler

NettyServerHandler是个ChannelInboundHandler,既然是个ChannelInboundHandler,那么找数据处理逻辑肯定奔着channelRead去了

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        handler.received(channel, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

这个内部handler由构造的时候传入,看上层代码发现NettyServer本身也是ChannelHandler,继续跟踪会发现实际是DubboProtocol中的一个匿名内部类的对象requestHandler经过几层包装而来

image-20181031163323615

DecodeHandler

transport层(面向message),这个handler解到上级decoder解码后的request(内部data是一个DecodeableRpcInvocation对象),进入received逻辑

public void received(Channel channel, Object message) throws RemotingException {
    ...
    if (message instanceof Request) {
        decode(((Request) message).getData());
    }
	...
    handler.received(channel, message);
}

很明显会命中这个if,对data进行decode

private void decode(Object message) {
    if (message != null && message instanceof Decodeable) {
        ...
        ((Decodeable) message).decode();
        ...
    } // ~ end of if
} // ~ end of method decode

进入DecodeableRpcInvocation的decode方法(太长,不贴了),主要步骤是读取方法名/参数类型/参数/attachment,完了之后这个DecodeableRpcInvocation算是正式可用了

结束之后调用内部handler继续

HeaderExchangeHandler

exchanger层(面向request/response),最终调用构造的时候接到的DubboProtocol.reply方法

public void received(Channel channel, Object message) throws RemotingException {
    ...
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        }
    ...
}

Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    ...
    // find handler by message class.
    Object msg = req.getData();
    try {
        // handle data.
        Object result = handler.reply(channel, msg);
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

DubboProtocol

protocol层(面向invoker, invocation),拿到上层解析完的invocation,找到invoker,调用代理的service实现

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
        Invocation inv = (Invocation) message;
        Invoker<?> invoker = getInvoker(channel, inv);
        ...
        RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
        return invoker.invoke(inv);
    }
    ...
}

自此,从网络字节流invocation的逐级转化完成,接下去就是invoker调用service的过程,下一篇继续