В части 1 мы рассмотрели базовые основы работы с потоками. Перейдем к примитивам синхронизации.
В мультипоточной программе доступ к объектам иногда нужно синхронизировать.
С этой точки зрения все объекты (переменные) разделяются на:
- Неизменяемые. Мои самые любимые. Если объект никто не меняет, то синхронизация доступа ему не нужна. К сожалению, таких не очень много.
- Локальные. Если объект не виден остальным потокам, то доступ к нему синхронизировать тоже не требуется.
- Разделяемые и изменяемые. Синхронизация необходима.
Синхронизация доступа к объектам осуществляется с помощью объектов синхронизации (простите за тавтологию).
Объектов синхронизации существует множество - и постоянно придумывают новые. Это не то чтобы плохо - но избыточно (по крайней мере на начальном этапе изучения).
Каждый объект синхронизации имеет свои плюсы и минусы (обычно это баланс между быстродействием и областью применения). Ошибка ведет к тому, что
- код будет выполнятся не так быстро, как мог бы (меньшее зло)
- или к тому, что он будет выполнятся неправильно (тушите свет).
Давным-давно Дейкстра работал не только над алгоритмами на графах, но и над проблемами многопоточного кода. Ему удалось в одной из своих работ доказать, что минимальный базис, необходимый для любой синхронизации, сводится к двум объектам:
- блокировки (они же lock, mutex).
- условные переменные (condition variable).
Давайте на них и остановимся.
Ремарка о скорости. Питон - не первый спринтер на деревне. Если для программы на С выигрыш от правильного использования специфичного объекта синхронизации может быть существенным, то для Питона в целом они все примерно одинаковы.
Выбирать нужно более безопасный способ, не обращая внимания на быстродействие.
Кстати, это не относится к реализации CPython как такового - в коде интерпретатора присутствуют довольно экстравагантные блокировки, дающие выигрыш в пару процентов. До тех пор, пока их использует небольшая очень квалифицированная команда разработчиков - это оправдано.
Вторая ремарка: рассматриваемые объекты блокировки очень примитивны.
Откровенно говоря, не существует высокоуровневой
теории мультипоточных программ.
Если сравнивать с развитием языков, то уровень
мультипоточного программирования сейчас находится где-то там,
где были процедурные языки программирования.
Объекты синхронизации схожи с оператором goto
- использовать приходится,
но всегда следует внимательно смотреть, что вы делаете.
Более высокоуровневые концепты, такие как monitor и active object хороши, но не решают всех проблем.
Языки вроде erlang
, прекрасно справляющиеся с распараллеливанием задач
на оптимальное количество потоков, останутся в своей узкой нише -
примерно как lisp
и все семейство языков функционального программирования
навсегда остануться инструментом для маргиналов.
Парадигма многопоточных языков общего употребления все еще не создана.
Блокировки
Это - основа всего.
Простейший пример использования:
import threading
class Point(object):
def __init__(self):
self._mutex = threading.RLock()
self._x = 0
self._y = 0
def get(self):
with self._mutex:
return (self._x, self._y)
def set(self, x, y):
with self._mutex:
self._x = x
self._x = y
Итак, имеем тривиальный класс. Это точка в двухмерном пространстве.
Пусть координаты _x
и _y
начинаются
с подчеркивания - чтобы программист, пожелавший менять их непосредственно
немного задумался.
Еще есть _mutex
типа RLock
.
Публичная часть - два метода get
и set
.
Работает все это так:
- при вызове метода берем блокировку через with self._mutex:
- весь код внутри with
блока будет выполнятся только в одном потоке.
Другими словами, если два разных потока вызовут .get
то пока первый поток
не выйдет из блока второй будет его ждать - и только потом продолжит выполнение.
Зачем это все нужно? Координаты нужно менять одновременно - ведь точка это цельный объект. Если позволить одному потоку поменять x, а другой в это же время поправит y логика алгоритма может поломаться.
Есть и другой вопрос: зачем методу get
блокировка?
В приведенном примере она действительно не нужна.
Но я всё же настоятельно рекомендую использовать блокировки даже для методов,
вроде бы не изменяющих содержимое.
Во первых, это просто хорошая привычка. Иногда блокировки можно опускать для повышения производительности - но только после тщательного изучения побочных эффектов.
Во вторых (и это важнее) - блокировки при чтении позволяют корректно работать с объектами, части которых могут изменятся независимо.
Для примера возьмет цветную точку.
class ColoredPoint(Point):
def __init__(self):
super(ColoredPoint, self).__init__()
self._color = 'green'
@property
def color(self):
with self._mutex:
return self._color
@color.setter
def color(self, val):
with self._mutex:
self._color = val
def do(self, observer):
with self._mutex:
if self._color == 'red':
observer(self.get())
Без блокировки в методе .do
возможна ситуация, при которой один поток
поменяет цвет, в второй в это же время изменит координаты.
И тогда observer
будет вызван с неправильными значениями.
В модуле threading существует еще и Lock - никогда его не используйте. Дело в том, что RLock (recursive lock) допускает повторную блокировку.
В примере для цветной точки это хорошо видно - .do
берет блокировку, а
затем вызывает метод .get
, который берет эту блокировку еще раз.
В случае с Lock
метод будет намертво повешен. RLock
позволяет повторно
брать блокировку тому потоку, который уже ее получил.
Говоря о блокировках, нужно упомянуть и две проблемы, которые возникают при их использовании:
- Race condition: неправильное поведение из за отсутствия блокировки.
- Dead lock: взаимная блокировка.
Первый случай я уже описал: при отсутствии блокировки один поток меняет часть объекта, временно приводя его в некорректное состояние - а второй поток именно в это время пытается работать с этим некорректным на данный момент объектом.
Пример для взаимных блокировок:
a = threading.RLock()
b = threading.RLock()
def f():
with a:
# do something
with b:
# do something also
def g():
with b:
# do something
with a:
# do something also
При одновременно вызове f
из одного потока и g
из другого оба потока
навсегда повиснут: первый будет ждать захвата b
а второй, захватив b
остановится на ожидании блокировки a
.
Обратите внимание: эта проблема может проявляться не сразу и нелегко диагностируется/воспроизводится.
Современные программы бывают большими и сложными с очень запутанными потоками исполнения, отследить которые нелегко.
Еще один пример:
def h():
try:
a.acquire()
b.acquire()
# do something
finally:
a.release()
b.release()
При одновременном вызове h
возможна взаимная блокировка.
Общее правило, немного помогающее при проектировании многопоточных
систем: блокировки всегда должны браться в одном и том же порядке, а
освобождаться в противоположном. Использование оператора with
помогает решить эту проблему.
Условные переменные
С блокировками все более или менее ясно. Нужен еще один объект синхронизации.
Рассмотрим очередь на фиксированное количество элементов. Когда очередь пуста - поток, желающий получить новый элемент должен ждать. Аналогично с переполненной очередью.
Если решать все только на блокировках, придется проверять - а не пуста ли очередь? И если таки пуста - подождать еще немного. Сколько времени ждать - сложный вопрос.
Очевидно, нужен какой-то сигнал от заполняющего очередь потока - ожидающему.
Чтобы последний, выйдя из вечной блокировки мог продолжить работу.
Несмотря на то, что Питон непосредственно поддерживает сигналы (threading.Event
) -
я настоятельно не рекомендую их использовать.
Проблема в следующем: поток послал сигнал о поступлении элемента в очередь.
Кто его получит, если получение нового элемента ожидают несколько потоков?
Один из них забрал данные и очередь снова пуста. Как нужно дожидаться следующего поступления?
Условные переменные (condition variables) совмещают сигналы с блокировками.
Рассмотрим пример:
class Queue(object):
def __init__(self, size=5):
self._size = size
self._queue = []
self._mutex = threading.RLock()
self._empty = threading.Condition(self._mutex)
self._full = threading.Condition(self._mutex)
def put(self, val):
with self._full:
while len(self._queue) >= self._size:
self._full.wait()
self._queue.append(val)
self._empty.notify()
def get(self):
with self._empty:
while len(self._queue) == 0:
self._empty.wait()
ret = self._queue.pop(0)
self._full.notify()
return ret
У нас два почти симметричных метода - положить в очередь и взять из нее.
Разберем .get
. Сначала берем блокировку.
Обратите внимание - блокиратор один на обе условные переменные. Это важно
Дело в том, что _full
и _empty
взаимозависимы.
Хотя threading.Condition
позволяет не указывать блокиратор, создавая
новый RLock
автоматически - не поступайте так.
Можно поймать race condition, о котором я говорил раньше.
Гораздо надежней и наглядней делать все явно.
В нашем случае race condition был бы в явном виде - пусть и скрытый из за GIL.
- Входим в блокиратор.
- Проверяем наше условие - обычно это всегда цикл
while
. - Если условие не выполнилось - вызываем
.wait()
. Этот метод освободит блокировку и будет ждать извещения.notify
. После получения сигнала выполнение продолжится при снова взятой блокировке. Еще раз, простыми словами.- Проверка условия всегда выполняется в блокировке. Никто другой в это же время условие поменять не сможет (если все сделано правильно, конечно).
- Если условие не выполнилось - ждем опять, отдав управление операционной системе.
- При повторном вхождении у нас снова есть блокировка.
- Условие выполнилось - очередь не пуста. Как минимум один элемент в ней есть. Может быть и больше - нас сейчас это не волнует. Блокировка все еще есть.
- Получаем этот элемент и извещаем очередь, что она стала не до конца заполненной -
т.е. в нее можно еще что-то положить (
self._full.notify()
). - Возвращаем полученное. Выходя из
with
освобождаем блокировку - другие потоки могут работать с очередью дальше.
Итак, условные переменные - это способ синхронизировать доступ к объектам при помощи блокировки и при этом возможность послать сигнал ожидающим потокам.
Альтернатива оператору with
.
Вместо
with lock:
# do something
можно использовать более традиционную форму:
try:
lock.acquire()
# do domething
finally:
lock.release()
Эти два подхода абсолютно эквивалентны, но первый на три строчки короче.
Выбросить try/finally
блок нельзя - при возникновении исключения блокировка
должна быть всё равно отпущена. К слову, неснятая блокировка при выходе из
функции - одна из самых распространенных ошибок. Будьте внимательны - а еще
лучше научитесь писать так, чтобы всё получалось просто, ясно и правильно.
Нужно отметить, что .acquire
позволяет указывать параметры: blocking
и,
начиная с Python 3.2 - timeout
. Подробности их использования неплохо описаны
в документации.
Продолжение - в третьей части.