轻松使用多线程

PythonPythonIntermediate
立即练习

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

介绍

在本教程中,我们将学习如何使用 Python 的 threading 模块来并发运行多个执行线程。

Python 的 threading 模块提供了一种简单的方式来创建和管理 Python 程序中的线程。线程是程序中的一个独立执行流。通过并发运行多个线程,我们可以利用多核 CPU 并提高程序的性能。

threading 模块提供了两个类来创建和管理线程:

  1. Thread 类:该类表示一个单独的执行线程。
  2. Lock 类:该类允许在线程之间同步对共享资源的访问。


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL python(("Python")) -.-> python/ModulesandPackagesGroup(["Modules and Packages"]) python(("Python")) -.-> python/ObjectOrientedProgrammingGroup(["Object-Oriented Programming"]) python(("Python")) -.-> python/AdvancedTopicsGroup(["Advanced Topics"]) python(("Python")) -.-> python/FunctionsGroup(["Functions"]) python/FunctionsGroup -.-> python/function_definition("Function Definition") python/ModulesandPackagesGroup -.-> python/creating_modules("Creating Modules") python/ModulesandPackagesGroup -.-> python/standard_libraries("Common Standard Libraries") python/ObjectOrientedProgrammingGroup -.-> python/classes_objects("Classes and Objects") python/AdvancedTopicsGroup -.-> python/threading_multiprocessing("Multithreading and Multiprocessing") subgraph Lab Skills python/function_definition -.-> lab-7839{{"轻松使用多线程"}} python/creating_modules -.-> lab-7839{{"轻松使用多线程"}} python/standard_libraries -.-> lab-7839{{"轻松使用多线程"}} python/classes_objects -.-> lab-7839{{"轻松使用多线程"}} python/threading_multiprocessing -.-> lab-7839{{"轻松使用多线程"}} end

创建线程

在 Python 中创建一个新线程,我们需要创建一个 Thread 类的实例,并传递一个要执行的函数给它。

在 WebIDE 中创建一个名为 create_thread.py 的项目,并输入以下内容。

import threading

## 定义一个在线程中运行的函数
def my_function():
    print("Hello from thread")

## 创建一个新线程
thread = threading.Thread(target=my_function)

## 启动线程
thread.start()

## 等待线程完成
thread.join()

## 打印 "Done" 表示程序已完成
print("Done")

这个示例定义了一个函数 my_function,它会打印一条消息。然后我们创建了一个 Thread 类的新实例,并将 my_function 作为目标函数传递给它。最后,我们使用 start 方法启动线程,并使用 join 方法等待线程完成。

使用以下命令运行脚本。

python create_thread.py

同步

如果多个线程访问同一个共享资源(例如变量或文件),我们必须同步对该资源的访问,以避免竞态条件(race conditions)。Python 的 threading 模块为此提供了 Lock 类。

以下是一个使用 Lock 的示例。在 WebIDE 中创建一个名为 sync.py 的项目,并输入以下内容。

import threading

## 创建一个锁以保护对共享资源的访问
lock = threading.Lock()

## 共享资源,将被多个线程修改
counter = 0

## 定义每个线程将运行的函数
def my_function():
    global counter

    ## 在访问共享资源之前获取锁
    lock.acquire()
    try:
        ## 访问共享资源
        counter += 1
    finally:
        ## 在修改共享资源后释放锁
        lock.release()

## 创建多个线程以访问共享资源
threads = []

## 创建并启动 10 个执行相同函数的线程
for i in range(10):
    thread = threading.Thread(target=my_function)
    threads.append(thread)
    thread.start()

## 等待所有线程完成执行
for thread in threads:
    thread.join()

## 打印计数器的最终值
print(counter) ## 输出:10

在这个示例中,我们创建了一个 Lock 对象和一个共享资源 countermy_function 函数通过使用 acquire 方法获取锁来访问共享资源,并在修改共享资源后使用 release 方法释放锁。我们创建了多个线程并启动它们,然后使用 join 方法等待它们完成。最后,我们打印计数器的最终值。

使用以下命令运行脚本。

python sync.py

带参数的线程

在 Python 中,你可以通过两种方式向线程传递参数:一种是在创建新线程时使用 args 参数,另一种是通过继承 Thread 类并定义接受参数的构造函数。以下是这两种方法的示例:

  1. 我们创建一个 Thread 类的子类,并重写 run 方法来定义线程的行为。在 WebIDE 中创建一个名为 thread_subclass.py 的项目,并输入以下内容。
import threading

## 定义一个继承 Thread 类的自定义线程类
class MyThread(threading.Thread):
    ## 重写 run() 方法以实现线程的行为
    def run(self):
        print("Hello from thread")

## 创建自定义线程类的实例
thread = MyThread()

