远程过程调用RPC,rpc远程调用框架有哪些
主要介绍了基于python的rpc远程过程调用的实现,通过示例代码进行了非常详细的介绍,对大家的学习或工作有一定的参考价值。有需要的朋友下面和边肖一起学习。
00-1010基于python 1的RPC实现演示前言。主要内容2。实施步骤1。进程间的通信。异步回调实现思想综述
目录
这是一个远程过程调用(RPC)的演示,可以实现不同python进程之间的通信,调用彼此的函数。它易于使用,易于扩展。更多的功能可以进一步完善。本文介绍了这种实现的主要思想。
基于python实现RPC的demo
我计划实现一个rpc很久了,在断断续续的推送下终于完成了。写这个demo的原因是:1)学习和思考这个部分的主要功能和实现思路;2)要在换套餐时没有心理负担,要有优越感。
这部分的实现主要是基于我自己的想法,所以可能会有bug或者更好的实现方法,仅供学习和参考。有关完整的代码,请参考Gitee链接。
实现的时候用的是Python2.7。忘记改了,下次再更新。
前言
RPC是远程过程调用的简称,网上有很多解释。简单来说,当前进程调用其他进程的函数时,体验就像调用本地编写的函数一样。
本文实现了本地调用远程类对象的接口,即本地客户端不实例化类对象,而是调用服务器端的类对象接口。
为了让调用层有丝滑的体验而不关心底层实现,需要以下几个部分:
客户端需要提取类的接口,存储调用函数的事件捕获;服务器需要使用类的公共函数作为可以远程调用的接口。将客户端调用函数的事件(被调用函数和参数)序列化并发送给服务器;服务器将反序列化客户端的调用事件,执行相应的接口,并将返回值发送给客户端。客户端通过某种方式(通常是网络套接字)与服务器通信。下面序列图的灰色部分对调用者是透明的,它的执行结果应该和本地函数一致。
一、主要内容
二、实现步骤
本文采用基于TCP的sokcet连接来实现进程间的通信。更多实现细节,请参考之前的博客。
这里应该注意:
本文使用选择模块来监控网络事件。如果服务器没有收到任何网络消息,它将始终被阻止在这里。如果服务器需要执行除了提供rpc调用服务之外的其他逻辑,应该采用非阻塞和轮询socket的方法来判断是否有新的网络事件。
# ServerBase.py
定义流程(自身):
可读、可写、异常=select.select(self.inputs,self.outputs,self.conns.values())
用于可读:中的连接器
如果连接器是自插座:
自我。_handle_conn()
else:
自我。_handle_recv(连接)
对于可写:的连接
及格
对于异常:中的连接
自我。_handle_leave(连接)
通过创建一个新线程来监控客户端的网络事件。它不会影响客户端主线程的执行,所以你可以随心所欲地阻塞它。部分代码如下:
# AsynCallback.py
类AsyncTaskManager(对象):
_asy_events=dict()
def __init__(self,loop,*args):
超级(AsyncTaskManager,self)。__init__()
自我。_loop_
fun = loop
def __call__(self, *args, **kwargs):
proc = threading.Thread(target=self._exec_loop, args=args, kwargs=kwargs)
proc.start()
def _exec_loop(self, *args, **kwargs):
while True:
net_resp = self._loop_fun(*args, **kwargs)
for resp in net_resp:
asy_event = self._asy_events.pop(resp.rid)
asy_event.set()
# Client.pyclass Client(TaskHandle, ClientBase):
@AsyncTaskManager
def process(self):
super(Client, self).process()
_events = []
while self.has_events:
event = self.get_next_event()
data = event[1]
_events.append(self.unpack_respond(data))
return _events
序列化方式,本文采用了库pickle进行序列化与反序列化,使用它的原因是可以将自定义类对象也进行序列化,非常之高级。
2. 异步回调实现思路
对于需要返回值的函数调用,处理起来比较简单,只需要将主线程阻塞等待,直至超时或者接收到了对应函数的返回值即可。本文采用了threading.Event来阻塞与唤醒调用的函数,同时采用了装饰器来实现这功能。若日后有更好的方法,可以轻易进行替换。相关示例代码如下所示:
@AsyncTaskManager.responddef _handle_response(self, tid):
""" 处理有返回值的情况
会阻塞线程直至收到返回值
"""
task = self.pop_task(tid)
if task.callback:
task.callback()
return self.pop_respond(tid)
@staticmethod
def respond(func):
@wraps(func)
def make_resp(handle, tid):
""" 需要注意的是,和装饰的函数参数含义需一致 """
event = threading.Event()
AsyncTaskManager._asy_events[tid] = event
event.wait(timeout=TIME_OUT)
return func(handle, tid) # 这儿才是真正执行_handle_response的地方
return make_resp
在实际的应用过程中,应有这样的情况,服务端与客户端都是独立的应用,通过rpc函数进行通信和交互,而并不是某方为另外一方提供服务,那么此时返回值并不必要,只需要将要做的事通知另一方即可。对于此种情况,可以采用异步回调的方式来告知调用方对应函数执行成功了。
在文中依旧采用线程来完成该功能,客户端调用函数之后创建一个新线程并阻塞住,等待服务端将执行结果发回后再唤醒,如果有回调函数就执行。示例代码如下:
@AsyncTaskManager.callbackdef _handle_call_back(self, tid):
""" 处理有回调函数的调用
callback会等tid事件调用成功之后 才会回调,且不会有返回值
"""
task = self.pop_task(tid)
if task.callback:
task.callback()
@staticmethod
def callback(func):
@wraps(func)
def make_thread(event, *args, **kwargs):
event.wait(timeout=TIME_OUT)
func(*args, **kwargs)
def make_async(handle, tid):
""" 注意点同上 """
event = threading.Event()
AsyncTaskManager._asy_events[tid] = event
_task = threading.Thread(target=lambda: make_thread(event, handle, tid))
return make_async
总结
到此这篇关于基于python实现rpc远程过程调用的文章就介绍到这了,更多相关python rpc远程调用内容请搜索盛行IT软件开发工作室以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT软件开发工作室!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。