Простой в использовании многопоточность

PythonBeginner
Практиковаться сейчас

Введение

В этом руководстве мы узнаем, как использовать модуль threading в Python для параллельного выполнения нескольких потоков исполнения.

Модуль threading в Python предоставляет простой способ создания и управления потоками в программе на Python. Поток - это отдельный поток исполнения внутри программы. Запуская несколько потоков параллельно, мы можем воспользоваться преимуществами многоядерных процессоров и повысить производительность наших программ.

Модуль threading предоставляет два класса для создания и управления потоками:

  1. Класс Thread: Этот класс представляет собой отдельный поток исполнения.
  2. Класс Lock: Этот класс позволяет синхронизировать доступ к общими ресурсам между потоками.

Создание потоков

Для создания нового потока в Python нам нужно создать новый экземпляр класса Thread и передать ему функцию для выполнения.

Создайте проект под названием create_thread.py в WebIDE и введите следующее содержимое.

import threading

## Определите функцию для выполнения в потоке
def my_function():
    print("Hello from thread")

## Создайте новый поток
thread = threading.Thread(target=my_function)

## Запустите поток
thread.start()

## Подождите, пока поток завершится
thread.join()

## Выведите "Done", чтобы показать, что программа завершилась
print("Done")

В этом примере определена функция my_function, которая выводит сообщение. Затем мы создаем новый экземпляр класса Thread, передав ему my_function в качестве целевой функции. Наконец, мы запускаем поток с помощью метода start и ждем его завершения с помощью метода join.

Используйте следующую команду для запуска скрипта.

python create_thread.py

Синхронизация

Если несколько потоков обращаются к одному и тому же общему ресурсу (например, переменной или файлу), мы должны синхронизировать доступ к этому ресурсу, чтобы избежать ситуаций гонки. Модуль threading в Python предоставляет класс Lock для этого целей.

Вот пример использования Lock для создания проекта под названием sync.py в WebIDE и ввода следующего содержимого.

import threading

## Создайте блокировку для защиты доступа к общему ресурсу
lock = threading.Lock()

## Общий ресурс, который будет изменяться несколькими потоками
counter = 0

## Определите функцию, которую каждый поток будет выполнять
def my_function():
    global counter

    ## Захватите блокировку перед доступом к общему ресурсу
    lock.acquire()
    try:
        ## Доступ к общему ресурсу
        counter += 1
    finally:
        ## Освободите блокировку после изменения общего ресурса
        lock.release()

## Создайте несколько потоков для доступа к общему ресурсу
threads = []

## Создайте и запустите 10 потоков, которые выполняют одну и ту же функцию
for i in range(10):
    thread = threading.Thread(target=my_function)
    threads.append(thread)
    thread.start()

## Подождите, пока все потоки завершат свое выполнение
for thread in threads:
    thread.join()

## Выведите конечное значение счетчика
print(counter) ## Output: 10

В этом примере мы создаем объект Lock и общий ресурс counter. Функция my_function обращается к общему ресурсу, захватывая блокировку с помощью метода acquire и освобождая блокировку с помощью метода release. Мы создаем несколько потоков и запускаем их, затем ждем их завершения с помощью метода join. Наконец, мы выводим конечное значение счетчика.

Используйте следующую команду для запуска скрипта.

python sync.py

Поток с аргументами

В Python вы можете передавать аргументы в потоки, либо используя параметр args при создании нового потока, либо путём наследования класса Thread и определения конструктора, который принимает аргументы. Вот примеры обоих подходов:

1; Мы создаём подкласс класса Thread и переопределяем метод run, чтобы определить поведение потока. Создайте проект под названием thread_subclass.py в WebIDE и введите следующее содержимое.

import threading

## Определите пользовательский класс Thread, который наследуется от класса Thread
class MyThread(threading.Thread):
    ## Переопределите метод run(), чтобы реализовать поведение потока
    def run(self):
        print("Hello from thread")

## Создайте экземпляр пользовательского класса потока
thread = MyThread()

## Запустите поток
thread.start()

## Подождите, пока поток завершит выполнение
thread.join()

Используйте следующую команду для запуска скрипта.

python thread_subclass.py

2; Мы создаём поток и передаём аргументы в целевую функцию с использованием параметра args. Создайте проект под названием thread_with_args.py в WebIDE и введите следующее содержимое

import threading

## Определите функцию, которая принимает параметр
def my_function(name):
    print("Hello from", name)

## Создайте новый поток с целевой функцией и аргументами
thread = threading.Thread(target=my_function, args=("Thread 1",))

## Запустите поток
thread.start()

## Подождите, пока поток завершит выполнение
thread.join()

Используйте следующую команду для запуска скрипта.

python thread_with_args.py

Пул потоков

В Python вы можете использовать пул потоков для параллельного выполнения задач с использованием заранее определённого набора потоков. Преимуществом использования пула потоков является то, что он избавляет от накладных расходов на создание и уничтожение потоков для каждой задачи, что может повысить производительность.

