Multiprocessing em Python: Guia Pratico

Domine processamento paralelo em Python com multiprocessing. Aprenda Pool, Process, filas, memoria compartilhada e quando usar.

6 min de leitura Equipe Python Brasil

Python tem o famoso GIL (Global Interpreter Lock) que impede que multiplas threads executem bytecode Python simultaneamente. Para tarefas CPU-bound, a solucao e o multiprocessing, que cria processos separados, cada um com seu proprio interpretador e GIL. Neste guia, vamos explorar como usar multiprocessing para acelerar seus programas de forma significativa.

Entendendo o Problema: GIL e CPU-Bound

Antes de paralelizar, e importante entender quando o multiprocessing faz diferenca. Tarefas CPU-bound (calculos matematicos, processamento de dados, compressao) se beneficiam de multiplos processos. Tarefas I/O-bound (requisicoes HTTP, leitura de arquivos) se beneficiam mais de threading ou asyncio.

import time

def tarefa_pesada(n: int) -> int:
    """Simula processamento CPU-intensivo."""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

# Execucao sequencial
inicio = time.time()
resultados = []
for _ in range(4):
    resultados.append(tarefa_pesada(10_000_000))
tempo_seq = time.time() - inicio
print(f"Sequencial: {tempo_seq:.2f}s")

Process: A Base do Multiprocessing

A classe Process e a forma mais basica de criar processos:

from multiprocessing import Process, Queue
import os

def processar_dados(nome: str, dados: list, fila_resultado: Queue):
    """Funcao executada em processo separado."""
    pid = os.getpid()
    print(f"[{nome}] Processo {pid} iniciado com {len(dados)} itens")

    resultado = sum(x ** 2 for x in dados)

    print(f"[{nome}] Processo {pid} finalizado. Resultado: {resultado}")
    fila_resultado.put((nome, resultado))

def executar_com_processos():
    """Distribui trabalho entre processos."""
    dados_completos = list(range(1_000_000))
    n_processos = 4
    tamanho_fatia = len(dados_completos) // n_processos

    fila = Queue()
    processos = []

    for i in range(n_processos):
        inicio = i * tamanho_fatia
        fim = inicio + tamanho_fatia if i < n_processos - 1 else len(dados_completos)
        fatia = dados_completos[inicio:fim]

        p = Process(
            target=processar_dados,
            args=(f"Worker-{i}", fatia, fila),
        )
        processos.append(p)
        p.start()

    # Coletar resultados
    resultados = {}
    for _ in processos:
        nome, valor = fila.get()
        resultados[nome] = valor

    # Aguardar finalizacao
    for p in processos:
        p.join()

    total = sum(resultados.values())
    print(f"\nTotal combinado: {total}")
    print(f"Resultados por worker: {resultados}")

executar_com_processos()

Pool: Paralelismo Simplificado

O Pool e a forma mais pratica de paralelizar. Ele gerencia um grupo de processos workers:

from multiprocessing import Pool, cpu_count
import time

def calcular_fatorial_soma(n: int) -> int:
    """Calculo CPU-intensivo para demonstracao."""
    total = 0
    for i in range(1, n + 1):
        fat = 1
        for j in range(1, min(i, 20) + 1):
            fat *= j
        total += fat
    return total

def comparar_performance():
    """Compara execucao sequencial vs paralela."""
    numeros = list(range(100, 5000))
    n_cores = cpu_count()
    print(f"Cores disponiveis: {n_cores}")

    # Sequencial
    inicio = time.time()
    resultados_seq = [calcular_fatorial_soma(n) for n in numeros]
    tempo_seq = time.time() - inicio
    print(f"Sequencial: {tempo_seq:.2f}s")

    # Paralelo com Pool
    inicio = time.time()
    with Pool(processes=n_cores) as pool:
        resultados_par = pool.map(calcular_fatorial_soma, numeros)
    tempo_par = time.time() - inicio
    print(f"Paralelo ({n_cores} cores): {tempo_par:.2f}s")

    speedup = tempo_seq / tempo_par
    print(f"Speedup: {speedup:.2f}x")

    # Verificar que resultados sao iguais
    assert resultados_seq == resultados_par

comparar_performance()

Map, Starmap e Apply

O Pool oferece diferentes metodos para distribuir trabalho:

from multiprocessing import Pool

def processar_arquivo(caminho: str) -> dict:
    """Processa um arquivo e retorna estatisticas."""
    with open(caminho) as f:
        conteudo = f.read()
    return {
        "arquivo": caminho,
        "linhas": conteudo.count("\n") + 1,
        "caracteres": len(conteudo),
        "palavras": len(conteudo.split()),
    }

def multiplicar(a: int, b: int) -> int:
    return a * b

def demonstrar_metodos():
    with Pool(4) as pool:
        # map: aplica funcao a cada item
        arquivos = ["dados1.txt", "dados2.txt", "dados3.txt"]
        stats = pool.map(processar_arquivo, arquivos)

        # starmap: para funcoes com multiplos argumentos
        pares = [(2, 3), (4, 5), (6, 7), (8, 9)]
        produtos = pool.starmap(multiplicar, pares)
        print(f"Produtos: {produtos}")  # [6, 20, 42, 72]

        # map_async: versao assincrona (nao bloqueante)
        resultado_async = pool.map_async(calcular_fatorial_soma, range(100, 200))
        print("Processando em background...")
        valores = resultado_async.get(timeout=30)  # Espera ate 30s
        print(f"Obtidos {len(valores)} resultados")

        # imap: iterador lazy (bom para muitos dados)
        for resultado in pool.imap(calcular_fatorial_soma, range(1000)):
            pass  # Processa um por um conforme ficam prontos

        # imap_unordered: como imap, mas sem garantia de ordem
        for resultado in pool.imap_unordered(calcular_fatorial_soma, range(1000)):
            pass  # Recebe na ordem que terminar primeiro

Memoria Compartilhada

Processos nao compartilham memoria por padrao. Use Value e Array para dados compartilhados:

from multiprocessing import Process, Value, Array, Lock

def incrementar_contador(contador, lock, n_incrementos: int):
    """Incrementa contador compartilhado de forma segura."""
    for _ in range(n_incrementos):
        with lock:
            contador.value += 1

def preencher_array(array_compartilhado, inicio: int, fim: int, valor: float):
    """Preenche uma faixa do array compartilhado."""
    for i in range(inicio, fim):
        array_compartilhado[i] = valor * (i + 1)

def demonstrar_memoria_compartilhada():
    # Valor compartilhado (int)
    contador = Value("i", 0)  # 'i' = inteiro
    lock = Lock()

    processos = []
    for _ in range(4):
        p = Process(target=incrementar_contador, args=(contador, lock, 10000))
        processos.append(p)
        p.start()

    for p in processos:
        p.join()

    print(f"Contador final: {contador.value}")  # 40000

    # Array compartilhado (double)
    tamanho = 100
    array = Array("d", tamanho)  # 'd' = double

    processos = []
    fatia = tamanho // 4
    for i in range(4):
        inicio = i * fatia
        fim = inicio + fatia
        p = Process(target=preencher_array, args=(array, inicio, fim, 1.5))
        processos.append(p)
        p.start()

    for p in processos:
        p.join()

    print(f"Primeiros 10 valores: {list(array[:10])}")

demonstrar_memoria_compartilhada()

ProcessPoolExecutor: A Interface Moderna

O concurrent.futures oferece uma interface mais limpa e compativel com asyncio:

from concurrent.futures import ProcessPoolExecutor, as_completed
import time

def analisar_dataset(dataset_id: int) -> dict:
    """Simula analise pesada de um dataset."""
    time.sleep(0.5)  # Simula processamento
    registros = dataset_id * 1000
    return {
        "dataset": dataset_id,
        "registros": registros,
        "anomalias": registros // 50,
        "score": round(95 - dataset_id * 0.5, 1),
    }

def pipeline_analise():
    """Pipeline de analise com ProcessPoolExecutor."""
    datasets = list(range(1, 21))

    print(f"Analisando {len(datasets)} datasets...")
    inicio = time.time()

    resultados = []
    with ProcessPoolExecutor(max_workers=4) as executor:
        # Submeter todas as tarefas
        futuros = {
            executor.submit(analisar_dataset, ds): ds
            for ds in datasets
        }

        # Coletar resultados conforme terminam
        for futuro in as_completed(futuros):
            dataset_id = futuros[futuro]
            try:
                resultado = futuro.result()
                resultados.append(resultado)
                print(f"  Dataset {dataset_id} concluido: score {resultado['score']}")
            except Exception as e:
                print(f"  Dataset {dataset_id} falhou: {e}")

    tempo = time.time() - inicio
    print(f"\nConcluido em {tempo:.2f}s")
    print(f"Media de score: {sum(r['score'] for r in resultados) / len(resultados):.1f}")

pipeline_analise()

Quando Usar Cada Abordagem

A escolha entre multiprocessing, threading e asyncio depende do tipo de tarefa. Para calculos matematicos e processamento de dados pesado, use multiprocessing. Para requisicoes de rede e leitura de arquivos, use asyncio ou threading. Para tarefas mistas, combine ambas abordagens.

Cuidado com o overhead: criar processos e mais caro que criar threads. Para tarefas muito rapidas, o custo de criar processos pode anular o ganho de paralelismo. Use multiprocessing apenas quando o processamento individual for significativo.

Boas Praticas

Sempre proteja o ponto de entrada com if __name__ == "__main__" ao usar multiprocessing, especialmente no Windows. Use context managers (with Pool() as pool) para garantir que processos sejam encerrados corretamente. Evite compartilhar grandes volumes de dados entre processos; prefira dividir os dados antes de enviar. E monitore o uso de memoria, pois cada processo carrega uma copia do interpretador Python.

Conclusao

Multiprocessing e a resposta do Python para processamento verdadeiramente paralelo. Com Pool, ProcessPoolExecutor e memoria compartilhada, voce pode aproveitar todos os cores do processador para acelerar tarefas CPU-intensivas. Entenda as limitacoes, escolha a ferramenta certa para cada caso e seus programas Python alcancarao performance de outro nivel.

E

Equipe Python Brasil

Contribuidor do Python Brasil — Aprenda Python em Português