如何等待 Python 线程完成

PythonBeginner
立即练习

介绍

掌握如何等待 Python 线程完成对于构建稳健且可靠的应用程序至关重要。在多线程程序中,适当的同步可以确保操作以正确的顺序完成,并有效地利用资源。

在这个实验(Lab)中,你将学习如何创建 Python 线程,等待它们完成,以及处理多个线程。这些技能是开发并发应用程序的基础,这些应用程序可以同时执行多个任务,同时保持适当的同步。

创建你的第一个 Python 线程

Python 的 threading 模块提供了一种创建和管理线程的简单方法。在这一步中,你将学习如何创建一个基本线程并观察它的行为。

理解线程

线程是程序中一个独立的执行流程。当你运行一个 Python 脚本时,它会从一个称为主线程的单一线程开始。通过创建额外的线程,你的程序可以并发地执行多个任务。

线程对于以下情况很有用:

  • 运行耗时的操作,而不会阻塞主程序
  • 并行处理任务以提高性能
  • 在服务器应用程序中处理多个客户端连接

创建一个简单的线程

让我们从创建一个简单的 Python 脚本开始,它演示了如何创建和启动一个线程。

  1. 通过单击“File”菜单,选择“New File”,然后将其保存为 /home/labex/project 目录下的 simple_thread.py,在编辑器中打开一个新文件。

  2. 将以下代码添加到文件中:

import threading
import time

def print_numbers():
    """Function that prints numbers from 1 to 5 with a delay."""
    for i in range(1, 6):
        print(f"Number {i} from thread")
        time.sleep(1)  ## Sleep for 1 second

## Create a thread that targets the print_numbers function
number_thread = threading.Thread(target=print_numbers)

## Start the thread
print("Starting the thread...")
number_thread.start()

## Main thread continues execution
print("Main thread continues to run...")
print("Main thread is doing other work...")

## Sleep for 2 seconds to demonstrate both threads running concurrently
time.sleep(2)
print("Main thread finished its work!")
  1. Ctrl+S 或单击“File” > “Save”保存文件。

  2. 通过打开一个终端(如果尚未打开)并执行以下命令来运行脚本:

python3 /home/labex/project/simple_thread.py

你应该看到类似于这样的输出:

Starting the thread...
Main thread continues to run...
Main thread is doing other work...
Number 1 from thread
Number 2 from thread
Main thread finished its work!
Number 3 from thread
Number 4 from thread
Number 5 from thread

分析发生了什么

在这个例子中:

  1. 我们导入了 threadingtime 模块。
  2. 我们定义了一个函数 print_numbers(),它打印从 1 到 5 的数字,每个数字之间有 1 秒的延迟。
  3. 我们创建了一个线程对象,使用 target 参数指定要运行的函数。
  4. 我们使用 start() 方法启动了线程。
  5. 主线程继续执行,打印消息并休眠 2 秒。
  6. 主线程和我们的数字线程同时运行,这就是输出交错的原因。

请注意,主线程在数字线程打印完所有数字之前就完成了。这是因为线程独立运行,并且默认情况下,当主线程完成时,Python 程序将退出,即使其他线程仍在运行。

在下一步中,你将学习如何使用 join() 方法等待线程完成。

使用 join() 等待线程完成

在上一步中,你创建了一个独立于主线程运行的线程。然而,在许多情况下,你需要等待一个线程完成其工作,然后才能继续程序的其余部分。这就是 join() 方法变得有用的地方。

理解 join() 方法

线程对象的 join() 方法会阻塞调用线程(通常是主线程),直到调用其 join() 方法的线程终止。这在以下情况下至关重要:

  • 主线程需要来自工作线程的结果
  • 你需要确保所有线程在退出程序之前完成
  • 操作顺序对你的应用程序逻辑很重要

创建一个线程并等待其完成

