Text Share Online

55555

Pipeline de Curação de Vagas — v5.19

Evolução do curate_job_postings.ts para arquitetura de taxonomia viva


O que muda da v5.18 para a v5.19

# Origem Correção Problema na v5.18 Solução v5.19
1 Gemini run_monthly_cleanup() sem DDL de CREATE FUNCTION Sprint apresentava bloco DO $$ ... $$ mas handler chama supabase.rpc('run_monthly_cleanup') — sem o DDL a RPC não existe DDL CREATE OR REPLACE FUNCTION run_monthly_cleanup() adicionado ao sprint e ao pipeline; campo corrigido para created_at (não processed_at)
2 GPT Rate limit de quarentena com janela de corrida residual Fluxo em dois round-trips (remaining_slotsrelease_quarantined_jobs) deixava janela de corrida em execuções concorrentes que consultassem slots quase simultaneamente Nova RPC release_quarantined_jobs_limited() — calcula slots + seleciona + libera no mesmo contexto transacional; TypeScript chama uma única RPC
3 GPT + Gemini VACUUM sem decisão concreta Comentário no handler dizia “executar separadamente” sem definir caminho — deixava gap de governança operacional Decisão registrada explicitamente: autovacuum do Supabase cobre o volume do lançamento; critério de revisão quando function_orchestrator_items ultrapassar 5M registros

O que muda da v5.17 para a v5.18

# Origem Correção Problema na v5.17 Solução v5.18
1 Genspark monthly_cleanup sem agendamento Job mensal documentado mas sem entrada em vercel.json e sem handler TypeScript — nunca rodaria automaticamente Entrada "0 4 1 * *" adicionada ao vercel.json; handler monthly-cleanup.ts criado com createJobRun + registerCronError + padrão de erro/sucesso estabelecido
2 Genspark SLOW_BATCH_AUTO_PAUSE_AT hardcoded Threshold de 10 batches lentos consecutivos estava hardcoded no TypeScript — impedia ajuste operacional sem redeploy Restaurado como env var SLOW_BATCH_AUTO_PAUSE_AT=10; TypeScript usa parseInt(process.env.SLOW_BATCH_AUTO_PAUSE_AT ?? '10')
3 Genspark DELETE job_canonical_role_sources sem microbatch DELETE único poderia afetar centenas de milhares de registros em um statement — lock prolongado em escala Convertido para microbatch de 5.000 registros com FOR UPDATE SKIP LOCKED + pg_sleep(0.05) — mesmo padrão de function_orchestrator_items
4 Gemini low_quality = false ausente no rollback UPDATE de reversão resetava curation_status = 'pending' mas não tocava em low_quality — flag continuava true, quebrando regra de sincronia com curation_status Adicionado low_quality = false no UPDATE do plano de rollback
5 Gemini Regex do prebuild sem ;? generate-prompt-version.ts usava regex com ; obrigatório — Prettier com semi: false quebraria o prebuild silenciosamente Semicolon tornado opcional: /export const SYSTEM_PROMPTs*=s*([sS]?)`s;?/`
6 GPT check_quarantine_rate_limit() retornava booleano — teto ainda podia estourar RPC retornava TRUE com 90 liberações no dia; TypeScript liberava mais 100 — teto real de 100/dia não era garantido RPC reescrita para retornar remaining_slots INT; TypeScript usa slice(0, remaining_slots) — teto garantido atomicamente
7 GPT Guard aborted só no CRON, não nos Fluxos A e B isPipelineEnabled() verificado apenas no handler do CRON — Fluxos A (manual) e B (automático por endpoint) continuavam curando mesmo com pipeline pausado Spec tornada explícita: todo entrypoint de curadoria segue padrão createJobRun → isPipelineEnabled() → false → status='aborted' + metadata.errors[] + encerra
8 GPT run_auto_promotion() perdeu regra de is_emerging Função reescrita com p_min_sources INT DEFAULT 3AUTO_PROMOTE_MIN_SOURCES_EMERGING ficou órfão; canônicos normais promoviam cedo demais ou is_emerging perdia benefício Adicionado p_min_sources_emerging INT DEFAULT 3; condição usa CASE WHEN jcr.is_emerging THEN p_min_sources_emerging ELSE p_min_sources END

O que muda da v5.16 para a v5.17

# Origem Correção Problema na v5.16 Solução v5.17
1 Claude Governança mensal: query de auditoria de inflação por grupo empresarial Revisão mensal de distinct_sources_count dependia de memória do operador — sem query que sinalizasse automaticamente canônicos com alta concentração de prefixo comum Query adicionada à seção de monitoramento: filtra canônicos com distinct_sources_count >= 5 e COUNT(DISTINCT LEFT(normalized_company, 5)) <= 2
2 Claude Governança semanal: protocolo quando fallback_ratio > 0.20 Alerta FALLBACK_RATIO_ALERT_THRESHOLD=0.20 existia mas sem ação documentada quando disparado Adicionado à governança semanal: verificar alterações recentes em SYSTEM_PROMPT ou arquivos externos via job_runs.metadata.dict_files
3 Genspark + GPT AUTO_PROMOTE_MIN_DAYS: bind param inválido no PostgreSQL INTERVAL ':AUTO_PROMOTE_MIN_DAYS days' dentro de string literal SQL não é substituído pelo PostgreSQL — quebra em runtime Função de auto-promoção reescrita com parâmetro p_min_days INTEGER; corpo usa (p_min_days * INTERVAL '1 day'); chamada TypeScript passa parseInt(process.env.AUTO_PROMOTE_MIN_DAYS ?? '14')
4 GPT Rate limit de quarentena: teto real é 100/dia, não 200 Texto afirmava “2 execuções × 100 = 200/24h” mas a RPC check_quarantine_rate_limit() conta o total do dia-calendário — primeira execução que liberar 100 bloqueia a segunda Removida afirmação de 200/dia; teto oficial documentado como 100 liberações por dia-calendário de São Paulo
5 GPT Fluxo aborted sem implementação job_runs.status = 'aborted' existia no CHECK mas nenhum trecho do handler o escrevia — status órfão semanticamente Fluxo implementado no handler: isPipelineEnabled() retorna falsestatus = 'aborted' + motivo em metadata.errors[] + encerra sem processar
6 Gemini search_canonical_roles: ORDER BY multi-coluna quebra índice HNSW ORDER BY embedding <=> query_vector, vacancy_count DESC, id ASC força sequential scan — pgvector ignora índice HNSW quando ORDER BY contém qualquer coluna além do operador de distância Reescrito com CTE: inner query ordena só por vetor com LIMIT result_limit * 3; outer query reordena em memória por similarity DESC, vacancy_count DESC, id ASC
7 Gemini PASSO 0.2 watchdog: concatenação frágil de string JSONB "failed_at": "' || NOW()::text || '" depende do formato de saída do servidor — pode quebrar se fuso horário ou formato de data mudar Substituído por jsonb_build_object('step', 'watchdog', 'message', '...', 'failed_at', NOW()) — NOW() passado como TIMESTAMPTZ nativo sem conversão de string
8 Gemini job_canonical_role_sources sem limpeza mensal Tabela acumula UPSERT de todas as empresas indefinidamente — registros com last_seen_at antigos nunca são removidos, ocupando disco crescente Adicionado ao job mensal: DELETE FROM job_canonical_role_sources WHERE last_seen_at < NOW() - INTERVAL '1 year' em microbatches

O que muda da v5.15 para a v5.16

# Origem Correção Problema na v5.15 Solução v5.16
1 Gemini Watchdog cobre job_runs órfãos PASSO 0 resolvia function_orchestrator_runs presos mas não o job_runs pai — hard crash da Vercel deixa registros 'running' permanentemente em job_runs, poluindo o painel de auditoria PASSO 0.2 em maintenance_phase_1: UPDATE em job_runs com status='running' e started_at < NOW() - INTERVAL '30 minutes', injetando erro estruturado em metadata.errors[] via jsonb_set
2 GPT Plano de rollback referencia campo depreciado Passo 2 do rollback dizia job_runs.metadata.error — campo singular depreciado na v5.15 quando a spec migrou para errors[] acumulativo Corrigido para job_runs.metadata.errors[] com referência a step, component e message
3 GPT Comentário de CURATE_PIPELINE_ENABLED desatualizado Comentário dizia “Setado para ‘false’ automaticamente” — impreciso após v5.15 onde quem escreve em runtime é pipeline_config, não a env var Corrigido para “Valor de bootstrap. Em runtime, pipeline_config prevalece.”
4 GPT Rate limit de quarentena: toLocaleDateString vs. regra SQL canônica Handler usava toLocaleDateString('en-CA', { timeZone: 'America/Sao_Paulo' }) como aproximação — spec já tinha regra oficial com AT TIME ZONE 'America/Sao_Paulo' mas as duas formas coexistiam, criando brecha de interpretação Check movido para RPC check_quarantine_rate_limit() que usa AT TIME ZONE 'America/Sao_Paulo' como fonte de verdade única; TypeScript apenas chama a RPC e avalia o booleano retornado
5 GPT p_job_run_id decorativo em RPCs de manutenção maintenance_phase_1(p_job_run_id UUID) e maintenance_phase_2(p_job_run_id UUID) recebiam parâmetro nunca utilizado no corpo SQL — parâmetro sem uso gera confusão e sensação de logging implícito inexistente Parâmetro removido de ambas as funções; chamadas TypeScript correspondentes atualizadas

O que muda da v5.14 para a v5.15

# Origem Correção Problema na v5.14 Solução v5.15
1 GPT job_runs.status no CRON: adicionar partial CRON com erros nas FASEs 2–4 finalizava com status='success' — contradição auditável Adicionar 'partial' ao CHECK de job_runs.status. CRON acumula erros em phaseErrors[] e finaliza com status='partial' quando FASE 1 passa mas alguma das FASEs 2–4 falha. status='error' reservado para falha na FASE 1
2 GPT Auto-promoção: promoção dependente do COUNT(*) >= 5 HAVING COUNT(*) >= 5 na subquery de confidence_median não bloqueava o UPDATE — promoção ocorria mesmo sem amostras suficientes, com confidence_median NULL Reescrito com CTE: promoção só ocorre se a CTE retornar linha (exige COUNT(*) >= 5). UPDATE com FROM stats — sem linha na CTE, nenhuma promoção acontece
3 GPT + Genspark pipeline_config: DDL + regra de precedência Tabela referenciada em slow_batch_streak (pausa automática) sem DDL publicado. Coexistência com CURATE_PIPELINE_ENABLED como env var sem regra explícita de qual prevalece DDL publicado com seed inicial. Regra: CURATE_PIPELINE_ENABLED env var = bootstrap; pipeline_config = fonte de verdade em runtime. Se linha existir no banco, banco prevalece. Função isPipelineEnabled() implementa fallback
4 GPT Texto da FASE 1: política de aborto Texto dizia “aborta se o watchdog falhar” mas implementação aborta se qualquer PASSO da FASE 1 falhar (PASSOs 0, 1 ou 2) Texto corrigido: “aborta se qualquer falha ocorrer na FASE 1”
5 GPT + Genspark supabase.raw() substituído por RPC unificada supabase.raw() não existe no Supabase JS v2 — causa TypeError em runtime em 2 locais (incremento do circuit breaker e slow_batch_streak) Nova RPC increment_circuit_breaker(p_service, p_is_failing, p_is_slow_batch) cobre ambos os casos atomicamente. Todos os usos de supabase.raw() removidos
6 GPT Schema: action_required e índice em function_orchestrator_items action_required IN (..., NULL) é ambíguo em PostgreSQL (NULL não é comparável via IN). Ausência de índice em (job_posting_id, created_at DESC) — padrão de query mais usado para item mais recente por vaga action_required IS NULL OR action_required IN (...) no DDL. Novo índice fo_items_job_posting_id_created_at_idx ON function_orchestrator_items(job_posting_id, created_at DESC)
7 Genspark registerCronError(): interface definida + estratégia de merge Função usada em 4 locais no handler do CRON sem definição de interface nem comportamento de merge em metadata. Campo oscilava entre error (singular) e errors (array) Interface TypeScript definida. Campo padronizado como errors: [] (array acumulativo) em toda a spec. registerCronError() faz merge profundo — não sobrescreve erros anteriores do mesmo run

O que muda da v5.13 para a v5.14

# Origem Correção Problema na v5.13 Solução v5.14
1 Gemini + GPT Orquestração do CRON em 4 fases explícitas run_daily_pipeline_maintenance() declarada como função SQL monolítica — PostgreSQL não suporta pausas para lógica TypeScript intermediária. PASSO 3 usava bind param :ids_elegíveis_para_liberação impossível em RETURNS void; PASSO 6 dependia de TypeScript mas estava dentro do SQL API Route /api/cron/pipeline-maintenance orquestra 4 fases sequenciais: FASE 1 (SQL pura: PASSOs 0–2) → FASE 2 (TypeScript: PASSO 3 quarentena) → FASE 3 (SQL pura: PASSOs 4–5) → FASE 4 (TypeScript: PASSO 6 embeddings). Cada fase é RPC independente. Política de falha: continuar para próxima fase exceto FASE 1 que aborta tudo
2 Gemini Validação de completude após Zod z.record(z.string(), z.string().nullable()) valida {} como sucesso — arquivo JSON vazio (ex: falha de CI/CD) passa na validação e o pipeline processa milhares de vagas sem normalização Após validação Zod, verificar Object.keys(parsedData.data).length === 0 para cada arquivo. Abortar com step='zod_validation' e component=<arquivo> se vazio
3 GPT job_runs criado antes de tudo Em alguns trechos o job_runs era criado após a validação Zod — se o Zod falha, não existe linha em job_runs para receber o erro job_runs criado com status='running' e metadata={} no início absoluto do run, antes do Zod, antes da pré-rotina de hash. Todos os erros subsequentes enriquecem o metadata do registro já existente
4 GPT Exceção de db_connection no princípio transversal Regra “todo erro vai para job_runs.metadata” é tecnicamente impossível quando o próprio banco está inacessível Regra refinada: registrar em job_runs.metadata quando o banco estiver acessível; se indisponível (db_connection), registrar em log estruturado da aplicação (console.error com JSON) e observabilidade externa (ex: Vercel logs)
5 Deepseek confidence_median gravado na auto-promoção Campo presente em job_canonical_roles sem mecanismo de atualização definido — nunca era preenchido em operação normal confidence_median calculado e persistido no momento da auto-promoção, junto com promoted_at. Usa PERCENTILE_CONT com HAVING COUNT(*) >= 5 sobre as últimas 120 vagas curadas do canônico
6 Deepseek low_quality coexiste com curation_status Risco de ambiguidade sobre qual campo consultar em novas queries Nota no DDL confirma coexistência: low_quality (booleano) e curation_status = 'low_quality' são sempre atualizados de forma síncrona. Para novas queries, usar curation_status como fonte de verdade
7 Genspark DDL completo de job_runs Tabela referenciada em 8+ contextos da spec (FK em function_orchestrator_runs, job_runs.metadata, job_runs.job_name, joins de quarentena) mas DDL nunca publicado — impossível validar constraints ou índices DDL completo publicado com metadata JSONB, índices em (job_name, status), started_at DESC e GIN em metadata
8 Genspark Migration job_run_id NOT NULL com backfill ALTER COLUMN job_run_id SET NOT NULL falha com ERROR: column contains null values se existem registros históricos com job_run_id = NULL Migration em 3 passos: (1) criar registro sentinela em job_runs; (2) atribuir sentinela a registros históricos; (3) adicionar NOT NULL constraint
9 Genspark Reset atômico do circuit breaker Duas instâncias Vercel lendo windowAge > WINDOW_MS simultaneamente podem sobrescrever contadores — falha da instância A apagada pela instância B Reset com filtro .lt('window_started_at', expiryTimestamp) no UPDATE — só a primeira instância que vê a janela realmente expirada executa o reset
10 Genspark Trigger fn_update_distinct_sources_count não trata DELETE DELETE em job_canonical_role_sources (ex: limpeza mensal, rollback) não atualiza o contador — distinct_sources_count fica inflado até o próximo PASSO 2 do CRON Trigger estendido para AFTER INSERT OR UPDATE OR DELETE. Função usa COALESCE(NEW.canonical_role_id, OLD.canonical_role_id) para cobrir todos os casos
11 Genspark generate-prompt-version.ts faz hash do arquivo inteiro Hash do arquivo .ts completo inclui comentários — atualizar data de um comentário muda prompt_version e aciona revisita desnecessária de todas as quarentenas Script extrai apenas o conteúdo entre backticks do export const SYSTEM_PROMPT = …“ via regex antes de calcular o SHA-256
12 Genspark Backfill PASSO A/A2 com subquery O(n²) Subquery correlacionada aninhada (SELECT run_id ... ORDER BY created_at DESC LIMIT 1) executa um scan por linha de job_postings — lento em 100k+ registros Reescrito com CTE + DISTINCT ON (job_posting_id) — único scan de curate_run_items independente do volume de job_postings
13 Genspark Query de rastreabilidade filtra curation_status atual WHERE jp.curation_status = 'quarantined_llm_output' torna a query inútil para diagnóstico histórico após a vaga ser liberada Filtro removido. Query usa cri.error_type = 'item_output_parse_error' para focar nos runs de falha — funciona independentemente do estado atual da vaga
14 Genspark batch_output_parse_error sem limite de retries Spec definia retry do batch inteiro sem limite — bug sistemático no prompt causaria loop infinito dentro do run LLM_BATCH_PARSE_MAX_RETRIES=2 (padrão). Após esgotamento: todas as vagas do batch → retryable_error; registrar em job_runs.metadata com step='batch_processing'
15 Genspark slow_batch_streak não persiste em serverless Contador de batches lentos consecutivos em memória não funciona com múltiplas instâncias Vercel independentes Campos slow_batch_streak INT e last_slow_batch_at TIMESTAMPTZ adicionados a circuit_breaker_state. Threshold: 10 batches lentos consecutivos (> 35s). Na 11ª ocorrência: pausa automática via UPDATE CURATE_PIPELINE_ENABLED=false em tabela de configuração
16 Genspark Delete mensal sem batching DELETE FROM function_orchestrator_items WHERE processed_at < NOW() - INTERVAL '2 years' — após 2 anos pode deletar milhões de linhas em uma única transação, gerando lock prolongado Reescrito com loop em microbatches de 5.000 registros com pg_sleep(0.05) entre batches via FOR UPDATE SKIP LOCKED
17 Renomeação curate_runsfunction_orchestrator_runs Nome anglicizado (“curate”) soa estranho e não comunica o domínio Todas as referências atualizadas para function_orchestrator_runs
18 Renomeação curate_run_itemsfunction_orchestrator_items Idem Todas as referências atualizadas para function_orchestrator_items

O que muda da v5.12 para a v5.13

(preservado da v5.13)


Princípio transversal — erro detalhado em job_runs

Todo job documentado nesta spec que falhe deve registrar em job_runs.metadata com detalhe suficiente para diagnóstico sem acesso ao servidor.

Regra refinada (v5.14): registrar em job_runs.metadata quando o banco estiver acessível. Se o banco estiver indisponível (db_connection), registrar em log estruturado da aplicação (console.error com JSON) e observabilidade externa — a regra não pode ser absoluta quando o próprio banco é o ponto de falha.

Estrutura mínima de erro (v5.15 — campo errors é array acumulativo):

{
  "errors": [
    {
      "step": "cron_phase_2",
      "component": "quarantine_release",
      "message": "Timeout ao chamar release_quarantined_jobs",
      "failed_at": "2026-03-10T03:15:42.123Z"
    },
    {
      "step": "cron_phase_4",
      "component": "embedding_rebuild",
      "message": "generateE5Embedding: connection refused",
      "failed_at": "2026-03-10T03:16:01.456Z"
    }
  ],
  "dict_files": [ "..." ],
  "prompt_version": "a3f9b2c1"
}

⚠️ Campo padronizado como errors (array) em toda a spec — não error (singular). Múltiplas fases podem falhar no mesmo run do CRON; o array acumula todos os erros sem sobrescrever.

Valores válidos para step:

Valor Quando usar
zod_validation Falha na validação Zod ou de completude dos arquivos JSON
db_connection Falha na conexão com Supabase — registrar em log externo
circuit_breaker_open Pipeline pausado por circuit breaker
hash_pre_routine Falha na pré-rotina de hash
fetch_pending_jobs Falha na busca de vagas pendentes
batch_processing Falha sistêmica durante processamento de batch
cron_phase_N Falha na FASE N do CRON diário (ex: cron_phase_1)
monthly_cleanup Falha no job mensal

SYSTEM_PROMPT.ts — arquivo dedicado e versionamento automático

Regra de isolamento

lib/pipeline/SYSTEM_PROMPT.ts contém exclusivamente o texto do prompt do extrator.

// lib/pipeline/SYSTEM_PROMPT.ts
export const SYSTEM_PROMPT = `
Você é um extrator especializado em cargos do mercado de trabalho brasileiro...
`;

Script de pré-build — hash apenas do conteúdo do prompt

// scripts/generate-prompt-version.ts
import * as crypto from 'crypto';
import * as fs from 'fs';
import * as path from 'path';

const promptPath = path.resolve('./lib/pipeline/SYSTEM_PROMPT.ts');
const content = fs.readFileSync(promptPath, 'utf8');

// ⚠️ Hash apenas do conteúdo do template literal — não do arquivo inteiro
// Comentários e metadados do arquivo não devem gerar nova prompt_version
const match = content.match(/export const SYSTEM_PROMPTs*=s*`([sS]*?)`s*;?/); // ⚠️ v5.18: ;? — Prettier com semi:false não quebra o build
if (!match) {
  console.error('SYSTEM_PROMPT não encontrado em SYSTEM_PROMPT.ts');
  process.exit(1);
}

const promptContent = match[1]; // Apenas o texto do prompt
const hash = crypto.createHash('sha256').update(promptContent).digest('hex').slice(0, 8);

const output = `// AUTO-GERADO — não editar manualmente
// Gerado em: ${new Date().toISOString()}
export const PROMPT_VERSION = '${hash}';
`;

fs.writeFileSync('./lib/pipeline/prompt-version.generated.ts', output);
console.log(`prompt_version: ${hash}`);

package.json:

{
  "scripts": {
    "prebuild": "ts-node scripts/generate-prompt-version.ts",
    "build": "next build"
  }
}

prompt-version.generated.ts no .gitignore.


Pré-rotina de hash — versionamento automático dos arquivos externos

(inalterado da v5.13, exceto que job_runs é criado ANTES da validação Zod — ver fluxo abaixo)

Ordem obrigatória no início de cada run:

  1. Criar job_runs com status='running' e metadata={}antes de qualquer validação
  2. Validar 4 arquivos JSON via Zod + completude
    • Falha: enriquecer job_runs.metadata com erro e abortar
  3. Comparar hashes + prompt_version com run anterior
  4. Enriquecer job_runs.metadata com dict_files e prompt_version

Estrutura do metadata em job_runs: (inalterada da v5.13)

Rastreabilidade de quarentena (corrigida):

-- ⚠️ Filtro de curation_status REMOVIDO — vaga pode ter sido liberada da quarentena
-- Query funciona para diagnóstico histórico independentemente do estado atual
SELECT jr.metadata AS metadata_run_quarentena
FROM function_orchestrator_items foi
JOIN function_orchestrator_runs fo ON fo.id = foi.run_id
JOIN job_runs jr                   ON jr.id = fo.job_run_id
WHERE foi.job_posting_id = :vaga_id
  AND foi.error_type = 'item_output_parse_error'
ORDER BY foi.created_at DESC
LIMIT 1;
-- Comparar dict_files E prompt_version com valores atuais do build

Visão geral dos dois fluxos

Fluxo A — Ingestão manual via Job Ingestor

Operador clica "Curar"
    │
    ▼
[INÍCIO DO RUN — ANTES DE TUDO]
  1. Criar job_runs com status='running', metadata={}
    │
    ▼
[GUARD DE HABILITAÇÃO — v5.18]
  ⚠️ Todo entrypoint de curadoria verifica isPipelineEnabled() antes de processar
  2. isPipelineEnabled()
     → false: registerCronError(runId, { step: 'pipeline_guard', message: '...' })
              UPDATE job_runs SET status='aborted'
              encerra sem processar
    │
    ▼
[VALIDAÇÃO + PRÉ-ROTINA DE HASH]
  3. Valida 4 arquivos JSON via Zod
     → Falha ou arquivo vazio: enriquecer job_runs.metadata + abortar
  3. Compara hashes + prompt_version com run anterior
  4. Enriquece job_runs.metadata com dict_files e prompt_version
    │
    ▼
[BUSCA DE VAGAS PENDENTES]
SELECT ... FROM job_postings
WHERE curation_status = 'pending' AND is_active = true
ORDER BY posted_at DESC
FOR UPDATE SKIP LOCKED
    │
    ▼
[PROCESSAMENTO EM BATCHES DE 40]
→ extrator LLM (LLM_CONCURRENCY_EXTRACTOR=15, LLM_BATCH_PARSE_MAX_RETRIES=2)
→ upsert job_canonical_roles (slugs ordenados alfabeticamente — previne deadlock 40P01)
→ upsert job_canonical_role_sources somente quando pipeline_stage != 'fallback'
  (trigger fn_update_distinct_sources_count atualiza contador automaticamente)
→ function_orchestrator_items
→ ai_usage_logs (1 por batch, não-bloqueante)
→ eventos jobs_curate_started + jobs_curate_finished (Fluxo A apenas)
→ calcular fallback_ratio ao fim do run; alerta se > 0.20
→ monitorar slow_batch_streak; pausa automática na 11ª ocorrência

Fluxo B — Ingestão cotidiana via endpoint

(inalterado da v5.13 — aplicar guard isPipelineEnabled() idêntico ao Fluxo A: createJobRun → isPipelineEnabled() → false → status=’aborted’ + encerra)


Schema do banco — tabelas novas e alterações

job_runs — DDL completo

