Быстрый в изучении - мощный в программировании
Скрипт ИНВЕСТОР на Python

Попробуйте себя в качестве инвестора имея в помощники мощный алгоритм советника на Python...

Все уроки по PyQt5

PyQt5 реализован как комплект Python-модулей. Он включает в себя около 620 классов и 6000 функций и методов...

Скрипт отправки SMS через Python

Была задача отправить SMS-ки большому списку номеров телефона с уточнением цены за всю рассылку "До" ее отправки...

Модуль threading на примерах

Модуль threading впервые был представлен в Python 1.5.2 как продолжение низкоуровневого модуля потоков. Модуль threading значительно упрощает работу с потоками и позволяет программировать запуск нескольких операций одновременно. Обратите внимание на то, что потоки в Python лучше всего работают с операциями I/O, такими как загрузка ресурсов из интернета или чтение файлов и папок на вашем компьютере.

Если вам нужно сделать что-то, для чего нужен интенсивный CPU, тогда вам, возможно, захочется взглянуть на модуль multiprocessing, вместо threading. Причина заключается в том, что Python содержит Global Interpreter Lock (GIL), который запускает все потоки внутри главного потока. По этой причине, когда вам нужно запустить несколько интенсивных операций с потоками, вы заметите, что все работает достаточно медленно. Так что мы сфокусируемся на том, в чем потоки являются лучшими: операции I/O.

Небольшое интро

Поток позволяет вам запустить часть длинного кода так, как если бы он был отдельной программой. Это своего рода вызов наследуемого процесса, за исключением того, что вы вызываете функцию или класс, вместо отдельной программы. Я всегда находил конкретные примеры крайне полезными. Давайте взглянем на нечто совершенно простое:

import threading
 
def doubler(number):
    """
    A function that can be used by a thread
    """
    print(threading.currentThread().getName() + '\n')
    print(number * 2)
    print()
 
if __name__ == '__main__':
    for i in range(5):
        my_thread = threading.Thread(target=doubler, args=(i,))
        my_thread.start()

Здесь мы импортируем модуль threading и создаем обычную функцию под названием doubler. Наша функция принимает значение и удваивает его. Также она выводи название потока, который вызывает функцию и выводит бланк-строчку в конце. Далее, в последнем блоке кода, мы создаем пять потоков, и запускаем каждый из них по очереди.

Используя многопоточность можно решить много рутинных моментов. Например загрузка видео или другого материала в социальные сети, такие как Youtube или Facebook. Для развития своего Youtube канала можно использовать https://publbox.com/ru/youtube который возьмет на себя администрирование вашего канала. Youtube отличный источник заработка и чем больше каналов тем лучше. Без Publbox вам не обойтись.

Обратите внимание на то, что когда мы определяем поток, мы устанавливаем его целью на нашу функцию doubler, и мы также передаем аргумент функции. Причина, по которой параметр args выглядит немного непривычно, заключается в том, что нам нужно передать sequence функции doubler, и она принимает только один аргумент, так что нужно добавить запятую в конце, чтобы создать sequence одной из них. Обратите внимание на то, что если вы хотите подождать, пока поток определится, вы можете вызвать его метод join(). Когда вы запустите этот код, вы получите следующую выдачу:

Thread-1
 
0
 
Thread-2
 
2
 
Thread-3
 
4
 
Thread-4
 
6
 
Thread-5
 
8

Конечно, вам скорее всего не захочется выводить вашу выдачу в stdout. Это может закончиться сильным беспорядком. Вместо этого, вам нужно использовать модуль Python под названием logging. Это защищенный от потоков модуль и он прекрасно выполняет свою работу. Давайте немного обновим указанный ранее пример и добавим модуль logging, и заодно назовем наши потоки:

import logging
import threading
 
def get_logger():
    logger = logging.getLogger("threading_example")
    logger.setLevel(logging.DEBUG)
 
    fh = logging.FileHandler("threading.log")
    fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
    formatter = logging.Formatter(fmt)
    fh.setFormatter(formatter)
 
    logger.addHandler(fh)
    return logger
 
 
def doubler(number, logger):
    """
    A function that can be used by a thread
    """
    logger.debug('doubler function executing')
    result = number * 2
    logger.debug('doubler function ended with: {}'.format(result))
 
 
if __name__ == '__main__':
    logger = get_logger()
    thread_names = ['Mike', 'George', 'Wanda', 'Dingbat', 'Nina']
    for i in range(5):
        my_thread = threading.Thread(
            target=doubler, name=thread_names[i], args=(i,logger))
        my_thread.start()

Самое большое изменение в этом коде – это добавление функции get_logger. Эта часть кода создаст логгер, который настроен на дебаг. Это сохранит log в нынешнюю рабочую папку (другими словами, туда, откуда запускается скрипт) и затем мы настраиваем формат каждой линии на логированный. Формат включает временной штамп, название потока, уровень логгирования и логгированое сообщение. В функции doubler мы меняем наши операторы вывода на операторы логгирования.