让我们修改之前的例子,以演示如何使用 join() 方法等待线程完成。

  1. /home/labex/project 目录中创建一个名为 join_thread.py 的新文件。

  2. 将以下代码添加到文件中:

import threading
import time

def calculate_sum(numbers):
    """Function that calculates the sum of numbers with a delay."""
    print("Starting the calculation...")
    time.sleep(3)  ## Simulate a time-consuming calculation
    result = sum(numbers)
    print(f"Calculation result: {result}")
    return result

## Create a list of numbers
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

## Create a thread that targets the calculate_sum function
calculation_thread = threading.Thread(target=calculate_sum, args=(numbers,))

## Start the thread
print("Main thread: Starting the calculation thread...")
calculation_thread.start()

## Do some other work in the main thread
print("Main thread: Doing some other work while waiting...")
for i in range(5):
    print(f"Main thread: Working... ({i+1}/5)")
    time.sleep(0.5)

## Wait for the calculation thread to complete
print("Main thread: Waiting for the calculation thread to finish...")
calculation_thread.join()
print("Main thread: Calculation thread has finished!")

## Continue with the main thread
print("Main thread: Continuing with the rest of the program...")
  1. 保存文件并使用以下命令运行它:
python3 /home/labex/project/join_thread.py

你应该看到类似于这样的输出:

Main thread: Starting the calculation thread...
Starting the calculation...
Main thread: Doing some other work while waiting...
Main thread: Working... (1/5)
Main thread: Working... (2/5)
Main thread: Working... (3/5)
Main thread: Working... (4/5)
Main thread: Working... (5/5)
Main thread: Waiting for the calculation thread to finish...
Calculation result: 55
Main thread: Calculation thread has finished!
Main thread: Continuing with the rest of the program...

join() 的重要性

在这个例子中:

  1. 我们创建了一个执行计算(求和数字)的线程。
  2. 主线程同时做了一些其他工作。
  3. 当主线程需要确保计算完成时,它调用了 calculation_thread.join()
  4. join() 方法导致主线程等待,直到计算线程完成。
  5. 在计算线程完成后,主线程继续执行。

当你需要确保所有线程任务在继续程序的其余部分之前完成时,这种模式非常有用。如果没有 join(),主线程可能会在工作线程完成其任务之前继续甚至退出。

使用带有超时的 join()

有时,你可能希望等待一个线程,但不是无限期地等待。join() 方法接受一个可选的超时参数,该参数指定要等待的最大秒数。

让我们修改我们的代码来演示这一点:

  1. /home/labex/project 目录中创建一个名为 join_timeout.py 的新文件。

  2. 添加以下代码:

import threading
import time

def long_running_task():
    """A function that simulates a very long-running task."""
    print("Long-running task started...")
    time.sleep(10)  ## Simulate a 10-second task
    print("Long-running task completed!")

## Create and start the thread
task_thread = threading.Thread(target=long_running_task)
task_thread.start()

## Wait for the thread to complete, but only for up to 3 seconds
print("Main thread: Waiting for up to 3 seconds...")
task_thread.join(timeout=3)

## Check if the thread is still running
if task_thread.is_alive():
    print("Main thread: The task is still running, but I'm continuing anyway!")
else:
    print("Main thread: The task has completed within the timeout period.")

## Continue with the main thread
print("Main thread: Continuing with other operations...")
## Let's sleep a bit to see the long-running task complete
time.sleep(8)
print("Main thread: Finished.")
  1. 保存文件并运行它:
python3 /home/labex/project/join_timeout.py

输出应该如下所示:

Long-running task started...
Main thread: Waiting for up to 3 seconds...
Main thread: The task is still running, but I'm continuing anyway!
Main thread: Continuing with other operations...
Long-running task completed!
Main thread: Finished.

在这个例子中,主线程等待最多 3 秒,等待任务线程完成。由于任务需要 10 秒,主线程在超时后继续,而任务线程在后台继续运行。

当你希望给线程一个完成的机会,但在经过一定时间后需要继续时,这种方法很有用。

使用多个线程

在实际应用中,你经常需要同时使用多个线程。这一步将教你如何在 Python 中创建、管理和同步多个线程。

创建多个线程

当处理多个类似的任务时,创建多个线程来并发处理它们是很常见的。这可以显著提高性能,尤其是在 I/O 密集型操作(如下载文件或发出网络请求)中。

让我们创建一个使用多个线程处理任务列表的示例:

  1. /home/labex/project 目录中创建一个名为 multiple_threads.py 的新文件。

  2. 添加以下代码:

import threading
import time
import random

def process_task(task_id):
    """Function to process a single task."""
    print(f"Starting task {task_id}...")
    ## Simulate variable processing time
    processing_time = random.uniform(1, 3)
    time.sleep(processing_time)
    print(f"Task {task_id} completed in {processing_time:.2f} seconds.")
    return task_id

## List of tasks to process
tasks = list(range(1, 6))  ## Tasks with IDs 1 through 5

## Create a list to store our threads
threads = []

## Create and start a thread for each task
for task_id in tasks:
    thread = threading.Thread(target=process_task, args=(task_id,))
    threads.append(thread)
    print(f"Created thread for task {task_id}")
    thread.start()

print(f"All {len(threads)} threads have been started")

## Wait for all threads to complete
for thread in threads:
    thread.join()

print("All tasks have been completed!")
  1. 保存文件并运行它:
python3 /home/labex/project/multiple_threads.py

由于随机处理时间,输出每次都会有所不同,但应该类似于这样:

Created thread for task 1
Starting task 1...
Created thread for task 2
Starting task 2...
Created thread for task 3
Starting task 3...
Created thread for task 4
Starting task 4...
Created thread for task 5
Starting task 5...
All 5 threads have been started
Task 1 completed in 1.23 seconds.
Task 3 completed in 1.45 seconds.
Task 2 completed in 1.97 seconds.
Task 5 completed in 1.35 seconds.
Task 4 completed in 2.12 seconds.
All tasks have been completed!

理解执行流程

在这个例子中:

  1. 我们定义了一个函数 process_task(),它模拟了具有随机持续时间的任务处理。
  2. 我们创建了一个任务 ID 列表(1 到 5)。
  3. 对于每个任务,我们创建了一个线程,将其存储在一个列表中,并启动它。
  4. 在启动所有线程后,我们使用第二个循环和 join() 来等待每个线程完成。
  5. 只有在所有线程完成后,我们才打印最终消息。

当你有一批可以并行处理的独立任务时,这种模式非常有用。

线程池执行器

对于更高级的线程管理,Python 的 concurrent.futures 模块提供了 ThreadPoolExecutor 类。这创建了一个可重用的工作线程池,这比为每个任务创建和销毁线程更有效。

让我们使用线程池重写我们的示例:

  1. /home/labex/project 目录中创建一个名为 thread_pool.py 的新文件。

  2. 添加以下代码:

import concurrent.futures
import time
import random

def process_task(task_id):
    """Function to process a single task."""
    print(f"Starting task {task_id}...")
    ## Simulate variable processing time
    processing_time = random.uniform(1, 3)
    time.sleep(processing_time)
    print(f"Task {task_id} completed in {processing_time:.2f} seconds.")
    return f"Result of task {task_id}"

## List of tasks to process
tasks = list(range(1, 6))  ## Tasks with IDs 1 through 5

## Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    ## Submit all tasks and store the Future objects
    print(f"Submitting {len(tasks)} tasks to the thread pool with 3 workers...")
    future_to_task = {executor.submit(process_task, task_id): task_id for task_id in tasks}

    ## As each task completes, get its result
    for future in concurrent.futures.as_completed(future_to_task):
        task_id = future_to_task[future]
        try:
            result = future.result()
            print(f"Got result from task {task_id}: {result}")
        except Exception as e:
            print(f"Task {task_id} generated an exception: {e}")

