11.如何编码请求
关于请求的编码,如果你熟悉过dubbo代码,直接看其DubboCodec类即可。
如果不熟悉,属于刚开始看那种,也没关系。我们用下面的办法探索一下就能发现。
探索出编码器在哪里是什么
我们先从上篇文章的服务调用继续往下跟,可以发现请求要通过过Channel发送出去。调用栈如下:
NettyChannel.send(Object, boolean) line: 95
NettyClient(AbstractClient).send(Object, boolean) line: 270
NettyClient(AbstractPeer).send(Object) line: 51
HeaderExchangeChannel.request(Object, int) line: 112
既然是netty的channel,我们尝试看下其pipeline,应该能发现编解码器。
channel的ChannelPipeline pipeline字段:
DefaultChannelPipeline{(decoder = com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder), (encoder = com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalEncoder), (handler = com.alibaba.dubbo.remoting.transport.netty.NettyHandler)}
到InternalEncoder里跟一下不难发现对接了DubboCodec。
DubboCodec分析
encode
encode部分传进来的msg的Object的class是class com.alibaba.dubbo.remoting.exchange.Request。
这个过程主要完成的是:
- 编码头部
- 选出序列化器,用于后续编码
encodeRequestData
encodeRequestData这个部分用来编码Request对象中data部分,data是class com.alibaba.dubbo.rpc.RpcInvocation。看下其toString如下:
RpcInvocation [methodName=queryUser, parameterTypes=[class java.lang.Integer], arguments=[100], attachments={path=com.code260.ss.dubbo.demov.facade.service.UserService, interface=com.code260.ss.dubbo.demov.facade.service.UserService, version=0.0.0, timeout=600000, token=123456}]
整个逻辑是先写入一些特殊字段比如version,path,methodName等,再挨个写入请求参数,参数会过一层处理主要是针对回调的,如果不是回调方式的,原样返回给编码器进行序列化即可。
这个方法入参out的class是com.alibaba.dubbo.common.serialize.support.hessian.Hessian2ObjectOutput。
附上部分代码及调用栈供你参考
调用栈:
DubboCodec.encodeRequestData(Channel, ObjectOutput, Object) line: 186
DubboCodec(ExchangeCodec).encodeRequest(Channel, ChannelBuffer, Request) line: 236
DubboCodec(ExchangeCodec).encode(Channel, ChannelBuffer, Object) line: 75
DubboCountCodec.encode(Channel, ChannelBuffer, Object) line: 39
NettyCodecAdapter$InternalEncoder.encode(ChannelHandlerContext, Channel, Object) line: 81
NettyCodecAdapter$InternalEncoder(OneToOneEncoder).handleDownstream(ChannelHandlerContext, ChannelEvent) line: 66
DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 591
DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(ChannelEvent) line: 776
NettyHandler(SimpleChannelHandler).writeRequested(ChannelHandlerContext, MessageEvent) line: 304
NettyHandler.writeRequested(ChannelHandlerContext, MessageEvent) line: 99
NettyHandler(SimpleChannelHandler).handleDownstream(ChannelHandlerContext, ChannelEvent) line: 266
DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline$DefaultChannelHandlerContext, ChannelEvent) line: 591
DefaultChannelPipeline.sendDownstream(ChannelEvent) line: 582
Channels.write(Channel, Object, SocketAddress) line: 611
Channels.write(Channel, Object) line: 578
NioClientSocketChannel(AbstractChannel).write(Object) line: 251
NettyChannel.send(Object, boolean) line: 98
NettyClient(AbstractClient).send(Object, boolean) line: 270
NettyClient(AbstractPeer).send(Object) line: 51
HeaderExchangeChannel.request(Object, int) line: 112
HeaderExchangeClient.request(Object, int) line: 91
编码请求的代码:
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// header.
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
// set request id.
Bytes.long2bytes(req.getId(), header, 4);
// encode request data.
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData());
}
out.flushBuffer();
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
out.writeUTF(inv.getMethodName());
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
if (args != null)
for (int i = 0; i < args.length; i++){
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
out.writeObject(inv.getAttachments());
}
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!