threadpoolexcutor是一款线程池组件,ThreadPoolExecutor python

  threadpoolexcutor是一款线程池组件,ThreadPoolExecutor python

  这篇文章主要为大家介绍了python3线程池ThreadPoolExecutor处理战斗支援车文件数据实现的实例过程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  

目录
背景知识点拓展库流程实现代码解释

  

背景

  由于不同乙方对服务商业务接口字段理解不一致,导致线上上千万数据量数据存在问题,为了修复数据,通过计算机编程语言脚本进行修改

  

知识点

  Python3,线程池、pymysql、CSV文件操作、请求

  

拓展

  当我们程序在使用到线程、进程或协程的时候,以下三个知识点可以先做个基本认知

  中央处理器密集型、木卫一密集型、GIL全局解释器锁

  

  pip3安装请求

  pip3安装已安装

  

流程

  

实现代码

  # -*-编码:utf-8 -*-

  # @FileName:grade_update.py

  # @Desc :在一台超级计算机上运行过的牛逼计算机编程语言代码

  导入时间

  从并行未来导入ThreadPoolExecutor,FIRST_COMPLETED,等待

  导入请求

  导入已安装

  从项目路径导入路径

  gradeId=[4303,4304,1000926,1000927]

  def writer _ MySQL():

  数据库连接

  返回pymysql。connect(host= localhost ,

  端口=3306,

  用户=管理员,

  密码=管理,

  数据库=测试

  )

  定义操作数据库(等级标识,成员标识):

  操作数据库

  db=writ _ mysql()

  尝试:

  cursor=db.cursor()

  SQL=f update ` t _ m _ member _ grade ` SET ` current _ grade _ id `={ grade _ id },` modified`=now()其中` member _ id `={ member _ id };

  游标.执行(sql)

  数据库提交()

  打印(f 提交的SQL-{sql} )

  除了pymysql .错误为e:

  db.rollback()

  打印(数据库数据库异常:,e)

  db.close()

  返回真实的

  定义接口(行,thead):

  调用第三方接口

  打印(f 处理数据行数- {thead} -数据- {rows} )

  尝试:

  URL= http://xxxx/API/XXX-data/天猫/bindQuery

  body={

  昵称:字符串(行[0]),

  卖方名称 : 测试:

  手机 : 111

  }

  heade={ Content-Type : application/x-www-form-urlencoded }

  res=requests.post(url=url,data=body,headers=heade)

  result=res.json()

  如果结果[数据][状态]在[1,2]:中

  等级=结果[数据][成员][级别]

  grade_id=gradeId[grade]

  操作数据库(等级标识=等级标识,成员标识=行[1])

  返回真实的

  返回真实的

  e:除外

  打印(f 调用异常:{e} )

  定义read_csv():

  导入战斗支援车

  # db=writ _ mysql()

  #线程数

  MAX_WORKERS=5

  ThreadPoolExecutor(MAX_WORKERS)为游泳池:

  with open(path /file/result 2 _ colu。CSV , r ,newline= ,encoding=utf-8 )作为f:

  #set()函数创建无序不重复元素集

  seq_notdone=set()

  seq_done=set()

  # 使用战斗支援车的读者()方法,创建一个读者对象

  reader=csv.reader(f)

  n=0

  对于读者:中的行

  n=1

  # 遍历读者对象的每一行

  尝试:

  序列_未完成。添加(池。submit(interface,rows=row,thead=n))

  if len(seq _ not done)=MAX _ workers :

  #FIRST_COMPLETED文档说明-当任何期货结束或取消时返回。

  done,seq_notdone=wait(seq_notdone,return_when=FIRST_COMPLETED)

  seq_done更新(完成)

  e:除外

  打印(f 解析结果出错:{e} )

  # db.close()

  返回完成

  if __name__==__main__:

  read_csv()

  

解释

  引入线程池库

  从concurrent.futures导入ThreadPoolExecutor,FIRST_COMPLETED,等待

  pool.submit(interface,rows=row,thead=n)

  提交任务,接口调用的函数,rows和thead是interface()函数的参数。

  任务不断提交,线程池不断被MAX_WORKERS定义的线程数消耗。

  说明这个I/O密集型操作脚本适合多线程。如果是CPU密集型,建议使用,根据机器核心数配置。

  以上是python3 ThreadPoolExecutor处理csv文件数据的细节。更多关于python3 ThreadPoolExecutor处理csv的信息,请关注盛行的IT软件开发工作室的其他相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: