вторник, 30 ноября 2010 г.

Мультипоточность в питоне. Часть 2 - синхронизация

В части 1 мы рассмотрели базовые основы работы с потоками. Перейдем к примитивам синхронизации.

В мультипоточной программе доступ к объектам иногда нужно синхронизировать.

С этой точки зрения все объекты (переменные) разделяются на:

  • Неизменяемые. Мои самые любимые. Если объект никто не меняет, то синхронизация доступа ему не нужна. К сожалению, таких не очень много.
  • Локальные. Если объект не виден остальным потокам, то доступ к нему синхронизировать тоже не требуется.
  • Разделяемые и изменяемые. Синхронизация необходима.

Синхронизация доступа к объектам осуществляется с помощью объектов синхронизации (простите за тавтологию).

Объектов синхронизации существует множество - и постоянно придумывают новые. Это не то чтобы плохо - но избыточно (по крайней мере на начальном этапе изучения).

Каждый объект синхронизации имеет свои плюсы и минусы (обычно это баланс между быстродействием и областью применения). Ошибка ведет к тому, что

  • код будет выполнятся не так быстро, как мог бы (меньшее зло)
  • или к тому, что он будет выполнятся неправильно (тушите свет).

Давным-давно Дейкстра работал не только над алгоритмами на графах, но и над проблемами многопоточного кода. Ему удалось в одной из своих работ доказать, что минимальный базис, необходимый для любой синхронизации, сводится к двум объектам:

  • блокировки (они же lock, mutex).
  • условные переменные (condition variable).

Давайте на них и остановимся.

Ремарка о скорости. Питон - не первый спринтер на деревне. Если для программы на С выигрыш от правильного использования специфичного объекта синхронизации может быть существенным, то для Питона в целом они все примерно одинаковы.

Выбирать нужно более безопасный способ, не обращая внимания на быстродействие.

Кстати, это не относится к реализации CPython как такового - в коде интерпретатора присутствуют довольно экстравагантные блокировки, дающие выигрыш в пару процентов. До тех пор, пока их использует небольшая очень квалифицированная команда разработчиков - это оправдано.

Вторая ремарка: рассматриваемые объекты блокировки очень примитивны.

Откровенно говоря, не существует высокоуровневой теории мультипоточных программ. Если сравнивать с развитием языков, то уровень мультипоточного программирования сейчас находится где-то там, где были процедурные языки программирования. Объекты синхронизации схожи с оператором goto - использовать приходится, но всегда следует внимательно смотреть, что вы делаете.

Более высокоуровневые концепты, такие как monitor и active object хороши, но не решают всех проблем.

Языки вроде erlang, прекрасно справляющиеся с распараллеливанием задач на оптимальное количество потоков, останутся в своей узкой нише - примерно как lisp и все семейство языков функционального программирования навсегда остануться инструментом для маргиналов.

Парадигма многопоточных языков общего употребления все еще не создана.

Блокировки

Это - основа всего.

Простейший пример использования:

import threading

class Point(object):
    def __init__(self):
        self._mutex = threading.RLock()
        self._x = 0
        self._y = 0

    def get(self):
        with self._mutex:
            return (self._x, self._y)

    def set(self, x, y):
        with self._mutex:
            self._x = x
            self._x = y

Итак, имеем тривиальный класс. Это точка в двухмерном пространстве. Пусть координаты _x и _y начинаются с подчеркивания - чтобы программист, пожелавший менять их непосредственно немного задумался.

Еще есть _mutex типа RLock.

Публичная часть - два метода get и set.

Работает все это так: - при вызове метода берем блокировку через with self._mutex: - весь код внутри with блока будет выполнятся только в одном потоке. Другими словами, если два разных потока вызовут .get то пока первый поток не выйдет из блока второй будет его ждать - и только потом продолжит выполнение.

Зачем это все нужно? Координаты нужно менять одновременно - ведь точка это цельный объект. Если позволить одному потоку поменять x, а другой в это же время поправит y логика алгоритма может поломаться.

Есть и другой вопрос: зачем методу get блокировка? В приведенном примере она действительно не нужна. Но я всё же настоятельно рекомендую использовать блокировки даже для методов, вроде бы не изменяющих содержимое.

Во первых, это просто хорошая привычка. Иногда блокировки можно опускать для повышения производительности - но только после тщательного изучения побочных эффектов.

Во вторых (и это важнее) - блокировки при чтении позволяют корректно работать с объектами, части которых могут изменятся независимо.

Для примера возьмет цветную точку.

class ColoredPoint(Point):
    def __init__(self):
        super(ColoredPoint, self).__init__()
        self._color = 'green'

    @property
    def color(self):
        with self._mutex:
            return self._color

    @color.setter
    def color(self, val):
        with self._mutex:
            self._color = val

    def do(self, observer):
        with self._mutex:
            if self._color == 'red':
                observer(self.get())

Без блокировки в методе .do возможна ситуация, при которой один поток поменяет цвет, в второй в это же время изменит координаты. И тогда observer будет вызван с неправильными значениями.

В модуле threading существует еще и Lock - никогда его не используйте. Дело в том, что RLock (recursive lock) допускает повторную блокировку.

В примере для цветной точки это хорошо видно - .do берет блокировку, а затем вызывает метод .get, который берет эту блокировку еще раз.

В случае с Lock метод будет намертво повешен. RLock позволяет повторно брать блокировку тому потоку, который уже ее получил.

Говоря о блокировках, нужно упомянуть и две проблемы, которые возникают при их использовании:

  • Race condition: неправильное поведение из за отсутствия блокировки.
  • Dead lock: взаимная блокировка.

Первый случай я уже описал: при отсутствии блокировки один поток меняет часть объекта, временно приводя его в некорректное состояние - а второй поток именно в это время пытается работать с этим некорректным на данный момент объектом.

Пример для взаимных блокировок:

a = threading.RLock()
b = threading.RLock()

def f():
    with a:
        # do something
        with b:
            # do something also

def g():
    with b:
        # do something
        with a:
            # do something also

При одновременно вызове f из одного потока и g из другого оба потока навсегда повиснут: первый будет ждать захвата b а второй, захватив b остановится на ожидании блокировки a.

Обратите внимание: эта проблема может проявляться не сразу и нелегко диагностируется/воспроизводится.

Современные программы бывают большими и сложными с очень запутанными потоками исполнения, отследить которые нелегко.

Еще один пример:

def h():
    try:
        a.acquire()
        b.acquire()
    # do something
    finally:
        a.release()
        b.release()

При одновременном вызове h возможна взаимная блокировка.

Общее правило, немного помогающее при проектировании многопоточных систем: блокировки всегда должны браться в одном и том же порядке, а освобождаться в противоположном. Использование оператора with помогает решить эту проблему.

Условные переменные

С блокировками все более или менее ясно. Нужен еще один объект синхронизации.

Рассмотрим очередь на фиксированное количество элементов. Когда очередь пуста - поток, желающий получить новый элемент должен ждать. Аналогично с переполненной очередью.

Если решать все только на блокировках, придется проверять - а не пуста ли очередь? И если таки пуста - подождать еще немного. Сколько времени ждать - сложный вопрос.

Очевидно, нужен какой-то сигнал от заполняющего очередь потока - ожидающему.

Чтобы последний, выйдя из вечной блокировки мог продолжить работу.

Несмотря на то, что Питон непосредственно поддерживает сигналы (threading.Event) - я настоятельно не рекомендую их использовать.

Проблема в следующем: поток послал сигнал о поступлении элемента в очередь.

Кто его получит, если получение нового элемента ожидают несколько потоков?

Один из них забрал данные и очередь снова пуста. Как нужно дожидаться следующего поступления?

Условные переменные (condition variables) совмещают сигналы с блокировками.

Рассмотрим пример:

class Queue(object):
    def __init__(self, size=5):
        self._size = size
        self._queue = []
        self._mutex = threading.RLock()
        self._empty = threading.Condition(self._mutex)
        self._full = threading.Condition(self._mutex)

    def put(self, val):
        with self._full:
            while len(self._queue) >= self._size:
                self._full.wait()
            self._queue.append(val)
            self._empty.notify()

    def get(self):
        with self._empty:
            while len(self._queue) == 0:
                self._empty.wait()
            ret = self._queue.pop(0)
            self._full.notify()
            return ret

У нас два почти симметричных метода - положить в очередь и взять из нее.

Разберем .get. Сначала берем блокировку.

Обратите внимание - блокиратор один на обе условные переменные. Это важно Дело в том, что _full и _empty взаимозависимы. Хотя threading.Condition позволяет не указывать блокиратор, создавая новый RLock автоматически - не поступайте так. Можно поймать race condition, о котором я говорил раньше. Гораздо надежней и наглядней делать все явно. В нашем случае race condition был бы в явном виде - пусть и скрытый из за GIL.

  • Входим в блокиратор.
  • Проверяем наше условие - обычно это всегда цикл while.
  • Если условие не выполнилось - вызываем .wait(). Этот метод освободит блокировку и будет ждать извещения .notify. После получения сигнала выполнение продолжится при снова взятой блокировке. Еще раз, простыми словами.
    1. Проверка условия всегда выполняется в блокировке. Никто другой в это же время условие поменять не сможет (если все сделано правильно, конечно).
    2. Если условие не выполнилось - ждем опять, отдав управление операционной системе.
    3. При повторном вхождении у нас снова есть блокировка.
  • Условие выполнилось - очередь не пуста. Как минимум один элемент в ней есть. Может быть и больше - нас сейчас это не волнует. Блокировка все еще есть.
  • Получаем этот элемент и извещаем очередь, что она стала не до конца заполненной - т.е. в нее можно еще что-то положить (self._full.notify()).
  • Возвращаем полученное. Выходя из with освобождаем блокировку - другие потоки могут работать с очередью дальше.

Итак, условные переменные - это способ синхронизировать доступ к объектам при помощи блокировки и при этом возможность послать сигнал ожидающим потокам.

Альтернатива оператору with.

Вместо

with lock:
    # do something

можно использовать более традиционную форму:

try:
    lock.acquire()
    # do domething
finally:
    lock.release()

Эти два подхода абсолютно эквивалентны, но первый на три строчки короче. Выбросить try/finally блок нельзя - при возникновении исключения блокировка должна быть всё равно отпущена. К слову, неснятая блокировка при выходе из функции - одна из самых распространенных ошибок. Будьте внимательны - а еще лучше научитесь писать так, чтобы всё получалось просто, ясно и правильно.

Нужно отметить, что .acquire позволяет указывать параметры: blocking и, начиная с Python 3.2 - timeout. Подробности их использования неплохо описаны в документации.

Продолжение - в третьей части.

14 комментариев:

  1. А подскажите пожалуйста как реализовать такую синхронизацию:
    1. Есть 3 потока, которые берут задания из очереди Queue и заполняют другую очередь
    2. четвертый поток смотрит во вторую очередь и вынимает данные из нее, но когда все потоки выйдут, он тоже должен будет выйти


    Сейчас сделал так, что при создании каждого из трех потоков, они кладут вв общую переменную свое имя, а при выходе - удаляет. А у четвертого потока в while условие, если есть хоть 1 сервер в этой переменной, то проверять очередь

    ОтветитьУдалить
  2. Конечно, советовать в общем случае что-то сложно - нужно видеть код и знать требования.

    Тем не менее я попытаюсь :)

    Зачем вам общая переменная, если уже есть вторая очередь.
    Пусть ваши три потока при старте кладут в нее объект-маркер "начал".
    При окончании работы - маркер "закончил".

    Тогда четвертый поток может считать эти маркеры и таким образом знать, когда ему нужно завершиться.

    Дополнительный плюс - ждать можно в блокирующем режиме, а не проверять: если очередь пустая, нужно глянуть в общую переменную, и если там что-то есть то поспать немного (а вдруг в очереди что-то появится).

    ОтветитьУдалить
  3. Если ждать в блокирующем режиме, то как я узнаю что потоки умерли и выйду из потока 4? А если ждать через join(), то тогда данные из второй очереди не будут ложиться по мере поступления, а только потом, всем скопом

    Если каждый поток будет класть в очередь-2 идентификатор, а потом в нее же будет класть результат своей работы, то это получается, что в очереди разнородные данные и придется обрабатывать такие исключения.

    Для уточнения задачи: 3 потока производят кое-какие вычисления, а поток 4 по мере поступления, записывает их в Qt4 таблицу.

    ОтветитьУдалить
  4. Разнородные данные - это не страшно. Питон же, не С++.
    Я имел в виду нечто вроде такого:

    import Queue
    import threading

    in = Queue.Queue()
    out = Queue.Queue()

    class START_MARKER(int):
    pass

    STOP_MARKER = object()

    def worker_thread():
    while ...:
    i = in.get()
    out.put(process(i))
    out.put(STOP_MARKER)


    def ui_thread():
    th_count = out.get()
    assert isinstance(th_count, START_MARKER)
    stopped = 0
    while stopped < th_count:
    i = out.get()
    if i is STOP_MARKER:
    stopped += 1
    else:
    pass
    # process value

    def start_all():
    ui_th = threading.Thread(target=ui_thread)
    ui_th.start()
    out.put(START_MARKER(3))
    worker_th = []
    for i in range(3):
    th = threading.Thread(target=worker_thread)
    th.start()
    worker_th.append(th)

    ОтветитьУдалить
  5. Понял, спасибо. Получается впринципе такое же.

    ОтветитьУдалить
  6. Да, почти такое же - но с блокировкой.
    На паре слабонагруженных потоков это не видно - но в большой теории операции "проверить, поспать и проверить еще раз" - зло.

    ОтветитьУдалить
  7. Скажите, зачем все это нужно не питоне в свете следующего:
    http://boobleccie.blogspot.com/2010/02/python-gil.html

    ОтветитьУдалить
  8. Условно говоря, потоки делятся на вычислительные и ввода-вывода (по преобладающему характеру выполняемых операций).
    Для вычислительных потоков GIL является серьезным ограничением (до тех пор, пока разработчик не познает Дзен и не начнет писать масштабируемые распределенные программы. Яркий пример — Google Chrome).
    Потоки ввода-вывода большую часть времени сидят в ожидании системных ресурсов (прихода нового пакета из сокета и т.д.)
    Так вот именно для последнего типа потоков многопоточность всё ещё дает очевидные преимущества. При этом потоки всегда нужно синхронизировать — что я и описываю.

    Второй пример: система с GUI. Важно, чтобы цикл обработки сообщений имел как можно меньшее время отклика — тогда интерфейс не «тормозит». Сам этот цикл тоже почти всегда спит, ожидая пользовательских действий. Другие потоки могут делать полезную работу, не влияя на главный поток.

    Изначально целью многопоточных программ не было «максимально загрузить процессор» — на одноядерных машинах с этим лучше справлялся как раз однопоточный код.
    Наоборот, хотелось ценой некоторой потери производительности (переключение потоков небесплатное) получить лучше и быстрее реагирующую на действия пользователя систему в целом.

    ОтветитьУдалить
  9. Извиняюсь за некрокоментарий, но пройти мимо я не смог.
    Пример с функцией h() в разделе про блокировки неверен. Нет такого пути исполнения, когда одновременный вызов этой функции из двух потоков приведет к взаимной блокировке.

    И общее правило при проектировании многопоточных систем звучит приблизительно так:
    "Каждый поток должен брать мютексы в одном и том же порядке. А освобождать - в противоположном". Если приложение не использует других примитивов синхронизации, то можно утверждать, что используя это правило, мы никогда не попадем deadlock.

    ОтветитьУдалить
  10. В первую очередь спасибо за эту да и за все остальные статьи - очень интересно и познавательно.

    Вопрос - я может что-то не так понял, но зачем писать свою очередь если есть
    http://docs.python.org/2/library/queue.html

    Спасибо

    ОтветитьУдалить
    Ответы
    1. queue — pure python module. Полезно знать, как такое сделать самому. К тому же очередь — очень простая и всем понятная абстракция.

      А текст, вообще-то говоря, не об очередях а об условных переменных.

      Удалить
  11. Забавно читать в статью в 2019-м, когда есть Go, например, где, при всех иных явных и не очень недостатках языка, задача управления доступом к разделяемым ресурсам решена достаточно элегантно.

    ОтветитьУдалить
  12. Удивительно что не набежали адепты FP и Rust доказывать, про светлое будущее и что они совсем не маргиналы

    ОтветитьУдалить