print("All tasks have been processed!")
  1. 保存文件并运行它:
python3 /home/labex/project/thread_pool.py

由于随机处理时间,输出将再次变化,但应该类似于这样:

Submitting 5 tasks to the thread pool with 3 workers...
Starting task 1...
Starting task 2...
Starting task 3...
Task 2 completed in 1.15 seconds.
Starting task 4...
Got result from task 2: Result of task 2
Task 1 completed in 1.82 seconds.
Starting task 5...
Got result from task 1: Result of task 1
Task 3 completed in 2.25 seconds.
Got result from task 3: Result of task 3
Task 4 completed in 1.45 seconds.
Got result from task 4: Result of task 4
Task 5 completed in 1.67 seconds.
Got result from task 5: Result of task 5
All tasks have been processed!

线程池的优势

线程池方法提供了几个优势:

  1. 资源管理:它限制了可以同时运行的线程数量,从而防止系统资源耗尽。
  2. 任务调度:它自动处理任务的调度,在线程可用时启动新任务。
  3. 结果收集:它提供了从已完成任务中收集结果的便捷方法。
  4. 异常处理:它使处理线程中的异常更加直接。

在我们的示例中,我们设置了 max_workers=3,这意味着即使我们有 5 个任务,也只有 3 个线程会同时运行。当线程完成其任务时,它们会被重用于剩余的任务。

当你拥有的任务数量远远多于你希望同时运行的线程数量,或者任务正在不断生成时,线程池特别有用。

线程超时和守护线程

在最后一步中,你将学习线程管理的两个重要概念:设置超时和使用守护线程。这些技术让你更好地控制线程的行为以及它们与主程序的交互。

使用线程超时

正如你在第 2 步中了解到的,join() 方法接受一个超时参数。当你希望等待线程完成,但只等待到某个特定时间点时,这很有用。

让我们创建一个更实用的例子,其中我们实现一个尝试使用超时获取数据的函数:

  1. /home/labex/project 目录中创建一个名为 thread_with_timeout.py 的新文件。

  2. 添加以下代码:

import threading
import time
import random

def fetch_data(data_id):
    """Simulate fetching data that might take varying amounts of time."""
    print(f"Fetching data #{data_id}...")

    ## Simulate different fetch times, occasionally very long
    fetch_time = random.choices([1, 8], weights=[0.8, 0.2])[0]
    time.sleep(fetch_time)

    if fetch_time > 5:  ## Simulate a slow fetch
        print(f"Data #{data_id}: Fetch took too long!")
        return None
    else:
        print(f"Data #{data_id}: Fetch completed in {fetch_time} seconds!")
        return f"Data content for #{data_id}"

def fetch_with_timeout(data_id, timeout=3):
    """Fetch data with a timeout."""
    result = [None]  ## Using a list to store result from the thread

    def target_func():
        result[0] = fetch_data(data_id)

    ## Create and start the thread
    thread = threading.Thread(target=target_func)
    thread.start()

    ## Wait for the thread with a timeout
    thread.join(timeout=timeout)

    if thread.is_alive():
        print(f"Data #{data_id}: Fetch timed out after {timeout} seconds!")
        return "TIMEOUT"
    else:
        return result[0]

## Try to fetch several pieces of data
for i in range(1, 6):
    print(f"\nAttempting to fetch data #{i}")
    result = fetch_with_timeout(i, timeout=3)
    if result == "TIMEOUT":
        print(f"Main thread: Fetch for data #{i} timed out, moving on...")
    elif result is None:
        print(f"Main thread: Fetch for data #{i} completed but returned no data.")
    else:
        print(f"Main thread: Successfully fetched: {result}")

print("\nAll fetch attempts completed!")
  1. 保存文件并运行它:
python3 /home/labex/project/thread_with_timeout.py

输出将有所不同,但应该类似于这样:

Attempting to fetch data #1
Fetching data #1...
Data #1: Fetch completed in 1 seconds!
Main thread: Successfully fetched: Data content for #1

Attempting to fetch data #2
Fetching data #2...
Data #2: Fetch completed in 1 seconds!
Main thread: Successfully fetched: Data content for #2

Attempting to fetch data #3
Fetching data #3...
Data #3: Fetch timed out after 3 seconds!
Main thread: Fetch for data #3 timed out, moving on...
Data #3: Fetch took too long!

Attempting to fetch data #4
Fetching data #4...
Data #4: Fetch completed in 1 seconds!
Main thread: Successfully fetched: Data content for #4

Attempting to fetch data #5
Fetching data #5...
Data #5: Fetch completed in 1 seconds!
Main thread: Successfully fetched: Data content for #5

All fetch attempts completed!

这个例子演示了:

  1. 一个尝试获取数据并且可能很慢的函数
  2. 一个使用带超时的线程的包装函数
  3. 如何优雅地处理超时并继续其他操作

理解守护线程

在 Python 中,守护线程是在后台运行的线程。守护线程和非守护线程之间的关键区别在于,Python 在退出之前不会等待守护线程完成。这对于执行不应阻止程序退出的后台任务的线程很有用。

让我们创建一个示例来演示守护线程:

  1. /home/labex/project 目录中创建一个名为 daemon_threads.py 的新文件。

  2. 添加以下代码:

import threading
import time

def background_task(name, interval):
    """A task that runs in the background at regular intervals."""
    count = 0
    while True:
        count += 1
        print(f"{name}: Iteration {count} at {time.strftime('%H:%M:%S')}")
        time.sleep(interval)

def main_task():
    """The main task that runs for a set amount of time."""
    print("Main task: Starting...")
    time.sleep(5)
    print("Main task: Completed!")

## Create two background threads
print("Creating background monitoring threads...")
monitor1 = threading.Thread(target=background_task, args=("Monitor-1", 1), daemon=True)
monitor2 = threading.Thread(target=background_task, args=("Monitor-2", 2), daemon=True)

## Start the background threads
monitor1.start()
monitor2.start()

print("Background monitors started, now starting main task...")

## Run the main task
main_task()

print("Main task completed, program will exit without waiting for daemon threads.")
print("Daemon threads will be terminated when the program exits.")
  1. 保存文件并运行它:
python3 /home/labex/project/daemon_threads.py

输出应该类似于这样:

Creating background monitoring threads...
Background monitors started, now starting main task...
Main task: Starting...
Monitor-1: Iteration 1 at 14:25:10
Monitor-2: Iteration 1 at 14:25:10
Monitor-1: Iteration 2 at 14:25:11
Monitor-1: Iteration 3 at 14:25:12
Monitor-2: Iteration 2 at 14:25:12
Monitor-1: Iteration 4 at 14:25:13
Monitor-1: Iteration 5 at 14:25:14
Monitor-2: Iteration 3 at 14:25:14
Main task: Completed!
Main task completed, program will exit without waiting for daemon threads.
Daemon threads will be terminated when the program exits.

在这个例子中:

  1. 我们创建了两个守护线程,它们持续运行,并定期打印消息。
  2. 我们在创建线程时设置了 daemon=True,这将其标记为守护线程。
  3. 主线程运行 5 秒钟然后退出。
  4. 当主线程退出时,程序终止,并且守护线程也会自动终止。

非守护线程与守护线程

为了更好地理解差异,让我们再创建一个示例来比较守护线程和非守护线程:

  1. /home/labex/project 目录中创建一个名为 daemon_comparison.py 的新文件。

  2. 添加以下代码:

import threading
import time

def task(name, seconds, daemon=False):
    """A task that runs for a specified amount of time."""
    print(f"{name} starting {'(daemon)' if daemon else '(non-daemon)'}")
    time.sleep(seconds)
    print(f"{name} finished after {seconds} seconds")

