介绍
掌握如何等待 Python 线程完成对于构建稳健且可靠的应用程序至关重要。在多线程程序中,适当的同步可以确保操作以正确的顺序完成,并有效地利用资源。
在这个实验(Lab)中,你将学习如何创建 Python 线程,等待它们完成,以及处理多个线程。这些技能是开发并发应用程序的基础,这些应用程序可以同时执行多个任务,同时保持适当的同步。
掌握如何等待 Python 线程完成对于构建稳健且可靠的应用程序至关重要。在多线程程序中,适当的同步可以确保操作以正确的顺序完成,并有效地利用资源。
在这个实验(Lab)中,你将学习如何创建 Python 线程,等待它们完成,以及处理多个线程。这些技能是开发并发应用程序的基础,这些应用程序可以同时执行多个任务,同时保持适当的同步。
Python 的 threading 模块提供了一种创建和管理线程的简单方法。在这一步中,你将学习如何创建一个基本线程并观察它的行为。
线程是程序中一个独立的执行流程。当你运行一个 Python 脚本时,它会从一个称为主线程的单一线程开始。通过创建额外的线程,你的程序可以并发地执行多个任务。
线程对于以下情况很有用:
让我们从创建一个简单的 Python 脚本开始,它演示了如何创建和启动一个线程。
通过单击“File”菜单,选择“New File”,然后将其保存为 /home/labex/project 目录下的 simple_thread.py,在编辑器中打开一个新文件。
将以下代码添加到文件中:
import threading
import time
def print_numbers():
"""Function that prints numbers from 1 to 5 with a delay."""
for i in range(1, 6):
print(f"Number {i} from thread")
time.sleep(1) ## Sleep for 1 second
## Create a thread that targets the print_numbers function
number_thread = threading.Thread(target=print_numbers)
## Start the thread
print("Starting the thread...")
number_thread.start()
## Main thread continues execution
print("Main thread continues to run...")
print("Main thread is doing other work...")
## Sleep for 2 seconds to demonstrate both threads running concurrently
time.sleep(2)
print("Main thread finished its work!")
按 Ctrl+S 或单击“File” > “Save”保存文件。
通过打开一个终端(如果尚未打开)并执行以下命令来运行脚本:
python3 /home/labex/project/simple_thread.py
你应该看到类似于这样的输出:
Starting the thread...
Main thread continues to run...
Main thread is doing other work...
Number 1 from thread
Number 2 from thread
Main thread finished its work!
Number 3 from thread
Number 4 from thread
Number 5 from thread
在这个例子中:
threading 和 time 模块。print_numbers(),它打印从 1 到 5 的数字,每个数字之间有 1 秒的延迟。target 参数指定要运行的函数。start() 方法启动了线程。请注意,主线程在数字线程打印完所有数字之前就完成了。这是因为线程独立运行,并且默认情况下,当主线程完成时,Python 程序将退出,即使其他线程仍在运行。
在下一步中,你将学习如何使用 join() 方法等待线程完成。
在上一步中,你创建了一个独立于主线程运行的线程。然而,在许多情况下,你需要等待一个线程完成其工作,然后才能继续程序的其余部分。这就是 join() 方法变得有用的地方。
线程对象的 join() 方法会阻塞调用线程(通常是主线程),直到调用其 join() 方法的线程终止。这在以下情况下至关重要:
让我们修改之前的例子,以演示如何使用 join() 方法等待线程完成。
在 /home/labex/project 目录中创建一个名为 join_thread.py 的新文件。
将以下代码添加到文件中:
import threading
import time
def calculate_sum(numbers):
"""Function that calculates the sum of numbers with a delay."""
print("Starting the calculation...")
time.sleep(3) ## Simulate a time-consuming calculation
result = sum(numbers)
print(f"Calculation result: {result}")
return result
## Create a list of numbers
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
## Create a thread that targets the calculate_sum function
calculation_thread = threading.Thread(target=calculate_sum, args=(numbers,))
## Start the thread
print("Main thread: Starting the calculation thread...")
calculation_thread.start()
## Do some other work in the main thread
print("Main thread: Doing some other work while waiting...")
for i in range(5):
print(f"Main thread: Working... ({i+1}/5)")
time.sleep(0.5)
## Wait for the calculation thread to complete
print("Main thread: Waiting for the calculation thread to finish...")
calculation_thread.join()
print("Main thread: Calculation thread has finished!")
## Continue with the main thread
print("Main thread: Continuing with the rest of the program...")
python3 /home/labex/project/join_thread.py
你应该看到类似于这样的输出:
Main thread: Starting the calculation thread...
Starting the calculation...
Main thread: Doing some other work while waiting...
Main thread: Working... (1/5)
Main thread: Working... (2/5)
Main thread: Working... (3/5)
Main thread: Working... (4/5)
Main thread: Working... (5/5)
Main thread: Waiting for the calculation thread to finish...
Calculation result: 55
Main thread: Calculation thread has finished!
Main thread: Continuing with the rest of the program...
在这个例子中:
calculation_thread.join()。join() 方法导致主线程等待,直到计算线程完成。当你需要确保所有线程任务在继续程序的其余部分之前完成时,这种模式非常有用。如果没有 join(),主线程可能会在工作线程完成其任务之前继续甚至退出。
有时,你可能希望等待一个线程,但不是无限期地等待。join() 方法接受一个可选的超时参数,该参数指定要等待的最大秒数。
让我们修改我们的代码来演示这一点:
在 /home/labex/project 目录中创建一个名为 join_timeout.py 的新文件。
添加以下代码:
import threading
import time
def long_running_task():
"""A function that simulates a very long-running task."""
print("Long-running task started...")
time.sleep(10) ## Simulate a 10-second task
print("Long-running task completed!")
## Create and start the thread
task_thread = threading.Thread(target=long_running_task)
task_thread.start()
## Wait for the thread to complete, but only for up to 3 seconds
print("Main thread: Waiting for up to 3 seconds...")
task_thread.join(timeout=3)
## Check if the thread is still running
if task_thread.is_alive():
print("Main thread: The task is still running, but I'm continuing anyway!")
else:
print("Main thread: The task has completed within the timeout period.")
## Continue with the main thread
print("Main thread: Continuing with other operations...")
## Let's sleep a bit to see the long-running task complete
time.sleep(8)
print("Main thread: Finished.")
python3 /home/labex/project/join_timeout.py
输出应该如下所示:
Long-running task started...
Main thread: Waiting for up to 3 seconds...
Main thread: The task is still running, but I'm continuing anyway!
Main thread: Continuing with other operations...
Long-running task completed!
Main thread: Finished.
在这个例子中,主线程等待最多 3 秒,等待任务线程完成。由于任务需要 10 秒,主线程在超时后继续,而任务线程在后台继续运行。
当你希望给线程一个完成的机会,但在经过一定时间后需要继续时,这种方法很有用。
在实际应用中,你经常需要同时使用多个线程。这一步将教你如何在 Python 中创建、管理和同步多个线程。
当处理多个类似的任务时,创建多个线程来并发处理它们是很常见的。这可以显著提高性能,尤其是在 I/O 密集型操作(如下载文件或发出网络请求)中。
让我们创建一个使用多个线程处理任务列表的示例:
在 /home/labex/project 目录中创建一个名为 multiple_threads.py 的新文件。
添加以下代码:
import threading
import time
import random
def process_task(task_id):
"""Function to process a single task."""
print(f"Starting task {task_id}...")
## Simulate variable processing time
processing_time = random.uniform(1, 3)
time.sleep(processing_time)
print(f"Task {task_id} completed in {processing_time:.2f} seconds.")
return task_id
## List of tasks to process
tasks = list(range(1, 6)) ## Tasks with IDs 1 through 5
## Create a list to store our threads
threads = []
## Create and start a thread for each task
for task_id in tasks:
thread = threading.Thread(target=process_task, args=(task_id,))
threads.append(thread)
print(f"Created thread for task {task_id}")
thread.start()
print(f"All {len(threads)} threads have been started")
## Wait for all threads to complete
for thread in threads:
thread.join()
print("All tasks have been completed!")
python3 /home/labex/project/multiple_threads.py
由于随机处理时间,输出每次都会有所不同,但应该类似于这样:
Created thread for task 1
Starting task 1...
Created thread for task 2
Starting task 2...
Created thread for task 3
Starting task 3...
Created thread for task 4
Starting task 4...
Created thread for task 5
Starting task 5...
All 5 threads have been started
Task 1 completed in 1.23 seconds.
Task 3 completed in 1.45 seconds.
Task 2 completed in 1.97 seconds.
Task 5 completed in 1.35 seconds.
Task 4 completed in 2.12 seconds.
All tasks have been completed!
在这个例子中:
process_task(),它模拟了具有随机持续时间的任务处理。join() 来等待每个线程完成。当你有一批可以并行处理的独立任务时,这种模式非常有用。
对于更高级的线程管理,Python 的 concurrent.futures 模块提供了 ThreadPoolExecutor 类。这创建了一个可重用的工作线程池,这比为每个任务创建和销毁线程更有效。
让我们使用线程池重写我们的示例:
在 /home/labex/project 目录中创建一个名为 thread_pool.py 的新文件。
添加以下代码:
import concurrent.futures
import time
import random
def process_task(task_id):
"""Function to process a single task."""
print(f"Starting task {task_id}...")
## Simulate variable processing time
processing_time = random.uniform(1, 3)
time.sleep(processing_time)
print(f"Task {task_id} completed in {processing_time:.2f} seconds.")
return f"Result of task {task_id}"
## List of tasks to process
tasks = list(range(1, 6)) ## Tasks with IDs 1 through 5
## Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
## Submit all tasks and store the Future objects
print(f"Submitting {len(tasks)} tasks to the thread pool with 3 workers...")
future_to_task = {executor.submit(process_task, task_id): task_id for task_id in tasks}
## As each task completes, get its result
for future in concurrent.futures.as_completed(future_to_task):
task_id = future_to_task[future]
try:
result = future.result()
print(f"Got result from task {task_id}: {result}")
except Exception as e:
print(f"Task {task_id} generated an exception: {e}")
print("All tasks have been processed!")
python3 /home/labex/project/thread_pool.py
由于随机处理时间,输出将再次变化,但应该类似于这样:
Submitting 5 tasks to the thread pool with 3 workers...
Starting task 1...
Starting task 2...
Starting task 3...
Task 2 completed in 1.15 seconds.
Starting task 4...
Got result from task 2: Result of task 2
Task 1 completed in 1.82 seconds.
Starting task 5...
Got result from task 1: Result of task 1
Task 3 completed in 2.25 seconds.
Got result from task 3: Result of task 3
Task 4 completed in 1.45 seconds.
Got result from task 4: Result of task 4
Task 5 completed in 1.67 seconds.
Got result from task 5: Result of task 5
All tasks have been processed!
线程池方法提供了几个优势:
在我们的示例中,我们设置了 max_workers=3,这意味着即使我们有 5 个任务,也只有 3 个线程会同时运行。当线程完成其任务时,它们会被重用于剩余的任务。
当你拥有的任务数量远远多于你希望同时运行的线程数量,或者任务正在不断生成时,线程池特别有用。
在最后一步中,你将学习线程管理的两个重要概念:设置超时和使用守护线程。这些技术让你更好地控制线程的行为以及它们与主程序的交互。
正如你在第 2 步中了解到的,join() 方法接受一个超时参数。当你希望等待线程完成,但只等待到某个特定时间点时,这很有用。
让我们创建一个更实用的例子,其中我们实现一个尝试使用超时获取数据的函数:
在 /home/labex/project 目录中创建一个名为 thread_with_timeout.py 的新文件。
添加以下代码:
import threading
import time
import random
def fetch_data(data_id):
"""Simulate fetching data that might take varying amounts of time."""
print(f"Fetching data #{data_id}...")
## Simulate different fetch times, occasionally very long
fetch_time = random.choices([1, 8], weights=[0.8, 0.2])[0]
time.sleep(fetch_time)
if fetch_time > 5: ## Simulate a slow fetch
print(f"Data #{data_id}: Fetch took too long!")
return None
else:
print(f"Data #{data_id}: Fetch completed in {fetch_time} seconds!")
return f"Data content for #{data_id}"
def fetch_with_timeout(data_id, timeout=3):
"""Fetch data with a timeout."""
result = [None] ## Using a list to store result from the thread
def target_func():
result[0] = fetch_data(data_id)
## Create and start the thread
thread = threading.Thread(target=target_func)
thread.start()
## Wait for the thread with a timeout
thread.join(timeout=timeout)
if thread.is_alive():
print(f"Data #{data_id}: Fetch timed out after {timeout} seconds!")
return "TIMEOUT"
else:
return result[0]
## Try to fetch several pieces of data
for i in range(1, 6):
print(f"\nAttempting to fetch data #{i}")
result = fetch_with_timeout(i, timeout=3)
if result == "TIMEOUT":
print(f"Main thread: Fetch for data #{i} timed out, moving on...")
elif result is None:
print(f"Main thread: Fetch for data #{i} completed but returned no data.")
else:
print(f"Main thread: Successfully fetched: {result}")
print("\nAll fetch attempts completed!")
python3 /home/labex/project/thread_with_timeout.py
输出将有所不同,但应该类似于这样:
Attempting to fetch data #1
Fetching data #1...
Data #1: Fetch completed in 1 seconds!
Main thread: Successfully fetched: Data content for #1
Attempting to fetch data #2
Fetching data #2...
Data #2: Fetch completed in 1 seconds!
Main thread: Successfully fetched: Data content for #2
Attempting to fetch data #3
Fetching data #3...
Data #3: Fetch timed out after 3 seconds!
Main thread: Fetch for data #3 timed out, moving on...
Data #3: Fetch took too long!
Attempting to fetch data #4
Fetching data #4...
Data #4: Fetch completed in 1 seconds!
Main thread: Successfully fetched: Data content for #4
Attempting to fetch data #5
Fetching data #5...
Data #5: Fetch completed in 1 seconds!
Main thread: Successfully fetched: Data content for #5
All fetch attempts completed!
这个例子演示了:
在 Python 中,守护线程是在后台运行的线程。守护线程和非守护线程之间的关键区别在于,Python 在退出之前不会等待守护线程完成。这对于执行不应阻止程序退出的后台任务的线程很有用。
让我们创建一个示例来演示守护线程:
在 /home/labex/project 目录中创建一个名为 daemon_threads.py 的新文件。
添加以下代码:
import threading
import time
def background_task(name, interval):
"""A task that runs in the background at regular intervals."""
count = 0
while True:
count += 1
print(f"{name}: Iteration {count} at {time.strftime('%H:%M:%S')}")
time.sleep(interval)
def main_task():
"""The main task that runs for a set amount of time."""
print("Main task: Starting...")
time.sleep(5)
print("Main task: Completed!")
## Create two background threads
print("Creating background monitoring threads...")
monitor1 = threading.Thread(target=background_task, args=("Monitor-1", 1), daemon=True)
monitor2 = threading.Thread(target=background_task, args=("Monitor-2", 2), daemon=True)
## Start the background threads
monitor1.start()
monitor2.start()
print("Background monitors started, now starting main task...")
## Run the main task
main_task()
print("Main task completed, program will exit without waiting for daemon threads.")
print("Daemon threads will be terminated when the program exits.")
python3 /home/labex/project/daemon_threads.py
输出应该类似于这样:
Creating background monitoring threads...
Background monitors started, now starting main task...
Main task: Starting...
Monitor-1: Iteration 1 at 14:25:10
Monitor-2: Iteration 1 at 14:25:10
Monitor-1: Iteration 2 at 14:25:11
Monitor-1: Iteration 3 at 14:25:12
Monitor-2: Iteration 2 at 14:25:12
Monitor-1: Iteration 4 at 14:25:13
Monitor-1: Iteration 5 at 14:25:14
Monitor-2: Iteration 3 at 14:25:14
Main task: Completed!
Main task completed, program will exit without waiting for daemon threads.
Daemon threads will be terminated when the program exits.
在这个例子中:
daemon=True,这将其标记为守护线程。为了更好地理解差异,让我们再创建一个示例来比较守护线程和非守护线程:
在 /home/labex/project 目录中创建一个名为 daemon_comparison.py 的新文件。
添加以下代码:
import threading
import time
def task(name, seconds, daemon=False):
"""A task that runs for a specified amount of time."""
print(f"{name} starting {'(daemon)' if daemon else '(non-daemon)'}")
time.sleep(seconds)
print(f"{name} finished after {seconds} seconds")
## Create a non-daemon thread that runs for 8 seconds
non_daemon_thread = threading.Thread(
target=task,
args=("Non-daemon thread", 8, False),
daemon=False ## This is the default, so it's not actually needed
)
## Create a daemon thread that runs for 8 seconds
daemon_thread = threading.Thread(
target=task,
args=("Daemon thread", 8, True),
daemon=True
)
## Start both threads
non_daemon_thread.start()
daemon_thread.start()
## Let the main thread run for 3 seconds
print("Main thread will run for 3 seconds...")
time.sleep(3)
## Check which threads are still running
print("\nAfter 3 seconds:")
print(f"Daemon thread is alive: {daemon_thread.is_alive()}")
print(f"Non-daemon thread is alive: {non_daemon_thread.is_alive()}")
print("\nMain thread is finishing. Here's what will happen:")
print("1. The program will wait for all non-daemon threads to complete")
print("2. Daemon threads will be terminated when the program exits")
print("\nWaiting for non-daemon threads to finish...")
## We don't need to join the non-daemon thread, Python will wait for it
## But we'll explicitly join it for clarity
non_daemon_thread.join()
print("All non-daemon threads have finished, program will exit now.")
python3 /home/labex/project/daemon_comparison.py
输出应该如下所示:
Non-daemon thread starting (non-daemon)
Daemon thread starting (daemon)
Main thread will run for 3 seconds...
After 3 seconds:
Daemon thread is alive: True
Non-daemon thread is alive: True
Main thread is finishing. Here's what will happen:
1. The program will wait for all non-daemon threads to complete
2. Daemon threads will be terminated when the program exits
Waiting for non-daemon threads to finish...
Non-daemon thread finished after 8 seconds
All non-daemon threads have finished, program will exit now.
关键观察结果:
守护线程适用于:
非守护线程适用于:
了解何时使用每种类型是设计健壮的多线程应用程序的重要组成部分。
在这个实验中,你学习了使用 Python 线程和等待它们完成的基本技术。以下是涵盖的关键概念的总结:
创建和启动线程:你学习了如何创建线程对象,指定目标函数,以及使用 start() 方法启动其执行。
使用 join() 等待线程:你了解了如何使用 join() 方法来等待线程完成,然后再继续主程序,从而确保正确的同步。
使用多个线程:你练习了手动创建和管理多个线程,以及使用 ThreadPoolExecutor 类进行更有效的线程管理。
线程超时和守护线程:你探索了高级主题,包括为线程操作设置超时以及使用守护线程进行后台任务。
这些技能为在 Python 中开发多线程应用程序奠定了基础。多线程使你的程序能够并发执行多个任务,从而提高性能和响应能力,尤其是在 I/O 密集型操作中。
当你继续使用线程时,请记住这些最佳实践:
ThreadPoolExecutor 来管理多个线程有了这些技能和实践,你现在已经能够使用多线程技术构建更高效、响应更快的 Python 应用程序。