Когда вы запускаете программу на своем компьютере, она запускается в своем собственном «пузыре», который полностью отделен от других программ, которые активны в то же время. Этот «пузырь» называется процессом и включает в себя все, что необходимо для управления этим вызовом программы.
Эта так называемая среда процесса включает в себя страницы памяти, которые использует процесс, обрабатываемый этим процессом файл, права доступа пользователей и групп, а также весь вызов командной строки, включая заданные параметры.
Эта информация хранится в файловой системе процесса вашей системы UNIX/Linux, которая является виртуальной файловой системой и доступна через каталог proc. Записи сортируются по идентификатору процесса, который уникален для каждого процесса. Пример 1 показывает это для произвольно выбранного процесса с идентификатором # 177.
Пример 1: информация, доступная процессу
[email protected]:/proc/177# ls attr cpuset limits net projid_map statm autogroup cwd loginuid ns root status auxv environ map_files numa_maps sched syscall cgroup exe maps oom_adj sessionid task clear_refs fd mem oom_score setgroups timers cmdline fdinfo mountinfo oom_score_adj smaps uid_map comm gid_map mounts pagemap stack wchan coredump_filter io mountstats personality stat
Структурирование программного кода и данных
Чем сложнее становится программа в Python, тем чаще бывает удобно разбивать ее на более мелкие части. Это относится не только к исходному коду, но и к коду, который выполняется на вашем компьютере. Одним из решений этого является использование подпроцессов в сочетании с параллельным выполнением. За этим стоят следующие мысли:
- Один процесс охватывает фрагмент кода, который можно запускать отдельно.
- Некоторые участки кода могут выполняться одновременно, что в принципе допускает распараллеливание.
- Использование возможностей современных процессоров и операционных систем, например каждого доступного ядра процессора, для сокращения общего времени выполнения программы.
- Чтобы снизить сложность вашей программы или кода и передать часть работы специализированным агентам, выступающим в качестве подпроцессов.
Использование подпроцессов требует от вас переосмысления способа выполнения вашей программы, от линейного к параллельному. Это похоже на изменение вашей рабочей точки зрения в компании с обычного работника на менеджера – вам нужно будет следить за тем, кто что делает, сколько времени занимает один шаг и каковы зависимости между промежуточными результатами.
Это поможет вам разделить код на более мелкие части, которые могут быть выполнены агентом, специализирующимся только на этой задаче. Если это еще не сделано, подумайте о том, как структурирован ваш набор данных, чтобы его могли эффективно обрабатывать отдельные агенты. Это приводит к следующим вопросам:
- Почему вы хотите распараллеливать код? Имеет ли смысл думать об этом в вашем конкретном случае и с точки зрения усилий?
- Ваша программа предназначена для запуска только один раз или она будет запускаться регулярно с аналогичным набором данных?
- Можете ли вы разделить свой алгоритм на несколько этапов выполнения?
- Допускают ли ваши данные вообще распараллеливание? Если еще нет, то каким образом нужно адаптировать организацию ваших данных?
- Какие промежуточные результаты ваших вычислений зависят друг от друга?
- Какое изменение оборудования необходимо для этого?
- Есть ли узкое место в аппаратном обеспечении или алгоритме, и как можно избежать или минимизировать влияние этих факторов?
- Какие еще побочные эффекты распараллеливания могут возникнуть?
Возможный вариант использования – это основной процесс и демон, работающий в фоновом режиме (ведущий или ведомый), ожидающий активации. Кроме того, это может быть главный процесс, запускающий рабочие процессы, выполняемые по запросу. На практике основной процесс – это процесс подачи, который управляет двумя или более агентами, которым подаются части данных, и которые выполняют вычисления для данной части.
Имейте в виду, что распараллеливание требует больших затрат и времени из-за накладных расходов на подпроцессы, которые необходимы вашей операционной системе. По сравнению с выполнением двух или более задач линейным способом, выполняя это параллельно, вы можете сэкономить от 25 до 30 процентов времени на подпроцесс, в зависимости от вашего варианта использования.
Например, две задачи, каждая из которых занимает 5 секунд, требуют в сумме 10 секунд при последовательном выполнении и могут потребовать в среднем около 8 секунд на многоядерной машине при распараллеливании. 3 из этих 8 секунд могут быть потеряны из-за накладных расходов, что ограничит ваши улучшения скорости.
Запуск функции параллельно с Python
Python предлагает четыре возможных способа справиться с этим. Во-первых, вы можете выполнять функции параллельно, используя модуль многопроцессорности. Во-вторых, альтернативой процессам являются потоки. Технически это легкие процессы, выходящие за рамки данной статьи.
Для дальнейшего чтения вы можете взглянуть на модуль потоковой передачи Python. В-третьих, вы можете вызывать внешние программы с помощью метода system() модуля os или методов, предоставляемых модулем подпроцесса, и впоследствии собирать результаты.
Модуль многопроцессорности предлагает хороший набор методов для параллельного выполнения подпрограмм. Сюда входят процессы, пулы агентов, очереди и каналы.
Пример 1 работает с пулом из пяти агентов, которые обрабатывают блок из трех значений одновременно. Значения количества агентов и размера фрагмента выбраны произвольно в демонстрационных целях. Отрегулируйте эти значения в соответствии с количеством ядер в вашем процессоре.
Метод Pool.map() требует трех параметров – функции, вызываемой для каждого элемента набора данных, самого набора данных и размера фрагмента. В примере 1 мы используем функцию с именем square, которая вычисляет квадрат заданного целочисленного значения. Кроме того, можно не указывать размер фрагмента. Если не задан явно, размер блока по умолчанию равен 1.
Обратите внимание, что порядок выполнения агентов не гарантируется, но набор результатов находится в правильном порядке. Он содержит квадратные значения в соответствии с порядком элементов исходного набора данных.
Пример 1
from multiprocessing import Pool
def square(x):
# calculate the square of the value of x
return x*x
if __name__ == '__main__':
# Define the dataset
dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
# Output the dataset
print ('Dataset: ' + str(dataset))
# Run this with a pool of 5 agents having a chunksize of 3 until finished
agents = 5
chunksize = 3
with Pool(processes=agents) as pool:
result = pool.map(square, dataset, chunksize)
# Output the result
print ('Result: ' + str(result))
Выполнение этого кода должно привести к следующему результату:
$ python3 pool_multiprocessing.py Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14] Result: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]
Примечание. Для этих примеров мы будем использовать Python 3.
Запуск нескольких функций с использованием очереди
Как структура данных, очередь очень распространена и существует несколькими способами. Он организован как «первым пришел – первым обслужен» (FIFO) или «последним пришел – первым обслужен» (LIFO), а также с приоритетами и без них (очередь с приоритетами). Структура данных реализована, как массив с фиксированным количеством записей или как список, содержащий переменное количество отдельных элементов.
В примерах 2.1–2.7 мы используем очередь FIFO. Он реализован в виде списка, который уже предоставлен соответствующим классом из модуля многопроцессорности. Кроме того, загружается модуль time, который используется для имитации рабочей нагрузки.
Листинг 2.1: используемые модули
import multiprocessing from time import sleep
Затем определяется рабочая функция (пример 2.2). Эта функция фактически представляет агента и требует трех аргументов. Имя процесса указывает, что это за процесс, а задачи и результаты относятся к соответствующей очереди.
Внутри рабочей функции находится бесконечный цикл while. И задачи, и результаты – это очереди, которые определены в основной программе. tasks.get() возвращает текущую задачу из очереди задач для обработки. Значение задачи меньше 0 выходит из цикла while и возвращает значение -1. Любое другое значение задачи выполнит вычисление (квадрат) и вернет это значение. Возврат значения в основную программу реализован, как results.put(). Это добавляет вычисленное значение в конец очереди результатов.
Листинг 2.2: рабочая функция
# define worker function
def calculate(process_name, tasks, results):
print('[%s] evaluation routine starts' % process_name)
while True:
new_value = tasks.get()
if new_value < 0:
print('[%s] evaluation routine quits' % process_name)
# Indicate finished
results.put(-1)
break
else:
# Compute result and mimic a long-running task
compute = new_value * new_value
sleep(0.02*new_value)
# Output which process received the value
# and the calculation result
print('[%s] received value: %i' % (process_name, new_value))
print('[%s] calculated value: %i' % (process_name, compute))
# Add result to the queue
results.put(compute)
return
Следующим шагом является основной цикл. Сначала определяется менеджер для межпроцессного взаимодействия (IPC). Затем добавляются две очереди – одна для задач, а другая для результатов.
Пример 2.3: IPC и очереди
if __name__ == "__main__":
# Define IPC manager
manager = multiprocessing.Manager()
# Define a list (queue) for tasks and computation results
tasks = manager.Queue()
results = manager.Queue()
После такой настройки мы определяем пул процессов с четырьмя рабочими процессами (агентами). Мы используем класс multiprocessing.Pool() и создаем его экземпляр. Затем мы определяем пустой список процессов.
Пример 2.4: определение пула процессов
# Create process pool with four processes num_processes = 4 pool = multiprocessing.Pool(processes=num_processes) processes = []
На следующем шаге мы запускаем четыре рабочих процесса (агента). Для простоты они называются от «P0» до «P3». Создание четырех рабочих процессов выполняется с помощью multiprocessing.Process(). Это связывает каждый из них с рабочей функцией, а также с задачей и очередью результатов. Наконец, мы добавляем вновь инициализированный процесс в конец списка процессов и запускаем новый процесс с помощью new_process.start().
Пример 2.5: подготовка рабочих процессов
# Initiate the worker processes
for i in range(num_processes):
# Set process name
process_name = 'P%i' % i
# Create the process, and connect it to the worker function
new_process = multiprocessing.Process(target=calculate, args=(process_name,tasks,results))
# Add new process to the list of processes
processes.append(new_process)
# Start the process
new_process.start()
Наши рабочие процессы ждут работы. Мы определяем список задач, которые в нашем случае являются произвольно выбранными целыми числами. Эти значения добавляются в список задач с помощью tasks.put(). Каждый рабочий процесс ожидает выполнения задач и выбирает следующую доступную задачу из списка задач. Этим занимается сама очередь.
Пример 2.6: подготовка очереди задач
# Fill task queue
task_list = [43, 1, 780, 256, 142, 68, 183, 334, 325, 3]
for single_task in task_list:
tasks.put(single_task)
# Wait while the workers process
sleep(5)
Через некоторое время мы хотим, чтобы наши агенты закончили. Каждый рабочий процесс реагирует на задачу со значением -1. Он интерпретирует это значение, как сигнал завершения и после этого завершается. Вот почему мы помещаем в очередь задач столько -1, сколько у нас запущенных процессов. Перед завершением процесс помещает -1 в очередь результатов. Это должно быть сигналом подтверждения для основного цикла, что агент завершает работу.
В основном цикле мы читаем из этой очереди и подсчитываем число -1. Основной цикл завершается, как только мы посчитаем столько подтверждений завершения, сколько у нас есть процессов. В противном случае выводим результат расчета из очереди.
Пример 2.7: завершение и вывод результатов
# Quit the worker processes by sending them -1
for i in range(num_processes):
tasks.put(-1)
# Read calculation results
num_finished_processes = 0
while True:
# Read result
new_result = results.get()
# Have a look at the results
if new_result == -1:
# Process has finished
num_finished_processes += 1
if num_finished_processes == num_processes:
break
else:
# Output result
print('Result:' + str(new_result))
В примере 2 показан вывод программы Python. Запуская программу несколько раз, вы можете заметить, что порядок, в котором запускаются рабочие процессы, столь же непредсказуем, как и сам процесс, выбирающий задачу из очереди. Однако после завершения порядок элементов очереди результатов совпадает с порядком элементов очереди задач.
Пример 3
$ python3 queue_multiprocessing.py [P0] evaluation routine starts [P1] evaluation routine starts [P2] evaluation routine starts [P3] evaluation routine starts [P1] received value: 1 [P1] calculated value: 1 [P0] received value: 43 [P0] calculated value: 1849 [P0] received value: 68 [P0] calculated value: 4624 [P1] received value: 142 [P1] calculated value: 20164 result: 1 result: 1849 result: 4624 result: 20164 [P3] received value: 256 [P3] calculated value: 65536 result: 65536 [P0] received value: 183 [P0] calculated value: 33489 result: 33489 [P0] received value: 3 [P0] calculated value: 9 result: 9 [P0] evaluation routine quits [P1] received value: 334 [P1] calculated value: 111556 result: 111556 [P1] evaluation routine quits [P3] received value: 325 [P3] calculated value: 105625 result: 105625 [P3] evaluation routine quits [P2] received value: 780 [P2] calculated value: 608400 result: 608400 [P2] evaluation routine quits
Примечание. Как упоминалось ранее, ваш вывод может не точно совпадать с показанным выше, поскольку порядок выполнения непредсказуем.
Использование метода os.system()
Метод system() является частью модуля os, который позволяет выполнять внешние программы командной строки в отдельном процессе от вашей программы Python. Метод system() – это блокирующий вызов, и вам нужно дождаться завершения вызова и возврата. Как вы знаете, что команду можно запускать в фоновом режиме и записывать вычисленный результат в выходной поток, который перенаправляется в такой файл.
Пример 4: команда с перенаправлением вывода
$ ./program >> outputfile
В программе Python вы просто инкапсулируете этот вызов, как показано ниже.
Пример 4: системный вызов с использованием модуля os
import os
os.system("./program >> outputfile ")
Этот системный вызов создает процесс, который выполняется параллельно вашей текущей программе Python. Получение результата может стать немного сложным, потому что этот вызов может завершиться после отключения программы Python – вы никогда не узнаете.
Использование этого метода намного дороже, чем предыдущие методы, которые я описал. Во-первых, накладные расходы намного больше (переключение процесса), а во-вторых, данные записываются в физическую память, например на диск, что занимает больше времени. Хотя это лучший вариант, у вас ограниченная память (например, с ОЗУ), и вместо этого вы можете записывать большие выходные данные на твердотельный диск.
Использование модуля subprocess
Этот модуль предназначен для замены вызовов os.system() и os.spawn(). Идея подпроцесса состоит в том, чтобы упростить процессы порождения, общаться с ними через каналы и сигналы и собирать вывод, который они производят, включая сообщения об ошибках.
Начиная с Python 3.5, подпроцесс содержит метод subprocess.run() для запуска внешней команды, которая является оболочкой для базового класса subprocess.Popen(). В качестве примера мы запускаем команду U или NIX Linux df -h, чтобы узнать, сколько дискового пространства все еще доступно в разделе home. В программе Python вы выполняете этот вызов, как показано ниже.
Пример 5: базовый пример запуска внешней команды
import subprocess ret = subprocess.run(["df", "-h", "/home"]) print(ret)
Это основной вызов, очень похожий на команду df -h или home, выполняемую в терминале. Обратите внимание, что параметры разделены списком, а не одной строкой. Вывод будет аналогичен примеру 5. По сравнению с официальной документацией Python для этого модуля, он выводит результат вызова stdout в дополнение к возвращаемому значению вызова.
В примере 5 показан результат нашего вызова. Последняя строка вывода показывает успешное выполнение команды. Вызов subprocess.run() возвращает экземпляр класса CompletedProcess, который имеет два атрибута с именами args (аргументы командной строки) и код возврата (возвращаемое значение команды).
Пример 6: запуск скрипта
$ python3 diskfree.py Filesystem Size Used Avail Capacity iused ifree %iused Mounted on /dev/sda3 233Gi 203Gi 30Gi 88% 53160407 7818407 87% /home CompletedProcess(args=['df', '-h', '/home'], returncode=0)
Чтобы подавить вывод на стандартный вывод и перехватить вывод и возвращаемое значение для дальнейшей оценки, вызов subprocess.run() должен быть немного изменен. Без дальнейших изменений subprocess.run() отправляет вывод выполненной команды на stdout, который является каналом вывода базового процесса в Python. Чтобы получить вывод, мы должны изменить это и установить канал вывода на предварительно определенное значение subprocess.PIPE.
Пример 7: захват вывода в конвейер
import subprocess
# Call the command
output = subprocess.run(["df", "-h", "/home"], stdout=subprocess.PIPE)
# Read the return code and the output data
print ("Return code: %i" % output.returncode)
print ("Output data: %s" % output.stdout)
Как объяснялось ранее, subprocess.run() возвращает экземпляр класса CompletedProcess. В примере этот экземпляр представляет собой переменную с простым именем output. Код возврата команды сохраняется в атрибуте output.returncode, а вывод, выводимый на стандартный вывод, можно найти в атрибуте output.stdout. Имейте в виду, что это не касается обработки сообщений об ошибках, потому что мы не меняли для этого выходной канал.
Заключение
Параллельная обработка данных – прекрасная возможность использовать мощь современного оборудования. Python предоставляет вам доступ к этим методам на очень сложном уровне. Как вы уже видели ранее, модуль multiprocessing и subprocess позволяет легко погрузиться в эту тему.