Асинхронный python

Процессы (process) и потоки (thread)

Процесс – экземпляр программы, которому выделено определенное адресное пространство в памяти. В общем случае, процессы не имеют прямого доступ к переменным и структурам данных смежных процессов. Ценность изолированности процессов в том, что один зависший процесс не повредит работе прочих процессов.

Потоки порождаются процессом и делят одно адресное пространство памяти, тем самым могут получить доступ к ресурсам смежных потоков. У каждого потока есть свой собственный стек исполнения. Они более легковесны в части создания и переключения контекста по сравнению с процессами.

GIL (Global Interpreter Lock)

Для синхронизации потоков и предотвращения одновременного доступа к одному участку памяти в python реализован механизм, который накладывает некоторое ограничение на выполнение программы. В любой момент времени возможно выполнение только одного потока. Это называется глобальная блокировка интерпретатора.

Таким образом работа многопоточной программы выглядит следующим образом:

  1. Запускается несколько параллельных потоков
  2. GIL «выбирает» один поток, а остальные приостанавливает
  3. Через некоторое время активный поток «замораживается», происходит выбор следующего потока и переключение контекста выполнения на него
  4. П.3 повторяется до тех пор, пока родительский процесс не завершится

Один из путей решения проблемы GIL – это использование процессов, т.к. на каждый процесс создается отдельный интерпретатор. Однако важно помнить, что переключение контекста между процессами более сложен и накладывает дополнительные расходы на управление.

Многопроцессорная обработка

Многопроцессорная обработка задач подразумевает запуск одного процесса на задачу и переключение контекста между запущенными процессами. Исходя из определения процесса, для каждой задачи выделяется своя область памяти, и обмен данными между задачами затруднен.

Данный механизм реализован в модуле multiprocessing. Ниже приведен небольшой пример работы с этим модулем:


      import os
      from multiprocessing import Process


      def square(i):
          j = i * i
          print('parent process:', os.getppid())
          print('process id:', os.getpid())
          print(j)
          return j


      if __name__ == '__main__':
          process_list = []

          for number in range(5):
              process = Process(target=square, args=(number,))
              process_list.append(process)
              process.start()

          for process in process_list:
              process.join()
    

      parent process: 13608
      process id: 6140
      4
      parent process: 13608
      process id: 11712
      parent process:0
      13608
      process id: 15140
      1
      parent process: 13608
      process id: 14000
      9
      parent process: 13608
      process id: 7044
      16
    

Объект Process представляет активность, которая запускается в отдельном потоке. Метод join([timeout]) блокирует выполнение до тех пор, пока запущенный процесс не завершит свое выполнение, либо не истечет таймаут.

В примере мы запускаем 5 процессов, в каждом из которых выполняется функция square для расчёта квадрата числа. Однако мы видим, что вывод программы выглядит «перемешанным». Это произошло из-за того, что несколько процессов одновременно выполняли вывод в консоль. Для синхронизации выполнения потоков необходимо использовать threading.Lock - класс, реализующий примитивные блокировки. Как только поток получил блокировку, последующие попытки ее получить блокируются, пока она не будет освобождена.


      import os
      from multiprocessing import Process, Lock


      def square(i, lock):
          j = i * i

          lock.acquire()
          try:
              print('parent process:', os.getppid())
              print('process id:', os.getpid())
              print(j)
          finally:
              lock.release()

          return j


      if __name__ == '__main__':
          lock = Lock()

          process_list = []

          for number in range(5):
              process = Process(target=square, args=(number, lock))
              process_list.append(process)
              process.start()

          for process in process_list:
              process.join()
    

      parent process: 17648
      process id: 10628
      1
      parent process: 17648
      process id: 12920
      0
      parent process: 17648
      process id: 17092
      4
      parent process: 17648
      process id: 13580
      9
      parent process: 17648
      process id: 6284
      16
    

Как видим, вывод в консоль стал последовательным.

Многопоточность

Многопоточность – эта альтернатива многопроцессорной обработке, когда задачи выполняются в рамках потоков одного процесса. Однако из-за ограничений GIL в python в реальности этот подход не столь эффективен.

Данный механизм реализован в модуле threading.

Async/await и цикл событий

Асинхронность является ещё одним механизмом управления задачами. Ниже представлены основные понятия, связанные с асинхронностью.

Цикл событий – это планировщик задач, который выбирает активную задачу для выполнения и переключает контекст на неё программно. По умолчанию, все сопрограммы выполняются в одном потоке по очереди. Описанный подход называется concurrency. Он отличается от многопоточной обработки (parallelism), где задачи выполняются в разных потоках и управление контекстом происходит на уровне операционной системы. Из-за ограничений, накладываемых GIL, ОС очень часто переключает контекст между потоками, что в итоге является не эффективным. Цикл событий также поддерживает вариант выполнения задач в разных потоках.

На изображении ниже представлено наглядное сравнение двух подходов выполнения программ. В случае с concurrency задача 1 стартует свое выполнение, предположим выполняет запрос к базе данных. Как только запрос отправлен, цикл событий на основе инструкции await понимает, что можно переключиться на выполнение задачи 2. Задача 2 выполняет полезную нагрузку, например делает запрос курсов валют к удаленному серверу. В этот момент, от базы данных пришел ответ и цикл может переключиться на выполнение задачи 1. И так пока все задачи не будут выполнены.

Как видим, в случае с parallelism каждая задача блокируется, ожидая сетевой ответ на свой запрос.

Сравнение concurrency & parallelism

Корутина (coroutine) – это асинхронная функция, которая определяется синтаксисом async/await и основана на генераторах. Генератор – это функция, которая имеет в своем теле ключевое слово yield и сохраняет свое состояние между вызовами. Именно корутины выполняются в цикле событий. Обратите внимание, что обычный вызов корутины без инструкции await не запланирует её выполнение.

Пример создания и выполнения асинхронной корутины:


      import asyncio


      async def async_main(name):
          print(f"Hello, {name}")


      if __name__ == '__main__':
          loop = asyncio.get_event_loop()

          loop.run_until_complete(async_main("world"))
    

      Hello, world
    

Задача (task) используется для отправки корутины в цикл событий. В общем случае задача встает в очередь и ждет своего выполнения.

Пример создания и выполнения асинхронной задачи:


      import asyncio


      async def async_task(name):
          print(f"Hello, {name}")


      async def async_main(name):
          asyncio.create_task(async_task(name))

      if __name__ == '__main__':
          loop = asyncio.get_event_loop()

          loop.run_until_complete(async_main("task"))
    

      Hello, task
    

Итак, в чем отличие корутины от задачи? В том что asyncio.create_task не блокирует выполнение функции, её вызвавшей. Инструкция await передает контекст выполнения обратно в цикл событий, который переключится на выполнение другой сопрограммы, но родительская функция, в теле которой выполнили await приостанавливается. Поясню на примере. Выполним сопрограмму async_main, а в ней выполнение async_sub оформим как задачу:


      import asyncio


      async def async_sub():
          print(f"Hello from async_sub")


      async def async_main():
          asyncio.create_task(async_sub())
          print(f"Hello from async_main")


      if __name__ == '__main__':
          loop = asyncio.get_event_loop()

          loop.run_until_complete(async_main())
    

      Hello from async_main
      Hello from async_sub
    

Теперь переделаем пример на вызов корутин с использованием await:


      import asyncio


      async def async_sub():
          print(f"Hello from async_sub")


      async def async_main():
          await async_sub()
          print(f"Hello from async_main")


      if __name__ == '__main__':
          loop = asyncio.get_event_loop()

          loop.run_until_complete(async_main())
    

      Hello from async_sub
      Hello from async_main
    

Наглядно видно, что инструкция await во втором случае приостанавливает выполнение async_main до завершения async_sub.

Теперь зная, что такое задачи, встает вопрос: «а как получить результат её выполнения?». Future – это ещё не рассчитанный результат асинхронной операции или задачи, поставленной в очередь. Объект этого типа имеет инструменты для получения статуса выполнения операции, установки результата, а также возможность подписки на определенные события, например, завершения задачи. Ну и, конечно, мы можем дождаться выполнения задачи, используя await.


      import asyncio


      async def async_sub(future):
          print(f"Hello from async_sub")
          future.set_result(True)


      async def async_main():
          running_loop = asyncio.get_running_loop()
          future = running_loop.create_future()

          task = asyncio.create_task(async_sub(future))
          task.add_done_callback(lambda result: print("Hello from async_main"))

          await asyncio.wait_for(future, timeout=None)


      if __name__ == '__main__':
          loop = asyncio.get_event_loop()

          loop.run_until_complete(async_main())
    

      Hello from async_sub
      Hello from async_main