## Create a non-daemon thread that runs for 8 seconds
non_daemon_thread = threading.Thread(
    target=task,
    args=("Non-daemon thread", 8, False),
    daemon=False  ## This is the default, so it's not actually needed
)

## Create a daemon thread that runs for 8 seconds
daemon_thread = threading.Thread(
    target=task,
    args=("Daemon thread", 8, True),
    daemon=True
)

## Start both threads
non_daemon_thread.start()
daemon_thread.start()

## Let the main thread run for 3 seconds
print("Main thread will run for 3 seconds...")
time.sleep(3)

## Check which threads are still running
print("\nAfter 3 seconds:")
print(f"Daemon thread is alive: {daemon_thread.is_alive()}")
print(f"Non-daemon thread is alive: {non_daemon_thread.is_alive()}")

print("\nMain thread is finishing. Here's what will happen:")
print("1. The program will wait for all non-daemon threads to complete")
print("2. Daemon threads will be terminated when the program exits")

print("\nWaiting for non-daemon threads to finish...")
## We don't need to join the non-daemon thread, Python will wait for it
## But we'll explicitly join it for clarity
non_daemon_thread.join()
print("All non-daemon threads have finished, program will exit now.")
  1. 保存文件并运行它:
python3 /home/labex/project/daemon_comparison.py

输出应该如下所示:

Non-daemon thread starting (non-daemon)
Daemon thread starting (daemon)
Main thread will run for 3 seconds...

After 3 seconds:
Daemon thread is alive: True
Non-daemon thread is alive: True

Main thread is finishing. Here's what will happen:
1. The program will wait for all non-daemon threads to complete
2. Daemon threads will be terminated when the program exits

Waiting for non-daemon threads to finish...
Non-daemon thread finished after 8 seconds
All non-daemon threads have finished, program will exit now.

关键观察结果:

  1. 两个线程都启动并并发运行。
  2. 3 秒后,两个线程仍在运行。
  3. 程序等待非守护线程完成(8 秒后)。
  4. 当程序退出时,守护线程仍在运行,但它被终止。
  5. 守护线程永远不会打印其完成消息,因为它在程序退出时被终止。

何时使用守护线程

守护线程适用于:

  • 后台监控任务
  • 清理操作
  • 应该在程序持续时间内运行但不会阻止其退出的服务
  • 定时器线程,它们定期触发事件

非守护线程适用于:

  • 必须完成的关键操作
  • 不应中断的任务
  • 必须在程序退出之前干净地完成的操作

了解何时使用每种类型是设计健壮的多线程应用程序的重要组成部分。

总结

在这个实验中,你学习了使用 Python 线程和等待它们完成的基本技术。以下是涵盖的关键概念的总结:

  1. 创建和启动线程:你学习了如何创建线程对象,指定目标函数,以及使用 start() 方法启动其执行。

  2. 使用 join() 等待线程:你了解了如何使用 join() 方法来等待线程完成,然后再继续主程序,从而确保正确的同步。

  3. 使用多个线程:你练习了手动创建和管理多个线程,以及使用 ThreadPoolExecutor 类进行更有效的线程管理。

  4. 线程超时和守护线程:你探索了高级主题,包括为线程操作设置超时以及使用守护线程进行后台任务。

这些技能为在 Python 中开发多线程应用程序奠定了基础。多线程使你的程序能够并发执行多个任务,从而提高性能和响应能力,尤其是在 I/O 密集型操作中。

当你继续使用线程时,请记住这些最佳实践:

  • 将线程用于 I/O 密集型任务,而不是 CPU 密集型任务(考虑对后者使用多处理)
  • 注意共享资源并使用适当的同步机制
  • 考虑使用更高级别的抽象,例如 ThreadPoolExecutor 来管理多个线程
  • 将守护线程用于不应阻止程序退出的后台任务

有了这些技能和实践,你现在已经能够使用多线程技术构建更高效、响应更快的 Python 应用程序。