from dawn till dusk是什么意思,dusk till dawn读音

  from dawn till dusk是什么意思,dusk till dawn读音

  飞跑

  一、Dask简介

  Dask是一个并行计算库,可以在集群中进行分布式计算,可以用更方便简洁的方式处理大量数据。相比Spark等大数据处理框架,Dask更轻。Dask更侧重于与其他框架的结合,如Numpy、Pandas、Scikit-learning,这使得它更便于分布式并行计算。

  二、Dask数据结构

  Dask有三种基本的数据结构:数组、数据帧和包。接下来详细介绍三种数据结构及其应用场景。

  2.1.数组

  Dask中的数组(位于包dask.arrays下)实际上是Numpy中ndarray的一些接口的改进,从而方便了大数据的处理。对于一个很大的数据集,特别是当它的大小大于内存的时候,如果我们要计算它,按照传统的方式,我们会把它全部塞进内存,那么它会报错内存不足。当然我们也可以一次读取一些数据,那么是否可以提前屏蔽掉大数据集呢?我们只需要控制每个数据集不超过内存,就能满足内存中的计算?达斯克就是这么做的。盗图Dask官网:

  从上图可以看出,Dask数组中的每个块实际上都是Numpy数组,所以我们很好奇Dask是如何分区的。我们从dask.array中的from_array(dask.array.core)方法来分析一下:

  我们发现,对于Numpy数组,Dask有六种方法可以阻止它。概括起来就是:对于一个多维Numpy数组,假设它的维数为:(D1,D2,…,Dn),那么chunks表示沿着不同轴划分的块数。Dask将这六种不同的方式转换成统一的方式(统一成第三种方式),如下:

  转换模式如下:

  这里需要注意的是,当组块的值像 1kib 和 1mb 时,需要保证每个组块的大小不能超过划分时组块所限定的大小,例如:

  因为1kiB=1000字节,而np.float32是4字节,也就是对shape=(2000,)的数组进行分区时,任何块的大小都不能超过1000字节。如果划分为8个块,每个块的大小为250*4=1000,刚好满足块大小限制。具体是怎么算出来的?

  我们假设chunks=1000 Byte,要分区的numpy array type=float 32,即这个数组的每个元素占用4个字节。所以我们知道一个块的元素个数不能超过1000/4=250,所以我们只需要从最大的块开始分区。

  2.2.数据帧

  Dataframe是在Pandas Dataframe基础上改进的数据结构,可以并行处理大量数据,甚至是大于内存的数据(注:dask.array不能直接处理大于内存的数据,从其源代码可以看出,从Numpy数组转换为dask数组时,需要先将Numpy数组放入内存)。

  Dask数据帧的划分如下:

  也就是说,分区是按行执行的。下面就拿Dask . data frame . read _ CSV(dask . data frame . io . CSV)来分析一下dask是如何分区的。

  然后通过read_pandas方法,再通过read_bytes方法,根据blocksize将每个文件逐行分块(注意这里返回的是延迟对象),最后通过text_blocks_to_pandas,转换成Dask Dataframe。那么为什么即使是内存不足的数据仍然可以正常导入呢?事实上,我们可以从下面的代码中看出:

  这里有一个重要的概念——Delayed,后面会解释。在这里,暂且把它当作一个指针。它并没有真正将所有分区的数据读入内存,只是在内存中存储一个指针指向这些块数据。

  通过以上步骤我们知道,当我读取一个csv文件时,其实Dask会将其转换成一个延迟列表,列表中每个延迟对象的计算大小不会超过blocksize。那么当我们有一个csv文件的延迟对象时,我们怎么把它转换成Dask数据帧呢?然后我们分析:

  在from _ delayed(dask . data frame . io . io . py)方法中,我们可以看到:

  因为我们知道这里的dfs实际上是一个延迟列表,那么我们需要知道Delayed.dask是什么?为此,我们在Delayed(dask.delayed.py)类中输入dask属性:

  只要您知道这里,您实际上构建了一个任务地图,然后创建了一个Dask数据框架:

  具体流程如下(dask.dataframe.core.py):

  我们发现,最终,我们实际上是基于构建的任务依赖图创建了一个数据帧。

  2.3.包

  Bags主要用于半结构化的大型数据集,比如日志或博客。我们分析如何从它的read_text(dask.bag.text.py)创建Dask Bag对象:

  从里面的delayed方法可以看到,方法最后返回了一个Delayed对象。稍后我们将讨论延迟对象。

  三。高级图简介

  正如我们在上一节中看到的,当创建Dask Dataframe时,我们实际上通过HighLevelGraph构建了一个任务图。那么这个任务图是什么呢?其实他本质上是一个字典结构(dict)。从构成要素来看,它由两部分组成,一部分是动作(可以看作任务图中的一个节点),一部分是依赖(可以看作任务图中的一条边)。它的定义是:

  让我们举一个来自官网的例子来详细了解一下:

  对于本例中的df,它指向最后一层的输出:

  四。Dask分布式系统简介

  Dask之所以能高效处理大量数据,是因为它能进行分布式计算。对于分布式计算,它面临两个基本问题:

  1.怎么交流或者怎么传输数据?

  2.如何调度或调度者如何分配任务给工人?

  在Dask分布式系统中(也可以是伪分布式,即本机上的线程或进程并行处理,本文不做描述),有客户端、调度器和工作器三种角色,其中客户端负责向调度器提交任务,调度器负责将提交的任务按照一定的策略分配给工作器,工作器进行实际的计算和数据存储。在此期间,调度程序始终关注工人的状态。

  接下来,我们将解释Dask如何处理这两个方面。

  1.如何沟通。

  说到通信,只有两个问题需要解决:通信节点如何找到对方,以及在Dask中如何找到对方,即客户端、调度器和工作器。首先,在Dask中,每个需要通信的节点都需要有自己的ID。在Dask中,它由一个URI表示,例如tcp://192.168.0.1:35072。在Dask中,通常通过TCP (TLS和Inproc)进行通信;也可以用);另一个问题,信息如何传输,即如何定义要传输的数据格式?我们知道,当客户端向调度器提交任务时,最终的任务计算是由Worker完成的,需要考虑网络带宽。所以节点之间传输的数据量越小越好,传输速度越快越好。因此,我们需要一个协议来指导数据的编码和解码,这里的数据不仅指Python对象,还指Python函数。

  在Dask中,要传输的原始消息由字典表示,如下所示:

  下一步是序列化它。目前通过Dask中的MsgPack与Pickle和CloudPickle进行序列化和反序列化。

  2.问题怎么排?

  当一批任务在客户端提交时,调度器如何调度它们直接影响到整个分布式计算的效率。我们先来看看下图所示的整体流程:

  当整个提交的任务完成后,通过调度器从工作人员那里获取任务运行结果。

  在整个任务调度中,对于每个工作者来说,都会经历接受新任务——等待依赖任务完成——计算任务——保存数据——垃圾回收的过程。调度程序的任务是跟踪所有的工人。在Dask中,调度程序将记录以下三个实体的状态:

  1.分解的任务图。调度程序将记录每个任务及其相应的状态。

  2.与工人的联系

  3.与客户的联系

  对于每个任务,调度器中都有许多不同的状态表示和相应的状态转换过程。调度器管理的所有任务的当前状态信息可以通过调度器中的TaskState类获得。

  对于每个Worker,调度程序管理的所有Worker都可以通过调度程序中的WorkerState获得。

  对于调度器来说,为了更好地调度任务,他会维护两个结构:一个是用来收集那些资源利用率过大(大大超过平均值)的工作者,另一个是用来收集那些资源利用率不高的工作者,让任务可以立即执行。

  对于每个客户端,调度程序管理的所有客户端都可以通过调度程序中的ClientState获得。

  在调度程序中,监控客户端和工作程序的状态,并根据以下操作更新客户端或工作程序的调度程序状态:

  对于工人:

  1.任务完成。

  2.任务失败

  3.任务缺少必要的数据

  4.添加工人

  5.移除工作人员

  对于客户:

  1.更新任务图并添加更多任务。

  2.释放结果键值,即不再要求调度器返回任务图运行结果。

  从上面的介绍中,我们知道调度器记录了所有任务、客户端和工作者的信息,那么如何基于这些信息进行有效的调度呢?或者调度器如何从任务池中选择要分发的任务,以及它如何从候选工人中选择可以接受任务的工人?

  对于工作者的选择,调度器会根据当前待分配任务的限制(如是否限制GPU等)得到一个满足基本条件的工作者列表。),然后基于worker当前的资源占用信息(比如当前内存占用率,当前运行的任务数),也会综合考虑分配给哪个worker,worker之间传输的数据最少。有关具体的工人选择策略,请参见decide_worker方法。

  对于任务的选择,Dask有自己的一套准则:

  1.确保相对公平,即多个客户端提交的任务应该按照先来先服务的原则进行服务。

  2.尽量优先运行重要的任务序列,从而减少整体运行时间和集群负载。

  3.尽量优先考虑依赖多个任务的任务,因为只有运行这样的任务,才能释放被依赖任务占用的资源。

  4.尽量优先考虑相关性高的任务。

  很难确保完全满足上述所有设计原则。最后,Dask采用了一些启发式原则,详见dask/order.py。

  5.延迟计算和实时计算

  Dask的计算方式有两种,一种是延迟计算,叫做延迟,一种是即时计算,叫做未来。这两个计算是什么意思?先说延迟。

  1.延时的

  我们只需要用dask封装一个普通的Python函数。函数来获得一个延迟的对象,如下所示:

  如图所示,Delayed只构建了一个任务图,并没有进行实际的计算。只有在调用compute时,计算才开始。这对于处理那些可以并行处理的操作非常有用,无论是对于大型数据还是CPU密集型任务。

  2.将来的

  对于延迟,不立即计算,而是构造一个任务图,而对于未来,则立即执行。可以通过提交和映射方法将函数提交给调度程序。在后台,调度程序将处理提交的任务,并将其分配给工人进行实际计算。任务提交后,会返回一个键值,指向任务的运行结果,也就是未来对象。我们可以追踪它目前的状态。当然,我们也可以通过result和gather方法等待任务完成,在本地收集结果。

  不及物动词Dask机器学习库介绍

  目前Dask-ML对Sklearn的支持比较好,Xgboost也可以直接调用使用。对于Tensorflow来说,由于TF在分布式处理方面非常优秀,所以所有的Dask-ML都没有过多的处理它的分布式进程。在通过TF训练模型的过程中,通过start_tensorflow指定集群的参数服务器和Worker服务器,这些服务器对应Dask中的Worker。对于Dask-ML,如下图所示:

  七。Dask和Tensorflow的组合

  对于Dask和TF的结合,有两个部分,一个是训练阶段,一个是预测阶段。对于培训阶段来说,相对复杂一些。请参考此链接:

  3358美丽的冬季Day.com/blog/work/2017/02/11/dask-tensorflow

  对于预测阶段,相对容易:

  总体步骤如下:

  1.将示例封装到Dask示例数据中。这里有一个问题需要注意。对于csv文本数据,我们可以通过reading _ CSV轻松得到Dask Dataframe格式的数据。但是对于图像类型的数据,由于Dask Dataframe只支持二维Numpy数组内部数据,所以图像的处理需要经过Dask数组的处理和延时。

  2.通过客户端提交预测任务。

  3.在预测任务中执行模型加载和实际预测任务。

  八。Dask分布式性能诊断

  去做

  九。Dask和K8s的组合

  目前Dask和K8s结合有几种方式:

  目前我自己测试,主要是基于Helm Repo中的stable/dask镜像,再加上一些必要的尊敬的月饼,构建一个大小在1.83G左右的镜像

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: