Multiprocessing em Python: Guia Pratico
Domine processamento paralelo em Python com multiprocessing. Aprenda Pool, Process, filas, memoria compartilhada e quando usar.
Python tem o famoso GIL (Global Interpreter Lock) que impede que multiplas threads executem bytecode Python simultaneamente. Para tarefas CPU-bound, a solucao e o multiprocessing, que cria processos separados, cada um com seu proprio interpretador e GIL. Neste guia, vamos explorar como usar multiprocessing para acelerar seus programas de forma significativa.
Entendendo o Problema: GIL e CPU-Bound
Antes de paralelizar, e importante entender quando o multiprocessing faz diferenca. Tarefas CPU-bound (calculos matematicos, processamento de dados, compressao) se beneficiam de multiplos processos. Tarefas I/O-bound (requisicoes HTTP, leitura de arquivos) se beneficiam mais de threading ou asyncio.
import time
def tarefa_pesada(n: int) -> int:
"""Simula processamento CPU-intensivo."""
total = 0
for i in range(n):
total += i ** 2
return total
# Execucao sequencial
inicio = time.time()
resultados = []
for _ in range(4):
resultados.append(tarefa_pesada(10_000_000))
tempo_seq = time.time() - inicio
print(f"Sequencial: {tempo_seq:.2f}s")
Process: A Base do Multiprocessing
A classe Process e a forma mais basica de criar processos:
from multiprocessing import Process, Queue
import os
def processar_dados(nome: str, dados: list, fila_resultado: Queue):
"""Funcao executada em processo separado."""
pid = os.getpid()
print(f"[{nome}] Processo {pid} iniciado com {len(dados)} itens")
resultado = sum(x ** 2 for x in dados)
print(f"[{nome}] Processo {pid} finalizado. Resultado: {resultado}")
fila_resultado.put((nome, resultado))
def executar_com_processos():
"""Distribui trabalho entre processos."""
dados_completos = list(range(1_000_000))
n_processos = 4
tamanho_fatia = len(dados_completos) // n_processos
fila = Queue()
processos = []
for i in range(n_processos):
inicio = i * tamanho_fatia
fim = inicio + tamanho_fatia if i < n_processos - 1 else len(dados_completos)
fatia = dados_completos[inicio:fim]
p = Process(
target=processar_dados,
args=(f"Worker-{i}", fatia, fila),
)
processos.append(p)
p.start()
# Coletar resultados
resultados = {}
for _ in processos:
nome, valor = fila.get()
resultados[nome] = valor
# Aguardar finalizacao
for p in processos:
p.join()
total = sum(resultados.values())
print(f"\nTotal combinado: {total}")
print(f"Resultados por worker: {resultados}")
executar_com_processos()
Pool: Paralelismo Simplificado
O Pool e a forma mais pratica de paralelizar. Ele gerencia um grupo de processos workers:
from multiprocessing import Pool, cpu_count
import time
def calcular_fatorial_soma(n: int) -> int:
"""Calculo CPU-intensivo para demonstracao."""
total = 0
for i in range(1, n + 1):
fat = 1
for j in range(1, min(i, 20) + 1):
fat *= j
total += fat
return total
def comparar_performance():
"""Compara execucao sequencial vs paralela."""
numeros = list(range(100, 5000))
n_cores = cpu_count()
print(f"Cores disponiveis: {n_cores}")
# Sequencial
inicio = time.time()
resultados_seq = [calcular_fatorial_soma(n) for n in numeros]
tempo_seq = time.time() - inicio
print(f"Sequencial: {tempo_seq:.2f}s")
# Paralelo com Pool
inicio = time.time()
with Pool(processes=n_cores) as pool:
resultados_par = pool.map(calcular_fatorial_soma, numeros)
tempo_par = time.time() - inicio
print(f"Paralelo ({n_cores} cores): {tempo_par:.2f}s")
speedup = tempo_seq / tempo_par
print(f"Speedup: {speedup:.2f}x")
# Verificar que resultados sao iguais
assert resultados_seq == resultados_par
comparar_performance()
Map, Starmap e Apply
O Pool oferece diferentes metodos para distribuir trabalho:
from multiprocessing import Pool
def processar_arquivo(caminho: str) -> dict:
"""Processa um arquivo e retorna estatisticas."""
with open(caminho) as f:
conteudo = f.read()
return {
"arquivo": caminho,
"linhas": conteudo.count("\n") + 1,
"caracteres": len(conteudo),
"palavras": len(conteudo.split()),
}
def multiplicar(a: int, b: int) -> int:
return a * b
def demonstrar_metodos():
with Pool(4) as pool:
# map: aplica funcao a cada item
arquivos = ["dados1.txt", "dados2.txt", "dados3.txt"]
stats = pool.map(processar_arquivo, arquivos)
# starmap: para funcoes com multiplos argumentos
pares = [(2, 3), (4, 5), (6, 7), (8, 9)]
produtos = pool.starmap(multiplicar, pares)
print(f"Produtos: {produtos}") # [6, 20, 42, 72]
# map_async: versao assincrona (nao bloqueante)
resultado_async = pool.map_async(calcular_fatorial_soma, range(100, 200))
print("Processando em background...")
valores = resultado_async.get(timeout=30) # Espera ate 30s
print(f"Obtidos {len(valores)} resultados")
# imap: iterador lazy (bom para muitos dados)
for resultado in pool.imap(calcular_fatorial_soma, range(1000)):
pass # Processa um por um conforme ficam prontos
# imap_unordered: como imap, mas sem garantia de ordem
for resultado in pool.imap_unordered(calcular_fatorial_soma, range(1000)):
pass # Recebe na ordem que terminar primeiro
Memoria Compartilhada
Processos nao compartilham memoria por padrao. Use Value e Array para dados compartilhados:
from multiprocessing import Process, Value, Array, Lock
def incrementar_contador(contador, lock, n_incrementos: int):
"""Incrementa contador compartilhado de forma segura."""
for _ in range(n_incrementos):
with lock:
contador.value += 1
def preencher_array(array_compartilhado, inicio: int, fim: int, valor: float):
"""Preenche uma faixa do array compartilhado."""
for i in range(inicio, fim):
array_compartilhado[i] = valor * (i + 1)
def demonstrar_memoria_compartilhada():
# Valor compartilhado (int)
contador = Value("i", 0) # 'i' = inteiro
lock = Lock()
processos = []
for _ in range(4):
p = Process(target=incrementar_contador, args=(contador, lock, 10000))
processos.append(p)
p.start()
for p in processos:
p.join()
print(f"Contador final: {contador.value}") # 40000
# Array compartilhado (double)
tamanho = 100
array = Array("d", tamanho) # 'd' = double
processos = []
fatia = tamanho // 4
for i in range(4):
inicio = i * fatia
fim = inicio + fatia
p = Process(target=preencher_array, args=(array, inicio, fim, 1.5))
processos.append(p)
p.start()
for p in processos:
p.join()
print(f"Primeiros 10 valores: {list(array[:10])}")
demonstrar_memoria_compartilhada()
ProcessPoolExecutor: A Interface Moderna
O concurrent.futures oferece uma interface mais limpa e compativel com asyncio:
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def analisar_dataset(dataset_id: int) -> dict:
"""Simula analise pesada de um dataset."""
time.sleep(0.5) # Simula processamento
registros = dataset_id * 1000
return {
"dataset": dataset_id,
"registros": registros,
"anomalias": registros // 50,
"score": round(95 - dataset_id * 0.5, 1),
}
def pipeline_analise():
"""Pipeline de analise com ProcessPoolExecutor."""
datasets = list(range(1, 21))
print(f"Analisando {len(datasets)} datasets...")
inicio = time.time()
resultados = []
with ProcessPoolExecutor(max_workers=4) as executor:
# Submeter todas as tarefas
futuros = {
executor.submit(analisar_dataset, ds): ds
for ds in datasets
}
# Coletar resultados conforme terminam
for futuro in as_completed(futuros):
dataset_id = futuros[futuro]
try:
resultado = futuro.result()
resultados.append(resultado)
print(f" Dataset {dataset_id} concluido: score {resultado['score']}")
except Exception as e:
print(f" Dataset {dataset_id} falhou: {e}")
tempo = time.time() - inicio
print(f"\nConcluido em {tempo:.2f}s")
print(f"Media de score: {sum(r['score'] for r in resultados) / len(resultados):.1f}")
pipeline_analise()
Quando Usar Cada Abordagem
A escolha entre multiprocessing, threading e asyncio depende do tipo de tarefa. Para calculos matematicos e processamento de dados pesado, use multiprocessing. Para requisicoes de rede e leitura de arquivos, use asyncio ou threading. Para tarefas mistas, combine ambas abordagens.
Cuidado com o overhead: criar processos e mais caro que criar threads. Para tarefas muito rapidas, o custo de criar processos pode anular o ganho de paralelismo. Use multiprocessing apenas quando o processamento individual for significativo.
Boas Praticas
Sempre proteja o ponto de entrada com if __name__ == "__main__" ao usar multiprocessing, especialmente no Windows. Use context managers (with Pool() as pool) para garantir que processos sejam encerrados corretamente. Evite compartilhar grandes volumes de dados entre processos; prefira dividir os dados antes de enviar. E monitore o uso de memoria, pois cada processo carrega uma copia do interpretador Python.
Conclusao
Multiprocessing e a resposta do Python para processamento verdadeiramente paralelo. Com Pool, ProcessPoolExecutor e memoria compartilhada, voce pode aproveitar todos os cores do processador para acelerar tarefas CPU-intensivas. Entenda as limitacoes, escolha a ferramenta certa para cada caso e seus programas Python alcancarao performance de outro nivel.
Equipe Python Brasil
Contribuidor do Python Brasil — Aprenda Python em Português