Обратите внимание на то, что мы передаем логгер в функцию doubler, когда создаем поток. Это связанно с тем, что если вы определяет объект логгирования в каждом потоке, вы получите несколько singletons и ваш журнал будет содержать множество повторяющихся строк. В конце мы называем наши потоки, создав список наименований, и затем устанавливаем каждый поток на особое наименование, использую параметр name. Когда вы запустите этот код, вы должны получить лог файл со следующим содержимым:

Эта выдача достаточно понятная, так что давайте пойдем дальше. Я хочу разобрать еще один вопрос в этой статье. Мы поговорим о наследовании класса под названием threading.Thread. Давайте снова рассмотрим предыдущий пример, только вместо вызова потока напрямую, мы создадим свой собственный подкласс. Вот обновленный код:

import logging
import threading
 
class MyThread(threading.Thread):
 
    def __init__(self, number, logger):
        threading.Thread.__init__(self)
        self.number = number
        self.logger = logger
 
 
    def run(self):
        """
        Run the thread
        """
        logger.debug('Calling doubler')
        doubler(self.number, self.logger)
 
 
    def get_logger():
        logger = logging.getLogger("threading_example")
        logger.setLevel(logging.DEBUG)
 
        fh = logging.FileHandler("threading_class.log")
        fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
        formatter = logging.Formatter(fmt)
        fh.setFormatter(formatter)
 
        logger.addHandler(fh)
        return logger
 
 
def doubler(number, logger):
    """
    A function that can be used by a thread
    """
    logger.debug('doubler function executing')
    result = number * 2
    logger.debug('doubler function ended with: {}'.format(result))
 
 
if __name__ == '__main__':
    logger = get_logger()
    thread_names = ['Mike', 'George', 'Wanda', 'Dingbat', 'Nina']
    for i in range(5):
        thread = MyThread(i, logger)
        thread.setName(thread_names[i])
        thread.start()

В этом примере мы только что унаследовали класс threading.Thread. Мы передали число, которое хотим удвоить, а также передали объект логгированмя, как делали это ранее. Но на этот раз, мы настроим название потока по-другому, вызвав функцию setName в объекте потока. Нам все еще нужно вызвать старт в каждом потоке, но запомните, что нам не нужно определять это в наследуемом классе. Когда вы вызываете старт, он запускает ваш поток, вызывая метод run. В нашем классе мы вызываем функцию doubler для выполнения наших вычислений. Выдача сильно похожа на ту, что была в примере ранее, за исключением того, что я добавил дополнительную строку в выдаче. Попробуйте сами и посмотрите, что получится.

Замки и Синхронизация

Когда у вас в распоряжении более одного потока, тогда вам, возможно, понадобится понять, как избежать конфликтов. Под этим я имею ввиду то, что вы можете использовать случай, где более одного потока нуждаются в доступе к одном и тому же ресурсу в одно и то же время. Если вы не думаете о таких проблемах и соответственном планировании, тогда вы можете столкнуться с самыми худшими проблемами в крайне неудобное время, и, как правило, в момент выпуска кода.

Решение проблемы – это использовать замки. Замок предоставлен модулем Python threading и может держать один поток, или не держать поток вообще. Если поток пытается acquire замок на ресурсе, который уже закрыт, этот поток будет ожидать до тех пор, пока замок не откроется. Давайте посмотрим на практичный пример одного кода, который не имеет никакого замочного функционала, но мы попробуем его добавить:

import threading
 
total = 0
 
def update_total(amount):
    """
    Updates the total by the given amount
    """
    global total
    total += amount
    print (total)
if __name__ == '__main__':
    for i in range(10):
        my_thread = threading.Thread(target=update_total, args=(5,))
        my_thread.start()

Мы можем сделать этот пример еще интереснее, добавив вызов time.sleep. Следовательно, проблема здесь в том, что один поток может вызывать update_total и перед тем, как он обновится, другой поток может вызвать его и тоже попытается обновить его. В зависимости от порядка операций, значение может быть добавлено единожды. Давайте добавим замок к функции. Существует два способа сделать эта. Первый – это использование try/finally, если мы хотим убедиться, что замок снят. Вот пример:

import threading
 
total = 0
lock = threading.Lock()
 
def update_total(amount):
    """
    Updates the total by the given amount
    """
    global total
    lock.acquire()
 
    try:
        total += amount
    finally:
        lock.release()
 
    print (total)
 
 
if __name__ == '__main__':
    for i in range(10):
        my_thread = threading.Thread(target=update_total, args=(5,))
        my_thread.start()

Здесь мы просто вешаем замок, перед тем как сделать что-либо другое. Далее, мы пытаемся обновить total и finally, мы снимаем замок и выводим нынешний total. Мы можем упростить данную задачу, используя оператор Python под названием with:

import threading
 
total = 0
lock = threading.Lock()
 
def update_total(amount):
    """
    Updates the total by the given amount
    """
    global total
    with lock:
        total += amount
 
    print (total)
 
 
if __name__ == '__main__':
    for i in range(10):
        my_thread = threading.Thread(target=update_total, args=(5,))
        my_thread.start()

Как вы видите, нам больше не нужны try/finally, так как контекстный менеджер, предоставленный оператором with, сделал все это за нас. Конечно, вы можете обнаружить, что пишите код там, где необходимы несколько потоков с доступом к нескольким функциям. Когда вы впервые начнете писать конкурентный код, вы можете сделать что-нибудь на подобии следующего:

import threading
 
total = 0
lock = threading.Lock()
 
def do_something():
    lock.acquire()
 
    try:
        print('Lock acquired in the do_something function')
    finally:
        lock.release()
        print('Lock released in the do_something function')
 
    return "Done doing something"
 
 
def do_something_else():
    lock.acquire()
 
    try:
        print('Lock acquired in the do_something_else function')
    finally:
        lock.release()
        print('Lock released in the do_something_else function')
 
    return "Finished something else"
 
 
if __name__ == '__main__':
    result_one = do_something()
    result_two = do_something_else()

Этот код хорошо работает в данном случае, но подразумевается, что у вас есть несколько потоков, вызывающих обе эти функции. Пока один поток работает над функциями, второй может, в свою очередь, обновлять данные и вы получите некорректный результат. Проблема в том, что вы сначала можете не заметить, что с результатами что-то неладное. Как найти решение этой проблеме? Давайте в ней разберемся. Первое, к чему можно прийти, это повесить замок на двух вызовах функций. Давайте попробуем обновить указанный ранее пример, чтобы получить что-то вроде следующего:

import threading
 
total = 0
lock = threading.RLock()
 
def do_something():
    with lock:
        print('Lock acquired in the do_something function')
 
    print('Lock released in the do_something function')
 
    return "Done doing something"
 
 
def do_something_else():
    with lock:
        print('Lock acquired in the do_something_else function')
 
    print('Lock released in the do_something_else function')
    return "Finished something else"
 
 
def main():
    with lock:
        result_one = do_something()
        result_two = do_something_else()
 
    print (result_one)
    print (result_two)
 
 
if __name__ == '__main__':
    main()

Когда вы запустите этот код, вы увидите, что он просто висит. Причина в том, что мы просто указываем модулю threading повесить замок. Так что когда мы вызываем первую функцию, она видит, что замок уже висит и блокируется. Это будет длиться до тех пор, пока замок не снимут, что никогда и не случится, так как это не предусмотрено в коде. Хорошее решение в данном случае – использовать re-entrant замок. Модуль threading предоставляет такой, в виде функции RLock. Просто замените строку lock = threading.Lock() на lock = threading.RLock() и попробуйте перезапустить код. Теперь он должен заработать. Если вы хотите попробовать код выше но добавить в него потоки, то мы можем заменить call на main следующим образом:

if __name__ == '__main__':
    for i in range(10):
        my_thread = threading.Thread(target=main)
        my_thread.start()

Так мы запустим основную функцию в каждом потоке, что в свою очередь приведет к вызову остальных двух функций. В конце вы получите достаточно крупную выдачу.

Таймеры

Модуль threading включает в себя один очень удобный класс, под названием Timer, который вы можете использовать запуска действия, спустя определенный отрезок времени. Данный класс запускает собственный поток и начинают работу с того же метода start(), как и обычные потоки. Вы также можете остановить таймер, используя метод cancel. Обратите внимание на то, что вы можете отменить таймер еще до того, как он стартовал. Однажды у меня был случай, когда мне нужно было наладить связь с под-процессом, который я начал, но мне нужен было обратный отсчет. Несмотря на существования ряда различных способов решения этой отдельной проблемы, моим любимым решением всегда было использование класса Timer модуля threading. Для этого примера мы взглянем на применение команды ping. В Linux, команда ping будет работать, пока вы её не убьете. Так что класс Timer становится особенно полезным для мира Linux. Вот пример:

import subprocess
 
from threading import Timer
 
