MPIRE (MultiProcessing Is Really Easy)

标准库中的 multiprocess 里有很多很多坑. MPIRE 是标准库的一个替代, 性能更好, 那些不应该由开发者考虑的坑基本都没有了.

Examples

# -*- coding: utf-8 -*-

import os
import time
from multiprocessing import Pool

print(f"you have {os.cpu_count()} CPU core")


def func(name):
    for i in range(10):
        time.sleep(1)  # Simulate that this function takes long to complete
        print(f"{i}th call, hello {name}!")
    return f"hello {name}"


# multiprocessing.Pool has to use under if __name__ == "__main__":
# it is like run this script with different parameter 8 times
if __name__ == "__main__":
    args = list("abcdefghijklmnopqrstuvwxyz")
    with Pool(processes=12) as pool:
        results = pool.map(func, args)
        print(results)
# -*- coding: utf-8 -*-

import os
import time
from mpire import WorkerPool

print(f"you have {os.cpu_count()} CPU core")


def func(name):
    for i in range(3):
        time.sleep(1)  # Simulate that this function takes long to complete
        print(f"{i}th call, hello {name}!")
    return f"hello {name}"


args = list("abcdefghijklmnopqrstuvwxyz")
kwargs = [{"name": arg} for arg in args]

with WorkerPool(n_jobs=12) as pool:
    results = pool.map(func, kwargs, enable_insights=True)
    print(results)
    print(pool.get_insights())

Shared Object

# -*- coding: utf-8 -*-

"""
Share object Example
"""

import os
import random
from typing import List
from mpire import WorkerPool

# MacOS high sierra disable multi process by defaultz
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"

cpu_count = os.cpu_count()
print(f"you have {cpu_count} CPU core")


def func(numbers: List[int]):
    i = random.randint(1, 100)
    numbers.append(i)
    return i


numbers: List[int] = list()

kwargs: List[dict] = [{} for _ in range(10)]

# from https://slimmer-ai.github.io/mpire/usage/workerpool/shared_objects.html#copy-on-write-alternatives
# it says that For ``threading`` these shared objects are readable and writable without copies being made.
# which is our use case here
with WorkerPool(
    n_jobs=cpu_count,
    shared_objects=numbers,
    start_method="threading",
) as pool:
    results = pool.map(func, kwargs)
    print(f"all returned results = {results}, sum = {sum(results)}")
    print(f"number of elements in numbers: {len(numbers)}")
    print(f"sum of the numbers: {sum(numbers)}")

扩展阅读 并行模型

两种方式利用并发提高性能: 第一, 将一个单个任务分成几部分, 且各自并行运行, 从而降低总运行时间. 这就是任务并行(task parallelism) . 虽然这听起来很直观, 但它是一个相当复杂的过程, 因为在各个部分之间可能存在着依赖. 区别可能是在过程方面——一个线程执行算法的一部分, 而另一个线程执行算法的另一个部分——或是在数据方面——每个线程在不同的数据部分上执行相同的操作(第二种方式). 后一种方法被称为数据并行 (data parallelism).

第一种并行方式影响的算法常被称为易并行 (embarrassingly parallel) 算法. 尽管易并行算法的代码会让你感觉到头痛, 但这对于你来说是一件好事: 我曾遇到过自然并行 (naturally parallel) 和便利并发 (conveniently concurrent) 的算法. 易并行算法具有良好的可扩展特性——当可用硬件线程的数量增加时, 算法的并行性也会随之增加. 这种算法能很好的体现人多力量大. 如果算法中有不易并行的部分, 你可以把算法划分成固定(不可扩展)数量的并行任务. 第8章将会再来讨论, 在线程之间划分任务的技巧.

第二种方法是使用可并行的方式, 来解决更大的问题; 与其同时处理一个文件, 不如酌情处理2个、10个或20个. 虽然, 这是数据并行的一种应用(通过对多组数据同时执行相同的操作), 但着重点不同. 处理一个数据块仍然需要同样的时间, 但在相同的时间内处理了更多的数据. 当然, 这种方法也有限制, 并非在所有情况下都是有益的. 不过, 这种方法所带来的吞吐量提升, 可以让某些新功能成为可能, 例如, 可以并行处理图片的各部分, 就能提高视频的分辨率.

总结一下, 例如你有一个任务, 其中有 20 个步骤. 但你要同时执行 100 个任务. task para 的本质上是用 8 个 CPU, 每个执行一个任务. 而 data para 的本质是 8 个 CPU, 每个执行其中的一个步骤, 例如无论是哪个任务的第 1 个步骤都交给 1 号 CPU 执行.