python进程池和队列,python线程池和进程池

  python进程池和队列,python线程池和进程池

  本文主要详细介绍Python对线程池线程安全队列的实现。本文中的示例代码非常详细,具有一定的参考价值。感兴趣的朋友可以参考一下。

  00-1010一、线程池的构成二。线程安全队列III的实现。测试逻辑3.1。测试阻塞逻辑3.2。测试读取、写入和锁定逻辑。本例分享用Python实现线程池线程安全队列的具体代码,供大家参考。具体内容如下

  

目录

  一个完整的线程池由以下几部分组成:线程安全队列、任务对象、线程处理对象和线程池对象。线程安全队列之一是实现线程池和任务队列的基础。在这一节中,我们通过互斥线程实现了一个简单且读安全的线程队列。Lock()和条件变量threading。线程包中的条件()。

  

一、线程池组成

  包括put、pop、get和其他方法。为了保证线程安全,读写操作时要添加互斥体。并且pop操作可以设置等待时间来阻塞当前获取该元素的线程,当新元素被写入队列时,等待操作被条件变量取消。

  类ThreadSafeQueue(对象):

  def __init__(self,max_size=0):

  self.queue=[]

  自我。max _ size=max _ size # max _ size 0表示无穷大。

  Self.lock=线程。Lock() #互斥体

  self . condition=threading . condition()#条件变量

  定义尺寸(自身):

  获取当前队列的大小。

  :返回:队列长度

  #锁定

  self.lock.acquire()

  size=len(self.queue)

  self.lock.release()

  退货尺寸

  定义放置(自身,项目):

  将单个元素放入队列中。

  :参数项目:

  :返回:

  #队列已满。max_size为0表示无穷大。

  if self.max_size!=0且self.size()=self.max_size:

  返回ThreadSafeException()

  #锁定

  self.lock.acquire()

  self.queue.append(项目)

  self.lock.release()

  self.condition.acquire()

  #通知等待读取的线程

  self.condition.notify()

  self.condition.release()

  退货项目

  定义批投放(自身,项目列表):

  添加批量元素

  :参数item_list:

  :返回:

  如果不是isinstance(item_list,list):

  item_list=list(item_list)

  RES=[item _ list中项目的self . put(item)]

  返回资源

  def pop(self,block=False,timeout=0):

  从队列的头部获取元素。

  :param block:阻塞线程?

  :参数超时:等待时间

  :返回:

  if self.size()==0:

  如果块:

  self.condition.acquire()

          self.condition.wait(timeout)

                  self.condition.release()

              else:

                  return None

          # 加锁

          self.lock.acquire()

          item = None

          if len(self.queue):

              item = self.queue.pop()

          self.lock.release()

          return item

      def get(self, index):

          """

          获取指定位置的元素

          :param index:

          :return:

          """

          if self.size() == 0 or index >= self.size():

              return None

          # 加锁

          self.lock.acquire()

          item = self.queue[index]

          self.lock.release()

          return item

  class ThreadSafeException(Exception):

      pass

  

  

三、测试逻辑

  

  

3.1、测试阻塞逻辑

  

def thread_queue_test_1():

      thread_queue = ThreadSafeQueue(10)

      def producer():

          while True:

              thread_queue.put(random.randint(0, 10))

              time.sleep(2)

      def consumer():

          while True:

              print(current time before pop is %d % time.time())

              item = thread_queue.pop(block=True, timeout=3)

              # item = thread_queue.get(2)

              if item is not None:

                  print(get value from queue is %s % item)

              else:

                  print(item)

              print(current time after pop is %d % time.time())

      t1 = threading.Thread(target=producer)

      t2 = threading.Thread(target=consumer)

      t1.start()

      t2.start()

      t1.join()

      t2.join()

  测试结果:

  我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。

  

  

  

3.2、测试读写加锁逻辑

  

def thread_queue_test_2():

      thread_queue = ThreadSafeQueue(10)

      def producer():

          while True:

              thread_queue.put(random.randint(0, 10))

              time.sleep(2)

      def consumer(name):

          while True:

              item = thread_queue.pop(block=True, timeout=1)

              # item = thread_queue.get(2)

              if item is not None:

                  print(%s get value from queue is %s % (name, item))

              else:

                  print(%s get value from queue is None % name)

      t1 = threading.Thread(target=producer)

      t2 = threading.Thread(target=consumer, args=(thread1,))

      t3 = threading.Thread(target=consumer, args=(thread2,))

      t1.start()

      t2.start()

      t3.start()

      t1.join()

      t2.join()

      t3.join()

  测试结果:

  生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。

  

  以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: