Кажется скоро будет круглых 10 лет как я перешел на Python в качестве основного языка программирования. Переход на Linux в качестве основной ОС случился позже, но общее с переходом на Python в том, что я так же ни разу не обернулся. Тема статической vs динамической типизации – холиварная и кажется текущий виток спиральной динамики в пользу статики. Пардоньте некоторый каламбур в предыдущем предложении =) Лично для меня доводы адептов статики слабоваты и переход C# => Python был одним из лучших решений.
Как мы знаем любой выбор это компромисс, а значит не бывает добра без худа. Я давно знал в теории, что Питончик плохо параллелится под CPU-нагрузку (всему виной Global Interpreter Lock aka GIL), на на практие пободался с этим относительно недавно. О этом собственно пост.
GIL problems? Just use multiprocessing!
Итак, задача преобразования массива в Питончике выглядит просто как 2х2:
processed_records = map(processor_func, raw_records)
Ежели processor_func это CPU-intensive функция, то хочется раскидать выполнение по всем доступным ядрам. Для организации мнопоточного выполнения в батарейках Питончика водится модуль threading, но для CPU-intensive нагрузок от него будет мало толку ибо GIL. И тут на помощь к нам вроде как стремится другой модуль из батареек – multiprocessing !
Основная идея понятная и простая: раз GIL мешает нормально распараллелить выполнение, то мы наплодим процессов. Банкет за счет более сложной координации и большего footprint по памяти. Насчет координации конечно оговорочка. Кто занимался низкоуровневым многопоточным кодом знает, что координация потоков внутри одного процесса может быть очень нетривиальной =) Что-то не работает и надо дебажить? GLHF!
Магия multiprocessing в том, что код становится не сильно хуже:
from multiprocessing import Pool
with Pool() as p:
processed_records = p.map(processor_func, raw_records)
Вроде бы PROFIT, так ведь? По моей практике пиджак черный не очень! В частности у меня периодически основной процесс начинал кипятить процессор из-за сбоев IPC. На это указал поверхностный анализ с помощью gdb py-bt.
Не все контексты одинаково полезны
Способы, которыми multiprocessing создает worker-процессы (которые выполняют processor_func) бывают разные. На Убунте где я с этим возился по умолчанию используется fork за скорость и простоту (общие дескрипторы и т.п.). К сожалению простота так же чревата проблемами вроде унаследованных блокировок. С использованием spawn код становится попечальнее:
import logging
from logging.handlers import QueueHandler, QueueListener
from multiprocessing import get_context
logger = logging.getLogger(__name__)
def pool_worker_setup(log_queue, log_level):
log_queue_handler = QueueHandler(log_queue)
logging.basicConfig(level=log_level, handlers=[log_queue_handler])
mp_ctx = get_context("spawn")
log_queue = mp_ctx.Queue()
log_queue_listener = QueueListener(log_queue, logging.getLogger().handlers[0])
log_queue_listener.start()
log_level = logger.getEffectiveLevel()
with mp_ctx.Pool(initializer=pool_worker_setup, initargs=(log_queue, log_level)) as p:
processed_records = p.map(processor_func, raw_records)
log_queue_listener.stop()
Зачем так много буков?
- В основном все свистопляски просто для получения логов от worker-процессов. В случае fork worker просто использует те же file descriptor-ы и все валится в ту же консоль/файл и т.п.
- Функция pool_worker_setup выполняется перед запуском каждого из worker-процессов. worker начинает складывать свои логи в общую очередь.
- Функция get_context позволяет создать нужный контекст multiprocessing. Без явного использования контекста используется контекст по-умолчанию. На Убунту этот контекст инициализируется на использование fork. Чтобы не заниматься monkey patching-м глобальных переменных лучше явно создать и использовать нужный контекст.
- Через log_queue_listener сообщения от worker-процессов направляются обработчику логов основного процесса.
Увы в моем случае и spawn оказался не самым стабильным (но явно получше чем fork). Вместо хардкорного дебага multiprocessing я подумал про то, что мне может помочь Docker Compose.
Параллелим с помощью Docker Compose
В этом случае код на Питончике возвращается в свою исходную форму (что само по себе большой плюс). docker-compose.yml выглядит примерно так:
version: '3'
services:
process-1:
image: local-images/my-img
build:
context: .
command: ["whatever commands needed", "start", "length of block"]
# the rest of settings - volumes etc
process-2:
image: local-images/my-img
command: ["whatever commands needed", "start + 1 * length of block", "length of block"]
# more of process defs up to number of cores = N
Что тут происходит?
- Сервис process-1 объявляет локальный тег Docker-образа (local-images/my-img) и указывает, что мы его собираем из исходников.
- Команда process-1 содержит в параметрах индекс в массиве для начала обработки и длину блока / количество элементов для обработки.
- Сервисы process-2+ используют локальный тег, собранный а рамках process-1.
- Сервисы process-2+ содержат в параметрах соответствующие индексы и длины для распределения элементов по процессам.
- Количество сервисов равно количеству ядер процессора.
Это не самый красивый способ. Тем не менее этот способ показал свою непревзойденную стабильность по сравнению с использованием multiprocessing.
Плюс подобное статическое распределение нагрузки предполагает примерно равные вычислительные затраты на каждый элемент. В противном случае ожидание самого запаздывающего процесса может затянуться. При этом в целом возможно создание динамического распределения нагрузки за счет усложнения кода.