简介
本全面教程探讨了 Python 中的并发任务实现,为开发者提供了增强应用程序性能和效率的关键技术。通过理解并发基础并利用 Python 强大的并发编程工具,读者将学习如何编写可扩展且响应迅速的代码,以有效地同时管理多个任务。
本全面教程探讨了 Python 中的并发任务实现,为开发者提供了增强应用程序性能和效率的关键技术。通过理解并发基础并利用 Python 强大的并发编程工具,读者将学习如何编写可扩展且响应迅速的代码,以有效地同时管理多个任务。
并发是一种编程范式,它允许多个任务同时取得进展。在 Python 中,并发使开发者能够通过并发执行多个操作而非顺序执行,来编写更高效且响应迅速的应用程序。
| 类型 | 描述 | 特点 |
|---|---|---|
| 并行 | 同时执行 | 多个任务同时运行 |
| 并发 | 在多个任务上取得进展 | 任务可以在重叠的时间段内开始、运行并完成 |
当多个任务同时访问共享资源时,会导致不可预测的结果。
两个或多个任务由于彼此等待对方释放资源而无法继续进行的情况。
import concurrent.futures
import time
def worker(task_id):
print(f"任务 {task_id} 开始")
time.sleep(2)
print(f"任务 {task_id} 完成")
return task_id
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(worker, i) for i in range(5)]
for future in concurrent.futures.as_completed(futures):
result = future.result()
print(f"结果: {result}")
if __name__ == "__main__":
start_time = time.time()
main()
print(f"总执行时间: {time.time() - start_time:.2f} 秒")
在 LabEx,我们建议在深入学习高级并发编程技术之前,先理解这些基本概念。
Python 提供了多种用于实现并发编程的工具,每个工具都有其独特的特性和用例。
import threading
import time
def worker(thread_id):
print(f"线程 {thread_id} 已启动")
time.sleep(2)
print(f"线程 {thread_id} 已完成")
def main():
threads = []
for i in range(3):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
import multiprocessing
import time
def worker(process_id):
print(f"进程 {process_id} 已启动")
time.sleep(2)
print(f"进程 {process_id} 已完成")
def main():
processes = []
for i in range(3):
process = multiprocessing.Process(target=worker, args=(i,))
processes.append(process)
process.start()
for process in processes:
process.join()
if __name__ == "__main__":
main()
import asyncio
async def worker(task_id):
print(f"任务 {task_id} 已启动")
await asyncio.sleep(2)
print(f"任务 {task_id} 已完成")
async def main():
tasks = [worker(i) for i in range(3)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
from concurrent.futures import ThreadPoolExecutor, as_completed
def worker(task_id):
print(f"任务 {task_id} 正在处理")
return task_id * task_id
def main():
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(worker, i) for i in range(5)]
for future in as_completed(futures):
result = future.result()
print(f"结果: {result}")
if __name__ == "__main__":
main()
| 工具 | 用例 | 优点 | 缺点 |
|---|---|---|---|
| 线程模块 | I/O 密集型任务 | 轻量级 | GIL 限制 |
| 多进程模块 | CPU 密集型任务 | 真正的并行 | 更高的内存开销 |
| 异步 I/O 模块 | 网络 I/O | 高效、非阻塞 | 复杂的编程模型 |
| 并发执行模块 | 简单的并行执行 | 易于使用 | 灵活性有限 |
在 LabEx,我们建议了解每个工具的优缺点,以便做出明智的设计决策。
import queue
import threading
import time
import random
class ProducerConsumer:
def __init__(self, queue_size=10):
self.task_queue = queue.Queue(maxsize=queue_size)
self.producers_done = False
def producer(self, producer_id):
for i in range(5):
item = random.randint(1, 100)
self.task_queue.put(item)
print(f"生产者 {producer_id} 生产了: {item}")
time.sleep(random.random())
print(f"生产者 {producer_id} 完成")
def consumer(self, consumer_id):
while not (self.producers_done and self.task_queue.empty()):
try:
item = self.task_queue.get(timeout=2)
print(f"消费者 {consumer_id} 消费了: {item}")
self.task_queue.task_done()
time.sleep(random.random())
except queue.Empty:
break
def run(self):
producers = [threading.Thread(target=self.producer, args=(i,))
for i in range(3)]
consumers = [threading.Thread(target=self.consumer, args=(i,))
for i in range(2)]
for p in producers:
p.start()
for c in consumers:
c.start()
for p in producers:
p.join()
self.producers_done = True
for c in consumers:
c.join()
if __name__ == "__main__":
pc = ProducerConsumer()
pc.run()
from concurrent.futures import ThreadPoolExecutor
import time
def task_executor(task_id):
print(f"正在执行任务 {task_id}")
time.sleep(1)
return f"任务 {task_id} 完成"
def thread_pool_example():
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task_executor, i) for i in range(10)]
for future in futures:
print(future.result())
if __name__ == "__main__":
thread_pool_example()
import threading
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock:
current_value = self.value
time.sleep(0.1) ## 模拟复杂操作
self.value = current_value + 1
def worker(counter, n):
for _ in range(n):
counter.increment()
def mutex_example():
counter = Counter()
threads = [threading.Thread(target=worker, args=(counter, 100))
for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终计数器值: {counter.value}")
if __name__ == "__main__":
mutex_example()
import threading
import time
class LimitedResourcePool:
def __init__(self, max_connections=3):
self.semaphore = threading.Semaphore(max_connections)
def access_resource(self, thread_id):
with self.semaphore:
print(f"线程 {thread_id} 正在访问资源")
time.sleep(2)
print(f"线程 {thread_id} 正在释放资源")
def semaphore_example():
resource_pool = LimitedResourcePool()
threads = [threading.Thread(target=resource_pool.access_resource,
args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
if __name__ == "__main__":
semaphore_example()
| 模式 | 用例 | 优点 | 缺点 |
|---|---|---|---|
| 生产者 - 消费者 | 任务分发 | 解耦生产和消费 | 需要仔细同步 |
| 线程池 | 并行任务执行 | 限制线程创建开销 | 固定线程数 |
| 互斥锁/锁 | 共享资源保护 | 防止竞态条件 | 可能导致性能瓶颈 |
| 信号量 | 资源限制 | 控制并发访问 | 存在死锁可能性 |
在 LabEx,我们建议通过实践这些模式来开发健壮的并发应用程序。
在本教程中,我们研究了 Python 的并发编程领域,涵盖了用于实现并行任务的关键工具、模式和策略。通过掌握这些技术,开发者可以创建更具响应性、高效的应用程序,从而在各种编程场景中最大限度地利用计算资源并提高整体系统性能。