Модуль concurrent.futures в Python предоставляет класс ThreadPoolExecutor, который позволяет вам создать пул потоков и отправлять в него задачи. Вот пример:

Создайте проект под названием thread_pool_range.py в WebIDE и введите следующее содержимое.

import concurrent.futures

## Определите функцию, которая будет выполняться в нескольких потоках с двумя аргументами
def my_func(arg1, arg2):
    ## Определите задачи, выполняемые потоком здесь
    print(f"Hello from my thread with args {arg1} and {arg2}")

## Создайте объект ThreadPoolExecutor с максимальным количеством 5 рабочих потоков
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    ## Отправьте каждую задачу (вызов функции с её аргументами) в исполнитель для обработки в отдельном потоке
    ## Метод submit() возвращает объект Future, представляющий результат асинхронного вычисления
    for i in range(10):
        executor.submit(my_func, i, i+1)

В этом примере мы определяем функцию my_func, которая принимает два аргумента. Мы создаём ThreadPoolExecutor с максимальным количеством 5 рабочих потоков. Затем мы перебираем диапазон чисел и отправляем задачи в пул потоков с использованием метода executor.submit(). Каждая отправленная задача выполняется в одном из доступных рабочих потоков.

Советы: Объект ThreadPoolExecutor используется в качестве менеджера контекста. Это гарантирует, что все потоки будут правильно очищены, когда код внутри блока with будет завершен.

Используйте следующую команду для запуска скрипта.

python thread_pool_range.py

Метод submit() возвращает объект Future немедленно, представляющий результат отправленной задачи. Вы можете использовать метод result() объекта Future для получения возвращаемого значения задачи. Если задача вызывает исключение, вызов result() вызовет это исключение.

Вы также можете использовать метод map() класса ThreadPoolExecutor для применения одной и той же функции к коллекции элементов. Например, создайте проект под названием thread_pool_map.py в WebIDE и введите следующее содержимое.:

import concurrent.futures

## Определите функцию, которая будет выполняться в нескольких потоках
def my_func(item):
    ## Определите задачи, выполняемые потоком здесь
    print(f"Hello from my thread with arg {item}")

## Создайте список элементов, которые будут обрабатываться потоками
items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

## Создайте объект ThreadPoolExecutor с максимальным количеством 5 рабочих потоков
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    ## Отправьте каждый элемент в исполнитель для обработки в отдельном потоке
    ## Метод map() автоматически возвращает результаты в порядке
    executor.map(my_func, items)

В этом примере мы определяем функцию my_func, которая принимает один аргумент. Мы создаём список элементов и отправляем их в пул потоков с использованием метода executor.map(). Каждый элемент в списке передаётся в my_func в качестве аргумента, и каждый элемент выполняется в одном из доступных рабочих потоков.

Используйте следующую команду для запуска скрипта.

python thread_pool_map.py

Результаты, полученные из thread_pool_range.py и thread_pool_map.py, одинаковы.

Демоновые потоки

В Python демон-поток — это тип потока, который работает в фоновом режиме и не препятствует выходу программы. Когда все не-демон-потоки завершились, интерпретатор Python выходит, независимо от того, запущены ли какие-либо демон-потоки.

Создайте проект под названием daemon_thread_with_args.py в WebIDE и введите следующее содержимое.

import threading
import time

## Определите функцию, которая бесконечно выполняется и выводит сообщения с регулярным интервалом
def my_function():
    while True:
        print("Hello from thread")
        time.sleep(1)

## Создайте демон-поток, который выполняет целевую функцию
thread = threading.Thread(target=my_function, daemon=True)

## Запустите поток
thread.start()

## Основная программа продолжает выполняться и выводит сообщение
print("Main program")

## Подождите несколько секунд перед выходом из программы
time.sleep(5)

## Программа завершается, и демон-поток автоматически завершается
print("Main thread exiting...")

В этом примере мы создаём поток, который бесконечно циклит и выводит сообщение каждую секунду с использованием функции time.sleep(). Мы помечаем поток как демон с использованием параметра daemon, чтобы он завершился при автоматическом выходе из основной программы. Основная программа продолжает работать и выводит сообщение. Мы ждём несколько секунд, и программа завершается, завершая демон-поток.

Затем используйте следующую команду для запуска скрипта.

python daemon_thread_with_args.py

Конечно, мы также можем установить поток в качестве демона, вызвав метод setDaemon(True) на экземпляре потока. Например, создайте проект под названием daemon_thread_with_func.py в WebIDE и введите следующее содержимое:

import threading
import time

## Определите функцию, которая бесконечно выполняется и выводит сообщения с регулярным интервалом
def my_function():
    while True:
        print("Hello from thread")
        time.sleep(1)

## Создайте новый поток с целевой функцией
thread = threading.Thread(target=my_function)

## Установите флаг демона в True, чтобы поток работал в фоновом режиме и завершался при выходе из основной программы
thread.setDaemon(True)

## Запустите поток
thread.start()

## Основная программа продолжает работать и выводит сообщение
print("Main program")

## Подождите несколько секунд перед выходом из программы
time.sleep(5)

## Программа завершается, и демон-поток автоматически завершается
print("Main thread exiting...")

Запуск скрипта следующей командой даст тот же результат, что и предыдущий пример.

python daemon_thread_with_func.py

Объект события

В Python вы можете использовать объект threading.Event, чтобы позволить потокам ожидать наступления определённого события перед продолжением выполнения. Объект События предоставляет способ одному потоку сигнализировать о том, что событие произошло, и другим потокам можно ожидать этого сигнала.

Создайте проект под названием event_object.py в WebIDE и введите следующее содержимое.

import threading

## Создайте объект события
event = threading.Event()

## Определите функцию, которая ожидает установки события
def my_function():
    print("Waiting for event")
    ## Ожидайте установки события
    event.wait()
    print("Event received")

## Создайте новый поток с целевой функцией
thread = threading.Thread(target=my_function)

## Запустите поток
thread.start()

## Сигнализируйте событие спустя несколько секунд
## Теперь вызов wait() в целевой функции вернётся и продолжит выполнение
event.set()

## Подождите, пока поток завершит выполнение
thread.join()

В этом примере мы создаём объект Event с использованием класса Event. Мы определяем функцию, которая ожидает сигнала события с использованием метода wait и затем выводит сообщение. Мы создаём новый поток и запускаем его. Через несколько секунд мы сигнализируем событие с использованием метода set. Поток получает событие и выводит сообщение. Наконец, мы ждём завершения потока с использованием метода join.

Затем используйте следующую команду для запуска скрипта.

python event_object.py

Объект Таймера

В Python вы можете использовать объект threading.Timer, чтобы запланировать выполнение функции через определённое время. Объект Таймера создает новый поток, который ждёт указанный интервал времени перед выполнением функции.

Создайте проект под названием timer_object.py в WebIDE и введите следующее содержимое.

import threading

## Определите функцию, которая будет выполнена таймером через 5 секунд
def my_function():
    print("Hello from timer")

## Создайте таймер, который запускает целевую функцию через 5 секунд
timer = threading.Timer(5, my_function)

## Запустите таймер
timer.start()

## Подождите, пока таймер завершится
timer.join()

В этом примере мы создаём объект Timer с использованием класса Timer и передаём ему задержку времени в секундах и функцию для выполнения. Мы запускаем таймер с использованием метода start и ждём его завершения с использованием метода join. Через 5 секунд функция выполняется и выводит сообщение.

Затем используйте следующую команду для запуска скрипта.

python timer_object.py

Советы: Поток таймера работает отдельно, поэтому он может не синхронизироваться с главным потоком. Если ваша функция зависит от некоторого общего состояния или ресурсов, вам нужно будет правильно синхронизировать доступ к ним. Также помните, что поток таймера не остановит программу при выходе, если он по-прежнему работает, когда все другие не-демон-потоки завершились.

Объект Барьера

В Python вы можете использовать объект threading.Barrier, чтобы синхронизировать несколько потоков в заранее определённых точках синхронизации. Объект Барьера предоставляет способ для группы потоков ожидать друг друга, чтобы дойти до определённой точки в их выполнении, прежде чем продолжить.

Создайте проект под названием barrier_object.py в WebIDE и введите следующее содержимое.

import threading

## Создайте объект Барьера для 3 потоков
barrier = threading.Barrier(3)

## Определите функцию, которая ожидает в барьере
def my_function():
    print("Before barrier")
    ## Ожидайте, пока все три потока не достигнут барьера
    barrier.wait()
    print("After barrier")

## Создайте 3 потока с использованием цикла и запустите их
threads = []
for i in range(3):
    thread = threading.Thread(target=my_function)
    threads.append(thread)
    thread.start()

## Подождите, пока все потоки завершат выполнение
for thread in threads:
    thread.join()

В этом примере мы создаём объект Barrier с использованием класса Barrier и передаём ему количество потоков, которые нужно ожидать. С использованием метода wait мы определяем функцию, которая ожидает барьера и выводит сообщение. Мы создаём три потока и запускаем их. Каждый поток ждёт барьера, поэтому все потоки будут ждать, пока все они не достигнут барьера. Наконец, мы ждём завершения всех потоков с использованием метода join.

Затем используйте следующую команду для запуска скрипта.

python barrier_object.py

Резюме

Вот и все! Теперь вы знаете, как использовать модуль threading в Python в своем коде. Это поможет нам глубже овладеть основными принципами и методами параллельного программирования, чтобы мы могли лучше разрабатывать эффективные параллельные приложения.