本篇文章为你整理了9、手写基于Netty的RPC框架(自己动手写rpc框架)的详细内容,包含有netty4核心原理与手写rpc 自己动手写rpc框架 java 手写rpc net rpc框架 9、手写基于Netty的RPC框架,希望能帮助你了解 9、手写基于Netty的RPC框架。
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.junit.Test;
import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
假设一个需求,写一个RPC
来回通信,连接数量,拆包
public class MyRPCTest {
@Test
public void startServer() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = boss;
ServerBootstrap sbs = new ServerBootstrap();
ChannelFuture bind = sbs.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer NioSocketChannel () {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
System.out.println("server accept client port: " + nioSocketChannel.remoteAddress().getPort());
ChannelPipeline pipeline = nioSocketChannel.pipeline();
pipeline.addLast(new ServerRequestHandler());
}).bind(new InetSocketAddress("localhost", 8888));
try {
bind.sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
// 模拟客户端
@Test
public void get() {
new Thread(() - startServer()).start();
System.out.println("server started......");
Thread[] threads = new Thread[20];
for (int i = 0; i threads.length; i++) {
threads[i] = new Thread(() - {
Car car = proxyGet( Car.class); // 动态代理
car.ooxx("hello");
for (Thread thread : threads) {
thread.start();
try {
// 阻塞住
System.in.read();
} catch (IOException e) {
e.printStackTrace();
// Car car = proxyGet(Car.class); // 动态代理
// car.ooxx("hello");
// Fly fly = proxyGet(Fly.class); // 动态代理
// fly.xxoo("hello");
public T T proxyGet(Class T interfaceInfo) {
// 实现动态代理
ClassLoader classLoader = interfaceInfo.getClassLoader();
Class ? [] methodInfo = {interfaceInfo};
return (T) Proxy.newProxyInstance(classLoader, methodInfo, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 客户端对提供者的调用过程
// 1, 调用服务,方法,参数 封装成message
String name = interfaceInfo.getName();
String methodName = method.getName();
Class ? [] parameterTypes = method.getParameterTypes();
MyContent content = new MyContent();
content.setArgs(args);
content.setName(name);
content.setMethodName(methodName);
content.setParameterTypes(parameterTypes);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream);
outputStream.writeObject(content);
byte[] msgBody = byteArrayOutputStream.toByteArray();
// requestID + message, 本地要缓存
// 协议: header msgBody
MyHeader header = createHeader(msgBody);
byteArrayOutputStream.reset();
outputStream = new ObjectOutputStream(byteArrayOutputStream);
outputStream.writeObject(header);
byte[] msgHeader = byteArrayOutputStream.toByteArray();
// 连接池中取得连接
ClientFactory factory = ClientFactory.getFactory();
NioSocketChannel clientChannel = factory.getClient(new InetSocketAddress("localhost", 8888));
// 发送走io, out走netty(event驱动)
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeader.length + msgBody.length);
CountDownLatch countDownLatch = new CountDownLatch(1);
long requestID = header.getRequestID();
ResponseHandler.addCallBack(requestID, new Runnable() {
@Override
public void run() {
countDownLatch.countDown();
byteBuf.writeBytes(msgHeader);
byteBuf.writeBytes(msgBody);
ChannelFuture channelFuture = clientChannel.writeAndFlush(byteBuf);
channelFuture.sync();
countDownLatch.await();
int f = 0x14141414;
long requestID = Math.abs(UUID.randomUUID().getLeastSignificantBits());
header.setFlag(f);
header.setDataLen(length);
header.setRequestID(requestID);
return header;
class ServerRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
ByteBuf sendBuf = buf.copy();
if (buf.readableBytes() = 110) {
byte[] bytes = new byte[110];
buf.readBytes(bytes);
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream oin = new ObjectInputStream(in);
MyHeader header = (MyHeader) oin.readObject();
System.out.println("server response @ id: " + header.requestID);
if (buf.readableBytes() = header.getDataLen()) {
byte[] data = new byte[(int) header.getDataLen()];
buf.readBytes(data);
ByteArrayInputStream din = new ByteArrayInputStream(data);
ObjectInputStream doin = new ObjectInputStream(din);
MyContent content = ( MyContent) doin.readObject();
System.out.println(content.getName());
ChannelFuture channelFuture = ctx.writeAndFlush(sendBuf);
channelFuture.sync();
// 源于spark源码
class ClientFactory {
int poolSize = 1;
Random rand = new Random();
NioEventLoopGroup clientWorker;
private static final ClientFactory factory; // 单例
private ClientFactory() {
static {
factory = new ClientFactory();
public static ClientFactory getFactory() {
return factory;
// 一个客户端可以连接很多提供者,每个提供者都有自己的pool
ConcurrentHashMap InetSocketAddress, ClientPool outboxs = new ConcurrentHashMap ();
public synchronized NioSocketChannel getClient(InetSocketAddress address) {
ClientPool clientPool = outboxs.get(address);
if (clientPool == null) {
outboxs.putIfAbsent(address, new ClientPool(poolSize));
clientPool = outboxs.get(address);
int i = rand.nextInt(poolSize);
if (clientPool.clients[i] != null clientPool.clients[i].isActive()) {
return clientPool.clients[i];
synchronized (clientPool.lock[i]) {
return clientPool.clients[i] = create(address);
private NioSocketChannel create(InetSocketAddress address) {
// 基于netty客户端创建
clientWorker = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
ChannelFuture connect = bootstrap.group(clientWorker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer NioSocketChannel () {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
pipeline.addLast(new ClientResponses()); // 解决给谁的
}).connect(address);
try {
NioSocketChannel client = (NioSocketChannel) connect.sync().channel();
return client;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
class ClientResponses extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
if (buf.readableBytes() = 110) {
byte[] bytes = new byte[110];
buf.readBytes(bytes);
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream oin = new ObjectInputStream(in);
MyHeader header = (MyHeader) oin.readObject();
System.out.println("client response @ id: " + header.requestID);
ResponseHandler.runCallBack(header.requestID);
// if (buf.readableBytes() = header.getDataLen()) {
// byte[] data = new byte[(int) header.getDataLen()];
// buf.readBytes(data);
// ByteArrayInputStream din = new ByteArrayInputStream(data);
// ObjectInputStream doin = new ObjectInputStream(din);
// MyContent content = (MyContent) doin.readObject();
// System.out.println(content.getName());
super.channelRead(ctx, msg);
class ResponseHandler {
static ConcurrentHashMap Long, Runnable mapping = new ConcurrentHashMap ();
public static void addCallBack(long requestID, Runnable cb) {
mapping.putIfAbsent(requestID, cb);
public static void runCallBack(long requestID) {
Runnable runnable = mapping.get(requestID);
runnable.run();
removeCB(requestID);
private static void removeCB(long requestID) {
mapping.remove(requestID);
class ClientPool {
NioSocketChannel[] clients;
Object[] lock;
ClientPool(int size) {
this.clients = new NioSocketChannel[size]; // init 连接是空的
this.lock = new Object[size]; // 锁是初始化的
for (int i = 0; i size; i++) {
lock[i] = new Object();
class MyHeader implements Serializable {
通信协议
1, ooxx值
2, UUID
3, DATA_LEN
int flag; // 32bit可以设置很多信息
long requestID;
long dataLen;
public int getFlag() {
return flag;
public void setFlag(int flag) {
this.flag = flag;
public long getRequestID() {
return requestID;
public void setRequestID(long requestID) {
this.requestID = requestID;
public long getDataLen() {
return dataLen;
public void setDataLen(long dataLen) {
this.dataLen = dataLen;
class MyContent implements Serializable {
String name;
String methodName;
Class ? [] parameterTypes;
Object[] args;
public String getName() {
return name;
public void setName(String name) {
this.name = name;
public String getMethodName() {
return methodName;
public void setMethodName(String methodName) {
this.methodName = methodName;
public Class ? [] getParameterTypes() {
return parameterTypes;
public void setParameterTypes(Class ? [] parameterTypes) {
this.parameterTypes = parameterTypes;
public Object[] getArgs() {
return args;
public void setArgs(Object[] args) {
this.args = args;
interface Car {
public void ooxx(String msg);
interface Fly {
public void xxoo(String msg);
以上就是9、手写基于Netty的RPC框架(自己动手写rpc框架)的详细内容,想要了解更多 9、手写基于Netty的RPC框架的内容,请持续关注盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。