python动态加载模块热更新,python加载配置文件
所谓配置热加载,就是服务收到配置更新消息后,我们可以使用最新的配置来执行任务,而不需要重启服务。本文将介绍如何用Python配置热加载,有需要的可以参考一下。
00-1010背景:如何使用多处理配置热加载;用信号量实现热加载;使用多重处理。事件来实现热加载;结论
目录
由于近期工作需要,需要在现有项目上增加一项新功能,实现热负荷配置功能。所谓配置热加载,就是服务收到配置更新消息后,我们可以使用最新的配置来执行任务,而不需要重启服务。
背景
下面,我使用多进程、多线程和协同进程方法来配置热加载。
如何实现
如果我们的代码实现使用多进程,主进程1更新配置和发送指令,任务调用是进程2,如何实现配置热加载?
使用多进程实现配置热加载
当主进程收到配置更新的消息时(配置读取如何接收配置更新的消息?这里暂且不讨论),主进程向进度进程1发送kill信号,进度进程1收到kill信号后退出。之后,信号处理功能启动一个新的进程,并使用最新的配置文件继续执行任务。
main函数
定义主():
#启动流程以执行任务
p1=进程(目标=运行,参数=(p1 ,))
p1.start()
monitor(p1,run) #寄存器信号
进程[case100]=p1 #保存进程pid
数量=0
而True: #模拟获取配置更新
打印(
f { multi processing . active _ children()=},count={ len(multi processing . active _ children())} \ n
print(f“{ processes=} \ n )
睡眠(2)
如果数量==4:
kill _ process(processes[ case 100 ])#终止当前进程
如果数量==8:
kill _ process(processes[ case 100 ])#终止当前进程
如果数量==12:
kill _ process(processes[ case 100 ])#终止当前进程
数量=1
signal_handler函数
def信号处理程序(process: Process,func,signum,frame):
# print(f“{ signum=}”)
全局计数
如果signum==17: # 17是SIGCHILD
#这个循环是为了忽略SIGTERM发送的信号,避免抢占主进程发送的SIGCHILD。
对于[SIGTERM,SIGCHLD,SIGQUIT]:中的signame
signal.signal(signame,SIG_DFL)
打印(“启动新流程”)
p=多重处理。Process(target=func,args=(fp{counts} ,))
开始()
监视器(p,运行)
进程[case100]=p
计数=1
如果signum==2:
if process.is_alive():
校长
t(f"Kill {process} process")
process.terminate()
signal.signal(SIGCHLD, SIG_IGN)
sys.exit("kill parent process")
完整代码如下
#! /usr/local/bin/python3.8from multiprocessing import Process
from typing import Dict
import signal
from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN
import multiprocessing
from multiprocessing import Process
from typing import Callable
from data import processes
import sys
from functools import partial
import time
processes: Dict[str, Process] = {}
counts = 2
def run(process: Process):
while True:
print(f"{process} running...")
time.sleep(1)
def kill_process(process: Process):
print(f"kill {process}")
process.terminate()
def monitor(process: Process, func: Callable):
for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]:
# SIGTERM is kill signal.
# No SIGCHILD is not trigger singnal_handler,
# No SIGINT is not handler ctrl+c,
# No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name=<stdout>>
signal.signal(signame, partial(signal_handler, process, func))
def signal_handler(process: Process, func, signum, frame):
print(f"{signum=}")
global counts
if signum == 17: # 17 is SIGTERM
for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
signal.signal(signame, SIG_DFL)
print("Launch a new process")
p = multiprocessing.Process(target=func, args=(f"p{counts}",))
p.start()
monitor(p, run)
processes["case100"] = p
counts += 1
if signum == 2:
if process.is_alive():
print(f"Kill {process} process")
process.terminate()
signal.signal(SIGCHLD, SIG_IGN)
sys.exit("kill parent process")
def main():
p1 = Process(target=run, args=("p1",))
p1.start()
monitor(p1, run)
processes["case100"] = p1
num = 0
while True:
print(
f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
print(f"{processes=}\n")
time.sleep(2)
if num == 4:
kill_process(processes["case100"])
if num == 8:
kill_process(processes["case100"])
if num == 12:
kill_process(processes["case100"])
num += 1
if __name__ == __main__:
main()
执行结果如下
multiprocessing.active_children()=[<Process name=Process-1 pid=2533 parent=2532 started>], count=1processes={case100: <Process name=Process-1 pid=2533 parent=2532 started>}
p1 running...
p1 running...
kill <Process name=Process-1 pid=2533 parent=2532 started>
multiprocessing.active_children()=[<Process name=Process-1 pid=2533 parent=2532 started>], count=1
processes={case100: <Process name=Process-1 pid=2533 parent=2532 started>}
signum=17
Launch a new process
p2 running...
p2 running...
multiprocessing.active_children()=[<Process name=Process-2 pid=2577 parent=2532 started>], count=1
processes={case100: <Process name=Process-2 pid=2577 parent=2532 started>}
p2 running...
p2 running...
multiprocessing.active_children()=[<Process name=Process-2 pid=2577 parent=2532 started>], count=1
processes={case100: <Process name=Process-2 pid=2577 parent=2532 started>}
p2 running...
p2 running...
multiprocessing.active_children()=[<Process name=Process-2 pid=2577 parent=2532 started>], count=1
processes={case100: <Process name=Process-2 pid=2577 parent=2532 started>}
p2 running...
p2 running...
kill <Process name=Process-2 pid=2577 parent=2532 started>
signum=17
Launch a new process
multiprocessing.active_children()=[<Process name=Process-2 pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1
processes={case100: <Process name=Process-3 pid=2675 parent=2532 started>}
p3 running...
p3 running...
multiprocessing.active_children()=[<Process name=Process-3 pid=2675 parent=2532 started>], count=1
总结
好处:使用信号量可以处理多进程之间通信的问题。
坏处:代码不好写,写出来代码不好理解。信号量使用必须要很熟悉,不然很容易自己给自己写了一个bug.(所有初学者慎用,老司机除外。)
还有一点不是特别理解的就是process.terminate()
发送出信号是SIGTERM
number是15,但是第一次signal_handler
收到信号却是number=17,如果我要去处理15的信号,就会导致前一个进程不能kill掉的问题。欢迎有对信号量比较熟悉的大佬,前来指点迷津,不甚感谢。
采用multiprocessing.Event来实现配置热加载
实现逻辑是主进程1 更新配置并发送指令。进程2启动调度任务。
这时候当主进程1更新好配置之后,发送指令给进程2,这时候的指令就是用Event一个异步事件通知。
直接上代码
scheduler函数
def scheduler():while True:
print(wait message...)
case_configurations = scheduler_notify_queue.get()
print(f"Got case configurations {case_configurations=}...")
task_schedule_event.set() # 设置set之后, is_set 为True
print(f"Schedule will start ...")
while task_schedule_event.is_set(): # is_set 为True的话,那么任务就会一直执行
run(case_configurations)
print("Clearing all scheduling job ...")
event_scheduler函数
def event_scheduler(case_config):scheduler_notify_queue.put(case_config)
print(f"Put cases config to the Queue ...")
task_schedule_event.clear() # clear之后,is_set 为False
print(f"Clear scheduler jobs ...")
print(f"Schedule job ...")
完整代码如下
import multiprocessingimport time
scheduler_notify_queue = multiprocessing.Queue()
task_schedule_event = multiprocessing.Event()
def run(case_configurations: str):
print(f{case_configurations} running...)
time.sleep(3)
def scheduler():
while True:
print(wait message...)
case_configurations = scheduler_notify_queue.get()
print(f"Got case configurations {case_configurations=}...")
task_schedule_event.set()
print(f"Schedule will start ...")
while task_schedule_event.is_set():
run(case_configurations)
print("Clearing all scheduling job ...")
def event_scheduler(case_config: str):
scheduler_notify_queue.put(case_config)
print(f"Put cases config to the Queue ...")
task_schedule_event.clear()
print(f"Clear scheduler jobs ...")
print(f"Schedule job ...")
def main():
scheduler_notify_queue.put(1)
p = multiprocessing.Process(target=scheduler)
p.start()
count = 1
print(f{count=})
while True:
if count == 5:
event_scheduler(100)
if count == 10:
event_scheduler(200)
count += 1
time.sleep(1)
if __name__ == __main__:
main()
执行结果如下
wait message...Got case configurations case_configurations=1...
Schedule will start ...
1 running...
1 running...
Put cases config to the Queue ...
Clear scheduler jobs ...
Schedule job ...
Clearing all scheduling job ...
wait message...
Got case configurations case_configurations=100...
Schedule will start ...
100 running...
Put cases config to the Queue ...
Clear scheduler jobs ...
Schedule job ...
Clearing all scheduling job ...
wait message...
Got case configurations case_configurations=200...
Schedule will start ...
200 running...
200 running...
总结
使用Event事件通知,代码不易出错,代码编写少,易读。相比之前信号量的方法,推荐大家多使用这种方式。
使用多线程或协程的方式,其实和上述实现方式一致。唯一区别就是调用了不同库中,queue
和event
.
# threadingscheduler_notify_queue = queue.Queue()
task_schedule_event = threading.Event()
# async
scheduler_notify_queue = asyncio.Queue()
task_schedule_event = asyncio.Event()
结语
具体的实现的方式有很多,也各自有各自的优劣势。我们需要去深刻理解到需求本身,才去做技术选型。
以上就是基于Python实现配置热加载的方法详解的详细内容,更多关于Python配置热加载的资料请关注盛行IT软件开发工作室其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。