11.如何编码请求

关于请求的编码,如果你熟悉过dubbo代码,直接看其DubboCodec类即可。

如果不熟悉,属于刚开始看那种,也没关系。我们用下面的办法探索一下就能发现。

探索出编码器在哪里是什么

我们先从上篇文章的服务调用继续往下跟,可以发现请求要通过过Channel发送出去。调用栈如下:

NettyChannel.send(Object, boolean) line: 95	
NettyClient(AbstractClient).send(Object, boolean) line: 270	
NettyClient(AbstractPeer).send(Object) line: 51	
HeaderExchangeChannel.request(Object, int) line: 112

既然是netty的channel,我们尝试看下其pipeline,应该能发现编解码器。

channel的ChannelPipeline pipeline字段:

DefaultChannelPipeline{(decoder = com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder), (encoder = com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalEncoder), (handler = com.alibaba.dubbo.remoting.transport.netty.NettyHandler)}

到InternalEncoder里跟一下不难发现对接了DubboCodec。

DubboCodec分析

encode

encode部分传进来的msg的Object的class是class com.alibaba.dubbo.remoting.exchange.Request。

这个过程主要完成的是:

  1. 编码头部
  2. 选出序列化器,用于后续编码

encodeRequestData

encodeRequestData这个部分用来编码Request对象中data部分,data是class com.alibaba.dubbo.rpc.RpcInvocation。看下其toString如下:

RpcInvocation [methodName=queryUser, parameterTypes=[class java.lang.Integer], arguments=[100], attachments={path=com.code260.ss.dubbo.demov.facade.service.UserService, interface=com.code260.ss.dubbo.demov.facade.service.UserService, version=0.0.0, timeout=600000, token=123456}]

整个逻辑是先写入一些特殊字段比如version,path,methodName等,再挨个写入请求参数,参数会过一层处理主要是针对回调的,如果不是回调方式的,原样返回给编码器进行序列化即可。

这个方法入参out的class是com.alibaba.dubbo.common.serialize.support.hessian.Hessian2ObjectOutput。

附上部分代码及调用栈供你参考

调用栈:

DubboCodec.encodeRequestData(Channel, ObjectOutput, Object) line: 186	
DubboCodec(ExchangeCodec).encodeRequest(Channel, ChannelBuffer, Request) line: 236	
DubboCodec(ExchangeCodec).encode(Channel, ChannelBuffer, Object) line: 75	
DubboCountCodec.encode(Channel, ChannelBuffer, Object) line: 39	
NettyCodecAdapter$InternalEncoder.encode(ChannelHandlerContext, Channel, Object) line: 81	
NettyCodecAdapter$InternalEncoder(OneToOneEncoder).handleDownstream(ChannelHandlerContext, ChannelEvent) line: 66	
DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 591	
DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(ChannelEvent) line: 776	
NettyHandler(SimpleChannelHandler).writeRequested(ChannelHandlerContext, MessageEvent) line: 304	
NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent) line: 99	
NettyHandler(SimpleChannelHandler).handleDownstream(ChannelHandlerContext, ChannelEvent) line: 266	
DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 591	
DefaultChannelPipeline.sendDownstream(ChannelEvent) line: 582	
Channels.write(Channel, Object, SocketAddress) line: 611	
Channels.write(Channel, Object) line: 578	
NioClientSocketChannel(AbstractChannel).write(Object) line: 251	
NettyChannel.send(Object, boolean) line: 98	
NettyClient(AbstractClient).send(Object, boolean) line: 270	
NettyClient(AbstractPeer).send(Object) line: 51	
HeaderExchangeChannel.request(Object, int) line: 112	
HeaderExchangeClient.request(Object, int) line: 91

编码请求的代码:

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);
        // header.
        byte[] header = new byte[HEADER_LENGTH];
        // set magic number.
        Bytes.short2bytes(MAGIC, header);

        // set request and serialization flag.
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
        if (req.isEvent()) header[2] |= FLAG_EVENT;

        // set request id.
        Bytes.long2bytes(req.getId(), header, 4);

        // encode request data.
        int savedWriteIndex = buffer.writerIndex();
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            encodeEventData(channel, out, req.getData());
        } else {
            encodeRequestData(channel, out, req.getData());
        }
        out.flushBuffer();
        bos.flush();
        bos.close();
        int len = bos.writtenBytes();
        checkPayload(channel, len);
        Bytes.int2bytes(len, header, 12);

        // write
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header.
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
    

@Override
    protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

        out.writeUTF(inv.getMethodName());
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null)
        for (int i = 0; i < args.length; i++){
            out.writeObject(encodeInvocationArgument(channel, inv, i));
        }
        out.writeObject(inv.getAttachments());
    }