【RSocket】使用 RSocket (一)——建立连接()

  本篇文章为你整理了【RSocket】使用 RSocket (一)——建立连接()的详细内容,包含有 【RSocket】使用 RSocket (一)——建立连接,希望能帮助你了解 【RSocket】使用 RSocket (一)——建立连接。

  哥们可能是个混子,但没兴趣的话也混不下去,就结果来说可能算是有兴趣的吧,但是以后混不混得下去就是另一回事了

  
 

  目录0. RSocket 简介1. 服务端1.1 SETUP阶段 - 处理客户端发起的连接请求1.2 保存客户端的 Requester2. 客户端

  0. RSocket 简介

  采用二进制点对点数据传输,主要应用于分布式架构之中,是一种基于Reactive Stream规范标准实现的新的通信协议。

  参考阿里云开发者社区的介绍

  相关文档和资料:

  RSocket By Example

  rsocket-java 原生库例子

  Spring RSocket 支持文档

  在这里我们在客户端使用 rsocket-java 原生库,在服务端使用 spring-boot-starter-rsocket。

  1. 服务端

  1.1 SETUP阶段 - 处理客户端发起的连接请求

  点击查看源代码

  新建一个 RSocketController 类来处理 RSocket 相关的请求。

  

@Controller

 

  public class RSocketController {

   private static Logger logger = LoggerFactory.getLogger(RSocketController.class);

   // 对到来的连接做一些处理

   @ConnectMapping("connect.setup")

   public Mono Void setup(String data, RSocketRequester rSocketRequester) {

   logger.info("[connect.setup]Client connection: {}\n", data);

   return Mono.empty();

  

 

  RSocket 的 metadata 中可以包含路由(Routing)信息,这和 一般 WEB 框架通过解析 URL 将请求导向不同的处理函数是一样的。在连接建立时,客户端会发送一个 SETUP Payload,@ConnectMapping 可以通过解析 SETUP Payload 的 metadata 中的路由信息来使用不同的连接建立阶段的处理函数。在这里,只要 SETUP Payload 的 metadata 中的路由信息是 connect.setup ,该函数就会处理建立连接后客户端发送的 SETUP Payload。

  1.2 保存客户端的 Requester

  RSocket 协议支持双方主动调用对方的函数。如果服务端想要主动向客户端发送请求,他就可以在连接建立时保存 RSocketRequester 对象以便服务端在需要时向客户端发起请求。

  首先在这里我们假设客户端建立连接时会将 UUID 放在 SETUP Payload 的 data 中。然后我们声明一个类来保存 RSocketRequester,代码如下:

  

public class ConnectedClient {

 

   public RSocketRequester requester;

   public Date connectedTime;

   ConnectedClient(RSocketRequester requester) {

   this.requester = requester;

   this.connectedTime = new Date();

  

 

  然后我们建立一个 Service 来管理客户端的 RSocketRequester。在这里使用 ConcurrentHashMap 来存储 Requester,键是客户端的 UUID,值是 ConnectedClient 对象。

  

@Service

 

  public class ConnectedClientsManager {

   private static Logger logger = LoggerFactory.getLogger(ConnectedClientsManager.class);

   public final ConcurrentHashMap String, ConnectedClient clients;

   public ConnectedClientsManager() {

   this.clients = new ConcurrentHashMap ();

   public Set String getAllClientIdentifier() {

   return this.clients.keySet();

   public RSocketRequester getClientRequester(String clientIdentifier) {

   return this.clients.get(clientIdentifier).requester;

   public void putClientRequester(String clientIdentifier, RSocketRequester requester) {

   requester.rsocket()

   .onClose()

   .doFirst(() - this.clients.put(clientIdentifier, new ConnectedClient(requester)))

   .doFinally(sig - {

   logger.info("Client closed, uuid is {}. signal is {}.", clientIdentifier, sig.toString());

   this.clients.remove(clientIdentifier);

   }).subscribe();

   public void removeClientRequester(String clientIdentifier) {

   this.clients.remove(clientIdentifier);

  

 

  然后我们就可以在 RSocketController 中引入 ConnectedClientsManager 了。

  

@Controller

 

  public class RSocketController {

   private static Logger logger = LoggerFactory.getLogger(RSocketController.class);

   public static ConnectedClientsManager clientsManager;

   @Autowired

   private void initializeClientsManager() {

   clientsManager = new ConnectedClientsManager();

  

 

  最后我们编写连接处理函数,将 Requester 保存起来:

  

@ConnectMapping("connect.setup")

 

   public Mono Void setup(String data, RSocketRequester rSocketRequester) {

   logger.info("[connect.setup]Client connection: {}\n", data);

   clientsManager.putClientRequester(data, rSocketRequester);

   return Mono.empty();

  

 

  下面是 spring application 配置 application.yaml:

  

spring:

 

   rsocket:

   server:

   port: 8099

   transport: tcp

  

 

  2. 客户端

  点击查看源代码

  第一步:随机生成标识客户端身份的 UUID

  

public class ConnectionSetup {

 

   public static void main(String[] args) {

   final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);

   UUID uuid = UUID.randomUUID();

  ......

  

 

  第二步:生成 SETUP Payload 使用的 routing 信息

  

ByteBuf setupRouteMetadata = TaggingMetadataCodec.createTaggingContent(

 

   ByteBufAllocator.DEFAULT,

   Collections.singletonList("connect.setup"));

  

 

  第三步:使用 RSocketConnector 建立 RSocket:

  在这里首先需要设置元数据的 MIME 类型,方便服务端根据 MIME 类型确定 metadata 的内容

  然后生成 SETUP Payload,data 中存放 UUID 字符串,metadata 中存放路由信息

  设置重连策略

  最后指定 ClientTransport 和服务端建立连接

  使用 block() 在连接建立真正之前阻塞进程

  


RSocket socket = RSocketConnector.create()

 

   // 设置 metadata MIME Type,方便服务端根据 MIME 类型确定 metadata 内容

   .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())

   // SETUP 阶段的 Payload,data 里面存放 UUID

   .setupPayload(ByteBufPayload.create(

   ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),

   setupRouteMetadata))

   // 设置重连策略

   .reconnect(Retry.backoff(2, Duration.ofMillis(500)))

   .connect(

   TcpClientTransport.create(

   TcpClient.create()

   .host("127.0.0.1")

   .port(8099)))

   .block();

  

 

  然后可以使用 socket.onClose().block(); 保持连接。此时如果我们运行客户端,然后再关闭客户端的话,会在服务端看到输出:

  表明客户端和服务端建立了连接之后又关闭了连接。

  以上就是【RSocket】使用 RSocket (一)——建立连接()的详细内容,想要了解更多 【RSocket】使用 RSocket (一)——建立连接的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: