Как нагрузить кодом на Python все ядра и не разбить монитор

Кажется скоро будет круглых 10 лет как я перешел на Python в качестве основного языка программирования. Переход на Linux в качестве основной ОС случился позже, но общее с переходом на Python в том, что я так же ни разу не обернулся. Тема статической vs динамической типизации – холиварная и кажется текущий виток спиральной динамики в пользу статики. Пардоньте некоторый каламбур в предыдущем предложении =) Лично для меня доводы адептов статики слабоваты и переход C# => Python был одним из лучших решений.

Python logo

Как мы знаем любой выбор это компромисс, а значит не бывает добра без худа. Я давно знал в теории, что Питончик плохо параллелится под CPU-нагрузку (всему виной Global Interpreter Lock aka GIL), на на практие пободался с этим относительно недавно. О этом собственно пост.

GIL problems? Just use multiprocessing!

Итак, задача преобразования массива в Питончике выглядит просто как 2х2:

processed_records = map(processor_func, raw_records)

Ежели processor_func это CPU-intensive функция, то хочется раскидать выполнение по всем доступным ядрам. Для организации мнопоточного выполнения в батарейках Питончика водится модуль threading, но для CPU-intensive нагрузок от него будет мало толку ибо GIL. И тут на помощь к нам вроде как стремится другой модуль из батареек – multiprocessing !

Основная идея понятная и простая: раз GIL мешает нормально распараллелить выполнение, то мы наплодим процессов. Банкет за счет более сложной координации и большего footprint по памяти. Насчет координации конечно оговорочка. Кто занимался низкоуровневым многопоточным кодом знает, что координация потоков внутри одного процесса может быть очень нетривиальной =) Что-то не работает и надо дебажить? GLHF!

Магия multiprocessing в том, что код становится не сильно хуже:

from multiprocessing import Pool

with Pool() as p:
    processed_records = p.map(processor_func, raw_records)

Вроде бы PROFIT, так ведь? По моей практике пиджак черный не очень! В частности у меня периодически основной процесс начинал кипятить процессор из-за сбоев IPC. На это указал поверхностный анализ с помощью gdb py-bt.

Не все контексты одинаково полезны

Способы, которыми multiprocessing создает worker-процессы (которые выполняют processor_func) бывают разные. На Убунте где я с этим возился по умолчанию используется fork за скорость и простоту (общие дескрипторы и т.п.). К сожалению простота так же чревата проблемами вроде унаследованных блокировок. С использованием spawn код становится попечальнее:

import logging

from logging.handlers import QueueHandler, QueueListener
from multiprocessing import get_context

logger = logging.getLogger(__name__)

def pool_worker_setup(log_queue, log_level):
    log_queue_handler = QueueHandler(log_queue)
    logging.basicConfig(level=log_level, handlers=[log_queue_handler])

mp_ctx = get_context("spawn")
log_queue = mp_ctx.Queue()
log_queue_listener = QueueListener(log_queue, logging.getLogger().handlers[0])
log_queue_listener.start()
log_level = logger.getEffectiveLevel()

with mp_ctx.Pool(initializer=pool_worker_setup, initargs=(log_queue, log_level)) as p:
    processed_records = p.map(processor_func, raw_records)

log_queue_listener.stop()

Зачем так много буков?

  • В основном все свистопляски просто для получения логов от worker-процессов. В случае fork worker просто использует те же file descriptor-ы и все валится в ту же консоль/файл и т.п.
  • Функция pool_worker_setup выполняется перед запуском каждого из worker-процессов. worker начинает складывать свои логи в общую очередь.
  • Функция get_context позволяет создать нужный контекст multiprocessing. Без явного использования контекста используется контекст по-умолчанию. На Убунту этот контекст инициализируется на использование fork. Чтобы не заниматься monkey patching-м глобальных переменных лучше явно создать и использовать нужный контекст.
  • Через log_queue_listener сообщения от worker-процессов направляются обработчику логов основного процесса.

Увы в моем случае и spawn оказался не самым стабильным (но явно получше чем fork). Вместо хардкорного дебага multiprocessing я подумал про то, что мне может помочь Docker Compose.

Параллелим с помощью Docker Compose

В этом случае код на Питончике возвращается в свою исходную форму (что само по себе большой плюс). docker-compose.yml выглядит примерно так:

version: '3'

services:
  process-1:
    image: local-images/my-img

    build:
      context: .

    command: ["whatever commands needed", "start", "length of block"]
 
    # the rest of settings - volumes etc

  process-2:
    image: local-images/my-img

    command: ["whatever commands needed", "start + 1 * length of block", "length of block"]

  # more of process defs up to number of cores = N

Что тут происходит?

  • Сервис process-1 объявляет локальный тег Docker-образа (local-images/my-img) и указывает, что мы его собираем из исходников.
  • Команда process-1 содержит в параметрах индекс в массиве для начала обработки и длину блока / количество элементов для обработки.
  • Сервисы process-2+ используют локальный тег, собранный а рамках process-1.
  • Сервисы process-2+ содержат в параметрах соответствующие индексы и длины для распределения элементов по процессам.
  • Количество сервисов равно количеству ядер процессора.

Это не самый красивый способ. Тем не менее этот способ показал свою непревзойденную стабильность по сравнению с использованием multiprocessing.

Плюс подобное статическое распределение нагрузки предполагает примерно равные вычислительные затраты на каждый элемент. В противном случае ожидание самого запаздывающего процесса может затянуться. При этом в целом возможно создание динамического распределения нагрузки за счет усложнения кода.