Netty Protobuf处理粘包分析(netty解决粘包)

  本篇文章为你整理了Netty Protobuf处理粘包分析(netty解决粘包)的详细内容,包含有netty 粘包解决方案 netty解决粘包 netty粘包/拆包的解决办法 netty的粘包与拆包 Netty Protobuf处理粘包分析,希望能帮助你了解 Netty Protobuf处理粘包分析。

  最近消息中间件项目进行联调,我负责Server端,使用Java的Netty框架。同事负责Client端,使用Go的net包,消息使用Protobuf序列化。联调时Client发送的消息Server端解析出错,经过分析发现是Server与Client粘包处理方式不一致导致,Server使用的是Protobuf提供的粘包处理方式,Client使用的是消息头定义长度的处理方式,探索一下Protobuf粘包处理方式有何不同。

  

public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder ByteBuf {

 

   @Override

   protected void encode(

   ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {

   int bodyLen = msg.readableBytes();

   int headerLen = computeRawVarint32Size(bodyLen);

   out.ensureWritable(headerLen + bodyLen);

   writeRawVarint32(out, bodyLen);

   out.writeBytes(msg, msg.readerIndex(), bodyLen);

   * Writes protobuf varint32 to (@link ByteBuf).

   * @param out to be written to

   * @param value to be written

   static void writeRawVarint32(ByteBuf out, int value) {

   while (true) {

   if ((value ~0x7F) == 0) {

   out.writeByte(value);

   return;

   } else {

   out.writeByte((value 0x7F) 0x80);

   value

   * Computes size of protobuf varint32 after encoding.

   * @param value which is to be encoded.

   * @return size of value encoded as protobuf varint32.

   static int computeRawVarint32Size(final int value) {

   if ((value (0xffffffff 7)) == 0) {

   return 1;

   if ((value (0xffffffff 14)) == 0) {

   return 2;

   if ((value (0xffffffff 21)) == 0) {

   return 3;

   if ((value (0xffffffff 28)) == 0) {

   return 4;

   return 5;

  

 

  encode()方法

  

protected void encode(

 

   ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {

   // 获取消息长度

   int bodyLen = msg.readableBytes();

   // 计算表示消息体长度所需的字节数量

   int headerLen = computeRawVarint32Size(bodyLen);

   // 拿到所有需要写入的数据长度,对缓冲区进行扩容

   out.ensureWritable(headerLen + bodyLen);

   // 将表示消息体长度的字节写入缓冲区

   writeRawVarint32(out, bodyLen);

   out.writeBytes(msg, msg.readerIndex(), bodyLen);

  

 

  writeRawVarint32()方法

  先看value ~0x7F、(value 0x7F) 0x80、value = 7这几个看不懂的地方, 、、~、 =这些符号为计算机的位运算符号,分别代表与、或、非、忽略符号位右移(a =n 相当于 a = a n)

  计算value ~0x7F

  分别假设value值为100、200

  100转二进制为01100100,200转二进制为11001000

  计算100 ~0x7F

  
这里运算结果使用十进制表示二进制是不准确的,仅作参考,需要根据数据类型进行转换,比如:10000000转换为byte类型是-128,转换为int是128

  通过以上计算可以看出:

  可以使用小于7个位表示的数字即可满足条件,7个位可以表示$2^7=128$个数字,取值范围是0~127,也就是说0~127可以满足条件,这一步的目的是保证写入表示消息体长度的最后一位字节是正数,后面会说到。

  value=100满足条件,所以向bytebuf中写入字节01100100,然后return方法结束。

  value=200不满足条件,那么看(value 0x7F) 0x80这一步运算。

  计算(value 0x7F) 0x80

  
计算结果还是200,我们分析一下步骤:

  value 0x7f:取出最后七个位,0x80:将首位转为1

  即取出最后7个位,高位补1,正好一个字节的长度,将11001000写入bytebuf,再看value = 7。

  计算value = 7

  
计算机中一般首位是符号位,0表示正数,1表示负数。

  这里需要注意是 表示右移,不改变符号,最高位与原来符号保持一致。 是忽略符号位右移,最高位补0。

  200 = 7的结果为00000001,继续走if判断,满足条件,将00000001写入bytebuf。

  最终value=200写入bytebuf的字节是11001000、00000001。

  至此,三个看不懂的位运算都理解了,那么我们连起来看一下:

  如果value可以用7个字节表示(或者说是value在0~127范围内),将value转换为字节写入bytebuf,跳出循环,方法结束。

  如果value不能用7个字节表示(或者说是value不在0~127范围内),取最后7个位,高位补1,写入bytebuf中,右移7位(将刚才取出的7位删掉),再次判断是否满足if条件,不满足就继续上面的操作,直到满足条件为止。

  总结一下writeRawVarint32方法,其实是把一个整数拆分成多个字节,倒序写入bytebuf中,如果将每个字节转换为byte类型,最后一个字节总是正数,前面的字节都是负数。我们可以猜测,接收消息时以第一个正数为分割,将表示消息体长度的字节与消息体字节拆分开,再通过位运算将前者组合起来就得到了消息体的长度。

  computeRawVarint32Size()方法

  我们在writeRawVarint32方法分析中了解了位运算,再看computeRawVarint32Size方法就很简单了。

  计算机表示负数

  0xffffffff转换为二进制是11111111 1111111 11111111 11111111转换为有有符号int类型是-1。为什么是-1?

  因为计算机使用二进制可以做加法运算,但是没办法做减法运算,加上一个负数就相当于做了减法运算,现在问题是如何表示负数?

  曾经有原码表示法、反码表示法,这里不做赘述,现在使用的是补码表示法。

  补码表示法是将正数的二进制取反,然后在最后一位+1。

  通过例子看一下:

  有符号int类型的1用二进制可以表示为00000000 0000000 00000000 0000001取反得到11111111 11111111 11111111 11111110+1得到11111111 11111111 11111111 11111111转换为十六进制是0xffffffff。

  计算value (0xffffffff 7)

   表示左移,从左边挤出去7个位,在右边补7个0。

  这里仍然假设value分为为100,200。

  

// 计算(100 (0xffffffff 7))

 

   00000000 0000000 00000000 01100100 // 100

   11111111 1111111 11111111 10000000 // 0xffffffff 7

   00000000 0000000 00000000 00000000 // 结果:0

  // 计算(200 (0xffffffff 7))

   00000000 0000000 00000000 11001000 // 200

   11111111 1111111 11111111 10000000 // 0xffffffff 7

   00000000 0000000 00000000 10000000 // 结果:128

  // 计算(200 (0xffffffff 14))

   00000000 0000000 00000000 11001000 // 200

   11111111 1111111 11000000 00000000 // 0xffffffff 14

   00000000 0000000 00000000 00000000 // 结果:0

  

 

  从以上计算可以看出,如果value可以用小于7个位来表示,则左移7个位可以满足,如果value可以用8~14个位来表示,左移14个位可以满足。

  100、200计算结果分别为1、2,与writeRawVarint32方法写入的字节数量一致。

  writeRawVarint32是方法7个7个的取出位,这里按7个位来计算所需字节数量,最终返回表示消息体长度的字节数量。

  

public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {

 

   // TODO maxFrameLength + safe skip + fail-fast option

   // (just like LengthFieldBasedFrameDecoder)

   @Override

   protected void decode(ChannelHandlerContext ctx, ByteBuf in, List Object out)

   throws Exception {

   in.markReaderIndex();

   int preIndex = in.readerIndex();

   int length = readRawVarint32(in);

   if (preIndex == in.readerIndex()) {

   return;

   if (length 0) {

   throw new CorruptedFrameException("negative length: " + length);

   if (in.readableBytes() length) {

   in.resetReaderIndex();

   } else {

   out.add(in.readRetainedSlice(length));

   * Reads variable length 32bit int from buffer

   * @return decoded int if buffers readerIndex has been forwarded else nonsense value

   private static int readRawVarint32(ByteBuf buffer) {

   if (!buffer.isReadable()) {

   return 0;

   buffer.markReaderIndex();

   byte tmp = buffer.readByte();

   if (tmp = 0) {

   return tmp;

   } else {

   int result = tmp 127;

   if (!buffer.isReadable()) {

   buffer.resetReaderIndex();

   return 0;

   if ((tmp = buffer.readByte()) = 0) {

   result = tmp 7;

   } else {

   result = (tmp 127) 7;

   if (!buffer.isReadable()) {

   buffer.resetReaderIndex();

   return 0;

   if ((tmp = buffer.readByte()) = 0) {

   result = tmp 14;

   } else {

   result = (tmp 127) 14;

   if (!buffer.isReadable()) {

   buffer.resetReaderIndex();

   return 0;

   if ((tmp = buffer.readByte()) = 0) {

   result = tmp 21;

   } else {

   result = (tmp 127) 21;

   if (!buffer.isReadable()) {

   buffer.resetReaderIndex();

   return 0;

   result = (tmp = buffer.readByte()) 28;

   if (tmp 0) {

   throw new CorruptedFrameException("malformed varint.");

   return result;

  

 

  readRawVarint32()方法

  方法就不细看了,验证一下我们之前的猜测。还是使用value=200写入的11001000、00000001字节举例看一下。

  

private static int readRawVarint32(ByteBuf buffer) {

 

   if (!buffer.isReadable()) {

   return 0;

   buffer.markReaderIndex();

   // 读取第一个字节

   byte tmp = buffer.readByte(); // tmp = 11001000 首位是1,是个负数,小于0

   // 判断是否大于等于0,

   // 大于等于0说明是最后一个表示消息体长度的字节,直接return

   if (tmp = 0) {

   return tmp;

   } else {

   // 小于0 tmp 127 取出后七位

   int result = tmp 127; // result = 11001000 01111111 = 01001000

   if (!buffer.isReadable()) {

   buffer.resetReaderIndex();

   return 0;

   // 再取第二个字节,判断是否大于等于0

   if ((tmp = buffer.readByte()) = 0) { //tmp = 00000001

   // 这一步操作相当于是把上一步取出的7个字节拿出来拼在tmp 7的后面

   result = tmp 7;

   // result = 01001000 tmp 7

   // tmp 7 = 10000000

   // 01001000 10000000 = 11001000 转换为int类型是200

   // 后面的代码与以上步骤大同小异,不再赘述了

   return result;

  

 

  涉及到的基础知识:计算机表示整数、位运算、进制转换

  一般处理粘包的方式有三种:

  
消息头定义长度:在消息体前增加消息体的长度,一般使用四个字节,读取消息时先读取四个字节,得到消息体长度,再根据长度读取消息。

  
Netty Protobuf提供的处理粘包处理方式是在消息体前加正负数,并且以第一个正数作为分割。可以说是消息头定义长度方式+特殊符号分割方式的结合版。巧妙利用二进制的位运算和计算机表示整数的特点实现动态消息长度,发送较短消息时可以比消息头定义长度的方式节省1-3个字节。

  博客小白的第一篇文档,如有错误,还望指正。

  以上就是Netty Protobuf处理粘包分析(netty解决粘包)的详细内容,想要了解更多 Netty Protobuf处理粘包分析的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: