Voltar ao Glossario
Glossario Python

Multiprocessing: O que É e Como Funciona | Python Brasil

Aprenda multiprocessing em Python: processos paralelos, Pool, comunicacao entre processos, shared memory e boas praticas para computacao paralela.

O que e Multiprocessing?

Multiprocessing e o modulo da biblioteca padrao do Python que permite executar codigo em multiplos processos simultaneamente, aproveitando todos os nucleos da CPU. Diferente de threading, onde o GIL (Global Interpreter Lock) impede a execucao paralela real de codigo Python, cada processo no multiprocessing tem seu proprio interpretador e sua propria memoria, eliminando completamente a limitacao do GIL.

O multiprocessing e a solucao ideal para tarefas CPU-bound — calculos matematicos pesados, processamento de imagens, compressao de dados e qualquer operacao que exige uso intensivo do processador.

Criando Processos

import multiprocessing
import os
import time

def calcular(nome: str, n: int):
    """Funcao que sera executada em outro processo."""
    print(f'Processo {nome} (PID: {os.getpid()}) iniciado')
    resultado = sum(i * i for i in range(n))
    print(f'Processo {nome} concluido: {resultado}')
    return resultado

if __name__ == '__main__':
    # Criar e iniciar processos
    p1 = multiprocessing.Process(target=calcular, args=('A', 10_000_000))
    p2 = multiprocessing.Process(target=calcular, args=('B', 10_000_000))

    p1.start()
    p2.start()

    # Aguardar conclusao
    p1.join()
    p2.join()
    print('Todos os processos concluidos')

Pool de Processos

O Pool e a forma mais pratica de distribuir trabalho entre multiplos processos.

import multiprocessing
import time

def processar_item(item: int) -> dict:
    """Processa um unico item."""
    resultado = item ** 2 + sum(range(item))
    return {'item': item, 'resultado': resultado}

if __name__ == '__main__':
    itens = list(range(100))

    # Usando Pool
    with multiprocessing.Pool(processes=4) as pool:
        # map — aplica funcao a cada item (preserva ordem)
        resultados = pool.map(processar_item, itens)
        print(f'Processados: {len(resultados)} itens')

        # map com chunks para melhor performance
        resultados = pool.map(processar_item, itens, chunksize=25)

        # imap — retorna iterador (menor uso de memoria)
        for resultado in pool.imap(processar_item, itens):
            pass  # processar um por um

        # imap_unordered — mais rapido, sem garantia de ordem
        for resultado in pool.imap_unordered(processar_item, itens):
            pass

        # apply_async — submeter tarefas individualmente
        future = pool.apply_async(processar_item, (42,))
        resultado = future.get(timeout=10)
        print(resultado)

        # starmap — para funcoes com multiplos argumentos
        pares = [(1, 2), (3, 4), (5, 6)]
        # resultados = pool.starmap(funcao_com_dois_args, pares)

ProcessPoolExecutor

A interface concurrent.futures oferece uma API consistente entre threads e processos.

from concurrent.futures import ProcessPoolExecutor, as_completed
import time

def tarefa_pesada(n: int) -> int:
    """Calculo CPU-intensive."""
    return sum(i * i for i in range(n))

if __name__ == '__main__':
    tamanhos = [5_000_000, 8_000_000, 3_000_000, 10_000_000]

    inicio = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = {executor.submit(tarefa_pesada, n): n for n in tamanhos}

        for future in as_completed(futures):
            n = futures[future]
            try:
                resultado = future.result()
                print(f'n={n}: resultado={resultado}')
            except Exception as e:
                print(f'Erro para n={n}: {e}')

    print(f'Tempo total: {time.time() - inicio:.2f}s')

Comunicacao entre Processos

import multiprocessing

# Queue — fila thread/process-safe
def produtor(fila: multiprocessing.Queue):
    for i in range(10):
        fila.put(f'item_{i}')
    fila.put(None)  # sentinela

def consumidor(fila: multiprocessing.Queue):
    while True:
        item = fila.get()
        if item is None:
            break
        print(f'Processado: {item}')

if __name__ == '__main__':
    fila = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=produtor, args=(fila,))
    p2 = multiprocessing.Process(target=consumidor, args=(fila,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

# Pipe — comunicacao bidirecional entre dois processos
def enviar(conn):
    conn.send({'mensagem': 'ola', 'valor': 42})
    conn.close()

def receber(conn):
    dados = conn.recv()
    print(f'Recebido: {dados}')
    conn.close()

if __name__ == '__main__':
    conn_pai, conn_filho = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=enviar, args=(conn_pai,))
    p2 = multiprocessing.Process(target=receber, args=(conn_filho,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

Memoria Compartilhada

import multiprocessing
from multiprocessing import shared_memory
import numpy as np

# Value e Array — tipos simples compartilhados
def incrementar(valor_compartilhado, lock):
    for _ in range(100000):
        with lock:
            valor_compartilhado.value += 1

if __name__ == '__main__':
    contador = multiprocessing.Value('i', 0)  # inteiro compartilhado
    lock = multiprocessing.Lock()

    processos = [
        multiprocessing.Process(target=incrementar, args=(contador, lock))
        for _ in range(4)
    ]
    for p in processos:
        p.start()
    for p in processos:
        p.join()

    print(f'Contador: {contador.value}')  # 400000

    # Shared memory com numpy (Python 3.8+)
    dados = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
    shm = shared_memory.SharedMemory(create=True, size=dados.nbytes)
    compartilhado = np.ndarray(dados.shape, dtype=dados.dtype, buffer=shm.buf)
    compartilhado[:] = dados[:]
    print(f'Nome da memoria: {shm.name}')
    shm.close()
    shm.unlink()

Comparacao com Threading

import time
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def calcular_pesado(n):
    return sum(i * i for i in range(n))

if __name__ == '__main__':
    n = 10_000_000
    repeticoes = 4

    # Sequencial
    inicio = time.time()
    for _ in range(repeticoes):
        calcular_pesado(n)
    print(f'Sequencial: {time.time() - inicio:.2f}s')

    # Threads (limitado pelo GIL)
    inicio = time.time()
    with ThreadPoolExecutor(max_workers=4) as ex:
        list(ex.map(calcular_pesado, [n] * repeticoes))
    print(f'Threads: {time.time() - inicio:.2f}s')

    # Processos (verdadeiro paralelismo)
    inicio = time.time()
    with ProcessPoolExecutor(max_workers=4) as ex:
        list(ex.map(calcular_pesado, [n] * repeticoes))
    print(f'Processos: {time.time() - inicio:.2f}s')

Erros Comuns

O erro mais frequente e esquecer a guarda if __name__ == '__main__':, que e obrigatoria no Windows e macOS para evitar recursao infinita ao criar processos. Outro erro e tentar compartilhar objetos mutaveis complexos entre processos sem mecanismos apropriados de sincronizacao. Passar objetos nao serializaveis (como conexoes de banco ou sockets) como argumentos para processos tambem gera erros, pois os dados sao serializados com pickle para transmissao entre processos.

Boas Praticas

Sempre use if __name__ == '__main__':. Prefira ProcessPoolExecutor ou Pool em vez de gerenciar processos manualmente. Minimize a comunicacao entre processos — o overhead de serializar dados e significativo. Use chunksize em Pool.map para reduzir overhead. Evite compartilhar estado mutavel entre processos. Para tarefas I/O-bound, use threading ou asyncio em vez de multiprocessing.

Quando Usar

Multiprocessing e ideal para tarefas CPU-bound que se beneficiam de paralelismo real: processamento de imagens, calculos numericos, compressao, criptografia e analise de dados. Para I/O-bound, threading ou asyncio sao mais adequados. Para tarefas distribuidas em multiplas maquinas, considere Celery ou Dask.