Multiprocessing: O que É e Como Funciona | Python Brasil
Aprenda multiprocessing em Python: processos paralelos, Pool, comunicacao entre processos, shared memory e boas praticas para computacao paralela.
O que e Multiprocessing?
Multiprocessing e o modulo da biblioteca padrao do Python que permite executar codigo em multiplos processos simultaneamente, aproveitando todos os nucleos da CPU. Diferente de threading, onde o GIL (Global Interpreter Lock) impede a execucao paralela real de codigo Python, cada processo no multiprocessing tem seu proprio interpretador e sua propria memoria, eliminando completamente a limitacao do GIL.
O multiprocessing e a solucao ideal para tarefas CPU-bound — calculos matematicos pesados, processamento de imagens, compressao de dados e qualquer operacao que exige uso intensivo do processador.
Criando Processos
import multiprocessing
import os
import time
def calcular(nome: str, n: int):
"""Funcao que sera executada em outro processo."""
print(f'Processo {nome} (PID: {os.getpid()}) iniciado')
resultado = sum(i * i for i in range(n))
print(f'Processo {nome} concluido: {resultado}')
return resultado
if __name__ == '__main__':
# Criar e iniciar processos
p1 = multiprocessing.Process(target=calcular, args=('A', 10_000_000))
p2 = multiprocessing.Process(target=calcular, args=('B', 10_000_000))
p1.start()
p2.start()
# Aguardar conclusao
p1.join()
p2.join()
print('Todos os processos concluidos')
Pool de Processos
O Pool e a forma mais pratica de distribuir trabalho entre multiplos processos.
import multiprocessing
import time
def processar_item(item: int) -> dict:
"""Processa um unico item."""
resultado = item ** 2 + sum(range(item))
return {'item': item, 'resultado': resultado}
if __name__ == '__main__':
itens = list(range(100))
# Usando Pool
with multiprocessing.Pool(processes=4) as pool:
# map — aplica funcao a cada item (preserva ordem)
resultados = pool.map(processar_item, itens)
print(f'Processados: {len(resultados)} itens')
# map com chunks para melhor performance
resultados = pool.map(processar_item, itens, chunksize=25)
# imap — retorna iterador (menor uso de memoria)
for resultado in pool.imap(processar_item, itens):
pass # processar um por um
# imap_unordered — mais rapido, sem garantia de ordem
for resultado in pool.imap_unordered(processar_item, itens):
pass
# apply_async — submeter tarefas individualmente
future = pool.apply_async(processar_item, (42,))
resultado = future.get(timeout=10)
print(resultado)
# starmap — para funcoes com multiplos argumentos
pares = [(1, 2), (3, 4), (5, 6)]
# resultados = pool.starmap(funcao_com_dois_args, pares)
ProcessPoolExecutor
A interface concurrent.futures oferece uma API consistente entre threads e processos.
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def tarefa_pesada(n: int) -> int:
"""Calculo CPU-intensive."""
return sum(i * i for i in range(n))
if __name__ == '__main__':
tamanhos = [5_000_000, 8_000_000, 3_000_000, 10_000_000]
inicio = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(tarefa_pesada, n): n for n in tamanhos}
for future in as_completed(futures):
n = futures[future]
try:
resultado = future.result()
print(f'n={n}: resultado={resultado}')
except Exception as e:
print(f'Erro para n={n}: {e}')
print(f'Tempo total: {time.time() - inicio:.2f}s')
Comunicacao entre Processos
import multiprocessing
# Queue — fila thread/process-safe
def produtor(fila: multiprocessing.Queue):
for i in range(10):
fila.put(f'item_{i}')
fila.put(None) # sentinela
def consumidor(fila: multiprocessing.Queue):
while True:
item = fila.get()
if item is None:
break
print(f'Processado: {item}')
if __name__ == '__main__':
fila = multiprocessing.Queue()
p1 = multiprocessing.Process(target=produtor, args=(fila,))
p2 = multiprocessing.Process(target=consumidor, args=(fila,))
p1.start()
p2.start()
p1.join()
p2.join()
# Pipe — comunicacao bidirecional entre dois processos
def enviar(conn):
conn.send({'mensagem': 'ola', 'valor': 42})
conn.close()
def receber(conn):
dados = conn.recv()
print(f'Recebido: {dados}')
conn.close()
if __name__ == '__main__':
conn_pai, conn_filho = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=enviar, args=(conn_pai,))
p2 = multiprocessing.Process(target=receber, args=(conn_filho,))
p1.start()
p2.start()
p1.join()
p2.join()
Memoria Compartilhada
import multiprocessing
from multiprocessing import shared_memory
import numpy as np
# Value e Array — tipos simples compartilhados
def incrementar(valor_compartilhado, lock):
for _ in range(100000):
with lock:
valor_compartilhado.value += 1
if __name__ == '__main__':
contador = multiprocessing.Value('i', 0) # inteiro compartilhado
lock = multiprocessing.Lock()
processos = [
multiprocessing.Process(target=incrementar, args=(contador, lock))
for _ in range(4)
]
for p in processos:
p.start()
for p in processos:
p.join()
print(f'Contador: {contador.value}') # 400000
# Shared memory com numpy (Python 3.8+)
dados = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
shm = shared_memory.SharedMemory(create=True, size=dados.nbytes)
compartilhado = np.ndarray(dados.shape, dtype=dados.dtype, buffer=shm.buf)
compartilhado[:] = dados[:]
print(f'Nome da memoria: {shm.name}')
shm.close()
shm.unlink()
Comparacao com Threading
import time
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def calcular_pesado(n):
return sum(i * i for i in range(n))
if __name__ == '__main__':
n = 10_000_000
repeticoes = 4
# Sequencial
inicio = time.time()
for _ in range(repeticoes):
calcular_pesado(n)
print(f'Sequencial: {time.time() - inicio:.2f}s')
# Threads (limitado pelo GIL)
inicio = time.time()
with ThreadPoolExecutor(max_workers=4) as ex:
list(ex.map(calcular_pesado, [n] * repeticoes))
print(f'Threads: {time.time() - inicio:.2f}s')
# Processos (verdadeiro paralelismo)
inicio = time.time()
with ProcessPoolExecutor(max_workers=4) as ex:
list(ex.map(calcular_pesado, [n] * repeticoes))
print(f'Processos: {time.time() - inicio:.2f}s')
Erros Comuns
O erro mais frequente e esquecer a guarda if __name__ == '__main__':, que e obrigatoria no Windows e macOS para evitar recursao infinita ao criar processos. Outro erro e tentar compartilhar objetos mutaveis complexos entre processos sem mecanismos apropriados de sincronizacao. Passar objetos nao serializaveis (como conexoes de banco ou sockets) como argumentos para processos tambem gera erros, pois os dados sao serializados com pickle para transmissao entre processos.
Boas Praticas
Sempre use if __name__ == '__main__':. Prefira ProcessPoolExecutor ou Pool em vez de gerenciar processos manualmente. Minimize a comunicacao entre processos — o overhead de serializar dados e significativo. Use chunksize em Pool.map para reduzir overhead. Evite compartilhar estado mutavel entre processos. Para tarefas I/O-bound, use threading ou asyncio em vez de multiprocessing.
Quando Usar
Multiprocessing e ideal para tarefas CPU-bound que se beneficiam de paralelismo real: processamento de imagens, calculos numericos, compressao, criptografia e analise de dados. Para I/O-bound, threading ou asyncio sao mais adequados. Para tarefas distribuidas em multiplas maquinas, considere Celery ou Dask.