netty对多协议进行编解码(netty 多个解码器)

  本篇文章为你整理了netty对多协议进行编解码(netty 多个解码器)的详细内容,包含有netty多协议处理 netty 多个解码器 netty编码解码工作流程 netty handler和编解码顺序 netty对多协议进行编解码,希望能帮助你了解 netty对多协议进行编解码。

  项目地址:https://gitee.com/q529075990qqcom/NB-IOT.git

  我们需要一个创建mavne项目,这个项目是我已经写好的项目,项目结构图如下:

  

  

  

  创建公共模块

  创建子模块,准备好依赖Netty4.1版本

  

 dependencies 

 

   dependency

   groupId io.netty /groupId

   artifactId netty-all /artifactId

   version 4.1.72.Final /version

   /dependency

   dependency

   groupId org.slf4j /groupId

   artifactId slf4j-api /artifactId

   version 1.7.28 /version

   /dependency

   dependency

   groupId org.slf4j /groupId

   artifactId slf4j-simple /artifactId

   version 1.7.28 /version

   /dependency

   dependency

   groupId org.projectlombok /groupId

   artifactId lombok /artifactId

   version RELEASE /version

   scope compile /scope

   /dependency

   dependency

   groupId com.esotericsoftware /groupId

   artifactId kryo /artifactId

   version 5.3.0 /version

   /dependency

   /dependencies

 

  maven依赖

  

  序列化的定义是:将一个对象编码成一个字节流(I/O);而与之相反的操作被称为反序列化。

  

package serializer;

 

   * @description:

   * @author: quliang

   * @create: 2022-10-20 15:16

  public interface Serializer {

   * 序列化

   * @param obj

   * @return

   * @throws Exception

   byte[] serialize(Object obj) throws Exception;

   * 反序列化

   * @param bytes

   * @param clazz

   * @param T

   * @return

   * @throws Exception

   T T deserialize(byte[] bytes, Class T clazz) throws Exception;

  }

 

  自定义序列化接口

  

