简介
本全面教程深入探讨了Python中管理多线程执行的复杂性。该指南专为寻求提高并发编程技能的开发者设计,涵盖了基本的线程概念、同步技术和实际实现策略,以帮助你编写更高效、响应更快的Python应用程序。
本全面教程深入探讨了Python中管理多线程执行的复杂性。该指南专为寻求提高并发编程技能的开发者设计,涵盖了基本的线程概念、同步技术和实际实现策略,以帮助你编写更高效、响应更快的Python应用程序。
线程是一种编程技术,它允许程序的多个部分在单个进程中并发运行。在Python中,threading 模块提供了一种创建和管理线程的方法,从而实现代码的并行执行。
| 线程类型 | 描述 | 使用场景 |
|---|---|---|
| 守护线程 | 不阻止程序退出的后台线程 | 持续的后台任务 |
| 非守护线程 | 使程序保持运行的线程 | 关键操作 |
以下是在Python中创建和运行线程的简单示例:
import threading
import time
def worker(thread_id):
print(f"线程 {thread_id} 开始")
time.sleep(2)
print(f"线程 {thread_id} 结束")
## 创建多个线程
threads = []
for i in range(3):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
## 等待所有线程完成
for thread in threads:
thread.join()
print("所有线程已完成")
start():开始线程执行join():等待线程完成is_alive():检查线程是否正在运行在使用线程时,请注意:
线程最适合用于:
在LabEx,我们建议在深入复杂的并发编程场景之前,先理解线程基础。
当多个线程同时访问共享资源时,线程同步可防止竞态条件并确保数据完整性。
import threading
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
self.value += 1
def worker(counter, iterations):
for _ in range(iterations):
counter.increment()
## 锁使用示例
counter = Counter()
threads = []
for _ in range(5):
thread = threading.Thread(target=worker, args=(counter, 1000))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"最终计数器值: {counter.value}")
import threading
class RecursiveCounter:
def __init__(self):
self.value = 0
self.lock = threading.RLock()
def increment(self, depth=0):
with self.lock:
self.value += 1
if depth < 3:
self.increment(depth + 1)
| 原语 | 描述 | 使用场景 |
|---|---|---|
| 锁 | 基本的互斥 | 简单的临界区 |
| 可重入锁 | 可重入的锁 | 递归方法同步 |
| 信号量 | 限制并发访问 | 资源池 |
| 事件 | 线程间的信号传递 | 协调 |
| 条件变量 | 高级等待机制 | 复杂的同步 |
import threading
import queue
import time
class ThreadSafeQueue:
def __init__(self, max_size=10):
self.queue = queue.Queue(maxsize=max_size)
self.condition = threading.Condition()
def produce(self, item):
with self.condition:
while self.queue.full():
print("队列已满,等待中...")
self.condition.wait()
self.queue.put(item)
print(f"已生产: {item}")
self.condition.notify()
def consume(self):
with self.condition:
while self.queue.empty():
print("队列已空,等待中...")
self.condition.wait()
item = self.queue.get()
print(f"已消费: {item}")
self.condition.notify()
在LabEx,我们强调理解同步以构建健壮的多线程应用程序。
import threading
import requests
from queue import Queue
def fetch_url(url_queue, results):
while not url_queue.empty():
url = url_queue.get()
try:
response = requests.get(url, timeout=5)
results[url] = response.status_code
except Exception as e:
results[url] = str(e)
finally:
url_queue.task_done()
def parallel_web_scraping(urls, max_threads=5):
url_queue = Queue()
for url in urls:
url_queue.put(url)
results = {}
threads = []
for _ in range(min(max_threads, len(urls))):
thread = threading.Thread(target=fetch_url, args=(url_queue, results))
thread.start()
threads.append(thread)
url_queue.join()
for thread in threads:
thread.join()
return results
import threading
import time
import queue
class BackgroundTaskProcessor:
def __init__(self, num_workers=3):
self.task_queue = queue.Queue()
self.workers = []
self.stop_event = threading.Event()
for _ in range(num_workers):
worker = threading.Thread(target=self._worker)
worker.start()
self.workers.append(worker)
def _worker(self):
while not self.stop_event.is_set():
try:
task = self.task_queue.get(timeout=1)
task()
self.task_queue.task_done()
except queue.Empty:
continue
def add_task(self, task):
self.task_queue.put(task)
def shutdown(self):
self.stop_event.set()
for worker in self.workers:
worker.join()
| 模式 | 描述 | 使用场景 |
|---|---|---|
| 生产者 - 消费者模式 | 分离任务生成和处理 | 消息队列、工作分配 |
| 线程池 | 重用固定数量的线程 | 并发I/O操作 |
| 并行处理 | 分配计算任务 | 数据处理、科学计算 |
import threading
import time
import psutil
class ThreadPerformanceMonitor:
def __init__(self):
self.threads = []
self.performance_data = {}
def start_monitoring(self, thread):
thread_id = thread.ident
self.performance_data[thread_id] = {
'start_time': time.time(),
'cpu_usage': [],
'memory_usage': []
}
def monitor(self, thread):
thread_id = thread.ident
if thread_id in self.performance_data:
process = psutil.Process()
self.performance_data[thread_id]['cpu_usage'].append(
process.cpu_percent()
)
self.performance_data[thread_id]['memory_usage'].append(
process.memory_info().rss / (1024 * 1024)
)
import threading
import time
class CoordinatedTask:
def __init__(self):
self.ready_event = threading.Event()
self.complete_event = threading.Event()
def prepare_task(self):
print("准备任务")
time.sleep(2)
self.ready_event.set()
def execute_task(self):
self.ready_event.wait()
print("执行任务")
time.sleep(3)
self.complete_event.set()
在LabEx,我们建议:
通过掌握Python中的线程管理,开发者可以创建出响应更快、效率更高且能有效利用系统资源的应用程序。本教程为理解线程基础、实现同步机制以及应用实际的多线程技术来解决复杂编程挑战提供了坚实的基础。