python如何调取数据库数据,python 提取数据
本文主要介绍Python探针完成调用库的数据提取。在Python中,importhook的功能可以通过sys.meta_path来实现。下面详细介绍,需要的朋友可以参考一下。
目录
1.简单粗暴的方法——打包mysql库2。2的探针。Python 3。制作探针模块4。直接替换法5。摘要
1.简单粗暴的方法--对mysql库进行封装
要统计一个执行过程,需要知道执行过程的开始和结束位置,所以最简单最残酷的方法就是根据要调用的方法进行封装,在MySQL库和框架调用的MySQL库之间实现一个中间层,在中间层完成耗时的统计,如:.
#伪代码
def my_execute(连接,sql,参数):
MySql库的统计封装组件
使用MyTracer(连接、sql、参数):
#以下是正常使用MySql库的代码
用连接光标作为光标:
游标. execute(sql,param)
.
看起来实现的很好,改动很方便,但实际上因为是在顶级API上做的,所以很不灵活。同时在cursor.execute中还会进行一些前置操作,比如拼接sql和param,调用nextset清除当前游标的数据等等。我们最终得到的数据如果花时间的话是不准确的,也没有办法得到一些详细的元数据,比如错误码等。
如果想得到最直接最有用的数据,只能修改源代码,然后调用源代码。但是如果每个库都需要修改源代码来做统计,那就太麻烦了。好在Python还提供了一些类似于探针的接口,通过这些接口你可以替换库的源代码来完成我们的代码。
2.Python的探针
在Python中,可以通过sys.meta_path实现导入钩子的功能。执行导入相关操作时,导入相关库会根据sys.meta_path定义的对象而改变。sys.meta _ path中的对象需要实现find_module方法。此find_module方法返回None或实现load_module方法的对象。我们可以在导入一些库的时候用这个对象来替换相关的方法。简单用法如下。hooktime.sleep允许他打印睡觉时消耗的时间。
导入导入库
导入系统
从functools导入包装
定义函数_包装器(函数):
在这里,使用一个装饰装置来实现将灵猫变成王子并获取数据的效果
@wraps(func)
def包装(*args,**kwargs):
#记录开始时间
start=time.time()
result=func(*args,**kwargs)
#计算消耗时间
end=time.time()
打印(fspeed time:{end - start} )
回送结果
返回包装
类元路径查找器:
def find_module(self,全名,路径=无):
#执行时,您可以看到正在导入哪些模块
打印(f find module : { path } : { full name } )
返回元路径加载器()
类元路径加载器:
def load_module(自身,全名):
# import的模块会存储在sys.modules中,通过判断可以减少重复导入。
如果全名在sys.modules:中
返回sys.modules[fullname]
#防止递归调用
finder=sys.meta_path.pop(0)
#导入模块
module=importlib.import_module(全名)
if fullname==time:
# 替换函数
module.sleep = func_wrapper(module.sleep)
sys.meta_path.insert(0, finder)
return module
sys.meta_path.insert(0, MetaPathFinder())
if __name__ == __main__:
import time
time.sleep(1)
# 输出示例:
# find module:datetime
# find module:time
# load module:time
# find module:math
# find module:_datetime
# speed time:1.00073385238647468
3.制作探针模块
了解完了主要流程后, 可以开始制作自己的探针模块了, 由于示例只涉及到aiomysql模块, 那么在MetaPathFinder.find_module中需要只对aiomysql模块进行处理, 其他的先忽略. 然后我们需要确定我们要把aiomysql的哪个功能给替换, 从业务上来说, 一般情况下我们只要cursor.execute, cursor.fetchone, cursor.fetchall, cursor.executemany这几个主要的操作,所以需要深入cursor看看如何去更改代码, 后者重载哪个函数.
先cursor.execute的源码(cursor.executemanay也类似), 发现会先调用self.nextset的方法, 把上个请求的数据先拿完, 再合并sql语句, 最后通过self._query进行查询:
async def execute(self, query, args=None):"""Executes the given operation
Executes the given operation substituting any markers with
the given parameters.
For example, getting all rows where id is 5:
cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
:param query: ``str`` sql statement
:param args: ``tuple`` or ``list`` of arguments for sql query
:returns: ``int``, number of rows that has been produced of affected
"""
conn = self._get_db()
while (await self.nextset()):
pass
if args is not None:
query = query % self._escape_args(args, conn)
await self._query(query)
self._executed = query
if self._echo:
logger.info(query)
logger.info("%r", args)
return self._rowcount
再看cursor.fetchone的源码(cursor.fetchall也类似), 发现其实是从缓存中获取数据,
这些数据在执行cursor.execute中就已经获取了:
def fetchone(self):"""Fetch the next row """
self._check_executed()
fut = self._loop.create_future()
if self._rows is None or self._rownumber >= len(self._rows):
fut.set_result(None)
return fut
result = self._rows[self._rownumber]
self._rownumber += 1
fut = self._loop.create_future()
fut.set_result(result)
return fut
综合上面的分析, 我们只要对核心的方法self._query进行重载即可拿到我们要的数据, 从源码中我们可以知道, 我们能获取到传入self._query的self和sql参数, 根据self又能获取到查询的结果, 同时我们通过装饰器能获取到运行的时间, 要的数据基本都到齐了,
按照思路修改后的代码如下:
import importlibimport time
import sys
from functools import wraps
from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING
from types import ModuleType
if TYPE_CHECKING:
import aiomysql
def func_wrapper(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
start: float = time.time()
func_result: Any = await func(*args, **kwargs)
end: float = time.time()
# 根据_query可以知道, 第一格参数是self, 第二个参数是sql
self: aiomysql.Cursor = args[0]
sql: str = args[1]
# 通过self,我们可以拿到其他的数据
db: str = self._connection.db
user: str = self._connection.user
host: str = self._connection.host
port: str = self._connection.port
execute_result: Tuple[Tuple] = self._rows
# 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了,
# 这里只是打印一部分数据出来
print({
"sql": sql,
"db": db,
"user": user,
"host": host,
"port": port,
"result": execute_result,
"speed time": end - start
})
return func_result
return cast(Callable, wrapper)
class MetaPathFinder:
@staticmethod
def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:
if fullname == aiomysql:
# 只有aiomysql才进行hook
return MetaPathLoader()
else:
return None
class MetaPathLoader:
@staticmethod
def load_module(fullname: str):
if fullname in sys.modules:
return sys.modules[fullname]
# 防止递归调用
finder: "MetaPathFinder" = sys.meta_path.pop(0)
# 导入 module
module: ModuleType = importlib.import_module(fullname)
# 针对_query进行hook
module.Cursor._query = func_wrapper(module.Cursor._query)
sys.meta_path.insert(0, finder)
return module
async def test_mysql() -> None:
import aiomysql
pool: aiomysql.Pool = await aiomysql.create_pool(
host=127.0.0.1, port=3306, user=root, password=123123, db=mysql
)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
(r,) = await cur.fetchone()
assert r == 42
pool.close()
await pool.wait_closed()
if __name__ == __main__:
sys.meta_path.insert(0, MetaPathFinder())
import asyncio
asyncio.run(test_mysql())
# 输出示例:
# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间
# {sql: SELECT 42;, db: mysql, user: root, host: 127.0.0.1, port: 3306, result: ((42,),), speed time: 0.00045609474182128906}
这个例子看来很不错, 但是需要在调用的入口处显式调用该逻辑, 通常一个项目可能有几个入口, 每个入口都显示调用该逻辑会非常麻烦, 而且必须先调用我们的hook逻辑后才能import, 这样就得订好引入规范, 不然就可能出现部分地方hook不成功, 如果能把引入hook这个逻辑安排在解析器启动后马上执行, 就可以完美地解决这个问题了. 查阅了一翻资料后发现,python解释器初始化的时候会自动import PYTHONPATH下存在的sitecustomize和usercustomize模块, 我们只要创建该模块, 并在模块里面写入我们的 替换函数即可。
.├── __init__.py
├── hook_aiomysql.py
├── sitecustomize.py
└── test_auto_hook.py
hook_aiomysql.py是我们制作探针的代码为例子, 而sitecustomize.py存放的代码如下, 非常简单, 就是引入我们的探针代码, 并插入到sys.meta_path:
import sysfrom hook_aiomysql import MetaPathFinder
sys.meta_path.insert(0, MetaPathFinder())
test_auto_hook.py则是测试代码:
import asynciofrom hook_aiomysql import test_mysql
asyncio.run(test_mysql())
接下来只要设置PYTHONPATH并运行我们的代码即可(如果是项目的话一般交由superisor启动,则可以在配置文件中设置好PYTHONPATH):
(.venv) ➜ python_hook git:(master) ✗ export PYTHONPATH=.(.venv) ➜ python_hook git:(master) ✗ python test_auto_hook.py
{sql: SELECT 42;, db: mysql, user: root, host: 127.0.0.1, port: 3306, result: ((42,),), speed time: 0.000213623046875}
4.直接替换方法
可以看到上面的方法很好的运行了, 而且可以很方便的嵌入到我们的项目中, 但是依赖与sitecustomize.py文件很难让他抽离成一个第三方的库, 如果要抽离成第三方的库就得考虑看看有没有其他的方法。在上面介绍MetaPathLoader时说到了sys.module, 在里面通过sys.modules来减少重复引入:
class MetaPathLoader:def load_module(self, fullname):
# import的模块都会存放在sys.modules里面, 通过判断可以减少重复import
if fullname in sys.modules:
return sys.modules[fullname]
# 防止递归调用
finder = sys.meta_path.pop(0)
# 导入 module
module = importlib.import_module(fullname)
if fullname == time:
# 替换函数
module.sleep = func_wrapper(module.sleep)
sys.meta_path.insert(0, finder)
return module
这个减少重复引入的原理是, 每次引入一个模块后, 他就会存放在sys.modules, 如果是重复引入, 就会直接刷新成最新引入的模块。上面之所以会考虑到减少重复import是因为我们不会在程序运行时升级第三方库的依赖。利用到我们可以不考虑重复引入同名不同实现的模块, 以及sys.modules会缓存引入模块的特点, 我们可以把上面的逻辑简化成引入模块->替换当前模块方法为我们修改的hook方法。
import timefrom functools import wraps
from typing import Any, Callable, Tuple, cast
import aiomysql
def func_wrapper(func: Callable):
"""和上面一样的封装函数, 这里简单略过"""
# 判断是否hook过
_IS_HOOK: bool = False
# 存放原来的_query
_query: Callable = aiomysql.Cursor._query
# hook函数
def install_hook() -> None:
_IS_HOOK = False
if _IS_HOOK:
return
aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query)
_IS_HOOK = True
# 还原到原来的函数方法
def reset_hook() -> None:
aiomysql.Cursor._query = _query
_IS_HOOK = False
代码简单明了,接下来跑一跑刚才的测试:
import asyncioimport aiomysql
from demo import install_hook, reset_hook
async def test_mysql() -> None:
pool: aiomysql.Pool = await aiomysql.create_pool(
host=127.0.0.1, port=3306, user=root, password=, db=mysql
)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
(r,) = await cur.fetchone()
assert r == 42
pool.close()
await pool.wait_closed()
print("install hook")
install_hook()
asyncio.run(test_mysql())
print("reset hook")
reset_hook()
asyncio.run(test_mysql())
print("end")
通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息
install hook{sql: SELECT 42;, db: mysql, user: root, host: 127.0.0.1, port: 3306, result: ((42,),), speed time: 0.000347137451171875}
reset hook
end
5.总结
得益于Python动态语言的特性, 我们可以很容易的为第三方库实现钩子方法,上面说的两种方法中, 第二种方法非常简单, 但在自己项目中最好还是采用第一种方法, 因为Python是通过一行一行代码进行扫描执行的, 第二种方法只能放在入口代码中, 并且要在被hook的对象实例化之前执行, 不然就会实现hook失败的现象, 而第一种方法除了麻烦外,基本上能躲避所有坑。
到此这篇关于Python探针完成调用库的数据提取的文章就介绍到这了,更多相关 Python探针 内容请搜索盛行IT软件开发工作室以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT软件开发工作室!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。