-- ⚠️ Criar ANTES de function_orchestrator_runs (FK depende desta tabela)
CREATE TABLE job_runs (
  id          UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
  job_name    TEXT        NOT NULL,
    -- 'curate_job_postings' | 'run_daily_pipeline_maintenance' | 'monthly_cleanup'
  status      TEXT        NOT NULL DEFAULT 'running'
              CHECK (status IN ('running', 'success', 'partial', 'error', 'aborted')),
    -- 'success'  → todas as fases concluíram sem erro
    -- 'partial'  → FASE 1 passou, mas alguma das FASEs 2–4 falhou (CRON diário)
    -- 'error'    → falha crítica na FASE 1 ou falha fatal no pipeline de curadoria
    -- 'aborted'  → cancelamento explícito
  metadata    JSONB,
    -- Contém: dict_files[], prompt_version, errors[]{step,component,message,failed_at}
    -- Criado com metadata={} no início absoluto do run — enriquecido ao longo da execução
  started_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  finished_at TIMESTAMPTZ,
  created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX job_runs_job_name_status_idx ON job_runs (job_name, status);
CREATE INDEX job_runs_started_at_idx      ON job_runs (started_at DESC);
-- Índice GIN para buscas por prompt_version ou dict_file.hash em metadata:
CREATE INDEX job_runs_metadata_gin_idx    ON job_runs USING GIN (metadata);

function_orchestrator_runs

-- ⚠️ Antes era: curate_runs — renomeado na v5.14
CREATE TABLE function_orchestrator_runs (
  id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  job_run_id          UUID NOT NULL REFERENCES job_runs(id),
    -- NOT NULL obrigatório — job_runs é criado no início absoluto do run
    -- job_runs.metadata contém dict_files (hashes + versões) e prompt_version
  session_id          TEXT NOT NULL,
  source              TEXT NOT NULL CHECK (source IN ('manual', 'api')),
  triggered_by        UUID REFERENCES users(id) ON DELETE SET NULL,
  status              TEXT NOT NULL DEFAULT 'running'
                      CHECK (status IN ('running', 'success', 'partial', 'error')),
  job_ids_count       INT NOT NULL,
  model               TEXT,
  total               INT DEFAULT 0,
  curated             INT DEFAULT 0,
  curated_fallback    INT DEFAULT 0,
  low_quality         INT DEFAULT 0,
  failed              INT DEFAULT 0,
  pending_review      INT DEFAULT 0,
  canonical_created   INT DEFAULT 0,
  canonical_promoted  INT DEFAULT 0,
  fallback_ratio      NUMERIC(5,4),
    -- Calculado ao fim do run: curated_fallback / NULLIF(total, 0)
    -- Alerta automático via TypeScript quando > 0.20 (FALLBACK_RATIO_ALERT_THRESHOLD)
  input_tokens_total  INT DEFAULT 0,
  output_tokens_total INT DEFAULT 0,
  cost_usd_total      NUMERIC(10,6),
  logging_failures    INT DEFAULT 0,
  error_message       TEXT,
  started_at          TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  finished_at         TIMESTAMPTZ,
  created_at          TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX fo_runs_job_run_id_idx  ON function_orchestrator_runs(job_run_id);
CREATE INDEX fo_runs_session_id_idx  ON function_orchestrator_runs(session_id);
CREATE INDEX fo_runs_source_idx      ON function_orchestrator_runs(source);
CREATE INDEX fo_runs_started_at_idx  ON function_orchestrator_runs(started_at DESC);
CREATE INDEX fo_runs_status_idx      ON function_orchestrator_runs(status);

function_orchestrator_items

-- ⚠️ Antes era: curate_run_items — renomeado na v5.14
CREATE TABLE function_orchestrator_items (
  id                      UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  run_id                  UUID NOT NULL REFERENCES function_orchestrator_runs(id),
  job_posting_id          UUID NOT NULL REFERENCES job_postings(id),
  title_original          TEXT,
  canonical_role_proposed TEXT,
  secondary_role_proposed TEXT,
  canonical_role_id       UUID REFERENCES job_canonical_roles(id),
  canonical_status        TEXT CHECK (canonical_status IN ('active', 'pending', 'fallback')),
  skills_count            SMALLINT,
  confidence              NUMERIC(4,3),
  similarity_score        NUMERIC(5,4),
  reasoning               TEXT,
  pipeline_stage          TEXT CHECK (pipeline_stage IN (
    'hard_rule', 'dict_match', 'gatekeeper',
    'similarity_replace', 'deterministic', 'llm_extractor', 'fallback'
  )),
  -- ⚠️ 'curated_fallback' NÃO é valor de pipeline_stage — é valor de curation_status
  -- pipeline_stage='fallback' → curation_status='curated_fallback' em job_postings
  status                  TEXT NOT NULL CHECK (status IN (
    'success', 'fallback', 'low_quality', 'pending_review', 'failed'
  )),
  action_required         TEXT CHECK (
    action_required IS NULL OR action_required IN (
      'quarentena_output_llm', 'novo_canonico_aguarda_validacao'
    )
  ),
    -- NULL quando nenhuma ação pendente
    -- Sintaxe IS NULL OR IN(...) — IN(..., NULL) não funciona em PostgreSQL
  error_type              TEXT CHECK (error_type IN (
    'llm_timeout', 'batch_output_parse_error', 'item_output_parse_error',
    'payload_parse_error', 'parser_bug', 'db_error'
  )),
  -- batch_output_parse_error: falha no parse do JSON do lote inteiro
  --   → retry do batch (máx LLM_BATCH_PARSE_MAX_RETRIES=2)
  --   → após esgotamento: todas as vagas do batch → retryable_error
  --   → NÃO incrementa llm_parse_fail_count
  --   → NÃO alimenta quarentena por vaga
  -- item_output_parse_error: falha no parse de item individual
  --   → incrementa llm_parse_fail_count desta vaga
  --   → alimenta quarentena após 3 falhas cross-run
  error_detail            TEXT,
  dict_match              BOOLEAN NOT NULL DEFAULT false,
  dict_match_source       JSONB,
  processed_at            TIMESTAMPTZ,
  created_at              TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Política de retenção: 2 anos
-- Job mensal remove em microbatches de 5.000 (ver seção CRON mensal)
-- seguido de: VACUUM ANALYZE function_orchestrator_items

CREATE INDEX fo_items_run_id_idx             ON function_orchestrator_items(run_id);
CREATE INDEX fo_items_status_idx             ON function_orchestrator_items(status);
CREATE INDEX fo_items_job_posting_id_idx     ON function_orchestrator_items(job_posting_id);
CREATE INDEX fo_items_job_posting_created_idx ON function_orchestrator_items(job_posting_id, created_at DESC);
  -- Cobre padrão "item mais recente por vaga" usado em backfill, rastreabilidade e diagnóstico
CREATE INDEX fo_items_canonical_role_id_idx  ON function_orchestrator_items(canonical_role_id);
CREATE INDEX fo_items_canonical_proposed_idx ON function_orchestrator_items(canonical_role_proposed);

circuit_breaker_state — com campos de slow_batch_streak

CREATE TABLE IF NOT EXISTS circuit_breaker_state (
  id                  UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  service             TEXT NOT NULL UNIQUE,
    -- 'claude_haiku' — único serviço externo monitorado
  status              TEXT NOT NULL DEFAULT 'closed'
                      CHECK (status IN ('closed', 'open', 'half_open')),
  window_started_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    -- ⚠️ Resetar ANTES de incrementar se NOW() > window_started_at + 5min
  window_failures     INT NOT NULL DEFAULT 0,
  window_total        INT NOT NULL DEFAULT 0,
  cooldown_until      TIMESTAMPTZ,
  last_transition_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  slow_batch_streak   INT NOT NULL DEFAULT 0,
    -- Incrementado quando latência do batch > 35s
    -- Reset para 0 quando um batch completa em tempo normal (≤ 35s)
    -- Threshold: 10 batches lentos consecutivos
    -- Na 11ª ocorrência: pipeline pausado automaticamente
  last_slow_batch_at  TIMESTAMPTZ,
  updated_at          TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

INSERT INTO circuit_breaker_state (service) VALUES ('claude_haiku') ON CONFLICT DO NOTHING;

pipeline_config — fonte de verdade em runtime (v5.15)

-- ⚠️ Criar ANTES de qualquer código que leia CURATE_PIPELINE_ENABLED em runtime
-- Regra de precedência:
--   CURATE_PIPELINE_ENABLED (env var) = valor de bootstrap (deploy inicial)
--   pipeline_config (banco)           = fonte de verdade em runtime
--   Se linha existir no banco → banco prevalece. Env var é fallback quando banco inacessível.
CREATE TABLE pipeline_config (
  key        TEXT        PRIMARY KEY,
  value      TEXT        NOT NULL,
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_by TEXT
    -- 'system' quando alterado por lógica automática (ex: slow_batch_auto_pause)
    -- email do operador quando alterado manualmente via painel
);

-- Seed obrigatório: estado inicial de bootstrap
INSERT INTO pipeline_config (key, value, updated_by)
VALUES ('CURATE_PIPELINE_ENABLED', 'true', 'system')
ON CONFLICT DO NOTHING;

isPipelineEnabled() — TypeScript (v5.15):

// Banco prevalece sobre env var.
// Pausa automática (slow_batch_streak) só é possível via banco;
// env var não pode ser alterada em runtime no Vercel.
async function isPipelineEnabled(): Promise<boolean> {
  const { data } = await supabase
    .from('pipeline_config')
    .select('value')
    .eq('key', 'CURATE_PIPELINE_ENABLED')
    .single();
  return (data?.value ?? process.env.CURATE_PIPELINE_ENABLED ?? 'true') === 'true';
}
// Chamar no início do run, após criar job_runs e antes de qualquer processamento

Lógica de reset atômico de janela (v5.14 — previne race condition):

const WINDOW_MS = 5 * 60 * 1000;
const expiryTimestamp = new Date(Date.now() - WINDOW_MS).toISOString();

// Tentativa de reset atômico — só a primeira instância que vê a janela expirada vence
const { data: resetResult } = await supabase
  .from('circuit_breaker_state')
  .update({
    window_failures: isFailing ? 1 : 0,
    window_total: 1,
    window_started_at: new Date().toISOString(),
    updated_at: new Date().toISOString(),
  })
  .eq('service', 'claude_haiku')
  .lt('window_started_at', expiryTimestamp) // ← filtro atômico
  .select('id');

if (!resetResult?.length) {
  // Outra instância já resetou — incrementar via RPC atômica
  // ⚠️ supabase.raw() NÃO existe no Supabase JS v2 — usar RPC (v5.15)
  await supabase.rpc('increment_circuit_breaker', {
    p_service: 'claude_haiku',
    p_is_failing: isFailing,
    p_is_slow_batch: false,
  });
}

Monitoramento de slow_batch_streak:

const SLOW_BATCH_THRESHOLD_MS = 35000;
// ⚠️ v5.18: restaurado como env var — ajuste operacional sem redeploy
const SLOW_BATCH_STREAK_LIMIT = parseInt(process.env.SLOW_BATCH_AUTO_PAUSE_AT ?? '10');

const batchDuration = Date.now() - batchStart;
const isSlow = batchDuration > SLOW_BATCH_THRESHOLD_MS;

// ⚠️ supabase.raw() NÃO existe no Supabase JS v2 — usar RPC increment_circuit_breaker (v5.15)
const { data: updated } = await supabase.rpc('increment_circuit_breaker', {
  p_service: 'claude_haiku',
  p_is_failing: false,
  p_is_slow_batch: isSlow,
});

if (isSlow && (updated?.slow_batch_streak ?? 0) > SLOW_BATCH_STREAK_LIMIT) {
  // 11ª ocorrência — pausa automática via pipeline_config
  await supabase.from('pipeline_config')
    .update({ value: 'false' })
    .eq('key', 'CURATE_PIPELINE_ENABLED');

  console.error({
    type: 'auto_pause_slow_batch_streak',
    streak: updated?.slow_batch_streak,
    message: `Pipeline pausado automaticamente após ${updated?.slow_batch_streak} batches lentos consecutivos`,
  });
}

RPC increment_circuit_breaker — DDL (v5.15):

-- Cobre incremento normal do circuit breaker E rastreamento de slow_batch_streak
-- Elimina necessidade de supabase.raw() (inexistente no Supabase JS v2)
CREATE OR REPLACE FUNCTION increment_circuit_breaker(
  p_service       TEXT,
  p_is_failing    BOOLEAN,
  p_is_slow_batch BOOLEAN DEFAULT false
)
RETURNS TABLE(window_failures INT, window_total INT, slow_batch_streak INT)
LANGUAGE plpgsql AS $$
BEGIN
  RETURN QUERY
  UPDATE circuit_breaker_state
  SET
    window_failures   = CASE WHEN p_is_failing    THEN window_failures + 1
                             ELSE window_failures  END,
    window_total      = window_total + 1,
    slow_batch_streak = CASE WHEN p_is_slow_batch THEN slow_batch_streak + 1
                             ELSE 0               END,
    last_slow_batch_at = CASE WHEN p_is_slow_batch THEN NOW()
                              ELSE last_slow_batch_at END,
    updated_at        = NOW()
  WHERE service = p_service
  RETURNING window_failures, window_total, slow_batch_streak;
END;
$$;

Transição open → half_open atômica: (inalterada da v5.13)

job_canonical_role_sources — trigger com DELETE

CREATE TABLE job_canonical_role_sources (
  id                    UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  canonical_role_id     UUID NOT NULL REFERENCES job_canonical_roles(id),
  normalized_company    TEXT NOT NULL,
  first_seen_at         TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  last_seen_at          TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  UNIQUE (canonical_role_id, normalized_company)
);

-- Índice composto cobrindo canonical_role_id + last_seen_at (usado no PASSO 2 do CRON)
CREATE INDEX job_canonical_role_sources_canonical_lastseen_idx
  ON job_canonical_role_sources (canonical_role_id, last_seen_at DESC);
CREATE INDEX job_canonical_role_sources_company_idx
  ON job_canonical_role_sources(normalized_company);

-- Trigger incremental — agora inclui DELETE
CREATE OR REPLACE FUNCTION fn_update_distinct_sources_count()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE
  target_id UUID;
BEGIN
  -- COALESCE cobre INSERT/UPDATE (NEW) e DELETE (OLD)
  target_id := COALESCE(NEW.canonical_role_id, OLD.canonical_role_id);

  UPDATE job_canonical_roles
  SET distinct_sources_count = (
    SELECT COUNT(*)
    FROM job_canonical_role_sources
    WHERE canonical_role_id = target_id
      AND last_seen_at >= NOW() - INTERVAL '120 days'
  )
  WHERE id = target_id;

  RETURN COALESCE(NEW, OLD);
END;
$$;

-- Recriar trigger incluindo DELETE
DROP TRIGGER IF EXISTS trg_update_distinct_sources_count ON job_canonical_role_sources;
CREATE TRIGGER trg_update_distinct_sources_count
AFTER INSERT OR UPDATE OR DELETE ON job_canonical_role_sources
FOR EACH ROW EXECUTE FUNCTION fn_update_distinct_sources_count();

job_canonical_roles — alterações

(inalterado da v5.13, com nota de coexistência de low_quality)

-- ⚠️ Criar a função ANTES do ADD CONSTRAINT que a referencia
CREATE OR REPLACE FUNCTION is_valid_cbo_array(codes TEXT[])
RETURNS BOOLEAN LANGUAGE sql IMMUTABLE STRICT AS $$
  SELECT codes IS NULL OR
    NOT EXISTS (
      SELECT 1 FROM unnest(codes) AS code
      WHERE code !~ '^d{4}-d{2}$'
    );
$$;

ALTER TABLE job_canonical_roles
  ADD COLUMN IF NOT EXISTS status TEXT NOT NULL DEFAULT 'active'
    CHECK (status IN ('active', 'pending', 'deprecated', 'alias_of', 'rejected')),
  ADD COLUMN IF NOT EXISTS source TEXT NOT NULL DEFAULT 'llm_extractor'
    CHECK (source IN ('seed', 'llm_extractor', 'manual', 'merge')),
    -- 'merge': reservado para fluxo de merge admin — não implementado no Sprint 1
  ADD COLUMN IF NOT EXISTS merged_into UUID REFERENCES job_canonical_roles(id),
  ADD COLUMN IF NOT EXISTS alias_of_id UUID REFERENCES job_canonical_roles(id),
  ADD COLUMN IF NOT EXISTS rejected_reason TEXT,
  ADD COLUMN IF NOT EXISTS distinct_sources_count INTEGER DEFAULT 0,
  ADD COLUMN IF NOT EXISTS confidence_median NUMERIC(4,3),
    -- Calculado e persistido no momento da auto-promoção (junto com promoted_at)
    -- Usa PERCENTILE_CONT com HAVING COUNT(*) >= 5 sobre as últimas 120 vagas curadas
  ADD COLUMN IF NOT EXISTS promoted_at TIMESTAMPTZ,
  ADD COLUMN IF NOT EXISTS vacancy_count INT DEFAULT 0,
  ADD COLUMN IF NOT EXISTS latest_posted_at TIMESTAMPTZ,
  ADD COLUMN IF NOT EXISTS usage_count INT NOT NULL DEFAULT 0,
  ADD COLUMN IF NOT EXISTS cbo_codes TEXT[],
  ADD COLUMN IF NOT EXISTS blacklist_expiry_at TIMESTAMPTZ,
  ADD COLUMN IF NOT EXISTS is_emerging BOOLEAN NOT NULL DEFAULT false,
  ADD COLUMN IF NOT EXISTS embedding VECTOR(384);

-- ⚠️ Nota de coexistência: job_canonical_roles não possui campo low_quality
-- O booleano low_quality existe em job_postings e coexiste com curation_status
-- Ambos são sempre atualizados de forma síncrona
-- Para novas queries, usar curation_status como fonte de verdade

ALTER TABLE job_canonical_roles
  ADD CONSTRAINT chk_deprecated_has_merged_into
    CHECK (status != 'deprecated' OR merged_into IS NOT NULL),
  ADD CONSTRAINT chk_alias_has_alias_of_id
    CHECK (status != 'alias_of' OR alias_of_id IS NOT NULL),
  ADD CONSTRAINT chk_rejected_has_reason
    CHECK (status != 'rejected' OR rejected_reason IS NOT NULL),
  ADD CONSTRAINT chk_no_self_reference_alias
    CHECK (alias_of_id IS NULL OR alias_of_id != id),
  ADD CONSTRAINT chk_no_self_reference_merged
    CHECK (merged_into IS NULL OR merged_into != id),
  ADD CONSTRAINT chk_cbo_codes_format
    CHECK (is_valid_cbo_array(cbo_codes));

ALTER TABLE job_canonical_roles
  ADD CONSTRAINT job_canonical_roles_slug_key UNIQUE (slug);

CREATE INDEX IF NOT EXISTS job_canonical_roles_embedding_idx
  ON job_canonical_roles
  USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);

job_postings — alterações

(inalterado da v5.13)

Back-fill obrigatório — otimizado com CTE + DISTINCT ON:

-- ⚠️ Versão v5.14: reescrito com CTE para evitar O(n²) da subquery correlacionada aninhada
-- CTE escaneia function_orchestrator_items uma única vez independentemente do volume de job_postings

WITH latest_item AS (
  SELECT DISTINCT ON (job_posting_id)
    job_posting_id,
    pipeline_stage
  FROM function_orchestrator_items
  ORDER BY job_posting_id, created_at DESC
)

-- PASSO A: vagas já curadas com sucesso normal
UPDATE job_postings jp
SET curation_status = 'curated'
FROM latest_item li
WHERE jp.id = li.job_posting_id
  AND jp.canonical_role_id IS NOT NULL
  AND (jp.low_quality IS NULL OR jp.low_quality = false)
  AND li.pipeline_stage != 'fallback';

-- PASSO A2: vagas curadas via fallback (executar em seguida)
WITH latest_item AS (
  SELECT DISTINCT ON (job_posting_id)
    job_posting_id,
    pipeline_stage
  FROM function_orchestrator_items
  ORDER BY job_posting_id, created_at DESC
)
UPDATE job_postings jp
SET curation_status = 'curated_fallback'
FROM latest_item li
WHERE jp.id = li.job_posting_id
  AND jp.canonical_role_id IS NOT NULL
  AND (jp.low_quality IS NULL OR jp.low_quality = false)
  AND li.pipeline_stage = 'fallback';

-- PASSO B: vagas low_quality (inalterado)
UPDATE job_postings
SET curation_status = 'low_quality'
WHERE low_quality = true;

-- VERIFICAÇÃO pós-back-fill:
SELECT curation_status, COUNT(*) FROM job_postings GROUP BY curation_status;
-- 'pending' deve ter APENAS vagas nunca curadas

Migration job_run_id NOT NULL — 3 passos

-- Executar ANTES do ALTER em function_orchestrator_runs
-- PASSO 1: Registro sentinela para registros históricos sem job_run_id
INSERT INTO job_runs (id, job_name, status, metadata, started_at)
VALUES (
  '00000000-0000-0000-0000-000000000002',
    -- ⚠️ UUID diferente do sentinela de usuário de serviço (000...001)
  'legacy_migration_sentinel',
  'success',
  '{"note": "Registro criado para backfill de function_orchestrator_runs históricos"}',
  '2024-01-01T00:00:00Z'
) ON CONFLICT DO NOTHING;

-- PASSO 2: Atribuir sentinela aos registros sem job_run_id
UPDATE function_orchestrator_runs
SET job_run_id = '00000000-0000-0000-0000-000000000002'
WHERE job_run_id IS NULL;

-- PASSO 3: Adicionar NOT NULL constraint
ALTER TABLE function_orchestrator_runs
  ALTER COLUMN job_run_id SET NOT NULL;

-- VERIFICAÇÃO:
SELECT COUNT(*) FROM function_orchestrator_runs WHERE job_run_id IS NULL;
-- Deve retornar 0

ai_usage_logs, events e índices em job_postings

(inalterados da v5.13)


CRON diário — arquitetura em 4 fases

Arquitetura única: Vercel Cron. API Route TypeScript orquestra 4 fases sequenciais. Nenhum job pg_cron independente.

// vercel.json
{
  "crons": [
    { "path": "/api/cron/pipeline-maintenance", "schedule": "0 3 * * *" },
    { "path": "/api/cron/pipeline-maintenance", "schedule": "0 15 * * *" },
    { "path": "/api/cron/monthly-cleanup", "schedule": "0 4 1 * *" }
  ]
}

03h e 15h UTC = 00h e 12h horário de Brasília | 04h UTC dia 1 = 01h Brasília primeiro dia do mês

run_monthly_cleanup — DDL (v5.19)

-- ⚠️ v5.19: DDL explícito — necessário para supabase.rpc('run_monthly_cleanup') funcionar
-- Dois loops de microbatch independentes: job_canonical_role_sources (1 ano) e function_orchestrator_items (2 anos)
-- VACUUM não incluso — ver decisão de governança no handler abaixo
CREATE OR REPLACE FUNCTION run_monthly_cleanup()
RETURNS void LANGUAGE plpgsql AS $$
DECLARE deleted_count INT;
BEGIN
  -- Limpeza de job_canonical_role_sources: registros inativos há mais de 1 ano
  LOOP
    DELETE FROM job_canonical_role_sources
    WHERE id IN (
      SELECT id FROM job_canonical_role_sources
      WHERE last_seen_at < NOW() - INTERVAL '1 year'
      LIMIT 5000
      FOR UPDATE SKIP LOCKED
    );
    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    EXIT WHEN deleted_count = 0;
    PERFORM pg_sleep(0.05);
  END LOOP;

  -- Limpeza de function_orchestrator_items: registros com mais de 2 anos
  LOOP
    DELETE FROM function_orchestrator_items
    WHERE id IN (
      SELECT id FROM function_orchestrator_items
      WHERE created_at < NOW() - INTERVAL '2 years'
      LIMIT 5000
      FOR UPDATE SKIP LOCKED
    );
    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    EXIT WHEN deleted_count = 0;
    PERFORM pg_sleep(0.05);
  END LOOP;
END;
$$;

Handler monthly-cleanup.ts (v5.18)

// pages/api/cron/monthly-cleanup.ts
export default async function handler(req, res) {
  const runId = await createJobRun('monthly_cleanup');
  try {
    await supabase.rpc('run_monthly_cleanup');
    await supabase.from('job_runs')
      .update({ status: 'success', finished_at: new Date().toISOString() })
      .eq('id', runId);
    return res.status(200).json({ ok: true });
  } catch (err) {
    await registerCronError(runId, { step: 'monthly_cleanup', message: err.message });
    await supabase.from('job_runs')
      .update({ status: 'error', finished_at: new Date().toISOString() })
      .eq('id', runId);
    return res.status(500).json({ error: err.message });
  }
}
// ⚠️ VACUUM — Decisão de governança (v5.19):
// O autovacuum do Supabase cobre o volume esperado no lançamento.
// VACUUM explícito não é implementado no handler — adicionar complexidade desnecessária agora
// introduziria dependência de dblink (incompatível com Transaction Pooler do Supabase gerenciado)
// ou pg_cron externo (job fora do job_runs, sem rastreabilidade).
// Critério de revisão: quando function_orchestrator_items ultrapassar 5M registros,
// avaliar pg_cron com entrada separada no job_runs ou upgrade de plano Supabase para acesso direto.

API Route — orquestração das 4 fases

// pages/api/cron/pipeline-maintenance.ts

// ─── registerCronError — interface definida (v5.15) ──────────────────────────
// Acumula erros em metadata.errors[] — não sobrescreve erros anteriores do mesmo run
async function registerCronError(
  jobRunId: string,
  error: { step: string; component?: string; message: string }
): Promise<void> {
  const { data: current } = await supabase
    .from('job_runs')
    .select('metadata')
    .eq('id', jobRunId)
    .single();

  const existingErrors: unknown[] = current?.metadata?.errors ?? [];
  const newError = { ...error, failed_at: new Date().toISOString() };

  await supabase.from('job_runs').update({
    metadata: {
      ...(current?.metadata ?? {}),
      errors: [...existingErrors, newError],
    },
  }).eq('id', jobRunId);
}

// ─── isPipelineEnabled — fonte de verdade em runtime (v5.15) ─────────────────
// Banco prevalece sobre env var. Env var = bootstrap; pipeline_config = runtime.
async function isPipelineEnabled(): Promise<boolean> {
  const { data } = await supabase
    .from('pipeline_config')
    .select('value')
    .eq('key', 'CURATE_PIPELINE_ENABLED')
    .single();
  return (data?.value ?? process.env.CURATE_PIPELINE_ENABLED ?? 'true') === 'true';
}

export default async function handler(req, res) {
  const cronRunId = await createJobRun('run_daily_pipeline_maintenance');
  const phaseErrors: string[] = [];

  // ─── Verificação de habilitação — status 'aborted' (v5.17) ───────────────
  // isPipelineEnabled() lê pipeline_config (banco prevalece); fallback para env var
  const enabled = await isPipelineEnabled();
  if (!enabled) {
    await registerCronError(cronRunId, {
      step: 'pipeline_guard',
      message: 'Pipeline desabilitado via pipeline_config — run abortado sem processar',
    });
    await supabase.from('job_runs')
      .update({ status: 'aborted', finished_at: new Date().toISOString() })
      .eq('id', cronRunId);
    return res.status(200).json({ ok: false, status: 'aborted', reason: 'pipeline_disabled' });
  }

  try {
    // ─── FASE 1: SQL pura (PASSOs 0, 1, 2) ───────────────────────────
    // ⚠️ Política: qualquer falha na FASE 1 aborta o CRON inteiro.
    // FASE 1 cobre watchdog, expiração de vagas e recálculo de fontes —
    // todas são pré-condições para as fases seguintes operarem com dados corretos.
    const { error: phase1Error } = await supabase.rpc('maintenance_phase_1');
    if (phase1Error) {
      await registerCronError(cronRunId, { step: 'cron_phase_1', message: phase1Error.message });
      await supabase.from('job_runs')
        .update({ status: 'error', finished_at: new Date() })
        .eq('id', cronRunId);
      return res.status(500).json({ error: phase1Error.message });
    }

    // ─── FASE 2: TypeScript — PASSO 3 (quarentena) ───────────────────
    // Falha na FASE 2 não impede FASE 3 e FASE 4
    try {
      // ⚠️ v5.19: RPC transacional única — calcula slots + seleciona + libera no mesmo contexto
      // Elimina janela de corrida do fluxo anterior (remaining_slots → release_quarantined_jobs)
      // Ex: 90 liberações no dia → RPC libera no máx 10; 0 retornou = nenhuma vaga elegível ou teto atingido
      const { data: releasedCount } = await supabase.rpc('release_quarantined_jobs_limited');
      // releasedCount: INT — número de vagas efetivamente liberadas (0 se teto atingido)
    } catch (phase2Error) {
      phaseErrors.push('cron_phase_2');
      await registerCronError(cronRunId, { step: 'cron_phase_2', message: phase2Error.message });
      // Continua para FASE 3
    }

    // ─── FASE 3: SQL pura (PASSOs 4, 5) ──────────────────────────────
    // Falha na FASE 3 não impede FASE 4
    try {
      await supabase.rpc('maintenance_phase_2');
    } catch (phase3Error) {
      phaseErrors.push('cron_phase_3');
      await registerCronError(cronRunId, { step: 'cron_phase_3', message: phase3Error.message });
    }

    // ─── FASE 4: TypeScript — PASSO 6 (embeddings NULL) ──────────────
    try {
      const { data: nullEmbeddings } = await supabase
        .from('job_canonical_roles')
        .select('id, label')
        .eq('status', 'active')
        .is('embedding', null)
        .limit(parseInt(process.env.EMBEDDING_REBUILD_BATCH_LIMIT ?? '50'));

      for (const role of nullEmbeddings ?? []) {
        const vector = await generateE5Embedding(role.label);
        await supabase.from('job_canonical_roles')
          .update({ embedding: vector })
          .eq('id', role.id);
      }
    } catch (phase4Error) {
      phaseErrors.push('cron_phase_4');
      await registerCronError(cronRunId, { step: 'cron_phase_4', message: phase4Error.message });
    }

    // ─── Finalização com status correto (v5.15) ───────────────────────
    // 'success'  → nenhuma fase falhou
    // 'partial'  → FASE 1 passou, mas alguma das FASEs 2–4 falhou
    const finalStatus = phaseErrors.length === 0 ? 'success' : 'partial';
    await supabase.from('job_runs')
      .update({ status: finalStatus, finished_at: new Date() })
      .eq('id', cronRunId);

    return res.status(200).json({ ok: true, status: finalStatus, phaseErrors });

  } catch (fatalError) {
    await registerCronError(cronRunId, { step: 'cron_phase_1', message: fatalError.message });
    await supabase.from('job_runs')
      .update({ status: 'error', finished_at: new Date() })
      .eq('id', cronRunId);
    return res.status(500).json({ error: fatalError.message });
  }
}

maintenance_phase_1 — PASSOs 0, 1, 2

CREATE OR REPLACE FUNCTION maintenance_phase_1()
RETURNS void AS $$
BEGIN

-- PASSO 0: Watchdog — resolver runs presos
-- PASSO 0.1: Limpar function_orchestrator_runs filhos travados
UPDATE function_orchestrator_runs
SET status = 'error',
    finished_at = NOW(),
    error_message = 'Interrompido sem conclusão — resolvido pelo watchdog'
WHERE status = 'running'
  AND started_at < NOW() - INTERVAL '30 minutes';

-- PASSO 0.2: Limpar job_runs pais travados (v5.16)
-- Hard crash da Vercel pode deixar job_runs preso em 'running' indefinidamente
-- jsonb_build_object passa NOW() como TIMESTAMPTZ nativo — sem concatenação de string (v5.17)
UPDATE job_runs
SET status     = 'error',
    finished_at = NOW(),
    metadata   = jsonb_set(
      COALESCE(metadata, '{}'::jsonb),
      '{errors}',
      COALESCE(metadata->'errors', '[]'::jsonb)
        || jsonb_build_array(
             jsonb_build_object(
               'step',      'watchdog',
               'message',   'Job resolvido pelo watchdog por inatividade',
               'failed_at', NOW()
             )
           )
    )
WHERE status = 'running'
  AND started_at < NOW() - INTERVAL '30 minutes';

-- PASSO 1: Marcar vagas expiradas como inativas
UPDATE job_postings
SET is_active = false
WHERE posted_at < NOW() - INTERVAL '120 days'
  AND is_active = true;

-- PASSO 2: Recalcular distinct_sources_count (verificação de consistência full)
-- O trigger fn_update_distinct_sources_count mantém em tempo real (v5.13).
-- Este recálculo full detecta divergências acumuladas (ex: outage do trigger).
-- ⚠️ Sem JOIN com job_postings — last_seen_at já incorpora a janela temporal.
-- Índice composto (canonical_role_id, last_seen_at DESC) cobre este filtro.
UPDATE job_canonical_roles jcr
SET distinct_sources_count = (
  SELECT COUNT(*)
  FROM job_canonical_role_sources jcrs
  WHERE jcrs.canonical_role_id = jcr.id
    AND jcrs.last_seen_at >= NOW() - INTERVAL '120 days'
);

END;
$$ LANGUAGE plpgsql;

release_quarantined_jobs — PASSO 3 como RPC independente

CREATE OR REPLACE FUNCTION release_quarantined_jobs(job_ids UUID[])
RETURNS INT LANGUAGE plpgsql AS $$
DECLARE released INT;
BEGIN
  UPDATE job_postings
  SET curation_status = 'pending',
      llm_parse_fail_count = 0,
      quarantined_at = NULL,
      quarantine_reason = NULL,
      quarantine_released_at = NOW(),
      updated_at = NOW()
  WHERE id = ANY(job_ids)
    AND curation_status = 'quarantined_llm_output';
  GET DIAGNOSTICS released = ROW_COUNT;
  RETURN released;
END;
$$;

release_quarantined_jobs_limited — liberação transacional atômica (v5.19)

-- ⚠️ v5.19: substitui o fluxo de dois round-trips (check_quarantine_rate_limit → release_quarantined_jobs)
-- Elimina janela de corrida em execuções concorrentes — calcula slots + seleciona + libera
-- no mesmo contexto transacional com FOR UPDATE SKIP LOCKED
-- TypeScript chama uma única RPC sem precisar de resolveQuarantineEligibleIds()
CREATE OR REPLACE FUNCTION release_quarantined_jobs_limited()
RETURNS INT LANGUAGE plpgsql AS $$
DECLARE
  remaining_slots INT;
  released        INT := 0;
BEGIN
  -- Calcular slots disponíveis no dia-calendário de São Paulo
  SELECT GREATEST(0, 100 - COUNT(*)::INT)
  INTO remaining_slots
  FROM job_postings
  WHERE (quarantine_released_at AT TIME ZONE 'America/Sao_Paulo')::date
      = (NOW() AT TIME ZONE 'America/Sao_Paulo')::date;

  IF remaining_slots = 0 THEN
    RETURN 0;
  END IF;

  -- Selecionar e liberar em contexto transacional único
  UPDATE job_postings
  SET curation_status       = 'pending',
      llm_parse_fail_count  = 0,
      quarantined_at        = NULL,
      quarantine_reason     = NULL,
      quarantine_released_at = NOW(),
      updated_at            = NOW()
  WHERE id IN (
    SELECT id FROM job_postings
    WHERE curation_status = 'quarantined_llm_output'
      AND (quarantined_at + INTERVAL '1 day' * (
            SELECT value::INT FROM pipeline_config WHERE key = 'QUARANTINE_EXPIRY_DAYS'
          )) <= NOW()
    ORDER BY quarantined_at ASC
    LIMIT remaining_slots
    FOR UPDATE SKIP LOCKED
  );
  GET DIAGNOSTICS released = ROW_COUNT;
  RETURN released;
END;
$$;
-- Retorna o número de vagas efetivamente liberadas (0 se teto atingido ou nenhuma elegível)
-- check_quarantine_rate_limit() mantida para uso em queries de diagnóstico/monitoramento

check_quarantine_rate_limit — consulta de diagnóstico (v5.19)

-- ⚠️ v5.19: não mais usada no fluxo principal — substituída por release_quarantined_jobs_limited()
-- Mantida para diagnóstico, monitoramento e consultas admin
CREATE OR REPLACE FUNCTION check_quarantine_rate_limit()
RETURNS INT LANGUAGE sql STABLE AS $$
  SELECT GREATEST(0,
    100 - COUNT(*)::INT
  )
  FROM job_postings
  WHERE (quarantine_released_at AT TIME ZONE 'America/Sao_Paulo')::date
      = (NOW() AT TIME ZONE 'America/Sao_Paulo')::date;
$$;
-- Retorna 0 quando teto atingido, N quando ainda há N slots disponíveis

search_canonical_roles — busca vetorial com índice HNSW (v5.17)

-- ⚠️ pgvector: ORDER BY só pode conter o operador de distância para usar índice HNSW.
-- Qualquer coluna adicional (vacancy_count, id) força sequential scan — 100% CPU.
-- Solução: CTE busca top N*3 usando só o vetor; outer query reordena em memória.
CREATE OR REPLACE FUNCTION search_canonical_roles(
  query_vector  VECTOR(384),
  result_limit  INT DEFAULT 5
)
RETURNS TABLE(id UUID, label TEXT, similarity NUMERIC) AS $$
BEGIN
  -- set_config compatível com Transaction Pooler (porta 6543)
  PERFORM set_config('hnsw.ef_search', '64', true);
  RETURN QUERY
  WITH knn_results AS (
    SELECT jcr.id,
           jcr.label,
           jcr.vacancy_count,
           (1 - (jcr.embedding <=> query_vector))::NUMERIC AS similarity
    FROM job_canonical_roles jcr
    WHERE jcr.status = 'active'
      AND jcr.embedding IS NOT NULL
    ORDER BY jcr.embedding <=> query_vector  -- ⚠️ operador vetorial sozinho — HNSW ativado
    LIMIT result_limit * 3                   -- margem extra para desempate seguro
  )
  SELECT k.id, k.label, k.similarity
  FROM knn_results k
  ORDER BY k.similarity DESC,
           k.vacancy_count DESC,
           k.id ASC
  LIMIT result_limit;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION maintenance_phase_2()
RETURNS void AS $$
BEGIN

-- PASSO 4: Recalcular vacancy_count
UPDATE job_canonical_roles jcr
SET vacancy_count = (
  SELECT COUNT(*)
  FROM job_postings jp
  WHERE jp.canonical_role_id = jcr.id
    AND jp.curation_status IN ('curated', 'curated_fallback')
    AND jp.is_active = true
    AND jp.posted_at >= NOW() - INTERVAL '120 days'
);

-- PASSO 5: Requeue de falhas transitórias
UPDATE job_postings
SET curation_status = 'pending', updated_at = NOW()
WHERE id IN (
  SELECT id FROM job_postings
  WHERE curation_status = 'retryable_error'
    AND is_active = true
  ORDER BY updated_at ASC
  LIMIT 40
  FOR UPDATE SKIP LOCKED
);

END;
$$ LANGUAGE plpgsql;

Rate limit de quarentena — RPC transacional única (v5.19):

// ⚠️ v5.19: RPC única — calcula slots + seleciona + libera no mesmo contexto transacional
// Elimina janela de corrida do fluxo anterior (remaining_slots → release_quarantined_jobs)
// check_quarantine_rate_limit() mantida apenas para diagnóstico/monitoramento
const { data: releasedCount } = await supabase.rpc('release_quarantined_jobs_limited');
// releasedCount: número de vagas liberadas (0 = teto atingido ou nenhuma elegível)

Governança mensal — delete em microbatches:

-- Job separado: job_name='monthly_cleanup'
-- ⚠️ DELETE único em tabela com 2+ anos pode deletar milhões de linhas — lock prolongado
-- Solução: microbatches de 5.000 registros com pausa entre lotes
DO $$
DECLARE
  deleted_count INT;
  total_deleted INT := 0;
BEGIN
  LOOP
    DELETE FROM function_orchestrator_items
    WHERE id IN (
      SELECT id FROM function_orchestrator_items
      WHERE processed_at < NOW() - INTERVAL '2 years'
      LIMIT 5000
      FOR UPDATE SKIP LOCKED
    );
    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    total_deleted := total_deleted + deleted_count;
    EXIT WHEN deleted_count = 0;
    PERFORM pg_sleep(0.05); -- 50ms de pausa entre microbatches
  END LOOP;
  RAISE NOTICE 'Total deletado: %', total_deleted;
END;
$$;

VACUUM ANALYZE function_orchestrator_items;
VACUUM ANALYZE ai_usage_logs;

-- Limpeza de job_canonical_role_sources — registros inativos há mais de 1 ano (v5.17)
-- Seguro: distinct_sources_count só olha últimos 120 dias — registros de 1+ ano não afetam cálculos
-- ⚠️ v5.18: microbatch — mesmo padrão de function_orchestrator_items (evita lock prolongado)
DO $$
DECLARE deleted_count INT;
BEGIN
  LOOP
    DELETE FROM job_canonical_role_sources
    WHERE id IN (
      SELECT id FROM job_canonical_role_sources
      WHERE last_seen_at < NOW() - INTERVAL '1 year'
      LIMIT 5000
      FOR UPDATE SKIP LOCKED
    );
    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    EXIT WHEN deleted_count = 0;
    PERFORM pg_sleep(0.05);
  END LOOP;
END;
$$;

Alertas operacionais:

-- Alerta de canônicos pendentes
SELECT COUNT(*) FROM job_canonical_roles WHERE status = 'pending';
-- Alerta quando > PENDING_CANONICALS_ALERT_THRESHOLD (padrão: 50)

Governança semanal — protocolo quando fallback_ratio > 0.20 (v5.17):

Se fallback_ratio > 0.20 em qualquer run:
1. Verificar se houve alteração recente no SYSTEM_PROMPT via job_runs.metadata.prompt_version
2. Verificar se houve alteração nos arquivos externos via job_runs.metadata.dict_files
3. Comparar com runs anteriores saudáveis — isolar o run que iniciou a regressão
4. Se não houver alteração identificada: verificar cobertura do dicionário para as funções do período

Governança mensal — query de auditoria de inflação por grupo empresarial (v5.17):

-- Detecta canônicos com distinct_sources_count >= 5 mas alta concentração de prefixo
-- Indica possível inflação por subsidiárias do mesmo grupo empresarial
SELECT jcr.label,
       jcr.distinct_sources_count,
       COUNT(*)                                         AS total_sources,
       COUNT(DISTINCT LEFT(jcrs.normalized_company, 5)) AS prefixos_distintos
FROM job_canonical_roles jcr
JOIN job_canonical_role_sources jcrs ON jcrs.canonical_role_id = jcr.id
WHERE jcr.status IN ('pending', 'active')
  AND jcr.distinct_sources_count >= 5
GROUP BY jcr.id, jcr.label, jcr.distinct_sources_count
HAVING COUNT(DISTINCT LEFT(jcrs.normalized_company, 5)) <= 2
ORDER BY jcr.distinct_sources_count DESC;
-- Resultado com prefixos_distintos <= 2 merece revisão manual
-- Ação: avaliar se as sources são subsidiárias do mesmo grupo; corrigir distinct_sources_count manualmente

Upsert em job_canonical_roles

(inalterado da v5.13 — slugs ordenados alfabeticamente, COALESCE em latest_posted_at)


Quarentena operacional

(inalterado da v5.13)

Retry de batch com limite:

const MAX_BATCH_RETRIES = parseInt(process.env.LLM_BATCH_PARSE_MAX_RETRIES ?? '2');
let batchAttempt = 0;

while (batchAttempt <= MAX_BATCH_RETRIES) {
  try {
    // processar batch...
    break;
  } catch (err) {
    if (isBatchOutputParseError(err)) {
      batchAttempt++;
      if (batchAttempt > MAX_BATCH_RETRIES) {
        // Todas as vagas do batch → retryable_error
        // NÃO incrementa llm_parse_fail_count
        await markBatchItemsAsRetryable(batchItemIds, 'batch_output_parse_error');
        await registerJobRunError(jobRunId, {
          step: 'batch_processing',
          component: `batch_${batchNumber}`,
          message: `batch_output_parse_error após ${MAX_BATCH_RETRIES} tentativas`,
          failed_at: new Date().toISOString(),
        });
        break;
      }
    } else throw err;
  }
}

Auto-promoção — confidence_median gravado na promoção

-- Auto-promoção: CTE garante que promoção só ocorre se COUNT(*) >= 5 (v5.15)
-- ⚠️ v5.17: parâmetros INTEGER — INTERVAL ':var' é inválido no PG
-- ⚠️ v5.18: adicionado p_min_sources_emerging — respeita AUTO_PROMOTE_MIN_SOURCES_EMERGING
CREATE OR REPLACE FUNCTION run_auto_promotion(
  p_min_sources          INT DEFAULT 5,
  p_min_sources_emerging INT DEFAULT 3,
  p_min_days             INT DEFAULT 14
)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
  WITH stats AS (
    SELECT
      foi.canonical_role_id,
      COUNT(*)                                                      AS n,
      PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY foi.confidence)  AS median
    FROM function_orchestrator_items foi
    JOIN function_orchestrator_runs fo ON fo.id = foi.run_id
    WHERE foi.confidence IS NOT NULL
      AND fo.started_at >= NOW() - INTERVAL '120 days'
    GROUP BY foi.canonical_role_id
    HAVING COUNT(*) >= 5
  )
  UPDATE job_canonical_roles jcr
  SET status            = 'active',
      promoted_at       = NOW(),
      confidence_median = stats.median
  FROM stats
  WHERE jcr.id = stats.canonical_role_id
    AND jcr.status = 'pending'
    AND jcr.distinct_sources_count >= CASE
          WHEN jcr.is_emerging THEN p_min_sources_emerging
          ELSE p_min_sources
        END
    AND jcr.promoted_at IS NULL
    AND (NOW() - jcr.created_at) >= (p_min_days * INTERVAL '1 day');
  -- Se stats não tiver linha para o canonical_role_id: nenhuma atualização ocorre
END;
$$;
-- Chamada TypeScript:
-- await supabase.rpc('run_auto_promotion', {
--   p_min_sources:          parseInt(process.env.AUTO_PROMOTE_MIN_SOURCES ?? '5'),
--   p_min_sources_emerging: parseInt(process.env.AUTO_PROMOTE_MIN_SOURCES_EMERGING ?? '3'),
--   p_min_days:             parseInt(process.env.AUTO_PROMOTE_MIN_DAYS ?? '14')
-- })

Validação de arquivos JSON externos — completude

// Após validação Zod, verificar completude
function validateFileCompleteness(parsedData: any, fileName: string): void {
  if (Object.keys(parsedData.data ?? {}).length === 0) {
    throw new Error(`Arquivo ${fileName} passou na validação Zod mas está vazio (data: {}). Abortando.`);
  }
}

// Sequência obrigatória no startup:
// 1. Criar job_runs com status='running', metadata={}
// 2. Validar Zod
// 3. Validar completude
// Falha em qualquer etapa → enriquecer job_runs.metadata + abortar

Arquivos externos — repositório

(inalterado da v5.13)


Pipeline de curadoria — fluxo detalhado

(inalterado da v5.13)


Embeddings — arquitetura completa

(inalterado da v5.13)


Variáveis de ambiente

# LLM
LLM_MODEL=claude-haiku-4-5-20251001
LLM_CONCURRENCY_EXTRACTOR=15
LLM_BATCH_SIZE=40
LLM_BATCH_PARSE_MAX_RETRIES=2
  # Máximo de retries para batch_output_parse_error antes de marcar vagas como retryable_error

# Embeddings
EMBEDDING_MODEL=intfloat/multilingual-e5-small
EMBEDDING_SIMILARITY_REPLACE_THRESHOLD=0.975
EMBEDDING_REBUILD_BATCH_LIMIT=50
  # Máximo de embeddings NULL recalculados por execução do CRON (FASE 4)

# Pipeline
TEXT_TRUNCATION_LIMIT=5000
CURATE_PIPELINE_ENABLED=true
  # Valor de bootstrap. Em runtime, pipeline_config prevalece.
SLOW_BATCH_THRESHOLD_MS=35000
  # Batch acima deste tempo (ms) incrementa slow_batch_streak
SLOW_BATCH_AUTO_PAUSE_AT=10
  # Na Nª ocorrência consecutiva de slow_batch, pipeline é pausado automaticamente via pipeline_config

# Auto-promoção
AUTO_PROMOTE_MIN_SOURCES=5
AUTO_PROMOTE_MIN_SOURCES_EMERGING=3
AUTO_PROMOTE_MIN_DAYS=14
AUTO_PROMOTE_MIN_CONFIDENCE=0.80
AUTO_PROMOTE_MAX_SIMILARITY=0.90

# Quarentena
QUARANTINE_FAIL_THRESHOLD=3
BLACKLIST_EXPIRY_DAYS=90

# Alertas
FALLBACK_RATIO_ALERT_THRESHOLD=0.20
PENDING_CANONICALS_ALERT_THRESHOLD=50

# Connection
SUPABASE_URL=...
SUPABASE_SERVICE_ROLE_KEY=...
  # Connection string deve apontar para porta 6543 (Transaction Pooler)

Ciclo de vida dos cargos em job_canonical_roles

(inalterado da v5.13)


Quarentena operacional

(inalterado da v5.13)


Governança da taxonomia — eventos

(inalterado da v5.13)


Golden set — critérios de qualidade

(inalterado da v5.13)


Passo a passo de implementação

Sprint 1 — Sequência obrigatória

(inalterado da v5.13 — referências a curate_runsfunction_orchestrator_runs, curate_run_itemsfunction_orchestrator_items)

Sprint 1 — Esforço

(inalterado da v5.13)

Checklist de deploy

Item Verificação
job_runs DDL Criado antes de function_orchestrator_runs (FK depende)
job_runs GIN Índice GIN em metadata criado
job_runs.status CHECK Inclui 'partial' e 'aborted'CHECK (status IN ('running', 'success', 'partial', 'error', 'aborted'))
job_runs criado primeiro status='running', metadata={} antes do Zod e da pré-rotina de hash
pipeline_config DDL Tabela criada + seed ('CURATE_PIPELINE_ENABLED', 'true')
isPipelineEnabled() Lê banco com fallback para env var — banco prevalece
Fluxo aborted — CRON + Fluxos A e B Todo entrypoint de curadoria: isPipelineEnabled() → false → aborted + encerra
monthly_cleanup agendado Entrada "0 4 1 * *" em vercel.json + handler monthly-cleanup.ts
SLOW_BATCH_AUTO_PAUSE_AT env var parseInt(process.env.SLOW_BATCH_AUTO_PAUSE_AT ?? '10') no TypeScript
increment_circuit_breaker() RPC DDL executado — sem supabase.raw() no código
check_quarantine_rate_limit() RPC Retorna remaining_slots INT — teto 100/dia garantido atomicamente
search_canonical_roles() RPC ORDER BY só com operador vetorial; desempate via CTE com result_limit * 3
run_auto_promotion() RPC p_min_sources, p_min_sources_emerging, p_min_daysCASE WHEN is_emerging
Rollback low_quality = false incluído no UPDATE de reversão
generate-prompt-version.ts regex ;? — semicolon opcional
DELETE job_canonical_role_sources Microbatch 5.000 + FOR UPDATE SKIP LOCKED
registerCronError() Interface TypeScript definida — acumula em errors[], não sobrescreve
metadata.errors[] Campo é array em toda a spec — sem uso de metadata.error (singular)
Watchdog PASSO 0.2 job_runs presos resolvidos com jsonb_build_object — sem concatenação de string
maintenance_phase_1() sem p_job_run_id Parâmetro removido — chamada sem argumentos
maintenance_phase_2() sem p_job_run_id Parâmetro removido — chamada sem argumentos
CRON status partial Handler acumula phaseErrors[]; finaliza com 'partial' quando FASE 1 passa mas FASEs 2–4 falham
Texto FASE 1 Documenta “qualquer falha na FASE 1 aborta” (não apenas watchdog)
Auto-promoção com CTE UPDATE FROM stats — promoção só ocorre se COUNT(*) >= 5
action_required CHECK IS NULL OR IN (...) — sem IN (..., NULL)
Índice (job_posting_id, created_at DESC) Criado em function_orchestrator_items
Migration sentinela UUID 000...002 criado; function_orchestrator_runs históricos com job_run_id preenchido
function_orchestrator_runs job_run_id NOT NULL — toda execução gera job_runs antes de qualquer processamento
function_orchestrator_items retry_count ausente — nenhuma referência no código
Renomeação Nenhuma referência a curate_runs ou curate_run_items no código
SYSTEM_PROMPT.ts isolado Contém apenas texto do prompt
generate-prompt-version.ts Hash apenas do conteúdo do template literal (não do arquivo inteiro)
prompt-version.generated.ts No .gitignore, gerado pelo prebuild
Validação de completude Object.keys(data).length === 0 após Zod para cada arquivo
db_connection error Registrado em log externo quando banco inacessível — não apenas em job_runs
confidence_median Calculado via CTE — promoção bloqueada se COUNT(*) < 5
low_quality coexistência curation_status e low_quality atualizados de forma síncrona
Trigger DELETE fn_update_distinct_sources_count trata INSERT, UPDATE e DELETE
Índice composto sources (canonical_role_id, last_seen_at DESC) criado
CRON 4 fases FASE 1 aborta tudo se falhar; FASEs 2–4 continuam independentemente
maintenance_phase_1 PASSOs 0, 1, 2 como RPC separada
release_quarantined_jobs PASSO 3 como RPC com parâmetro UUID[]
maintenance_phase_2 PASSOs 4, 5 como RPC separada
PASSO 6 (FASE 4) TypeScript com LIMIT EMBEDDING_REBUILD_BATCH_LIMIT
slow_batch_streak Campos criados em circuit_breaker_state; pausa automática na 11ª ocorrência
Reset atômico circuit breaker UPDATE com .lt('window_started_at', expiryTimestamp)
LLM_BATCH_PARSE_MAX_RETRIES Batch retenta 2x; após esgotamento vagas → retryable_error
Backfill com CTE DISTINCT ON em vez de subquery aninhada
Rastreabilidade quarentena Query sem filtro curation_status — usa error_type = 'item_output_parse_error'
Delete mensal em microbatches Loop 5.000 registros + pg_sleep(0.05) + VACUUM ANALYZE
Timezone quarentena AT TIME ZONE 'America/Sao_Paulo' na query de rate limit
Ordenação de slugs Upsert em lote ordena alfabeticamente
fallback_ratio Calculado ao fim do run; alerta se > 20%
Alerta pending_review job_canonical_roles WHERE status='pending'
Erro detalhado job_runs Toda falha registra step, component, message, failed_at em errors[]
Golden set — Crítico 100% passando
Golden set — Alto ≥ 90% passando
Connection Pooler Porta 6543, set_config() via RPC testado

Sprint 2 — Validação em shadow mode

(preservado da v5.13)

Plano de rollback

1. IMEDIATO (< 5 minutos):
   - CURATE_PIPELINE_ENABLED=false (ou já pausado automaticamente por slow_batch_streak)
   - Parar runs manuais via painel

2. DIAGNÓSTICO (30 minutos):
   - Verificar `job_runs.metadata.errors[]` para identificar `step`, `component` e `message` da falha
   - Verificar function_orchestrator_runs.error_message
   - Recuperar dict_files e prompt_version via job_run_id → job_runs.metadata
   - Rodar golden set completo em staging

3. ROLLBACK COMPLETO:
   - Reverter SYSTEM_PROMPT.ts via git revert
     → próximo build regenera prompt_version automaticamente (hash do conteúdo)
   - Reverter arquivos JSON externos via git revert
   - Vagas curadas erroneamente: filtrar pelo run_id específico
     UPDATE job_postings jp
     SET skills = NULL, canonical_role_id = NULL, secondary_role = NULL,
         curation_status = 'pending', llm_parse_fail_count = 0,
         low_quality = false  -- ⚠️ v5.18: obrigatório — sincronia com curation_status
     FROM function_orchestrator_items foi
     JOIN function_orchestrator_runs fo ON fo.id = foi.run_id
     WHERE foi.job_posting_id = jp.id
       AND fo.id = :run_id_com_regressao;

4. RERUN após rollback:
   - Confirmar golden set 100% Crítico na versão revertida
   - Rodar 100 vagas em staging
   - Liberar produção

Sprint 3 — Painel em tempo real

(preservado da v5.13)

Share This: