Skip to content

Commit

Permalink
dubbo包头 发送请求/接受响应代码阅读
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyang committed May 7, 2020
1 parent 93874c7 commit 3f21fd0
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 89 deletions.
Expand Up @@ -21,8 +21,6 @@
/**
* Channel. (API/SPI, Prototype, ThreadSafe)
*
*
*
* @see com.alibaba.dubbo.remoting.Client
* @see com.alibaba.dubbo.remoting.Server#getChannels()
* @see com.alibaba.dubbo.remoting.Server#getChannel(InetSocketAddress)
Expand Down
Expand Up @@ -36,8 +36,8 @@ public interface ExchangeChannel extends Channel {
/**
* send request.
*
* @param request
* @param timeout
* @param request 请求对象 实际上是一个RpcInvocation对象
* @param timeout 超市时间
* @return response future
* @throws RemotingException
*/
Expand Down
Expand Up @@ -43,23 +43,20 @@

/**
* ExchangeCodec.
*
*
*
*/
public class ExchangeCodec extends TelnetCodec {

// header length.
// header length. 消息头长度128位
protected static final int HEADER_LENGTH = 16;
// magic header.
// magic header. 高低位魔数 标识协议版本号 各占8位
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
// message flag.
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
protected static final byte FLAG_REQUEST = (byte) 0x80; // 标识是请求或响应 请求:1 响应:0
protected static final byte FLAG_TWOWAY = (byte) 0x40; // 仅在FLAG_REQUEST为1(请求)时才有用 标记是否期望从服务器返回值
protected static final byte FLAG_EVENT = (byte) 0x20; // 标识是否是事件消息 如果这是一个事件设置为1
protected static final int SERIALIZATION_MASK = 0x1f; // 标识序列化类型
private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

public Short getMagicCode() {
Expand All @@ -68,26 +65,26 @@ public Short getMagicCode() {

@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
if (msg instanceof Request) { // 处理请求
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
} else if (msg instanceof Response) { // 处理响应
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
super.encode(channel, buffer, msg); // 其他的交给父类处理 用于telnet模式
}
}

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
int readable = buffer.readableBytes(); // 可读字节数
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; // 选取可读字节数和HEADER_LENGTH中小的
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}

@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
// check magic number. 检查魔数
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
Expand All @@ -105,24 +102,24 @@ protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byt
return super.decode(channel, buffer, readable, header);
}
// check length.
if (readable < HEADER_LENGTH) {
if (readable < HEADER_LENGTH) { // 如果是一个不完整的包 发出NEED_MORE_INPUT事件 需要继续读取
return DecodeResult.NEED_MORE_INPUT;
}

// get data length.
// get data length. 获取数据长度
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);

