并行计算中如何提高处理效率——来自Dask的提示

标签:#Dask##Python##分布式数据处理# 时间:2020/03/31 15:43:31 作者:小木

当数据量达到一定程度,单机的处理能力会无法达到性能的要求,采用并行计算,并利用多台服务器进行分布式处理可能会提升数据处理的速度,达到性能要求。然而如果使用不当,并行处理可能并不会提升处理的速度。

这篇博客介绍了Dask中关于并行处理的一些效率方面的建议,尽管是针对Dask的说明,但对于所有的并行处理来说都是适用的。

一、让数据停留在集群的节点中

并行计算的场景中,通常数据量都很大,将数据在节点中移动是不合适的,它会花费大量的时间和资源进行数据的移动,因此,将处理的函数或者方法传给每个节点,让节点基于自己的数据进行运算,并将结果返回是一种更加明智的方法。

例如,如果你需要知道一个numpy.array的shape,如下两种方法会带来截然不同的效果:

1、通过本地进程获取数据的shape,例如直接访问数据的shape属性
2、向集群中的每一个节点发送一个lambda表达式,计算各个节点的shape之后返回结果汇总

显然第二种方法要快很多。

代码示例:

首先,我们向集群中提交一个numpy数组,注意这是Dask的代码示例,想了解Dask的童鞋可以参考系列博客。

>>> x = client.submit(np.random.random, (1000, 1000))
>>> type(x)
Future

如下两种方法会带来完全不同的结果:

# 以下方法速度会很慢,它会让数据在节点之间移动
x.result().shape

# 以下方法速度很快,因为它会在各个节点计算节点内部数据后汇总
client.submit(lambda a: a.shape, x).result()

二、使用较大的任务

正常来说,分布式数据处理都需要有一个调度器来负责多个任务之间的调度。

调度器会为每个任务或Future对象增加大约一毫秒的开销。尽管这听起来可能很快,但是如果您运行十亿个任务,则相当慢。如果您的函数运行速度超过100毫秒左右,则使用分布式计算可能不会带来任何提速。

也就是说如果单个任务的运行速度很快,那么并行调度带来的开销将会显著影响系统的处理速度,使得实际的并行处理并不会十分有效。

一种常见的解决方案是将您的输入分成更大的块。也就是说让单个任务处理一个较多一点的数据。

代码示例:

# 这个程序执行的任务数已经达到1000000000个,太多了,而单个任务执行的速度非常快,那么这种处理方法将不会有并行计算的性能提升
futures = client.map(f, seq)
print(len(futures))  # avoid large numbers of futures
1000000000
def f_many(chunk):
...return [f(x) for x in chunk]

from tlz import partition_all
# 这个方法将原来的1000000000数据划分成1000个
chunks = partition_all(1000000, seq)  # Collect into groups of size 1000

# 然后单个任务直接处理1000000行,这种方法会显著提升处理速度
futures = client.map(f_many, chunks)
print(len(futures))  # Compute on larger pieces of your data at once
1000

三、原生Python代码使用多进程而非多线程

默认情况下,单个Worker使用与您的计算节点具有核心数量相同的线程数并行运行许多计算。 当使用纯Python函数时,这可能不是最佳选择,您可能想在每个节点上运行几个单独的工作进程,每个进程使用一个线程。 在配置集群时,您可能希望使用dask-worker可执行文件的选项,如下所示:

$ dask-worker ip:port --nprocs 8 --nthreads 1

请注意,如果您主要使用NumPy,Pandas,SciPy,Scikit Learn,Numba或其他C / Fortran / LLVM / Cython加速库,那么这对您来说不是问题。您的代码可能最适合与多线程一起使用。

这个问题主要是Python有一个全局解释器锁(GIL),原生的Python代码是不能进行多线程任务的,因为GIL的限制,多线程的Python实际还是单线程。但是,使用多进程则不受这个限制,所以尽量使用多进程来处理Python原生代码的任务。但是像Numpy或者Pandas等底层是C语言的(当然也包含了部分其他语言),那就不受GIL限制了,也就无所谓了。

四、不要优先考虑分布式

优先考虑通过并行以外的其他方式来加速代码。例如更好的算法、数据结构和存储格式等。甚至一点点C / Fortran / Numba代码可能足以为您提供所需的10倍速度提升。 并行和分布式计算是加速应用程序的昂贵方法。因为它有调度的开销,也有相对复杂的门槛。

欢迎大家关注DataLearner官方微信,接受最新的AI技术推送