## 启动线程
thread.start()

## 等待线程完成执行
thread.join()

使用以下命令运行脚本。

python thread_subclass.py
  1. 我们创建一个线程,并使用 args 参数向目标函数传递参数。在 WebIDE 中创建一个名为 thread_with_args.py 的项目,并输入以下内容。
import threading

## 定义一个接受参数的函数
def my_function(name):
    print("Hello from", name)

## 创建一个带有目标函数和参数的新线程
thread = threading.Thread(target=my_function, args=("Thread 1",))

## 启动线程
thread.start()

## 等待线程完成执行
thread.join()

使用以下命令运行脚本。

python thread_with_args.py

线程池

在 Python 中,你可以使用线程池来使用一组预定义的线程并发执行任务。使用线程池的好处是避免了为每个任务创建和销毁线程的开销,从而提高性能。

Python 的 concurrent.futures 模块提供了 ThreadPoolExecutor 类,允许你创建一个线程池并提交任务。以下是一个示例:

在 WebIDE 中创建一个名为 thread_pool_range.py 的项目,并输入以下内容。

import concurrent.futures

## 定义一个函数,该函数将在多个线程中执行,并接受两个参数
def my_func(arg1, arg2):
    ## 在此定义线程执行的任务
    print(f"Hello from my thread with args {arg1} and {arg2}")

## 创建一个最多包含 5 个工作线程的 ThreadPoolExecutor 对象
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    ## 将每个任务(函数调用及其参数)提交给执行器,以便在单独的线程中处理
    ## submit() 方法返回一个 Future 对象,表示异步计算的结果
    for i in range(10):
        executor.submit(my_func, i, i+1)

在这个示例中,我们定义了一个接受两个参数的函数 my_func。我们创建了一个最多包含 5 个工作线程的 ThreadPoolExecutor。然后,我们遍历一个数字范围,并使用 executor.submit() 方法将任务提交到线程池。每个提交的任务都会在可用的工作线程之一上执行。

提示:ThreadPoolExecutor 对象被用作上下文管理器。这确保了当 with 块中的代码执行完毕时,所有线程都会被正确清理。

使用以下命令运行脚本。

python thread_pool_range.py

submit() 方法会立即返回一个 Future 对象,表示提交任务的结果。你可以使用 Future 对象的 result() 方法来获取任务的返回值。如果任务引发异常,调用 result() 将会抛出该异常。

你还可以使用 ThreadPoolExecutor 类的 map() 方法将同一个函数应用于一组数据。例如,在 WebIDE 中创建一个名为 thread_pool_map.py 的项目,并输入以下内容:

import concurrent.futures

## 定义一个函数,该函数将在多个线程中执行
def my_func(item):
    ## 在此定义线程执行的任务
    print(f"Hello from my thread with arg {item}")

## 创建一个由线程处理的项列表
items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

## 创建一个最多包含 5 个工作线程的 ThreadPoolExecutor 对象
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    ## 将每个项提交给执行器,以便在单独的线程中处理
    ## map() 方法会自动按顺序返回结果
    executor.map(my_func, items)

在这个示例中,我们定义了一个接受一个参数的函数 my_func。我们创建了一个项列表,并使用 executor.map() 方法将它们提交到线程池。列表中的每个项都会作为参数传递给 my_func,并且每个项都会在可用的工作线程之一上执行。

使用以下命令运行脚本。

python thread_pool_map.py

thread_pool_range.pythread_pool_map.py 的结果是相同的。

守护线程(Daemon Threads)

在 Python 中,守护线程是一种在后台运行的线程,它不会阻止程序退出。当所有非守护线程执行完毕后,Python 解释器会退出,无论是否有守护线程仍在运行。

在 WebIDE 中创建一个名为 daemon_thread_with_args.py 的项目,并输入以下内容。

import threading
import time

## 定义一个无限循环运行的函数,并定期打印消息
def my_function():
    while True:
        print("Hello from thread")
        time.sleep(1)

## 创建一个守护线程,运行目标函数
thread = threading.Thread(target=my_function, daemon=True)

## 启动线程
thread.start()

## 主程序继续执行并打印消息
print("Main program")

## 等待几秒后退出程序
time.sleep(5)

## 程序退出,守护线程自动终止
print("Main thread exiting...")

在这个示例中,我们创建了一个线程,该线程运行一个无限循环,并使用 time.sleep() 函数每秒打印一条消息。我们通过 daemon 参数将线程标记为守护线程,以便在主程序退出时自动终止。主程序继续运行并打印一条消息。我们等待几秒后,程序退出,守护线程也随之终止。

然后使用以下命令运行脚本。

python daemon_thread_with_args.py