package serializer;

 

  import com.esotericsoftware.kryo.Kryo;

  import com.esotericsoftware.kryo.io.Input;

  import com.esotericsoftware.kryo.io.Output;

  import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;

  import org.objenesis.strategy.StdInstantiatorStrategy;

  import java.io.ByteArrayInputStream;

  import java.io.ByteArrayOutputStream;

  import java.io.IOException;

   * @description:

   * @author: quliang

   * @create: 2022-10-20 15:18

  public class KryoSerializer implements Serializer {

   private static final ThreadLocal Kryo kryoThreadLocal = ThreadLocal.withInitial(() - {

   Kryo kryo = new Kryo();

   kryo.setReferences(true);

   kryo.setRegistrationRequired(false);

   ((DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())

   .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());

   return kryo;

   @Override

   public byte[] serialize(Object obj) throws Exception {

   try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {

   Output output = new Output(baos);

   Kryo kryo = kryoThreadLocal.get();

   kryo.writeObject(output, obj);

   kryoThreadLocal.remove();

   return output.toBytes();

   } catch (IOException e) {

   throw new Exception("序列化失败", e);

   @Override

   public T T deserialize(byte[] bytes, Class T clazz) throws Exception {

   try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {

   Input input = new Input(bais);

   Kryo kryo = kryoThreadLocal.get();

   Object obj = kryo.readObject(input, clazz);

   kryoThreadLocal.remove();

   return clazz.cast(obj);

   } catch (IOException e) {

   throw new Exception("反序化失败");

  }

 

  Kryo实现序列化接口

  

  我们需要解析两种协议,那我们就要提前定义好两种协议,分别是消息协议、登录协议

  

  消息协议相关

  

package protocol.msg;

 

  import lombok.Data;

  import lombok.Getter;

   * @description: 消息协议: magicversiondata

   * @author: quliang

   * @create: 2022-12-10 20:46

  @Data

  public class MsgProtocol {

   @Getter

   private byte magic=0;

   @Getter

   private byte version=1;

  }

 

  消息协议基类

  

package protocol.msg.request;

 

  import lombok.Data;

  import protocol.msg.MsgProtocol;

   * @description:

   * @author: quliang

   * @create: 2022-12-10 20:58

  @Data

  public class MsgRequest extends MsgProtocol {

   private String msg;

  }

 

  消息请求子类

  

package protocol.msg.response;

 

  import lombok.Data;

  import protocol.msg.MsgProtocol;

   * @description:

   * @author: quliang

   * @create: 2022-12-10 20:41

  @Data

  public class MsgResponse extends MsgProtocol {

   private int statCode;

  }

 

  消息响应子类

  

package encoder;

 

  import io.netty.buffer.ByteBuf;

  import io.netty.channel.ChannelHandlerContext;

  import io.netty.handler.codec.MessageToByteEncoder;

  import protocol.msg.MsgProtocol;

  import serializer.KryoSerializer;

   * @description:

   * @author: quliang

   * @create: 2022-12-10 20:53

  public class MsgEncoder extends MessageToByteEncoder MsgProtocol {

   @Override

   protected void encode(ChannelHandlerContext ctx, MsgProtocol msgProtocol, ByteBuf in) throws Exception {

   in.writeByte(msgProtocol.getMagic());

  // in.writeByte(msgProtocol.code());

   in.writeByte(msgProtocol.getVersion());

   byte[] data = new KryoSerializer().serialize(msgProtocol);

   in.writeShort(data.length);

   in.writeBytes(data);

  }

 

  消息协议编码

  

package decoder;

 

  import io.netty.buffer.ByteBuf;

  import io.netty.channel.ChannelHandlerContext;

  import io.netty.handler.codec.ByteToMessageDecoder;

  import lombok.extern.slf4j.Slf4j;

  import protocol.msg.MsgProtocol;

  import serializer.KryoSerializer;

  import java.util.List;

   * @description:

   * @author: quliang

   * @create: 2022-12-10 20:52

  @Slf4j

  public class MsgDecoder extends ByteToMessageDecoder {

   private Class MsgProtocol msgClass;

   public MsgDecoder(Class clazz) {

   this.msgClass = clazz;

   @Override

   protected void decode(ChannelHandlerContext ctx, ByteBuf in, List Object out) throws Exception {

   try {

   byte magic = in.readByte();

   byte version = in.readByte();

   short dataSize = in.readShort();

   byte[] data = new byte[dataSize];

   in.readBytes(data);

   MsgProtocol baseProtocol = new KryoSerializer().deserialize(data, msgClass);

   out.add(baseProtocol);

   } catch (Exception e) {

   //如果解码错误,将数据传递到下一个解码器中

   log.error("msg decoder {}",e.getMessage());

   // 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空

   in.resetReaderIndex();

   // 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空;

   ByteBuf buff = in.retainedDuplicate();

   //原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。

   in.skipBytes(in.readableBytes());

   //继续传递到下一个解码器中

   out.add(buff);

  }

 

  消息协议解码

  

  登录协议相关

  

package protocol.system;

 

  
登录协议基类

  

package protocol.system.request;

 

  import lombok.AllArgsConstructor;

  import lombok.Data;

  import lombok.NoArgsConstructor;

  import protocol.system.LoginProtocol;

   * @description:

   * @author: quliang

   * @create: 2022-12-06 18:17

  @Data

  @NoArgsConstructor

  @AllArgsConstructor

  public class LoginRequest extends LoginProtocol {

   private String userId;

   private String userName;

  }

 

  登录请求子类

  

package protocol.system.response;

 

  import lombok.AllArgsConstructor;

  import lombok.Data;

  import lombok.NoArgsConstructor;

  import protocol.system.LoginProtocol;

   * @description:

   * @author: quliang

   * @create: 2022-12-06 18:22

  @Data

  @NoArgsConstructor

  @AllArgsConstructor

  public class LoginResponse extends LoginProtocol {

   private String msg;

   private String data;

  }

 

  登录响应子类

  

package encoder;

 

  import io.netty.buffer.ByteBuf;

  import io.netty.channel.ChannelHandlerContext;

  import io.netty.handler.codec.MessageToByteEncoder;

  import protocol.system.LoginProtocol;

  import serializer.KryoSerializer;

   * @description:

   * @author: quliang

   * @create: 2022-12-06 22:11

  public class LoginEncoder extends MessageToByteEncoder LoginProtocol {

   @Override

   protected void encode(ChannelHandlerContext ctx, LoginProtocol baseProtocol, ByteBuf in) throws Exception {

   in.writeByte(baseProtocol.getMagic());

   in.writeByte(baseProtocol.getCode());

   in.writeByte(baseProtocol.getVersion());

   byte[] data = new KryoSerializer().serialize(baseProtocol);

   in.writeShort(data.length);

   in.writeBytes(data);

  }

 

  登录协议编码

  

package decoder;

 

  import io.netty.buffer.ByteBuf;

  import io.netty.channel.ChannelHandlerContext;

  import io.netty.handler.codec.ByteToMessageDecoder;

  import lombok.extern.slf4j.Slf4j;

  import protocol.system.LoginProtocol;

  import serializer.KryoSerializer;

  import java.util.List;

   * @description:

   * @author: quliang

   * @create: 2022-12-06 17:59

  @Slf4j

  public class LoginDecoder extends ByteToMessageDecoder {

   private Class LoginProtocol clazz;

   public LoginDecoder(Class clazz) {

   this.clazz = clazz;

   @Override

   protected void decode(ChannelHandlerContext ctx, ByteBuf in, List Object out) throws Exception {

   try {

   byte magic = in.readByte();

   byte code = in.readByte();

   byte version = in.readByte();

   short dataSize = in.readShort();

   byte[] data = new byte[dataSize];

   in.readBytes(data);

   LoginProtocol baseProtocol = new KryoSerializer().deserialize(data, clazz);

   out.add(baseProtocol);

   } catch (Exception e) {

   //如果解码错误,将数据传递到下一个解码器中

   log.error("login decoder {}", e.getMessage());

   // 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空

   in.resetReaderIndex();

   // 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空;

   ByteBuf buff = in.retainedDuplicate();

   //原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。

   in.skipBytes(in.readableBytes());

   //继续传递到下一个解码器中

   out.add(buff);

  }

 

  登录协议解码

  这样公共模块就创建完成了

  创建服务端

  

package com.ql;

 

  import com.ql.handler.MsgHandler;

  import decoder.LoginDecoder;

  import decoder.MsgDecoder;

  import com.ql.handler.LoginHandler;

  import encoder.LoginEncoder;

  import encoder.MsgEncoder;

  import io.netty.bootstrap.ServerBootstrap;

  import io.netty.channel.ChannelFuture;

  import io.netty.channel.ChannelInitializer;

  import io.netty.channel.ChannelOption;

  import io.netty.channel.ChannelPipeline;

  import io.netty.channel.nio.NioEventLoopGroup;

  import io.netty.channel.socket.SocketChannel;

  import io.netty.channel.socket.nio.NioServerSocketChannel;

  import io.netty.handler.logging.LogLevel;

  import io.netty.handler.logging.LoggingHandler;

  import lombok.extern.slf4j.Slf4j;

  import protocol.system.request.LoginRequest;

  import protocol.msg.request.MsgRequest;

   * @author quliang

   * @description 服务端

   * @date 2022-12-06 17:39:14

  @Slf4j

  public class IotServer {

   public static void main(String[] args) throws InterruptedException {

   NioEventLoopGroup bossGroup = new NioEventLoopGroup();

   NioEventLoopGroup workGroup = new NioEventLoopGroup();

   try {

   ServerBootstrap bootstrap = new ServerBootstrap().group(

   bossGroup, workGroup)

   .channel(NioServerSocketChannel.class)

   .childHandler(new ChannelInitializer SocketChannel () {

   @Override

   protected void initChannel(SocketChannel channel) throws Exception {

   ChannelPipeline pipeline = channel.pipeline();

   pipeline.addLast(new LoggingHandler(LogLevel.INFO));

   * 心跳机制

   //pipeline.addLast(new IdleStateHandler(5, 10, 5, TimeUnit.SECONDS));

   * 消息、登录解码器

   pipeline.addLast(new LoginDecoder(LoginRequest.class));

   pipeline.addLast(new MsgDecoder(MsgRequest.class));

   * 消息、登录处理器

   pipeline.addLast(new MsgHandler());

   pipeline.addLast(new LoginHandler());

   * 消息、登录编码器

   pipeline.addLast(new MsgEncoder());

   pipeline.addLast(new LoginEncoder());

   .option(ChannelOption.SO_BACKLOG, 1024);

   ChannelFuture cf = bootstrap.bind(8849).sync();

   log.info("socket服务端启动成功 {}", cf.channel().localAddress().toString());

   cf.channel().closeFuture().sync();

   } finally {

   bossGroup.shutdownGracefully();

   workGroup.shutdownGracefully();

  }

 

  服务端代码

  

package com.ql.handler;

 

  import io.netty.channel.ChannelHandler;

  import io.netty.channel.ChannelHandlerContext;

  import io.netty.channel.SimpleChannelInboundHandler;

  import lombok.extern.slf4j.Slf4j;

  import protocol.msg.request.MsgRequest;

  import protocol.msg.response.MsgResponse;

   * @description: 消息处理器

   * @author: quliang

   * @create: 2022-12-10 20:57

  @Slf4j

  @ChannelHandler.Sharable

  public class MsgHandler extends SimpleChannelInboundHandler MsgRequest {

   @Override

   public void channelActive(ChannelHandlerContext ctx) throws Exception {

   log.info("上线{}", ctx.channel().remoteAddress().toString());

   @Override

   protected void channelRead0(ChannelHandlerContext ctx, MsgRequest request) throws Exception {

   log.info("服务端读取消息体数据为{}", request.toString());

   MsgResponse response = new MsgResponse();

   response.setStatCode(200);

   ctx.channel().writeAndFlush(response);

  }

 

  服务端消息处理器

  

package com.ql.handler;

 

  import io.netty.channel.*;

  import io.netty.handler.timeout.IdleStateEvent;

  import lombok.extern.slf4j.Slf4j;

  import protocol.system.request.LoginRequest;

  import protocol.system.response.LoginResponse;

  import java.util.concurrent.atomic.AtomicInteger;

   * @description: 登录处理器

   * @author: quliang

   * @create: 2022-12-06 18:14

  @Slf4j

  @ChannelHandler.Sharable

  public class LoginHandler extends SimpleChannelInboundHandler LoginRequest {

   private static AtomicInteger READER_COUNT = new AtomicInteger(0);

   @Override

   public void channelActive(ChannelHandlerContext ctx) throws Exception {

   log.info("服务端:{} 通道开启!", ctx.channel().localAddress().toString());

   @Override

   public void channelInactive(ChannelHandlerContext ctx) throws Exception {

   log.info("服务端: {} 通道关闭!", ctx.channel().localAddress().toString());

   @Override

   protected void channelRead0(ChannelHandlerContext ctx, LoginRequest loginRequest) throws Exception {

   log.info("读取数据 {} ", loginRequest.toString());

   LoginResponse response= new LoginResponse("success", null);

   ctx.channel().writeAndFlush(response);

   @Override

   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

   log.info("...............数据接收-完毕...............");

   @Override

   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

   ctx.close();

   log.error("...............业务处理异常...............{}", cause);

   @Override

   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

   if (evt instanceof IdleStateEvent) {

   IdleStateEvent event = (IdleStateEvent) evt;

   Channel channel = ctx.channel();

   switch (event.state()) {

   case READER_IDLE:

   log.info("读空闲");

   READER_COUNT.addAndGet(1);

   break;

   case WRITER_IDLE:

   log.info("写空闲");

   break;

   default:

   break;

   ctx.disconnect();

   if (READER_COUNT.get() 3) {

   log.info("close this channel {}", channel.remoteAddress().toString());

  }

 

  服务端登录处理器

  

  服务端其实很多都是直接引用公共模块的,代码也并不复杂

  创建消息客户端

  

package com.ql;

 

  import com.ql.handler.ClientMsgHandler;

  import decoder.MsgDecoder;

  import encoder.MsgEncoder;

  import io.netty.bootstrap.Bootstrap;

  import io.netty.channel.ChannelFuture;

  import io.netty.channel.ChannelInitializer;

  import io.netty.channel.ChannelPipeline;

  import io.netty.channel.EventLoopGroup;

  import io.netty.channel.nio.NioEventLoopGroup;

  import io.netty.channel.socket.SocketChannel;

  import io.netty.channel.socket.nio.NioSocketChannel;

  import io.netty.handler.logging.LogLevel;

  import io.netty.handler.logging.LoggingHandler;

  import lombok.extern.slf4j.Slf4j;

  import protocol.msg.response.MsgResponse;

  import java.net.InetSocketAddress;

   * @author quliang

   * @description 客户端

   * @date 2022-12-06 17:37:56

  @Slf4j

  public class IotClientMsg {

   public static void main(String[] args) throws InterruptedException {

   EventLoopGroup clientGroup = new NioEventLoopGroup();

   try {

   Bootstrap bs = new Bootstrap();

   bs.group(clientGroup)

   .channel(NioSocketChannel.class)

   .remoteAddress(new InetSocketAddress("169.254.190.154", 8849))

   .handler(new ChannelInitializer SocketChannel () {

   @Override

   protected void initChannel(SocketChannel ch) throws Exception {

   ChannelPipeline pipeline = ch.pipeline();

   pipeline.addLast(new LoggingHandler(LogLevel.INFO));

   //消息解码器

   pipeline.addLast(new MsgDecoder(MsgResponse.class));

   //客户端消息处理器

   pipeline.addLast(new ClientMsgHandler());

   //消息编码器

   pipeline.addLast(new MsgEncoder());

   ChannelFuture cf = bs.connect().sync();

   log.info("启动成功{}", cf.channel().localAddress().toString());

  // Scanner scanner = new Scanner(System.in);

   cf.channel().closeFuture().sync();

   } finally {

   clientGroup.shutdownGracefully().sync();

  }

 

  客户端代码

  

package com.ql.handler;

 

  import io.netty.channel.ChannelHandler;

  import io.netty.channel.ChannelHandlerContext;

  import io.netty.channel.SimpleChannelInboundHandler;

  import lombok.extern.slf4j.Slf4j;

  import protocol.msg.request.MsgRequest;

  import protocol.msg.response.MsgResponse;

   * @description:

   * @author: quliang

   * @create: 2022-12-10 20:40

  @Slf4j

  @ChannelHandler.Sharable

  public class ClientMsgHandler extends SimpleChannelInboundHandler MsgResponse {

   @Override

   public void channelActive(ChannelHandlerContext ctx) throws Exception {

   MsgRequest request = new MsgRequest();

   request.setMsg("hello");

   ctx.channel().writeAndFlush(request);

   @Override

   protected void channelRead0(ChannelHandlerContext ctx, MsgResponse response) throws Exception {

   int code = response.getStatCode();

   log.info("消息处理器读取响应对象数据为{}", code);

  }

 

  客户端消息处理器

  

  消息客户端代码也并不复杂

  创建登录客户端

  

package com.ql;

 

  import com.ql.handler.ClientLoginHandler;

  import decoder.LoginDecoder;

  import encoder.LoginEncoder;

  import io.netty.bootstrap.Bootstrap;

  import io.netty.channel.ChannelFuture;

  import io.netty.channel.ChannelInitializer;

  import io.netty.channel.ChannelPipeline;

  import io.netty.channel.EventLoopGroup;

  import io.netty.channel.nio.NioEventLoopGroup;

  import io.netty.channel.socket.SocketChannel;

  import io.netty.channel.socket.nio.NioSocketChannel;

  import io.netty.handler.logging.LogLevel;

  import io.netty.handler.logging.LoggingHandler;

  import lombok.extern.slf4j.Slf4j;

  import protocol.system.response.LoginResponse;

  import java.net.InetSocketAddress;

   * @author quliang

   * @description 客户端

   * @date 2022-12-06 17:37:56

  @Slf4j

  public class IotClientLogin {

   public static void main(String[] args) throws InterruptedException {

   EventLoopGroup clientGroup = new NioEventLoopGroup();

   try {

   Bootstrap bs = new Bootstrap();

   bs.group(clientGroup)

   .channel(NioSocketChannel.class)

   .remoteAddress(new InetSocketAddress("169.254.190.154", 8849))

   .handler(new ChannelInitializer SocketChannel () {

   @Override

   protected void initChannel(SocketChannel ch) throws Exception {

   ChannelPipeline pipeline = ch.pipeline();

   pipeline.addLast(new LoggingHandler(LogLevel.INFO));

   pipeline.addLast(new LoginDecoder(LoginResponse.class));

   //pipeline.addLast(new MsgDecoder(MsgResponse.class));

   //pipeline.addLast(new ClientMsgHandler());

   pipeline.addLast(new ClientLoginHandler());

   //pipeline.addLast(new MsgEncoder());

   pipeline.addLast(new LoginEncoder());

   ChannelFuture cf = bs.connect().sync();

   log.info("启动成功{}", cf.channel().localAddress().toString());

  // Scanner scanner = new Scanner(System.in);

   cf.channel().closeFuture().sync();

   } finally {

   clientGroup.shutdownGracefully().sync();

  }

 

  客户端代码

  

package com.ql.handler;

 

  import io.netty.channel.*;

  import lombok.extern.slf4j.Slf4j;

  import protocol.system.request.LoginRequest;

  import protocol.system.response.LoginResponse;

  import java.util.Scanner;

   * @description:

   * @author: quliang

   * @create: 2022-12-06 22:16

  @Slf4j

  @ChannelHandler.Sharable

  public class ClientLoginHandler extends SimpleChannelInboundHandler LoginResponse {

   private Scanner scanner = new Scanner(System.in);

   @Override

   public void channelActive(ChannelHandlerContext ctx) throws Exception {

   log.info("客户端:{} 通道开启!", ctx.channel().localAddress().toString());

   login(ctx);

   * 登录方法

   * @param ctx

   private void login(ChannelHandlerContext ctx) {

   LoginRequest request = new LoginRequest("123", "123");

   ctx.channel().writeAndFlush(request);

   @Override

   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

   log.info("客户端: {} 读取数据 {}", ctx.channel().localAddress().toString(), msg.toString());

   @Override

   protected void channelRead0(ChannelHandlerContext ctx, LoginResponse response) throws Exception {

   log.info("客户端: {} 读取数据 {}", ctx.channel().localAddress().toString(), response.toString());

   String msg = response.getMsg();

   log.info("========{}", msg);

   @Override

   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

   log.info("...............数据接收-完毕...............");

  }

 

  客户端登录处理器

  

  

  

  

  我们是怎么通过这个项目来实现不同协议编解码?

  其实也不难,我们仔细看MsgDecoder、LoginDecoder两个类其中一个类的代码,其中有个巧妙的操作就是使用try-catch,

  只要解码器无法解码发生异常,就重置读取字节索引传递到下一个解码器中,直到传递到正确解码器中。不过为了兼容多种协议,

  解码异常也会让服务端性能有所下降的,取舍之间必有得失。

  

  

  

  以上就是netty对多协议进行编解码(netty 多个解码器)的详细内容,想要了解更多 netty对多协议进行编解码的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: