Глобальное состояние и корутины в питоне

23/12/24
Технические статьи
Глобальное состояние и корутины в питоне

Введение

Однажды, листая Хабр, я наткнулся на статью, где разобрано внутреннее устройство корутин в питоне. Эта тема увлекла меня, и я стал изучать практические возможности, которые оно дает.

Я вспомнил проект, в котором мне приходилось использовать библиотеку, не приспособленную для многозадачности. Очень жаль, что к тому моменту я не знал того, о чем расскажу здесь. Это могло бы сохранить мне кучу времени и нервов.

Преимущества корутин перед потоками

Корутины, или сопрограммы – это многозадачность, реализуемая без использования соответствующей функциональности операционной системы. Исключение накладных расходов на переключение потоков увеличивает быстродействие приложения. К тому же, в питоне GIL сводит на нет весь смысл использования потоков.

Кроме того, кооперативная многозадачность корутин не имеет проблем типа race condition и необходимости использовать мьютексы или критические секции.

При написании корутины желательно использовать API, которые также являются корутинами. К сожалению, иногда приходится использовать библиотеки, написанные не так, как нам хочется.

Проблема глобальных состояний

Давайте представим себе, что мы используем библиотеку, которую можно вызывать только из одного потока. Назовем ее библиотекой X. И эта библиотека имеет глобальное состояние. Т.е. какие-то данные выставляются одним методом, а используются другим. Выставляются не для какого-то объекта, а для библиотеки в целом.

При этом мы также должны использовать ее внутри корутин и между вызовами библиотеки X вызывать еще какие-то корутины.

Пока одна корутина чего-то ждет, управление может быть передано другой корутине, а она вполне может тоже работать с библиотекой Х. Если не принимать никаких мер, может получиться, что глобальное состояние библиотеки Х будет испорчено.

Мой кейс

Мы занимались автоматическим тестированием и использовали для этого Squish. Мы использовали его как библиотеку для питона. Тестируемое приложение имело два процесса, реализующих GUI. Для каждого из них в Squish создавался Application Context. Большинство функций Squish не принимают Application Context как параметр, а используют текущий контекст.

Иногда надо было проверить, что какое-то текстовое поле мигает. Это требует читать его цвет с определенными промежутками времени. В это время может потребоваться проверить какое-то условие в другом процессе. Это уже требует внимательно следить за тем, где переключается Application Context.

Простое решение

Самое очевидное, что можно сделать – это восстанавливать состояние после каждого await’а. Пусть мы выставляем в библиотеке какие-то переменные вызовами x_set_a(), x_set_b()x_set_z() и затем делаем x_get_result(). Корутина отдает управление только в тот момент, когда выполняется await. Если у нас сначала идут вызовы x_set_*(), затем await, а где-то после него x_get_result(), то нам надо переделать код так, чтобы сеттеры вызывались после await’а, и проблема решена.

Но это не всегда так просто. Бывает, состояние меняется неочевидным образом или оно меняется один раз, а затем много раз используется.

Что я предлагаю

Копипастить вызовы сеттеров после каждого вызова корутины – занятие неблагодарное. Мы попробуем автоматизировать жонглирование состояниями при передаче управления между протопотоками. Для этого нам и понадобится изучить особенности реализации корутин в питоне.

Наш обработчик будет сохранять и восстанавливать глобальное состояние. Предполагается, что библиотека Х предоставляет API для чтения и записи всех своих внутренних переменных.

Похожие проблемы

Не надо путать описанную выше проблему с той, которую решает модуль contextvars. Этот модуль пригодится в случае, когда мы сами пишем код, который должен выполняться одновременно в нескольких потоках или протопотоках, но по каким-то причинам не можем передать в него данные через обычный параметр.

Устройство корутин в питоне

То, что описано ниже, можно найти в документации. Так что можно обновлять питон, не беспокоясь о том, что описанный здесь подход перестанет работать.

Корутина – это awaitable-объект. Awaitable-объект – это объект, у которого есть метод await(). Писать такие объекты можем и мы.

Метод await() возвращает итерируемый объект. Процесс итерирования по нему приводит к выполнению корутины. Вызов next() у итератора выполняет корутину до того момента, когда она блокируется. Если мы извлекли очередной объект, значит, корутина чего-то ждет. Если итерация закончилась, значит корутина вернула управление. Исключение StopIteration в атрибуте value содержит возвращаемое значение корутины.

Для нас важно, что, если одна корутина вызывает другую, происходит сквозная итерация. Когда метод в самой глубине иерархии вызовов чего-то ждет, итератор на самом верхнем уровне возвращает объект. Другими словами, последовательность объектов, генерируемых вызываемой корутиной, становится подпоследовательностью объектов, генерируемых вызывающей корутиной.

Объекты, извлекаемые из итератора, определяются планировщиком корутин. Asyncio – это только одна из возможных его реализаций.

В случае asyncio итерируемые объекты – это future-like объекты. Объект класса Future из модуля futures сам является awaitable-объектом. Его итератор определяет последовательность из одного объекта. Этим объектом является сам Future. Если попытаться получить из этого итератора следующий объект до того, как Future перейдет в состояние Done, выбрасывается RuntimeError. Т.е. планировщик должен ставить протопоток в очередь на выполнение не раньше, чем он разблокируется.

Протопотоки

Термин «протопоток» не употребляется в документации по питону. Однако я считаю это понятие полезным. Предлагаю пользоваться вот таким простым определением.

Определение: если одна корутина await’ит другую, то мы далее в этой статье будем говорить, что эти корутины вызваны в одном протопотоке.

Под словом «вызов» по умолчанию будем подразумевать await.

Когда корутина генерирует (не знаю, как адекватно перевести «yield». Но что делают генераторы в питоне? Генерируют, наверное) очередной Future, она блокирует протопоток. Настоящий поток при этом не блокируется. Для этого и нужны корутины.

Мы хотим обрабатывать события передачи управления в протопоток и из него, чтобы сохранять и восстанавливать глобальное состояние.

Каждый раз, когда протопоток блокируется, объект Future всплывает через весь стек вызовов и обрабатывается планировщиком корутин. Если где-то на этом пути будет наша обертка, она тоже сможет его обработать. Вот только такую обертку нельзя написать, как обычную корутину, используя слово async. Придется явно реализовать awaitable-объект. Этот объект будет для другого awaitable-объекта декоратором, для начала в смысле паттерна проектирования, а уже потом и в смысле синтаксиса питона.

Реализация

Обертка для итератора

Для начала реализуем обертку для итератора и протестируем её на обычном итераторе, никак не связанном с корутинами.

class IterWrap:
    def __init__(self, inner):
        # Сохраняем только то, что вернет __iter__(). Ничего другого нам не нужно
        self._inner = inner.__iter__()
    
    def __iter__(self):
        return self
    
    # __next__() обязательно должен вызвать __next__() у внутреннего итератора
    # Мы же будем кроме этого печатать символ “>” перед и “<” после этого вызова
    # Важно обрабатывать исключения. __next__() бросает StopIteration,
    # а также любые исключения, которые может бросать сама корутина
    def __next__(self):
        print('>')
        try:
            item = self._inner.__next__()
        finally:
            print('<')
        return item

def main():
    [print(f'{x}') for x in IterWrap(range(5, 10))]
    
if __name__ == '__main__':
    main()

Запустив этот пример, получим:

>
<
5
>
<
6
>
<
7
>
<
8
>
<
9
>
<

В промежутках между «>» и «<» ничего не выводится, что и не удивительно – range() сам по себе ничего не выводит. Обратите внимание, что на каждый «>» выводится ровно один «<», в том числе и в самом конце, что доказывает, что мы успешно обрабатываем исключение StopIteration.

Добавляем коллбэки

Доработаем нашу обертку, чтобы вместо вывода символов происходили вызовы обработчиков, установленных пользователем.

# Чтобы не проверять обработчики на None, выставим
# по умолчанию обработчик, который ничего не делает
def do_nothing():
    pass

class IterWrap:
    def __init__(self, inner, on_enter = do_nothing, on_exit = do_nothing):
        self._inner = inner.__iter__()
        self._on_enter = on_enter
        self._on_exit = on_exit

    def __iter__(self):
        return self
    
    def __next__(self):
        self._on_enter()
        try:
            item = self._inner.__next__()
        finally:
            self._on_exit()
        return item

Обертка для awaitable

Достаем из awaitable-объекта итератор, оборачиваем и возвращаем нашу обертку в нашем методе await().

class AwWrap:
    def __init__(self, inner, **kwargs):
        self._itwr = IterWrap(inner.__await__(), **kwargs)

    def __await__(self):
        return self._itwr

Эту обертку мы уже можем применить к корутине. В качестве примера запустим три протопотока, каждый из которых будет выводить числа с определенным (для каждого протопотока своим) интервалом времени. Обработчик же будет выставлять для каждого протопотока свой цвет при помощи кодов ANSI-терминала. Коды для изменения цвета определены в модуле colorama.

Код каждого протопотока будет такого вида:

async def thread_a():
    for i in range(1,10):
        await asyncio.sleep(1)
        print(f'a: {i}')

А запустим мы их так:

async def main():
    colorama_init()
    await asyncio.gather(
        AwWrap(thread_a(), on_enter = lambda : print(Fore.GREEN, end='')),
        AwWrap(thread_b(), on_enter = lambda : print(Fore.RED, end='')),
        AwWrap(thread_c(), on_enter = lambda : print(Fore.BLUE, end='')))

Полный код примера:

import asyncio
from colorama import init as colorama_init
from colorama import Fore
from colorama import Style

def do_nothing():
    pass

class IterWrap:
    def __init__(self, inner, on_enter = do_nothing, on_exit = do_nothing):
        self._inner = inner.__iter__()
        self._on_enter = on_enter
        self._on_exit = on_exit

    def __iter__(self):
        return self
    
    def __next__(self):
        self._on_enter()
        try:
            item = self._inner.__next__()
        finally:
            self._on_exit()
        return item

class AwWrap:
    def __init__(self, inner, **kwargs):
        self._itwr = IterWrap(inner.__await__(), **kwargs)

    def __await__(self):
        return self._itwr

async def thread_a():
    for i in range(1,10):
        await asyncio.sleep(1)
        print(f'a: {i}')

async def thread_b():
    for i in range(500,515):
        await asyncio.sleep(0.7)
        print(f'b: {i}')

async def thread_c():
    for i in range(200,207):
        await asyncio.sleep(1.4)
        print(f'c: {i}')

async def main():
    colorama_init()
    await asyncio.gather(
        AwWrap(thread_a(), on_enter = lambda : print(Fore.GREEN, end='')),
        AwWrap(thread_b(), on_enter = lambda : print(Fore.RED, end='')),
        AwWrap(thread_c(), on_enter = lambda : print(Fore.BLUE, end='')))

if __name__ == '__main__':
    asyncio.run(main())

Вывод:

b: 500 a: 1 c: 200 b: 501 a: 2 b: 502 c: 201 b: 503 a: 3 b: 504 a: 4 c: 202 b: 505 b: 506 a: 5 c: 203 b: 507 a: 6 b: 508 c: 204 b: 509 a: 7 b: 510 a: 8 c: 205 b: 511 a: 9 b: 512 c: 206 b: 513 b: 514

Заметим, что я тут из корутины вызываю синхронный print(), который вполне может блокировать поток, если перенаправить stdout на какое-то медленное устройство. Мы притворимся, что это API библиотеки X, которая о корутинах не знает и всё делает синхронно.

Заметим, что IterWrap не накладывает никаких ограничений на объекты-члены последовательности. Он просто возвращает item как есть. Поэтому наш AwWrap должен работать не только с asyncio, но и с другими планировщиками корутин.

Декоратор

Мы уже дважды реализовали паттерн Декоратор: в классе IterWrap и в классе AwWrap. Давайте же теперь сделаем декоратор, который можно будет указать через собаку перед объявлением корутины.

def wrap_aw(**itwr_args):
    def create_wrapper(async_func):
        def wrapper(*args, **kwargs):
            return AwWrap(async_func(*args, **kwargs), **itwr_args)
        return wrapper
    return create_wrapper

Теперь мы можем писать так:

@wrap_aw(on_enter = lambda : print(Fore.GREEN, end=''), on_exit = lambda : print(Fore.WHITE, end=''))
async def thread_a_internal():
    for i in range(1,10):
        await asyncio.sleep(1)
        print(f'a: {i}')
    return 10

Полный код:

import asyncio
from colorama import init as colorama_init
from colorama import Fore
from colorama import Style

def do_nothing():
    pass

class IterWrap:
    def __init__(self, inner, on_enter = do_nothing, on_exit = do_nothing):
        self._inner = inner.__iter__()
        self._on_enter = on_enter
        self._on_exit = on_exit

    def __iter__(self):
        return self
    
    def __next__(self):
        self._on_enter()
        try:
            item = self._inner.__next__()
        finally:
            self._on_exit()
        return item

class AwWrap:
    def __init__(self, inner, **kwargs):
        self._itwr = IterWrap(inner.__await__(), **kwargs)

    def __await__(self):
        return self._itwr

def wrap_aw(**itwr_args):
    def create_wrapper(async_func):
        def wrapper(*args, **kwargs):
            return AwWrap(async_func(*args, **kwargs), **itwr_args)
        return wrapper
    return create_wrapper

@wrap_aw(on_enter = lambda : print(Fore.GREEN, end=''), on_exit = lambda : print(Fore.WHITE, end=''))
async def thread_a_internal():
    for i in range(1,10):
        await asyncio.sleep(1)
        print(f'a: {i}')
    return 10

async def thread_a():
    rv = await thread_a_internal()
    print (f'RV={rv}')

@wrap_aw(on_enter = lambda : print(Fore.RED, end=''))
async def thread_b():
    for i in range(500,515):
        await asyncio.sleep(0.7)
        print(f'b: {i}')

@wrap_aw(on_enter = lambda : print(Fore.BLUE, end=''))
async def thread_c():
    for i in range(200,207):
        await asyncio.sleep(1.4)
        print(f'c: {i}')

async def main():
    colorama_init()
    await asyncio.gather(thread_a(), thread_b(), thread_c())

if __name__ == '__main__':
    asyncio.run(main())

Я переместил цикл из thread_a() в функцию thread_a_internal(), и именно к ней применяется декоратор. При этом декоратор устанавливает обработчик выхода для изменения цвета на белый. thread_a() выводит значение, возвращаемое функцией thread_a_internal(). Оно выводится белым цветом. Так мы проверяем работу обработчика on_exit и еще то, что передача возвращаемого значения не нарушена.

Вывод:

b: 500 a: 1 c: 200 b: 501 a: 2 b: 502 c: 201 b: 503 a: 3 b: 504 a: 4 c: 202 b: 505 b: 506 a: 5 c: 203 b: 507 a: 6 b: 508 c: 204 b: 509 a: 7 b: 510 a: 8 c: 205 b: 511 a: 9 RV=10 b: 512 c: 206 b: 513 b: 514

Сохранение и восстановление

В предыдущем примере обработчики устанавливают цвет, заданный в них явным образом. А наша цель – сохранять текущий цвет и восстанавливать сохраненный.

Создадим класс ColorState. Его метод save() будет сохранять цвет в переменную _saved_color, а метод load() будет делать сохраненный цвет текущим. У нас нет возможности читать текущий цвет из консоли, поэтому будем хранить его в статической переменной current_color. Менять текущий цвет будем статическим методом set_color(), который вызовет print() и сохранит цвет в current_color.

class ColorState:
    current_color = ''

    def __init__(self):
        self._saved_color = ColorState.current_color

    @staticmethod
    def set_color(color):
        print(color, end='')
        ColorState.current_color = color

    def save(self):
        self._saved_color = ColorState.current_color

    def load(self):
        print(self._saved_color, end='')
        ColorState.current_color = self._saved_color

Теперь поправим наш декоратор так, чтобы вместо обработчиков передавать туда класс, хранящий состояние.

def wrap_state(state_class):
    def create_wrapper(async_func):
        state_object = state_class()
        def wrapper(*args, **kwargs):
            return AwWrap(async_func(*args, **kwargs), on_enter = state_object.load, on_exit = state_object.save)
        return wrapper
    return create_wrapper

Каждому протопотоку нужен свой объект этого класса, поэтому объект создается в декораторе. Его методы load() и save() используются как обработчики on_enter и on_exit соответственно.

Классы IterWrap и AwWrap остаются без изменений.

Мы можем использовать этот декоратор следующим образом.

@wrap_state(ColorState)
async def thread_a():
    rv = await thread_a_internal()
    ColorState.set_color(Fore.WHITE)
    print (f'RV={rv}')

async def thread_a_internal():
    ColorState.set_color(Fore.GREEN)
    for i in range(1,5):
        await asyncio.sleep(1)
        print(f'a: {i}')
    ColorState.set_color(Fore.BLUE)
    for i in range(6,10):
        await asyncio.sleep(1)
        print(f'a: {i}')
    return 10

Протопоток A выводит первую половину строк зеленым цветом, вторую синим, а возвращаемое значение функции белым.

Полный код примера:

class ColorState:
    current_color = ''

    def __init__(self):
        self._saved_color = ColorState.current_color

    @staticmethod
    def set_color(color):
        print(color, end='')
        ColorState.current_color = color

    def save(self):
        self._saved_color = ColorState.current_color

    def load(self):
        print(self._saved_color, end='')
        ColorState.current_color = self._saved_color

def do_nothing():
    pass

class IterWrap:
    def __init__(self, inner, on_enter = do_nothing, on_exit = do_nothing):
        self._inner = inner.__iter__()
        self._on_enter = on_enter
        self._on_exit = on_exit

    def __iter__(self):
        return self
    
    def __next__(self):
        self._on_enter()
        try:
            item = self._inner.__next__()
        finally:
            self._on_exit()
        return item

class AwWrap:
    def __init__(self, inner, **kwargs):
        self._itwr = IterWrap(inner.__await__(), **kwargs)

    def __await__(self):
        return self._itwr

def wrap_state(state_class):
    def create_wrapper(async_func):
        state_object = state_class()
        def wrapper(*args, **kwargs):
            return AwWrap(async_func(*args, **kwargs), on_enter = state_object.load, on_exit = state_object.save)
        return wrapper
    return create_wrapper

async def thread_a_internal():
    ColorState.set_color(Fore.GREEN)
    for i in range(0,5):
        await asyncio.sleep(1)
        print(f'a: {i}')
    ColorState.set_color(Fore.BLUE)
    for i in range(5,10):
        await asyncio.sleep(1)
        print(f'a: {i}')
    return 10

@wrap_state(ColorState)
async def thread_a():
    rv = await thread_a_internal()
    ColorState.set_color(Fore.WHITE)
    print (f'RV={rv}')

@wrap_state(ColorState)
async def thread_b():
    ColorState.set_color(Fore.RED)
    for i in range(500,515):
        await asyncio.sleep(0.7)
        print(f'b: {i}')

async def main():
    colorama_init()
    await asyncio.gather(thread_a(), thread_b())

if __name__ == '__main__':
    asyncio.run(main())

Вывод:

b: 500 a: 0 b: 501 a: 1 b: 502 b: 503 a: 2 b: 504 a: 3 b: 505 b: 506 a: 4 b: 507 a: 5 b: 508 b: 509 a: 6 b: 510 a: 7 b: 511 a: 8 b: 512 b: 513 a: 9 RV=10 b: 514

Выводы

Зная внутреннее устройство механизма корутин, можно обрабатывать события передачи управления между корутинами. Благодаря уникальной архитектуре этого механизма в питоне, такая обработка распространяется на всю глубину стека вызовов, т.е. мы обрабатываем события передачи управления между протопотоками. Применив этот подход, мы можем внутри корутины использовать глобальное состояние и не беспокоиться о том, как его используют корутины, работающие параллельно. Рассмотренный код не привязан к определенной версии питона или к модулю asyncio.

Свяжитесь с нами напрямую

Офисы

Москва

117587, Варшавское ш., д. 125, стр. 16А

Ростов-на-Дону

344002, пр. Буденновский, д. 9, офис 305

Нижний Новгород

603104, ул. Нартова, д. 6, корп. 6, офис 829