本篇文章为你整理了netty对多协议进行编解码(netty 多个解码器)的详细内容,包含有netty多协议处理 netty 多个解码器 netty编码解码工作流程 netty handler和编解码顺序 netty对多协议进行编解码,希望能帮助你了解 netty对多协议进行编解码。
项目地址:https://gitee.com/q529075990qqcom/NB-IOT.git
我们需要一个创建mavne项目,这个项目是我已经写好的项目,项目结构图如下:
创建公共模块
创建子模块,准备好依赖Netty4.1版本
dependencies
dependency
groupId io.netty /groupId
artifactId netty-all /artifactId
version 4.1.72.Final /version
/dependency
dependency
groupId org.slf4j /groupId
artifactId slf4j-api /artifactId
version 1.7.28 /version
/dependency
dependency
groupId org.slf4j /groupId
artifactId slf4j-simple /artifactId
version 1.7.28 /version
/dependency
dependency
groupId org.projectlombok /groupId
artifactId lombok /artifactId
version RELEASE /version
scope compile /scope
/dependency
dependency
groupId com.esotericsoftware /groupId
artifactId kryo /artifactId
version 5.3.0 /version
/dependency
/dependencies
maven依赖
序列化的定义是:将一个对象编码成一个字节流(I/O);而与之相反的操作被称为反序列化。
package serializer;
* @description:
* @author: quliang
* @create: 2022-10-20 15:16
public interface Serializer {
* 序列化
* @param obj
* @return
* @throws Exception
byte[] serialize(Object obj) throws Exception;
* 反序列化
* @param bytes
* @param clazz
* @param T
* @return
* @throws Exception
T T deserialize(byte[] bytes, Class T clazz) throws Exception;
}
自定义序列化接口
package serializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
import org.objenesis.strategy.StdInstantiatorStrategy;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
* @description:
* @author: quliang
* @create: 2022-10-20 15:18
public class KryoSerializer implements Serializer {
private static final ThreadLocal Kryo kryoThreadLocal = ThreadLocal.withInitial(() - {
Kryo kryo = new Kryo();
kryo.setReferences(true);
kryo.setRegistrationRequired(false);
((DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
return kryo;
@Override
public byte[] serialize(Object obj) throws Exception {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
Output output = new Output(baos);
Kryo kryo = kryoThreadLocal.get();
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (IOException e) {
throw new Exception("序列化失败", e);
@Override
public T T deserialize(byte[] bytes, Class T clazz) throws Exception {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
Input input = new Input(bais);
Kryo kryo = kryoThreadLocal.get();
Object obj = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(obj);
} catch (IOException e) {
throw new Exception("反序化失败");
}
Kryo实现序列化接口
我们需要解析两种协议,那我们就要提前定义好两种协议,分别是消息协议、登录协议
消息协议相关
package protocol.msg;
import lombok.Data;
import lombok.Getter;
* @description: 消息协议: magicversiondata
* @author: quliang
* @create: 2022-12-10 20:46
@Data
public class MsgProtocol {
@Getter
private byte magic=0;
@Getter
private byte version=1;
}
消息协议基类
package protocol.msg.request;
import lombok.Data;
import protocol.msg.MsgProtocol;
* @description:
* @author: quliang
* @create: 2022-12-10 20:58
@Data
public class MsgRequest extends MsgProtocol {
private String msg;
}
消息请求子类
package protocol.msg.response;
import lombok.Data;
import protocol.msg.MsgProtocol;
* @description:
* @author: quliang
* @create: 2022-12-10 20:41
@Data
public class MsgResponse extends MsgProtocol {
private int statCode;
}
消息响应子类
package encoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import protocol.msg.MsgProtocol;
import serializer.KryoSerializer;
* @description:
* @author: quliang
* @create: 2022-12-10 20:53
public class MsgEncoder extends MessageToByteEncoder MsgProtocol {
@Override
protected void encode(ChannelHandlerContext ctx, MsgProtocol msgProtocol, ByteBuf in) throws Exception {
in.writeByte(msgProtocol.getMagic());
// in.writeByte(msgProtocol.code());
in.writeByte(msgProtocol.getVersion());
byte[] data = new KryoSerializer().serialize(msgProtocol);
in.writeShort(data.length);
in.writeBytes(data);
}
消息协议编码
package decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import protocol.msg.MsgProtocol;
import serializer.KryoSerializer;
import java.util.List;
* @description:
* @author: quliang
* @create: 2022-12-10 20:52
@Slf4j
public class MsgDecoder extends ByteToMessageDecoder {
private Class MsgProtocol msgClass;
public MsgDecoder(Class clazz) {
this.msgClass = clazz;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List Object out) throws Exception {
try {
byte magic = in.readByte();
byte version = in.readByte();
short dataSize = in.readShort();
byte[] data = new byte[dataSize];
in.readBytes(data);
MsgProtocol baseProtocol = new KryoSerializer().deserialize(data, msgClass);
out.add(baseProtocol);
} catch (Exception e) {
//如果解码错误,将数据传递到下一个解码器中
log.error("msg decoder {}",e.getMessage());
// 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空
in.resetReaderIndex();
// 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空;
ByteBuf buff = in.retainedDuplicate();
//原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。
in.skipBytes(in.readableBytes());
//继续传递到下一个解码器中
out.add(buff);
}
消息协议解码
登录协议相关
package protocol.system;
登录协议基类
package protocol.system.request;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import protocol.system.LoginProtocol;
* @description:
* @author: quliang
* @create: 2022-12-06 18:17
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LoginRequest extends LoginProtocol {
private String userId;
private String userName;
}
登录请求子类
package protocol.system.response;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import protocol.system.LoginProtocol;
* @description:
* @author: quliang
* @create: 2022-12-06 18:22
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LoginResponse extends LoginProtocol {
private String msg;
private String data;
}
登录响应子类
package encoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import protocol.system.LoginProtocol;
import serializer.KryoSerializer;
* @description:
* @author: quliang
* @create: 2022-12-06 22:11
public class LoginEncoder extends MessageToByteEncoder LoginProtocol {
@Override
protected void encode(ChannelHandlerContext ctx, LoginProtocol baseProtocol, ByteBuf in) throws Exception {
in.writeByte(baseProtocol.getMagic());
in.writeByte(baseProtocol.getCode());
in.writeByte(baseProtocol.getVersion());
byte[] data = new KryoSerializer().serialize(baseProtocol);
in.writeShort(data.length);
in.writeBytes(data);
}
登录协议编码
package decoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import protocol.system.LoginProtocol;
import serializer.KryoSerializer;
import java.util.List;
* @description:
* @author: quliang
* @create: 2022-12-06 17:59
@Slf4j
public class LoginDecoder extends ByteToMessageDecoder {
private Class LoginProtocol clazz;
public LoginDecoder(Class clazz) {
this.clazz = clazz;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List Object out) throws Exception {
try {
byte magic = in.readByte();
byte code = in.readByte();
byte version = in.readByte();
short dataSize = in.readShort();
byte[] data = new byte[dataSize];
in.readBytes(data);
LoginProtocol baseProtocol = new KryoSerializer().deserialize(data, clazz);
out.add(baseProtocol);
} catch (Exception e) {
//如果解码错误,将数据传递到下一个解码器中
log.error("login decoder {}", e.getMessage());
// 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空
in.resetReaderIndex();
// 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空;
ByteBuf buff = in.retainedDuplicate();
//原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。
in.skipBytes(in.readableBytes());
//继续传递到下一个解码器中
out.add(buff);
}
登录协议解码
这样公共模块就创建完成了
创建服务端
package com.ql;
import com.ql.handler.MsgHandler;
import decoder.LoginDecoder;
import decoder.MsgDecoder;
import com.ql.handler.LoginHandler;
import encoder.LoginEncoder;
import encoder.MsgEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import protocol.system.request.LoginRequest;
import protocol.msg.request.MsgRequest;
* @author quliang
* @description 服务端
* @date 2022-12-06 17:39:14
@Slf4j
public class IotServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap().group(
bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer SocketChannel () {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
* 心跳机制
//pipeline.addLast(new IdleStateHandler(5, 10, 5, TimeUnit.SECONDS));
* 消息、登录解码器
pipeline.addLast(new LoginDecoder(LoginRequest.class));
pipeline.addLast(new MsgDecoder(MsgRequest.class));
* 消息、登录处理器
pipeline.addLast(new MsgHandler());
pipeline.addLast(new LoginHandler());
* 消息、登录编码器
pipeline.addLast(new MsgEncoder());
pipeline.addLast(new LoginEncoder());
.option(ChannelOption.SO_BACKLOG, 1024);
ChannelFuture cf = bootstrap.bind(8849).sync();
log.info("socket服务端启动成功 {}", cf.channel().localAddress().toString());
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
服务端代码
package com.ql.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import protocol.msg.request.MsgRequest;
import protocol.msg.response.MsgResponse;
* @description: 消息处理器
* @author: quliang
* @create: 2022-12-10 20:57
@Slf4j
@ChannelHandler.Sharable
public class MsgHandler extends SimpleChannelInboundHandler MsgRequest {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("上线{}", ctx.channel().remoteAddress().toString());
@Override
protected void channelRead0(ChannelHandlerContext ctx, MsgRequest request) throws Exception {
log.info("服务端读取消息体数据为{}", request.toString());
MsgResponse response = new MsgResponse();
response.setStatCode(200);
ctx.channel().writeAndFlush(response);
}
服务端消息处理器
package com.ql.handler;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import protocol.system.request.LoginRequest;
import protocol.system.response.LoginResponse;
import java.util.concurrent.atomic.AtomicInteger;
* @description: 登录处理器
* @author: quliang
* @create: 2022-12-06 18:14
@Slf4j
@ChannelHandler.Sharable
public class LoginHandler extends SimpleChannelInboundHandler LoginRequest {
private static AtomicInteger READER_COUNT = new AtomicInteger(0);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("服务端:{} 通道开启!", ctx.channel().localAddress().toString());
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("服务端: {} 通道关闭!", ctx.channel().localAddress().toString());
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequest loginRequest) throws Exception {
log.info("读取数据 {} ", loginRequest.toString());
LoginResponse response= new LoginResponse("success", null);
ctx.channel().writeAndFlush(response);
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("...............数据接收-完毕...............");
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
log.error("...............业务处理异常...............{}", cause);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
Channel channel = ctx.channel();
switch (event.state()) {
case READER_IDLE:
log.info("读空闲");
READER_COUNT.addAndGet(1);
break;
case WRITER_IDLE:
log.info("写空闲");
break;
default:
break;
ctx.disconnect();
if (READER_COUNT.get() 3) {
log.info("close this channel {}", channel.remoteAddress().toString());
}
服务端登录处理器
服务端其实很多都是直接引用公共模块的,代码也并不复杂
创建消息客户端
package com.ql;
import com.ql.handler.ClientMsgHandler;
import decoder.MsgDecoder;
import encoder.MsgEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import protocol.msg.response.MsgResponse;
import java.net.InetSocketAddress;
* @author quliang
* @description 客户端
* @date 2022-12-06 17:37:56
@Slf4j
public class IotClientMsg {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup clientGroup = new NioEventLoopGroup();
try {
Bootstrap bs = new Bootstrap();
bs.group(clientGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress("169.254.190.154", 8849))
.handler(new ChannelInitializer SocketChannel () {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
//消息解码器
pipeline.addLast(new MsgDecoder(MsgResponse.class));
//客户端消息处理器
pipeline.addLast(new ClientMsgHandler());
//消息编码器
pipeline.addLast(new MsgEncoder());
ChannelFuture cf = bs.connect().sync();
log.info("启动成功{}", cf.channel().localAddress().toString());
// Scanner scanner = new Scanner(System.in);
cf.channel().closeFuture().sync();
} finally {
clientGroup.shutdownGracefully().sync();
}
客户端代码
package com.ql.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import protocol.msg.request.MsgRequest;
import protocol.msg.response.MsgResponse;
* @description:
* @author: quliang
* @create: 2022-12-10 20:40
@Slf4j
@ChannelHandler.Sharable
public class ClientMsgHandler extends SimpleChannelInboundHandler MsgResponse {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
MsgRequest request = new MsgRequest();
request.setMsg("hello");
ctx.channel().writeAndFlush(request);
@Override
protected void channelRead0(ChannelHandlerContext ctx, MsgResponse response) throws Exception {
int code = response.getStatCode();
log.info("消息处理器读取响应对象数据为{}", code);
}
客户端消息处理器
消息客户端代码也并不复杂
创建登录客户端
package com.ql;
import com.ql.handler.ClientLoginHandler;
import decoder.LoginDecoder;
import encoder.LoginEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import protocol.system.response.LoginResponse;
import java.net.InetSocketAddress;
* @author quliang
* @description 客户端
* @date 2022-12-06 17:37:56
@Slf4j
public class IotClientLogin {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup clientGroup = new NioEventLoopGroup();
try {
Bootstrap bs = new Bootstrap();
bs.group(clientGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress("169.254.190.154", 8849))
.handler(new ChannelInitializer SocketChannel () {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new LoginDecoder(LoginResponse.class));
//pipeline.addLast(new MsgDecoder(MsgResponse.class));
//pipeline.addLast(new ClientMsgHandler());
pipeline.addLast(new ClientLoginHandler());
//pipeline.addLast(new MsgEncoder());
pipeline.addLast(new LoginEncoder());
ChannelFuture cf = bs.connect().sync();
log.info("启动成功{}", cf.channel().localAddress().toString());
// Scanner scanner = new Scanner(System.in);
cf.channel().closeFuture().sync();
} finally {
clientGroup.shutdownGracefully().sync();
}
客户端代码
package com.ql.handler;
import io.netty.channel.*;
import lombok.extern.slf4j.Slf4j;
import protocol.system.request.LoginRequest;
import protocol.system.response.LoginResponse;
import java.util.Scanner;
* @description:
* @author: quliang
* @create: 2022-12-06 22:16
@Slf4j
@ChannelHandler.Sharable
public class ClientLoginHandler extends SimpleChannelInboundHandler LoginResponse {
private Scanner scanner = new Scanner(System.in);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端:{} 通道开启!", ctx.channel().localAddress().toString());
login(ctx);
* 登录方法
* @param ctx
private void login(ChannelHandlerContext ctx) {
LoginRequest request = new LoginRequest("123", "123");
ctx.channel().writeAndFlush(request);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("客户端: {} 读取数据 {}", ctx.channel().localAddress().toString(), msg.toString());
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginResponse response) throws Exception {
log.info("客户端: {} 读取数据 {}", ctx.channel().localAddress().toString(), response.toString());
String msg = response.getMsg();
log.info("========{}", msg);
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("...............数据接收-完毕...............");
}
客户端登录处理器
我们是怎么通过这个项目来实现不同协议编解码?
其实也不难,我们仔细看MsgDecoder、LoginDecoder两个类其中一个类的代码,其中有个巧妙的操作就是使用try-catch,
只要解码器无法解码发生异常,就重置读取字节索引传递到下一个解码器中,直到传递到正确解码器中。不过为了兼容多种协议,
解码异常也会让服务端性能有所下降的,取舍之间必有得失。
以上就是netty对多协议进行编解码(netty 多个解码器)的详细内容,想要了解更多 netty对多协议进行编解码的内容,请持续关注盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。