int tt = len + HEADER_LENGTH;
if (readable < tt) {
if (readable < tt) { // 需要继续读取
return DecodeResult.NEED_MORE_INPUT;
}

// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

try {
return decodeBody(channel, is, header);
return decodeBody(channel, is, header); // 解码数据
} finally {
if (is.available() > 0) {
try {
Expand All @@ -140,20 +137,20 @@ protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byt
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
long id = Bytes.bytes2long(header, 4); // 获取请求id
if ((flag & FLAG_REQUEST) == 0) { // 判断是请求还是响应
// decode response. 响应
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
if ((flag & FLAG_EVENT) != 0) { // 是否是event事件
res.setEvent(Response.HEARTBEAT_EVENT);
}
// get status.
byte status = header[3];
byte status = header[3]; // 获取请求的状态码
res.setStatus(status);
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); // 进行数据内容解析
if (status == Response.OK) {
Object data;
Object data; // 根据不同的类型来进行解析
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
Expand All @@ -171,7 +168,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
return res;
} else {
// decode request.
// decode request. 请求
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
Expand All @@ -180,7 +177,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
}
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
Object data;
Object data; // 与响应相同进行内容解析
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
Expand Down Expand Up @@ -208,27 +205,37 @@ protected Object getRequestData(long id) {
return req.getData();
}

/**
* 请求的encode
* 记录header --> 写body --> 写header中的data length --> 最终写buffer
*
* @param channel
* @param buffer
* @param req
* @throws IOException
*/
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// header.
Serialization serialization = getSerialization(channel); // 请求的序列化类型
// header. 写入header信息
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
// set magic number. 设置魔数
Bytes.short2bytes(MAGIC, header);

// set request and serialization flag.
// set request and serialization flag. 标记这是一个请求
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; // 是否是双路
if (req.isEvent()) header[2] |= FLAG_EVENT; // 是否为事件

// set request id.
Bytes.long2bytes(req.getId(), header, 4);
Bytes.long2bytes(req.getId(), header, 4); // 写入当前请求的request id

// encode request data.
int savedWriteIndex = buffer.writerIndex();
int savedWriteIndex = buffer.writerIndex(); // 记录当前写入的位置 将其往后偏移 保留出要写入内容大小的位置(预留出header的位置) 先写入body内容
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// 根据数据内容的不同 写入不同的内容
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
Expand All @@ -240,21 +247,21 @@ protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
int len = bos.writtenBytes(); // 记录写入的body长度
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
Bytes.int2bytes(len, header, 12); // 偏移96位写入body长度信息

// write
// write. 写入buffer
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writeBytes(header); // write header. 将header信息写入
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
int savedWriteIndex = buffer.writerIndex();
try {
Serialization serialization = getSerialization(channel);
// header.
// header. 和编码request的参数一致
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
Expand All @@ -263,11 +270,11 @@ protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response re
if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
// set response status.
byte status = res.getStatus();
header[3] = status;
header[3] = status; // 写入状态码
// set request id.
Bytes.long2bytes(res.getId(), header, 4);
Bytes.long2bytes(res.getId(), header, 4); // 写入response id

buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); // 和request一样的内容写入方式 先写入内容再写入长度
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// encode response data or error message.
Expand All @@ -277,30 +284,30 @@ protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response re
} else {
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
} else out.writeUTF(res.getErrorMessage());
} else out.writeUTF(res.getErrorMessage()); // 这里不太一样的地方在于 如果错误的时候则直接将错误信息写入 不需要再交由序列化
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();

int len = bos.writtenBytes();
int len = bos.writtenBytes(); // 一样的写入模式
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
// clear buffer
// clear buffer. 写入出现异常
buffer.writerIndex(savedWriteIndex);
// send error message to Consumer, otherwise, Consumer will wait till timeout.
if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
Response r = new Response(res.getId(), res.getVersion());
r.setStatus(Response.BAD_RESPONSE);

if (t instanceof ExceedPayloadLimitException) {
if (t instanceof ExceedPayloadLimitException) { // 如果是超过内容长度则重新设置内容大小并写入
logger.warn(t.getMessage(), t);
try {
r.setErrorMessage(t.getMessage());
Expand All @@ -322,7 +329,7 @@ protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response re
}
}

// Rethrow exception
// Rethrow exception. 其他的则抛出异常
if (t instanceof IOException) {
throw (IOException) t;
} else if (t instanceof RuntimeException) {
Expand Down Expand Up @@ -450,6 +457,4 @@ protected void encodeRequestData(Channel channel, ObjectOutput out, Object data,
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
encodeResponseData(out, data);
}


}
Expand Up @@ -66,6 +66,7 @@ static HeaderExchangeChannel getOrAddChannel(Channel ch) {
}
return ret;
}


static void removeChannelIfDisconnected(Channel ch) {
if (ch != null && !ch.isConnected()) {
Expand Down Expand Up @@ -106,16 +107,16 @@ public ResponseFuture request(Object request, int timeout) throws RemotingExcept
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
// create request. 创建请求对象
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
DefaultFuture future = new DefaultFuture(channel, req, timeout); // 创建执行结果的回调信息
try {
channel.send(req);
channel.send(req); // 交给业务渠道处理 实际上是交给Transporter这个SPI进行创建 NettyChannel就是在这里产生的
} catch (RemotingException e) {
future.cancel();
future.cancel(); // 请求出现异常则取消当前的请求封装
throw e;
}
return future;
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.remoting.Codec2;
import com.alibaba.dubbo.remoting.buffer.DynamicChannelBuffer;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
Expand Down Expand Up @@ -136,7 +135,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throw
throw e;
}
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
message.readerIndex(saveReaderIndex); // 读索引回滚
break;
} else {
if (saveReaderIndex == message.readerIndex()) {
Expand All @@ -149,7 +148,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throw
}
} while (message.readable());
} finally {
if (message.readable()) {
if (message.readable()) { // 判断是否可读
message.discardReadBytes();
buffer = message;
} else {
Expand Down
Expand Up @@ -19,7 +19,6 @@
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.remoting.Codec2;
import com.alibaba.dubbo.remoting.buffer.ChannelBuffer;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
Expand Down
Expand Up @@ -29,13 +29,15 @@ public interface Invoker<T> extends Node {

/**
* get service interface.
* 获取当前调用者的服务接口
*
* @return service interface.
*/
Class<T> getInterface();

/**
* invoke.
* 执行请求
*
* @param invocation
* @return result
Expand Down

0 comments on commit 3f21fd0

Please sign in to comment.