kill = lambda process: process.kill()
cmd = ['ping', 'www.google.com']
ping = subprocess.Popen(
    cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
my_timer = Timer(5, kill, [ping])
 
try:
    my_timer.start()
    stdout, stderr = ping.communicate()
finally:
    my_timer.cancel()
print (str(stdout))

Здесь мы просто настраиваем лямбду, которую мы можем использовать, чтобы убить процесс. Далее мы начинаем нашу работу над ping и создаем объект Timer. Обратите внимание на то, что первый аргумент – это время ожидания в секундах, затем – функция, которую нужно вызвать и аргумент, который будет передан функции. В нашем случае, наша функция – это лямбда, и мы передаем её список аргументов, где список содержит только один элемент. Если вы запустите этот код, он будет работать примерно 5 секунд, после чего выведет результат пинга.

Другие Компоненты Потоков

Модуль threading также включает в себя поддержу других объектов. Например, вы можете создать семафор, который является одним из древнейших синхронизационных примитивов в компьютерной науке. Семафор управляет внутренним счетчиком, который будет декрементируется когда вы вызываете acquire и увеличивается, когда вы вызываете release. Счетчик разработан таким образом, что он не может падать ниже нуля. Так что если так вышло, что вы вызываете acquire когда он равен нулю, то он заблокируется.

Еще один полезный инструмент, который содержится в модуле, это Event. С его помощью вы можете получить связь между двумя потоками, используя сигналы. Мы рассмотрим примеры применения Event в следующей статье. Наконец-то, в версии Python 3.2 был добавлен объект Barrier. Это примитив, который управляет пулом потока, при этом не важно, где потоки должны ждать своей очереди. Для передачи барьера, потоку нужно вызвать метод wait(), который будет блокировать до тех пор, пока все потоки не сделают вызов. После чего все потоки пройдут дальше одновременно.

Связь потоков

Существует ряд случаев, когда вам нужно сделать так, чтобы потоки были связанны друг с другом. Как я упоминал ранее, вы можете использовать Event для этой цели. Но более удобный способ – использовать Queue. В нашем примере мы используем оба способа! Давайте посмотрим, как это будет выглядеть:

import threading
from queue import Queue
 
 
def creator(data, q):
    """
    Creates data to be consumed and waits for the consumer
    to finish processing
    """
    print('Creating data and putting it on the queue')
    for item in data:
        evt = threading.Event()
        q.put((item, evt))
        print('Waiting for data to be doubled')
        evt.wait()
 
 
def my_consumer(q):
    """
    Consumes some data and works on it
    In this case, all it does is double the input
    """
    while True:
        data, evt = q.get()
        print('data found to be processed: {}'.format(data))
        processed = data * 2
 
        print(processed)
 
        evt.set()
        q.task_done()
 
 
if __name__ == '__main__':
    q = Queue()
    data = [5, 10, 13, -1]
    thread_one = threading.Thread(target=creator, args=(data, q))
    thread_two = threading.Thread(target=my_consumer, args=(q,))
 
    thread_one.start()
    thread_two.start()
 
    q.join()

Давайте немного притормозим. Во первых, у нас есть функция creator (также известная, как producer), которую мы используем для создания данных, с которыми мы хотим работать (или использовать). Далее мы получаем еще одну функцию, которую мы используем для обработки данных, под названием my_consumer. Функция creator использует метод Queue под названием put, чтобы добавить данные в очередь, затем потребитель, в свою очередь, будет проверять, есть ли новые данные и обрабатывать их, когда такие появятся. Queue обрабатывает все закрытия и открытия замков, так что лично вам эта участь не грозит.

В данном примере мы создали список значений, которые мы хотим дублировать. Далее мы создаем два потока, один для функции creator/producer, второй для consumer (потребитель). Обратите внимание на то, что мы передаем объект Queue каждому потоку, что является прямо таки магией, учитывая то, как обрабатываются замки. Очередь начнется с первого потока, который передает данные второму. Когда первый поток передает те или иные данные в очередь, он также передает их к Event, после чего дожидается, когда произойдет события, чтобы закончить. Далее, в функции consumer, данные обрабатываются, и после этого вызывается метод настройки Event, который указывает первому потоку, что второй закончил обработку, так что он может продолжать. Последняя строка кода вызывает метод join объекта Queue, который указывает Queue подождать, пока потоки закончат обработку. Первый поток заканчивает, когда ему больше нечего передавать в Queue.

Подведем итоги

Мы рассмотрели достаточно много материала. А именно:

  1. Основы работы с модулем threading
  2. Как работают замки
  3. Что такое Event и как его можно использовать
  4. Как использовать таймер
  5. Внутрипотоковая связь с использованием Queue/Event

Теперь вы знаете, как использовать потоки, и в чем они хороши. Надеюсь, вы найдете им применение в своем собственном коде!

Оставьте комментарий!

Используйте нормальные имена.

Имя и сайт используются только при регистрации

Если вы уже зарегистрированы как комментатор или хотите зарегистрироваться, укажите пароль и свой действующий email. При регистрации на указанный адрес придет письмо с кодом активации и ссылкой на ваш персональный аккаунт, где вы сможете изменить свои данные, включая адрес сайта, ник, описание, контакты и т.д., а также подписку на новые комментарии.

(обязательно)