当然,我们也可以通过调用线程实例的 setDaemon(True) 方法将线程设置为守护线程。例如,在 WebIDE 中创建一个名为 daemon_thread_with_func.py 的项目,并输入以下内容:

import threading
import time

## 定义一个无限循环运行的函数,并定期打印消息
def my_function():
    while True:
        print("Hello from thread")
        time.sleep(1)

## 创建一个新线程,运行目标函数
thread = threading.Thread(target=my_function)

## 将守护线程标志设置为 True,使线程在后台运行,并在主程序退出时终止
thread.setDaemon(True)

## 启动线程
thread.start()

## 主程序继续运行并打印消息
print("Main program")

## 等待几秒后退出程序
time.sleep(5)

## 程序退出,守护线程自动终止
print("Main thread exiting...")

使用以下命令执行脚本,将实现与上述示例相同的结果。

python daemon_thread_with_func.py

事件对象(Event Object)

在 Python 中,你可以使用 threading.Event 对象来让线程等待特定事件的发生后再继续执行。Event 对象提供了一种方式,让一个线程可以发出事件发生的信号,而其他线程可以等待该信号。

在 WebIDE 中创建一个名为 event_object.py 的项目,并输入以下内容。

import threading

## 创建一个事件对象
event = threading.Event()

## 定义一个函数,等待事件被触发
def my_function():
    print("Waiting for event")
    ## 等待事件被触发
    event.wait()
    print("Event received")

## 创建一个新线程,运行目标函数
thread = threading.Thread(target=my_function)

## 启动线程
thread.start()

## 几秒后触发事件
## 目标函数中的 wait() 调用将返回并继续执行
event.set()

## 等待线程执行完毕
thread.join()

在这个示例中,我们使用 Event 类创建了一个 Event 对象。我们定义了一个函数,该函数使用 wait 方法等待事件被触发,然后打印一条消息。我们创建了一个新线程并启动它。几秒后,我们使用 set 方法触发了事件。线程接收到事件并打印一条消息。最后,我们使用 join 方法等待线程执行完毕。

然后使用以下命令运行脚本。

python event_object.py

定时器对象(Timer Object)

在 Python 中,你可以使用 threading.Timer 对象来安排一个函数在特定时间后运行。Timer 对象会创建一个新线程,该线程在指定的时间间隔后执行函数。

在 WebIDE 中创建一个名为 timer_object.py 的项目,并输入以下内容。

import threading

## 定义一个函数,该函数将在 5 秒后被 Timer 执行
def my_function():
    print("Hello from timer")

## 创建一个定时器,5 秒后运行目标函数
timer = threading.Timer(5, my_function)

## 启动定时器
timer.start()

## 等待定时器完成
timer.join()

在这个示例中,我们使用 Timer 类创建了一个 Timer 对象,并传递了一个时间延迟(以秒为单位)和一个要执行的函数。我们使用 start 方法启动定时器,并使用 join 方法等待其完成。5 秒后,函数被执行并打印一条消息。

然后使用以下命令运行脚本。

python timer_object.py

提示:定时器线程是独立运行的,因此它可能与主线程不同步。如果你的函数依赖于某些共享状态或资源,你需要适当地同步对它们的访问。此外,请记住,如果定时器线程在所有其他非守护线程完成后仍在运行,它不会阻止程序退出。

屏障对象(Barrier Object)

在 Python 中,你可以使用 threading.Barrier 对象在预定义的同步点同步多个线程。Barrier 对象提供了一种方式,让一组线程在继续执行之前等待彼此达到某个执行点。

在 WebIDE 中创建一个名为 barrier_object.py 的项目,并输入以下内容。

import threading

## 创建一个 Barrier 对象,用于 3 个线程
barrier = threading.Barrier(3)

## 定义一个函数,该函数会在屏障处等待
def my_function():
    print("Before barrier")
    ## 等待所有三个线程到达屏障
    barrier.wait()
    print("After barrier")

## 使用循环创建 3 个线程并启动它们
threads = []
for i in range(3):
    thread = threading.Thread(target=my_function)
    threads.append(thread)
    thread.start()

## 等待所有线程执行完毕
for thread in threads:
    thread.join()

在这个示例中,我们使用 Barrier 类创建了一个 Barrier 对象,并传递了需要等待的线程数量。我们使用 wait 方法定义了一个函数,该函数会在屏障处等待并打印一条消息。我们创建了三个线程并启动它们。每个线程都会在屏障处等待,因此所有线程都会等待,直到它们都到达屏障。最后,我们使用 join 方法等待所有线程执行完毕。

然后使用以下命令运行脚本。

python barrier_object.py

总结

就是这样!你现在已经学会了如何在代码中使用 Python 的 threading 模块。它可以帮助我们深入掌握并发编程的基本原理和技巧,从而更好地开发高效的并发应用程序。