Python 多进程并行执行

PythonBeginner
立即练习

介绍

Python 多进程处理(multiprocessing)是一个强大的工具,可以显著加速需要高处理能力的 Python 程序的执行。在本实验中,你将学习 Python 多进程处理,并了解如何使用它来并行运行进程。我们将从简单的示例开始,逐步过渡到更复杂的案例。

创建一个简单的多进程程序

学习 Python 多进程处理的第一步是创建一个简单的程序来演示其工作原理。在这个程序中,我们将创建一个函数,该函数接受一个参数并返回该数字的平方。然后,我们将使用多进程在多个进程中运行此函数。

请完成 square.py

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    results = pool.map(square, range(10))
    print(results)

这段代码创建了一个包含四个进程的进程池,并使用 map() 函数将 square() 函数应用于从 0 到 9 的每个数字。结果随后会打印到控制台。

使用多进程加速处理

现在你已经了解了 Python 中多进程的工作原理,我们可以继续学习一个更复杂的示例。在这个示例中,我们将使用多进程来加速处理一个包含大量数字的列表。

请完成 complex_square.py

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(square, range(1000000))
    print(results[:10])

这段代码创建了一个包含四个进程的进程池,并使用 map() 函数将 square() 函数应用于从 0 到 999999 的每个数字。with 语句用于确保在任务完成后正确关闭进程池。前十个结果随后会打印到控制台。

进程间通信

在某些情况下,使用 Python 多进程时,你可能需要在进程之间进行通信。在这个示例中,我们将使用 multiprocessing.Queue() 函数在进程之间创建一个共享队列。

请完成 multiprocessing_queue.py

import multiprocessing

def producer(queue):
    for i in range(10):
        queue.put(i)

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(item)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    process_producer = multiprocessing.Process(target=producer, args=(queue,))
    process_consumer = multiprocessing.Process(target=consumer, args=(queue,))
    process_producer.start()
    process_consumer.start()
    process_producer.join()
    queue.put(None)
    process_consumer.join()

这段代码使用 multiprocessing.Queue() 函数创建了一个共享队列 queue。然后我们创建了两个进程,分别调用 producer()consumer() 函数。producer() 函数将数字 0 到 9 放入队列,而 consumer() 函数从队列中获取每个项目并将其打印到控制台。

使用 Pool.apply_async() 实现异步任务

除了 map() 方法外,multiprocessing.Pool() 类还提供了另一种并行运行进程的方法,称为 apply_async()。该方法允许你将函数调用提交到进程池中,并立即继续执行,而无需等待结果。相反,你可以使用回调函数在结果准备就绪时获取函数调用的结果。

请完成 slow_square.py

import multiprocessing
import time

def slow_square(x):
    time.sleep(1)
    return x * x

def callback(result):
    print(result)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    results = [pool.apply_async(slow_square, (x,), callback=callback) for x in range(10)]
    for result in results:
        result.wait()
    print("All tasks completed.")

在这个示例中,我们定义了一个函数 slow_square(),它接受一个数字 $x$,等待一秒钟,然后返回 $x$ 的平方。我们还定义了一个回调函数 callback(),它会在结果可用时简单地打印结果。

然后,我们创建了一个包含四个进程的进程池,并使用 apply_async() 为从 0 到 9 的每个数字提交一个函数调用到进程池中。我们通过 args 参数传入 slow_square() 函数和参数 $x$,并指定在结果准备就绪时调用的回调函数。

接着,我们遍历结果列表并调用 result.wait(),以阻塞主线程直到结果准备就绪。最后,我们打印一条消息以指示所有任务已完成。

总结

在本实验中,你学习了如何使用 Python 多进程来并行运行进程。你从一个简单的程序开始,演示了多进程的工作原理,然后进一步学习了更复杂的示例,这些示例使用多进程来加速处理、在进程之间共享内存,并使用 apply_async() 执行异步任务。通过本实验的学习,你应该对如何在自己的 Python 程序中使用多进程有了很好的理解,以实现更高的处理能力和效率,并在必要时在进程之间进行通信。