Pipeline de Curação de Vagas — v5.19
Evolução do curate_job_postings.ts para arquitetura de taxonomia viva
O que muda da v5.18 para a v5.19
| # | Origem | Correção | Problema na v5.18 | Solução v5.19 |
|---|---|---|---|---|
| 1 | Gemini | run_monthly_cleanup() sem DDL de CREATE FUNCTION |
Sprint apresentava bloco DO $$ ... $$ mas handler chama supabase.rpc('run_monthly_cleanup') — sem o DDL a RPC não existe |
DDL CREATE OR REPLACE FUNCTION run_monthly_cleanup() adicionado ao sprint e ao pipeline; campo corrigido para created_at (não processed_at) |
| 2 | GPT | Rate limit de quarentena com janela de corrida residual | Fluxo em dois round-trips (remaining_slots → release_quarantined_jobs) deixava janela de corrida em execuções concorrentes que consultassem slots quase simultaneamente |
Nova RPC release_quarantined_jobs_limited() — calcula slots + seleciona + libera no mesmo contexto transacional; TypeScript chama uma única RPC |
| 3 | GPT + Gemini | VACUUM sem decisão concreta | Comentário no handler dizia “executar separadamente” sem definir caminho — deixava gap de governança operacional | Decisão registrada explicitamente: autovacuum do Supabase cobre o volume do lançamento; critério de revisão quando function_orchestrator_items ultrapassar 5M registros |
O que muda da v5.17 para a v5.18
| # | Origem | Correção | Problema na v5.17 | Solução v5.18 |
|---|---|---|---|---|
| 1 | Genspark | monthly_cleanup sem agendamento |
Job mensal documentado mas sem entrada em vercel.json e sem handler TypeScript — nunca rodaria automaticamente |
Entrada "0 4 1 * *" adicionada ao vercel.json; handler monthly-cleanup.ts criado com createJobRun + registerCronError + padrão de erro/sucesso estabelecido |
| 2 | Genspark | SLOW_BATCH_AUTO_PAUSE_AT hardcoded |
Threshold de 10 batches lentos consecutivos estava hardcoded no TypeScript — impedia ajuste operacional sem redeploy | Restaurado como env var SLOW_BATCH_AUTO_PAUSE_AT=10; TypeScript usa parseInt(process.env.SLOW_BATCH_AUTO_PAUSE_AT ?? '10') |
| 3 | Genspark | DELETE job_canonical_role_sources sem microbatch |
DELETE único poderia afetar centenas de milhares de registros em um statement — lock prolongado em escala | Convertido para microbatch de 5.000 registros com FOR UPDATE SKIP LOCKED + pg_sleep(0.05) — mesmo padrão de function_orchestrator_items |
| 4 | Gemini | low_quality = false ausente no rollback |
UPDATE de reversão resetava curation_status = 'pending' mas não tocava em low_quality — flag continuava true, quebrando regra de sincronia com curation_status |
Adicionado low_quality = false no UPDATE do plano de rollback |
| 5 | Gemini | Regex do prebuild sem ;? |
generate-prompt-version.ts usava regex com ; obrigatório — Prettier com semi: false quebraria o prebuild silenciosamente |
Semicolon tornado opcional: /export const SYSTEM_PROMPTs*=s*([sS]?)`s;?/` |
| 6 | GPT | check_quarantine_rate_limit() retornava booleano — teto ainda podia estourar |
RPC retornava TRUE com 90 liberações no dia; TypeScript liberava mais 100 — teto real de 100/dia não era garantido |
RPC reescrita para retornar remaining_slots INT; TypeScript usa slice(0, remaining_slots) — teto garantido atomicamente |
| 7 | GPT | Guard aborted só no CRON, não nos Fluxos A e B |
isPipelineEnabled() verificado apenas no handler do CRON — Fluxos A (manual) e B (automático por endpoint) continuavam curando mesmo com pipeline pausado |
Spec tornada explícita: todo entrypoint de curadoria segue padrão createJobRun → isPipelineEnabled() → false → status='aborted' + metadata.errors[] + encerra |
| 8 | GPT | run_auto_promotion() perdeu regra de is_emerging |
Função reescrita com p_min_sources INT DEFAULT 3 — AUTO_PROMOTE_MIN_SOURCES_EMERGING ficou órfão; canônicos normais promoviam cedo demais ou is_emerging perdia benefício |
Adicionado p_min_sources_emerging INT DEFAULT 3; condição usa CASE WHEN jcr.is_emerging THEN p_min_sources_emerging ELSE p_min_sources END |
O que muda da v5.16 para a v5.17
| # | Origem | Correção | Problema na v5.16 | Solução v5.17 |
|---|---|---|---|---|
| 1 | Claude | Governança mensal: query de auditoria de inflação por grupo empresarial | Revisão mensal de distinct_sources_count dependia de memória do operador — sem query que sinalizasse automaticamente canônicos com alta concentração de prefixo comum |
Query adicionada à seção de monitoramento: filtra canônicos com distinct_sources_count >= 5 e COUNT(DISTINCT LEFT(normalized_company, 5)) <= 2 |
| 2 | Claude | Governança semanal: protocolo quando fallback_ratio > 0.20 |
Alerta FALLBACK_RATIO_ALERT_THRESHOLD=0.20 existia mas sem ação documentada quando disparado |
Adicionado à governança semanal: verificar alterações recentes em SYSTEM_PROMPT ou arquivos externos via job_runs.metadata.dict_files |
| 3 | Genspark + GPT | AUTO_PROMOTE_MIN_DAYS: bind param inválido no PostgreSQL |
INTERVAL ':AUTO_PROMOTE_MIN_DAYS days' dentro de string literal SQL não é substituído pelo PostgreSQL — quebra em runtime |
Função de auto-promoção reescrita com parâmetro p_min_days INTEGER; corpo usa (p_min_days * INTERVAL '1 day'); chamada TypeScript passa parseInt(process.env.AUTO_PROMOTE_MIN_DAYS ?? '14') |
| 4 | GPT | Rate limit de quarentena: teto real é 100/dia, não 200 | Texto afirmava “2 execuções × 100 = 200/24h” mas a RPC check_quarantine_rate_limit() conta o total do dia-calendário — primeira execução que liberar 100 bloqueia a segunda |
Removida afirmação de 200/dia; teto oficial documentado como 100 liberações por dia-calendário de São Paulo |
| 5 | GPT | Fluxo aborted sem implementação |
job_runs.status = 'aborted' existia no CHECK mas nenhum trecho do handler o escrevia — status órfão semanticamente |
Fluxo implementado no handler: isPipelineEnabled() retorna false → status = 'aborted' + motivo em metadata.errors[] + encerra sem processar |
| 6 | Gemini | search_canonical_roles: ORDER BY multi-coluna quebra índice HNSW |
ORDER BY embedding <=> query_vector, vacancy_count DESC, id ASC força sequential scan — pgvector ignora índice HNSW quando ORDER BY contém qualquer coluna além do operador de distância |
Reescrito com CTE: inner query ordena só por vetor com LIMIT result_limit * 3; outer query reordena em memória por similarity DESC, vacancy_count DESC, id ASC |
| 7 | Gemini | PASSO 0.2 watchdog: concatenação frágil de string JSONB | "failed_at": "' || NOW()::text || '" depende do formato de saída do servidor — pode quebrar se fuso horário ou formato de data mudar |
Substituído por jsonb_build_object('step', 'watchdog', 'message', '...', 'failed_at', NOW()) — NOW() passado como TIMESTAMPTZ nativo sem conversão de string |
| 8 | Gemini | job_canonical_role_sources sem limpeza mensal |
Tabela acumula UPSERT de todas as empresas indefinidamente — registros com last_seen_at antigos nunca são removidos, ocupando disco crescente |
Adicionado ao job mensal: DELETE FROM job_canonical_role_sources WHERE last_seen_at < NOW() - INTERVAL '1 year' em microbatches |
O que muda da v5.15 para a v5.16
| # | Origem | Correção | Problema na v5.15 | Solução v5.16 |
|---|---|---|---|---|
| 1 | Gemini | Watchdog cobre job_runs órfãos |
PASSO 0 resolvia function_orchestrator_runs presos mas não o job_runs pai — hard crash da Vercel deixa registros 'running' permanentemente em job_runs, poluindo o painel de auditoria |
PASSO 0.2 em maintenance_phase_1: UPDATE em job_runs com status='running' e started_at < NOW() - INTERVAL '30 minutes', injetando erro estruturado em metadata.errors[] via jsonb_set |
| 2 | GPT | Plano de rollback referencia campo depreciado | Passo 2 do rollback dizia job_runs.metadata.error — campo singular depreciado na v5.15 quando a spec migrou para errors[] acumulativo |
Corrigido para job_runs.metadata.errors[] com referência a step, component e message |
| 3 | GPT | Comentário de CURATE_PIPELINE_ENABLED desatualizado |
Comentário dizia “Setado para ‘false’ automaticamente” — impreciso após v5.15 onde quem escreve em runtime é pipeline_config, não a env var |
Corrigido para “Valor de bootstrap. Em runtime, pipeline_config prevalece.” |
| 4 | GPT | Rate limit de quarentena: toLocaleDateString vs. regra SQL canônica |
Handler usava toLocaleDateString('en-CA', { timeZone: 'America/Sao_Paulo' }) como aproximação — spec já tinha regra oficial com AT TIME ZONE 'America/Sao_Paulo' mas as duas formas coexistiam, criando brecha de interpretação |
Check movido para RPC check_quarantine_rate_limit() que usa AT TIME ZONE 'America/Sao_Paulo' como fonte de verdade única; TypeScript apenas chama a RPC e avalia o booleano retornado |
| 5 | GPT | p_job_run_id decorativo em RPCs de manutenção |
maintenance_phase_1(p_job_run_id UUID) e maintenance_phase_2(p_job_run_id UUID) recebiam parâmetro nunca utilizado no corpo SQL — parâmetro sem uso gera confusão e sensação de logging implícito inexistente |
Parâmetro removido de ambas as funções; chamadas TypeScript correspondentes atualizadas |
O que muda da v5.14 para a v5.15
| # | Origem | Correção | Problema na v5.14 | Solução v5.15 |
|---|---|---|---|---|
| 1 | GPT | job_runs.status no CRON: adicionar partial |
CRON com erros nas FASEs 2–4 finalizava com status='success' — contradição auditável |
Adicionar 'partial' ao CHECK de job_runs.status. CRON acumula erros em phaseErrors[] e finaliza com status='partial' quando FASE 1 passa mas alguma das FASEs 2–4 falha. status='error' reservado para falha na FASE 1 |
| 2 | GPT | Auto-promoção: promoção dependente do COUNT(*) >= 5 |
HAVING COUNT(*) >= 5 na subquery de confidence_median não bloqueava o UPDATE — promoção ocorria mesmo sem amostras suficientes, com confidence_median NULL |
Reescrito com CTE: promoção só ocorre se a CTE retornar linha (exige COUNT(*) >= 5). UPDATE com FROM stats — sem linha na CTE, nenhuma promoção acontece |
| 3 | GPT + Genspark | pipeline_config: DDL + regra de precedência |
Tabela referenciada em slow_batch_streak (pausa automática) sem DDL publicado. Coexistência com CURATE_PIPELINE_ENABLED como env var sem regra explícita de qual prevalece |
DDL publicado com seed inicial. Regra: CURATE_PIPELINE_ENABLED env var = bootstrap; pipeline_config = fonte de verdade em runtime. Se linha existir no banco, banco prevalece. Função isPipelineEnabled() implementa fallback |
| 4 | GPT | Texto da FASE 1: política de aborto | Texto dizia “aborta se o watchdog falhar” mas implementação aborta se qualquer PASSO da FASE 1 falhar (PASSOs 0, 1 ou 2) | Texto corrigido: “aborta se qualquer falha ocorrer na FASE 1” |
| 5 | GPT + Genspark | supabase.raw() substituído por RPC unificada |
supabase.raw() não existe no Supabase JS v2 — causa TypeError em runtime em 2 locais (incremento do circuit breaker e slow_batch_streak) |
Nova RPC increment_circuit_breaker(p_service, p_is_failing, p_is_slow_batch) cobre ambos os casos atomicamente. Todos os usos de supabase.raw() removidos |
| 6 | GPT | Schema: action_required e índice em function_orchestrator_items |
action_required IN (..., NULL) é ambíguo em PostgreSQL (NULL não é comparável via IN). Ausência de índice em (job_posting_id, created_at DESC) — padrão de query mais usado para item mais recente por vaga |
action_required IS NULL OR action_required IN (...) no DDL. Novo índice fo_items_job_posting_id_created_at_idx ON function_orchestrator_items(job_posting_id, created_at DESC) |
| 7 | Genspark | registerCronError(): interface definida + estratégia de merge |
Função usada em 4 locais no handler do CRON sem definição de interface nem comportamento de merge em metadata. Campo oscilava entre error (singular) e errors (array) |
Interface TypeScript definida. Campo padronizado como errors: [] (array acumulativo) em toda a spec. registerCronError() faz merge profundo — não sobrescreve erros anteriores do mesmo run |
O que muda da v5.13 para a v5.14
| # | Origem | Correção | Problema na v5.13 | Solução v5.14 |
|---|---|---|---|---|
| 1 | Gemini + GPT | Orquestração do CRON em 4 fases explícitas | run_daily_pipeline_maintenance() declarada como função SQL monolítica — PostgreSQL não suporta pausas para lógica TypeScript intermediária. PASSO 3 usava bind param :ids_elegíveis_para_liberação impossível em RETURNS void; PASSO 6 dependia de TypeScript mas estava dentro do SQL |
API Route /api/cron/pipeline-maintenance orquestra 4 fases sequenciais: FASE 1 (SQL pura: PASSOs 0–2) → FASE 2 (TypeScript: PASSO 3 quarentena) → FASE 3 (SQL pura: PASSOs 4–5) → FASE 4 (TypeScript: PASSO 6 embeddings). Cada fase é RPC independente. Política de falha: continuar para próxima fase exceto FASE 1 que aborta tudo |
| 2 | Gemini | Validação de completude após Zod | z.record(z.string(), z.string().nullable()) valida {} como sucesso — arquivo JSON vazio (ex: falha de CI/CD) passa na validação e o pipeline processa milhares de vagas sem normalização |
Após validação Zod, verificar Object.keys(parsedData.data).length === 0 para cada arquivo. Abortar com step='zod_validation' e component=<arquivo> se vazio |
| 3 | GPT | job_runs criado antes de tudo |
Em alguns trechos o job_runs era criado após a validação Zod — se o Zod falha, não existe linha em job_runs para receber o erro |
job_runs criado com status='running' e metadata={} no início absoluto do run, antes do Zod, antes da pré-rotina de hash. Todos os erros subsequentes enriquecem o metadata do registro já existente |
| 4 | GPT | Exceção de db_connection no princípio transversal |
Regra “todo erro vai para job_runs.metadata” é tecnicamente impossível quando o próprio banco está inacessível |
Regra refinada: registrar em job_runs.metadata quando o banco estiver acessível; se indisponível (db_connection), registrar em log estruturado da aplicação (console.error com JSON) e observabilidade externa (ex: Vercel logs) |
| 5 | Deepseek | confidence_median gravado na auto-promoção |
Campo presente em job_canonical_roles sem mecanismo de atualização definido — nunca era preenchido em operação normal |
confidence_median calculado e persistido no momento da auto-promoção, junto com promoted_at. Usa PERCENTILE_CONT com HAVING COUNT(*) >= 5 sobre as últimas 120 vagas curadas do canônico |
| 6 | Deepseek | low_quality coexiste com curation_status |
Risco de ambiguidade sobre qual campo consultar em novas queries | Nota no DDL confirma coexistência: low_quality (booleano) e curation_status = 'low_quality' são sempre atualizados de forma síncrona. Para novas queries, usar curation_status como fonte de verdade |
| 7 | Genspark | DDL completo de job_runs |
Tabela referenciada em 8+ contextos da spec (FK em function_orchestrator_runs, job_runs.metadata, job_runs.job_name, joins de quarentena) mas DDL nunca publicado — impossível validar constraints ou índices |
DDL completo publicado com metadata JSONB, índices em (job_name, status), started_at DESC e GIN em metadata |
| 8 | Genspark | Migration job_run_id NOT NULL com backfill |
ALTER COLUMN job_run_id SET NOT NULL falha com ERROR: column contains null values se existem registros históricos com job_run_id = NULL |
Migration em 3 passos: (1) criar registro sentinela em job_runs; (2) atribuir sentinela a registros históricos; (3) adicionar NOT NULL constraint |
| 9 | Genspark | Reset atômico do circuit breaker | Duas instâncias Vercel lendo windowAge > WINDOW_MS simultaneamente podem sobrescrever contadores — falha da instância A apagada pela instância B |
Reset com filtro .lt('window_started_at', expiryTimestamp) no UPDATE — só a primeira instância que vê a janela realmente expirada executa o reset |
| 10 | Genspark | Trigger fn_update_distinct_sources_count não trata DELETE |
DELETE em job_canonical_role_sources (ex: limpeza mensal, rollback) não atualiza o contador — distinct_sources_count fica inflado até o próximo PASSO 2 do CRON |
Trigger estendido para AFTER INSERT OR UPDATE OR DELETE. Função usa COALESCE(NEW.canonical_role_id, OLD.canonical_role_id) para cobrir todos os casos |
| 11 | Genspark | generate-prompt-version.ts faz hash do arquivo inteiro |
Hash do arquivo .ts completo inclui comentários — atualizar data de um comentário muda prompt_version e aciona revisita desnecessária de todas as quarentenas |
Script extrai apenas o conteúdo entre backticks do export const SYSTEM_PROMPT = …“ via regex antes de calcular o SHA-256 |
| 12 | Genspark | Backfill PASSO A/A2 com subquery O(n²) | Subquery correlacionada aninhada (SELECT run_id ... ORDER BY created_at DESC LIMIT 1) executa um scan por linha de job_postings — lento em 100k+ registros |
Reescrito com CTE + DISTINCT ON (job_posting_id) — único scan de curate_run_items independente do volume de job_postings |
| 13 | Genspark | Query de rastreabilidade filtra curation_status atual |
WHERE jp.curation_status = 'quarantined_llm_output' torna a query inútil para diagnóstico histórico após a vaga ser liberada |
Filtro removido. Query usa cri.error_type = 'item_output_parse_error' para focar nos runs de falha — funciona independentemente do estado atual da vaga |
| 14 | Genspark | batch_output_parse_error sem limite de retries |
Spec definia retry do batch inteiro sem limite — bug sistemático no prompt causaria loop infinito dentro do run | LLM_BATCH_PARSE_MAX_RETRIES=2 (padrão). Após esgotamento: todas as vagas do batch → retryable_error; registrar em job_runs.metadata com step='batch_processing' |
| 15 | Genspark | slow_batch_streak não persiste em serverless |
Contador de batches lentos consecutivos em memória não funciona com múltiplas instâncias Vercel independentes | Campos slow_batch_streak INT e last_slow_batch_at TIMESTAMPTZ adicionados a circuit_breaker_state. Threshold: 10 batches lentos consecutivos (> 35s). Na 11ª ocorrência: pausa automática via UPDATE CURATE_PIPELINE_ENABLED=false em tabela de configuração |
| 16 | Genspark | Delete mensal sem batching | DELETE FROM function_orchestrator_items WHERE processed_at < NOW() - INTERVAL '2 years' — após 2 anos pode deletar milhões de linhas em uma única transação, gerando lock prolongado |
Reescrito com loop em microbatches de 5.000 registros com pg_sleep(0.05) entre batches via FOR UPDATE SKIP LOCKED |
| 17 | Renomeação | curate_runs → function_orchestrator_runs |
Nome anglicizado (“curate”) soa estranho e não comunica o domínio | Todas as referências atualizadas para function_orchestrator_runs |
| 18 | Renomeação | curate_run_items → function_orchestrator_items |
Idem | Todas as referências atualizadas para function_orchestrator_items |
O que muda da v5.12 para a v5.13
(preservado da v5.13)
Princípio transversal — erro detalhado em job_runs
Todo job documentado nesta spec que falhe deve registrar em job_runs.metadata com detalhe suficiente para diagnóstico sem acesso ao servidor.
Regra refinada (v5.14): registrar em job_runs.metadata quando o banco estiver acessível. Se o banco estiver indisponível (db_connection), registrar em log estruturado da aplicação (console.error com JSON) e observabilidade externa — a regra não pode ser absoluta quando o próprio banco é o ponto de falha.
Estrutura mínima de erro (v5.15 — campo errors é array acumulativo):
{
"errors": [
{
"step": "cron_phase_2",
"component": "quarantine_release",
"message": "Timeout ao chamar release_quarantined_jobs",
"failed_at": "2026-03-10T03:15:42.123Z"
},
{
"step": "cron_phase_4",
"component": "embedding_rebuild",
"message": "generateE5Embedding: connection refused",
"failed_at": "2026-03-10T03:16:01.456Z"
}
],
"dict_files": [ "..." ],
"prompt_version": "a3f9b2c1"
}
⚠️ Campo padronizado como errors (array) em toda a spec — não error (singular). Múltiplas fases podem falhar no mesmo run do CRON; o array acumula todos os erros sem sobrescrever.
Valores válidos para step:
| Valor | Quando usar |
|---|---|
zod_validation |
Falha na validação Zod ou de completude dos arquivos JSON |
db_connection |
Falha na conexão com Supabase — registrar em log externo |
circuit_breaker_open |
Pipeline pausado por circuit breaker |
hash_pre_routine |
Falha na pré-rotina de hash |
fetch_pending_jobs |
Falha na busca de vagas pendentes |
batch_processing |
Falha sistêmica durante processamento de batch |
cron_phase_N |
Falha na FASE N do CRON diário (ex: cron_phase_1) |
monthly_cleanup |
Falha no job mensal |
SYSTEM_PROMPT.ts — arquivo dedicado e versionamento automático
Regra de isolamento
lib/pipeline/SYSTEM_PROMPT.ts contém exclusivamente o texto do prompt do extrator.
// lib/pipeline/SYSTEM_PROMPT.ts
export const SYSTEM_PROMPT = `
Você é um extrator especializado em cargos do mercado de trabalho brasileiro...
`;
Script de pré-build — hash apenas do conteúdo do prompt
// scripts/generate-prompt-version.ts
import * as crypto from 'crypto';
import * as fs from 'fs';
import * as path from 'path';
const promptPath = path.resolve('./lib/pipeline/SYSTEM_PROMPT.ts');
const content = fs.readFileSync(promptPath, 'utf8');
// ⚠️ Hash apenas do conteúdo do template literal — não do arquivo inteiro
// Comentários e metadados do arquivo não devem gerar nova prompt_version
const match = content.match(/export const SYSTEM_PROMPTs*=s*`([sS]*?)`s*;?/); // ⚠️ v5.18: ;? — Prettier com semi:false não quebra o build
if (!match) {
console.error('SYSTEM_PROMPT não encontrado em SYSTEM_PROMPT.ts');
process.exit(1);
}
const promptContent = match[1]; // Apenas o texto do prompt
const hash = crypto.createHash('sha256').update(promptContent).digest('hex').slice(0, 8);
const output = `// AUTO-GERADO — não editar manualmente
// Gerado em: ${new Date().toISOString()}
export const PROMPT_VERSION = '${hash}';
`;
fs.writeFileSync('./lib/pipeline/prompt-version.generated.ts', output);
console.log(`prompt_version: ${hash}`);
package.json:
{
"scripts": {
"prebuild": "ts-node scripts/generate-prompt-version.ts",
"build": "next build"
}
}
prompt-version.generated.ts no .gitignore.
Pré-rotina de hash — versionamento automático dos arquivos externos
(inalterado da v5.13, exceto que job_runs é criado ANTES da validação Zod — ver fluxo abaixo)
Ordem obrigatória no início de cada run:
- Criar
job_runscomstatus='running'emetadata={}— antes de qualquer validação - Validar 4 arquivos JSON via Zod + completude
- Falha: enriquecer
job_runs.metadatacom erro e abortar
- Falha: enriquecer
- Comparar hashes +
prompt_versioncom run anterior - Enriquecer
job_runs.metadatacomdict_fileseprompt_version
Estrutura do metadata em job_runs: (inalterada da v5.13)
Rastreabilidade de quarentena (corrigida):
-- ⚠️ Filtro de curation_status REMOVIDO — vaga pode ter sido liberada da quarentena
-- Query funciona para diagnóstico histórico independentemente do estado atual
SELECT jr.metadata AS metadata_run_quarentena
FROM function_orchestrator_items foi
JOIN function_orchestrator_runs fo ON fo.id = foi.run_id
JOIN job_runs jr ON jr.id = fo.job_run_id
WHERE foi.job_posting_id = :vaga_id
AND foi.error_type = 'item_output_parse_error'
ORDER BY foi.created_at DESC
LIMIT 1;
-- Comparar dict_files E prompt_version com valores atuais do build
Visão geral dos dois fluxos
Fluxo A — Ingestão manual via Job Ingestor
Operador clica "Curar"
│
▼
[INÍCIO DO RUN — ANTES DE TUDO]
1. Criar job_runs com status='running', metadata={}
│
▼
[GUARD DE HABILITAÇÃO — v5.18]
⚠️ Todo entrypoint de curadoria verifica isPipelineEnabled() antes de processar
2. isPipelineEnabled()
→ false: registerCronError(runId, { step: 'pipeline_guard', message: '...' })
UPDATE job_runs SET status='aborted'
encerra sem processar
│
▼
[VALIDAÇÃO + PRÉ-ROTINA DE HASH]
3. Valida 4 arquivos JSON via Zod
→ Falha ou arquivo vazio: enriquecer job_runs.metadata + abortar
3. Compara hashes + prompt_version com run anterior
4. Enriquece job_runs.metadata com dict_files e prompt_version
│
▼
[BUSCA DE VAGAS PENDENTES]
SELECT ... FROM job_postings
WHERE curation_status = 'pending' AND is_active = true
ORDER BY posted_at DESC
FOR UPDATE SKIP LOCKED
│
▼
[PROCESSAMENTO EM BATCHES DE 40]
→ extrator LLM (LLM_CONCURRENCY_EXTRACTOR=15, LLM_BATCH_PARSE_MAX_RETRIES=2)
→ upsert job_canonical_roles (slugs ordenados alfabeticamente — previne deadlock 40P01)
→ upsert job_canonical_role_sources somente quando pipeline_stage != 'fallback'
(trigger fn_update_distinct_sources_count atualiza contador automaticamente)
→ function_orchestrator_items
→ ai_usage_logs (1 por batch, não-bloqueante)
→ eventos jobs_curate_started + jobs_curate_finished (Fluxo A apenas)
→ calcular fallback_ratio ao fim do run; alerta se > 0.20
→ monitorar slow_batch_streak; pausa automática na 11ª ocorrência
Fluxo B — Ingestão cotidiana via endpoint
(inalterado da v5.13 — aplicar guard isPipelineEnabled() idêntico ao Fluxo A: createJobRun → isPipelineEnabled() → false → status=’aborted’ + encerra)
Schema do banco — tabelas novas e alterações
job_runs — DDL completo
-- ⚠️ Criar ANTES de function_orchestrator_runs (FK depende desta tabela)
CREATE TABLE job_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_name TEXT NOT NULL,
-- 'curate_job_postings' | 'run_daily_pipeline_maintenance' | 'monthly_cleanup'
status TEXT NOT NULL DEFAULT 'running'
CHECK (status IN ('running', 'success', 'partial', 'error', 'aborted')),
-- 'success' → todas as fases concluíram sem erro
-- 'partial' → FASE 1 passou, mas alguma das FASEs 2–4 falhou (CRON diário)
-- 'error' → falha crítica na FASE 1 ou falha fatal no pipeline de curadoria
-- 'aborted' → cancelamento explícito
metadata JSONB,
-- Contém: dict_files[], prompt_version, errors[]{step,component,message,failed_at}
-- Criado com metadata={} no início absoluto do run — enriquecido ao longo da execução
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
finished_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX job_runs_job_name_status_idx ON job_runs (job_name, status);
CREATE INDEX job_runs_started_at_idx ON job_runs (started_at DESC);
-- Índice GIN para buscas por prompt_version ou dict_file.hash em metadata:
CREATE INDEX job_runs_metadata_gin_idx ON job_runs USING GIN (metadata);
function_orchestrator_runs
-- ⚠️ Antes era: curate_runs — renomeado na v5.14
CREATE TABLE function_orchestrator_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_run_id UUID NOT NULL REFERENCES job_runs(id),
-- NOT NULL obrigatório — job_runs é criado no início absoluto do run
-- job_runs.metadata contém dict_files (hashes + versões) e prompt_version
session_id TEXT NOT NULL,
source TEXT NOT NULL CHECK (source IN ('manual', 'api')),
triggered_by UUID REFERENCES users(id) ON DELETE SET NULL,
status TEXT NOT NULL DEFAULT 'running'
CHECK (status IN ('running', 'success', 'partial', 'error')),
job_ids_count INT NOT NULL,
model TEXT,
total INT DEFAULT 0,
curated INT DEFAULT 0,
curated_fallback INT DEFAULT 0,
low_quality INT DEFAULT 0,
failed INT DEFAULT 0,
pending_review INT DEFAULT 0,
canonical_created INT DEFAULT 0,
canonical_promoted INT DEFAULT 0,
fallback_ratio NUMERIC(5,4),
-- Calculado ao fim do run: curated_fallback / NULLIF(total, 0)
-- Alerta automático via TypeScript quando > 0.20 (FALLBACK_RATIO_ALERT_THRESHOLD)
input_tokens_total INT DEFAULT 0,
output_tokens_total INT DEFAULT 0,
cost_usd_total NUMERIC(10,6),
logging_failures INT DEFAULT 0,
error_message TEXT,
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
finished_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX fo_runs_job_run_id_idx ON function_orchestrator_runs(job_run_id);
CREATE INDEX fo_runs_session_id_idx ON function_orchestrator_runs(session_id);
CREATE INDEX fo_runs_source_idx ON function_orchestrator_runs(source);
CREATE INDEX fo_runs_started_at_idx ON function_orchestrator_runs(started_at DESC);
CREATE INDEX fo_runs_status_idx ON function_orchestrator_runs(status);
function_orchestrator_items
-- ⚠️ Antes era: curate_run_items — renomeado na v5.14
CREATE TABLE function_orchestrator_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_id UUID NOT NULL REFERENCES function_orchestrator_runs(id),
job_posting_id UUID NOT NULL REFERENCES job_postings(id),
title_original TEXT,
canonical_role_proposed TEXT,
secondary_role_proposed TEXT,
canonical_role_id UUID REFERENCES job_canonical_roles(id),
canonical_status TEXT CHECK (canonical_status IN ('active', 'pending', 'fallback')),
skills_count SMALLINT,
confidence NUMERIC(4,3),
similarity_score NUMERIC(5,4),
reasoning TEXT,
pipeline_stage TEXT CHECK (pipeline_stage IN (
'hard_rule', 'dict_match', 'gatekeeper',
'similarity_replace', 'deterministic', 'llm_extractor', 'fallback'
)),
-- ⚠️ 'curated_fallback' NÃO é valor de pipeline_stage — é valor de curation_status
-- pipeline_stage='fallback' → curation_status='curated_fallback' em job_postings
status TEXT NOT NULL CHECK (status IN (
'success', 'fallback', 'low_quality', 'pending_review', 'failed'
)),
action_required TEXT CHECK (
action_required IS NULL OR action_required IN (
'quarentena_output_llm', 'novo_canonico_aguarda_validacao'
)
),
-- NULL quando nenhuma ação pendente
-- Sintaxe IS NULL OR IN(...) — IN(..., NULL) não funciona em PostgreSQL
error_type TEXT CHECK (error_type IN (
'llm_timeout', 'batch_output_parse_error', 'item_output_parse_error',
'payload_parse_error', 'parser_bug', 'db_error'
)),
-- batch_output_parse_error: falha no parse do JSON do lote inteiro
-- → retry do batch (máx LLM_BATCH_PARSE_MAX_RETRIES=2)
-- → após esgotamento: todas as vagas do batch → retryable_error
-- → NÃO incrementa llm_parse_fail_count
-- → NÃO alimenta quarentena por vaga
-- item_output_parse_error: falha no parse de item individual
-- → incrementa llm_parse_fail_count desta vaga
-- → alimenta quarentena após 3 falhas cross-run
error_detail TEXT,
dict_match BOOLEAN NOT NULL DEFAULT false,
dict_match_source JSONB,
processed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Política de retenção: 2 anos
-- Job mensal remove em microbatches de 5.000 (ver seção CRON mensal)
-- seguido de: VACUUM ANALYZE function_orchestrator_items
CREATE INDEX fo_items_run_id_idx ON function_orchestrator_items(run_id);
CREATE INDEX fo_items_status_idx ON function_orchestrator_items(status);
CREATE INDEX fo_items_job_posting_id_idx ON function_orchestrator_items(job_posting_id);
CREATE INDEX fo_items_job_posting_created_idx ON function_orchestrator_items(job_posting_id, created_at DESC);
-- Cobre padrão "item mais recente por vaga" usado em backfill, rastreabilidade e diagnóstico
CREATE INDEX fo_items_canonical_role_id_idx ON function_orchestrator_items(canonical_role_id);
CREATE INDEX fo_items_canonical_proposed_idx ON function_orchestrator_items(canonical_role_proposed);
circuit_breaker_state — com campos de slow_batch_streak
CREATE TABLE IF NOT EXISTS circuit_breaker_state (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
service TEXT NOT NULL UNIQUE,
-- 'claude_haiku' — único serviço externo monitorado
status TEXT NOT NULL DEFAULT 'closed'
CHECK (status IN ('closed', 'open', 'half_open')),
window_started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- ⚠️ Resetar ANTES de incrementar se NOW() > window_started_at + 5min
window_failures INT NOT NULL DEFAULT 0,
window_total INT NOT NULL DEFAULT 0,
cooldown_until TIMESTAMPTZ,
last_transition_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
slow_batch_streak INT NOT NULL DEFAULT 0,
-- Incrementado quando latência do batch > 35s
-- Reset para 0 quando um batch completa em tempo normal (≤ 35s)
-- Threshold: 10 batches lentos consecutivos
-- Na 11ª ocorrência: pipeline pausado automaticamente
last_slow_batch_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
INSERT INTO circuit_breaker_state (service) VALUES ('claude_haiku') ON CONFLICT DO NOTHING;
pipeline_config — fonte de verdade em runtime (v5.15)
-- ⚠️ Criar ANTES de qualquer código que leia CURATE_PIPELINE_ENABLED em runtime
-- Regra de precedência:
-- CURATE_PIPELINE_ENABLED (env var) = valor de bootstrap (deploy inicial)
-- pipeline_config (banco) = fonte de verdade em runtime
-- Se linha existir no banco → banco prevalece. Env var é fallback quando banco inacessível.
CREATE TABLE pipeline_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_by TEXT
-- 'system' quando alterado por lógica automática (ex: slow_batch_auto_pause)
-- email do operador quando alterado manualmente via painel
);
-- Seed obrigatório: estado inicial de bootstrap
INSERT INTO pipeline_config (key, value, updated_by)
VALUES ('CURATE_PIPELINE_ENABLED', 'true', 'system')
ON CONFLICT DO NOTHING;
isPipelineEnabled() — TypeScript (v5.15):
// Banco prevalece sobre env var.
// Pausa automática (slow_batch_streak) só é possível via banco;
// env var não pode ser alterada em runtime no Vercel.
async function isPipelineEnabled(): Promise<boolean> {
const { data } = await supabase
.from('pipeline_config')
.select('value')
.eq('key', 'CURATE_PIPELINE_ENABLED')
.single();
return (data?.value ?? process.env.CURATE_PIPELINE_ENABLED ?? 'true') === 'true';
}
// Chamar no início do run, após criar job_runs e antes de qualquer processamento
Lógica de reset atômico de janela (v5.14 — previne race condition):
const WINDOW_MS = 5 * 60 * 1000;
const expiryTimestamp = new Date(Date.now() - WINDOW_MS).toISOString();
// Tentativa de reset atômico — só a primeira instância que vê a janela expirada vence
const { data: resetResult } = await supabase
.from('circuit_breaker_state')
.update({
window_failures: isFailing ? 1 : 0,
window_total: 1,
window_started_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
})
.eq('service', 'claude_haiku')
.lt('window_started_at', expiryTimestamp) // ← filtro atômico
.select('id');
if (!resetResult?.length) {
// Outra instância já resetou — incrementar via RPC atômica
// ⚠️ supabase.raw() NÃO existe no Supabase JS v2 — usar RPC (v5.15)
await supabase.rpc('increment_circuit_breaker', {
p_service: 'claude_haiku',
p_is_failing: isFailing,
p_is_slow_batch: false,
});
}
Monitoramento de slow_batch_streak:
const SLOW_BATCH_THRESHOLD_MS = 35000;
// ⚠️ v5.18: restaurado como env var — ajuste operacional sem redeploy
const SLOW_BATCH_STREAK_LIMIT = parseInt(process.env.SLOW_BATCH_AUTO_PAUSE_AT ?? '10');
const batchDuration = Date.now() - batchStart;
const isSlow = batchDuration > SLOW_BATCH_THRESHOLD_MS;
// ⚠️ supabase.raw() NÃO existe no Supabase JS v2 — usar RPC increment_circuit_breaker (v5.15)
const { data: updated } = await supabase.rpc('increment_circuit_breaker', {
p_service: 'claude_haiku',
p_is_failing: false,
p_is_slow_batch: isSlow,
});
if (isSlow && (updated?.slow_batch_streak ?? 0) > SLOW_BATCH_STREAK_LIMIT) {
// 11ª ocorrência — pausa automática via pipeline_config
await supabase.from('pipeline_config')
.update({ value: 'false' })
.eq('key', 'CURATE_PIPELINE_ENABLED');
console.error({
type: 'auto_pause_slow_batch_streak',
streak: updated?.slow_batch_streak,
message: `Pipeline pausado automaticamente após ${updated?.slow_batch_streak} batches lentos consecutivos`,
});
}
RPC increment_circuit_breaker — DDL (v5.15):
-- Cobre incremento normal do circuit breaker E rastreamento de slow_batch_streak
-- Elimina necessidade de supabase.raw() (inexistente no Supabase JS v2)
CREATE OR REPLACE FUNCTION increment_circuit_breaker(
p_service TEXT,
p_is_failing BOOLEAN,
p_is_slow_batch BOOLEAN DEFAULT false
)
RETURNS TABLE(window_failures INT, window_total INT, slow_batch_streak INT)
LANGUAGE plpgsql AS $$
BEGIN
RETURN QUERY
UPDATE circuit_breaker_state
SET
window_failures = CASE WHEN p_is_failing THEN window_failures + 1
ELSE window_failures END,
window_total = window_total + 1,
slow_batch_streak = CASE WHEN p_is_slow_batch THEN slow_batch_streak + 1
ELSE 0 END,
last_slow_batch_at = CASE WHEN p_is_slow_batch THEN NOW()
ELSE last_slow_batch_at END,
updated_at = NOW()
WHERE service = p_service
RETURNING window_failures, window_total, slow_batch_streak;
END;
$$;
Transição open → half_open atômica: (inalterada da v5.13)
job_canonical_role_sources — trigger com DELETE
CREATE TABLE job_canonical_role_sources (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
canonical_role_id UUID NOT NULL REFERENCES job_canonical_roles(id),
normalized_company TEXT NOT NULL,
first_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (canonical_role_id, normalized_company)
);
-- Índice composto cobrindo canonical_role_id + last_seen_at (usado no PASSO 2 do CRON)
CREATE INDEX job_canonical_role_sources_canonical_lastseen_idx
ON job_canonical_role_sources (canonical_role_id, last_seen_at DESC);
CREATE INDEX job_canonical_role_sources_company_idx
ON job_canonical_role_sources(normalized_company);
-- Trigger incremental — agora inclui DELETE
CREATE OR REPLACE FUNCTION fn_update_distinct_sources_count()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE
target_id UUID;
BEGIN
-- COALESCE cobre INSERT/UPDATE (NEW) e DELETE (OLD)
target_id := COALESCE(NEW.canonical_role_id, OLD.canonical_role_id);
UPDATE job_canonical_roles
SET distinct_sources_count = (
SELECT COUNT(*)
FROM job_canonical_role_sources
WHERE canonical_role_id = target_id
AND last_seen_at >= NOW() - INTERVAL '120 days'
)
WHERE id = target_id;
RETURN COALESCE(NEW, OLD);
END;
$$;
-- Recriar trigger incluindo DELETE
DROP TRIGGER IF EXISTS trg_update_distinct_sources_count ON job_canonical_role_sources;
CREATE TRIGGER trg_update_distinct_sources_count
AFTER INSERT OR UPDATE OR DELETE ON job_canonical_role_sources
FOR EACH ROW EXECUTE FUNCTION fn_update_distinct_sources_count();
job_canonical_roles — alterações
(inalterado da v5.13, com nota de coexistência de low_quality)
-- ⚠️ Criar a função ANTES do ADD CONSTRAINT que a referencia
CREATE OR REPLACE FUNCTION is_valid_cbo_array(codes TEXT[])
RETURNS BOOLEAN LANGUAGE sql IMMUTABLE STRICT AS $$
SELECT codes IS NULL OR
NOT EXISTS (
SELECT 1 FROM unnest(codes) AS code
WHERE code !~ '^d{4}-d{2}$'
);
$$;
ALTER TABLE job_canonical_roles
ADD COLUMN IF NOT EXISTS status TEXT NOT NULL DEFAULT 'active'
CHECK (status IN ('active', 'pending', 'deprecated', 'alias_of', 'rejected')),
ADD COLUMN IF NOT EXISTS source TEXT NOT NULL DEFAULT 'llm_extractor'
CHECK (source IN ('seed', 'llm_extractor', 'manual', 'merge')),
-- 'merge': reservado para fluxo de merge admin — não implementado no Sprint 1
ADD COLUMN IF NOT EXISTS merged_into UUID REFERENCES job_canonical_roles(id),
ADD COLUMN IF NOT EXISTS alias_of_id UUID REFERENCES job_canonical_roles(id),
ADD COLUMN IF NOT EXISTS rejected_reason TEXT,
ADD COLUMN IF NOT EXISTS distinct_sources_count INTEGER DEFAULT 0,
ADD COLUMN IF NOT EXISTS confidence_median NUMERIC(4,3),
-- Calculado e persistido no momento da auto-promoção (junto com promoted_at)
-- Usa PERCENTILE_CONT com HAVING COUNT(*) >= 5 sobre as últimas 120 vagas curadas
ADD COLUMN IF NOT EXISTS promoted_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS vacancy_count INT DEFAULT 0,
ADD COLUMN IF NOT EXISTS latest_posted_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS usage_count INT NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS cbo_codes TEXT[],
ADD COLUMN IF NOT EXISTS blacklist_expiry_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS is_emerging BOOLEAN NOT NULL DEFAULT false,
ADD COLUMN IF NOT EXISTS embedding VECTOR(384);
-- ⚠️ Nota de coexistência: job_canonical_roles não possui campo low_quality
-- O booleano low_quality existe em job_postings e coexiste com curation_status
-- Ambos são sempre atualizados de forma síncrona
-- Para novas queries, usar curation_status como fonte de verdade
ALTER TABLE job_canonical_roles
ADD CONSTRAINT chk_deprecated_has_merged_into
CHECK (status != 'deprecated' OR merged_into IS NOT NULL),
ADD CONSTRAINT chk_alias_has_alias_of_id
CHECK (status != 'alias_of' OR alias_of_id IS NOT NULL),
ADD CONSTRAINT chk_rejected_has_reason
CHECK (status != 'rejected' OR rejected_reason IS NOT NULL),
ADD CONSTRAINT chk_no_self_reference_alias
CHECK (alias_of_id IS NULL OR alias_of_id != id),
ADD CONSTRAINT chk_no_self_reference_merged
CHECK (merged_into IS NULL OR merged_into != id),
ADD CONSTRAINT chk_cbo_codes_format
CHECK (is_valid_cbo_array(cbo_codes));
ALTER TABLE job_canonical_roles
ADD CONSTRAINT job_canonical_roles_slug_key UNIQUE (slug);
CREATE INDEX IF NOT EXISTS job_canonical_roles_embedding_idx
ON job_canonical_roles
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
job_postings — alterações
(inalterado da v5.13)
Back-fill obrigatório — otimizado com CTE + DISTINCT ON:
-- ⚠️ Versão v5.14: reescrito com CTE para evitar O(n²) da subquery correlacionada aninhada
-- CTE escaneia function_orchestrator_items uma única vez independentemente do volume de job_postings
WITH latest_item AS (
SELECT DISTINCT ON (job_posting_id)
job_posting_id,
pipeline_stage
FROM function_orchestrator_items
ORDER BY job_posting_id, created_at DESC
)
-- PASSO A: vagas já curadas com sucesso normal
UPDATE job_postings jp
SET curation_status = 'curated'
FROM latest_item li
WHERE jp.id = li.job_posting_id
AND jp.canonical_role_id IS NOT NULL
AND (jp.low_quality IS NULL OR jp.low_quality = false)
AND li.pipeline_stage != 'fallback';
-- PASSO A2: vagas curadas via fallback (executar em seguida)
WITH latest_item AS (
SELECT DISTINCT ON (job_posting_id)
job_posting_id,
pipeline_stage
FROM function_orchestrator_items
ORDER BY job_posting_id, created_at DESC
)
UPDATE job_postings jp
SET curation_status = 'curated_fallback'
FROM latest_item li
WHERE jp.id = li.job_posting_id
AND jp.canonical_role_id IS NOT NULL
AND (jp.low_quality IS NULL OR jp.low_quality = false)
AND li.pipeline_stage = 'fallback';
-- PASSO B: vagas low_quality (inalterado)
UPDATE job_postings
SET curation_status = 'low_quality'
WHERE low_quality = true;
-- VERIFICAÇÃO pós-back-fill:
SELECT curation_status, COUNT(*) FROM job_postings GROUP BY curation_status;
-- 'pending' deve ter APENAS vagas nunca curadas
Migration job_run_id NOT NULL — 3 passos
-- Executar ANTES do ALTER em function_orchestrator_runs
-- PASSO 1: Registro sentinela para registros históricos sem job_run_id
INSERT INTO job_runs (id, job_name, status, metadata, started_at)
VALUES (
'00000000-0000-0000-0000-000000000002',
-- ⚠️ UUID diferente do sentinela de usuário de serviço (000...001)
'legacy_migration_sentinel',
'success',
'{"note": "Registro criado para backfill de function_orchestrator_runs históricos"}',
'2024-01-01T00:00:00Z'
) ON CONFLICT DO NOTHING;
-- PASSO 2: Atribuir sentinela aos registros sem job_run_id
UPDATE function_orchestrator_runs
SET job_run_id = '00000000-0000-0000-0000-000000000002'
WHERE job_run_id IS NULL;
-- PASSO 3: Adicionar NOT NULL constraint
ALTER TABLE function_orchestrator_runs
ALTER COLUMN job_run_id SET NOT NULL;
-- VERIFICAÇÃO:
SELECT COUNT(*) FROM function_orchestrator_runs WHERE job_run_id IS NULL;
-- Deve retornar 0
ai_usage_logs, events e índices em job_postings
(inalterados da v5.13)
CRON diário — arquitetura em 4 fases
Arquitetura única: Vercel Cron. API Route TypeScript orquestra 4 fases sequenciais. Nenhum job pg_cron independente.
// vercel.json
{
"crons": [
{ "path": "/api/cron/pipeline-maintenance", "schedule": "0 3 * * *" },
{ "path": "/api/cron/pipeline-maintenance", "schedule": "0 15 * * *" },
{ "path": "/api/cron/monthly-cleanup", "schedule": "0 4 1 * *" }
]
}
03h e 15h UTC = 00h e 12h horário de Brasília | 04h UTC dia 1 = 01h Brasília primeiro dia do mês
run_monthly_cleanup — DDL (v5.19)
-- ⚠️ v5.19: DDL explícito — necessário para supabase.rpc('run_monthly_cleanup') funcionar
-- Dois loops de microbatch independentes: job_canonical_role_sources (1 ano) e function_orchestrator_items (2 anos)
-- VACUUM não incluso — ver decisão de governança no handler abaixo
CREATE OR REPLACE FUNCTION run_monthly_cleanup()
RETURNS void LANGUAGE plpgsql AS $$
DECLARE deleted_count INT;
BEGIN
-- Limpeza de job_canonical_role_sources: registros inativos há mais de 1 ano
LOOP
DELETE FROM job_canonical_role_sources
WHERE id IN (
SELECT id FROM job_canonical_role_sources
WHERE last_seen_at < NOW() - INTERVAL '1 year'
LIMIT 5000
FOR UPDATE SKIP LOCKED
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
EXIT WHEN deleted_count = 0;
PERFORM pg_sleep(0.05);
END LOOP;
-- Limpeza de function_orchestrator_items: registros com mais de 2 anos
LOOP
DELETE FROM function_orchestrator_items
WHERE id IN (
SELECT id FROM function_orchestrator_items
WHERE created_at < NOW() - INTERVAL '2 years'
LIMIT 5000
FOR UPDATE SKIP LOCKED
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
EXIT WHEN deleted_count = 0;
PERFORM pg_sleep(0.05);
END LOOP;
END;
$$;
Handler monthly-cleanup.ts (v5.18)
// pages/api/cron/monthly-cleanup.ts
export default async function handler(req, res) {
const runId = await createJobRun('monthly_cleanup');
try {
await supabase.rpc('run_monthly_cleanup');
await supabase.from('job_runs')
.update({ status: 'success', finished_at: new Date().toISOString() })
.eq('id', runId);
return res.status(200).json({ ok: true });
} catch (err) {
await registerCronError(runId, { step: 'monthly_cleanup', message: err.message });
await supabase.from('job_runs')
.update({ status: 'error', finished_at: new Date().toISOString() })
.eq('id', runId);
return res.status(500).json({ error: err.message });
}
}
// ⚠️ VACUUM — Decisão de governança (v5.19):
// O autovacuum do Supabase cobre o volume esperado no lançamento.
// VACUUM explícito não é implementado no handler — adicionar complexidade desnecessária agora
// introduziria dependência de dblink (incompatível com Transaction Pooler do Supabase gerenciado)
// ou pg_cron externo (job fora do job_runs, sem rastreabilidade).
// Critério de revisão: quando function_orchestrator_items ultrapassar 5M registros,
// avaliar pg_cron com entrada separada no job_runs ou upgrade de plano Supabase para acesso direto.
API Route — orquestração das 4 fases
// pages/api/cron/pipeline-maintenance.ts
// ─── registerCronError — interface definida (v5.15) ──────────────────────────
// Acumula erros em metadata.errors[] — não sobrescreve erros anteriores do mesmo run
async function registerCronError(
jobRunId: string,
error: { step: string; component?: string; message: string }
): Promise<void> {
const { data: current } = await supabase
.from('job_runs')
.select('metadata')
.eq('id', jobRunId)
.single();
const existingErrors: unknown[] = current?.metadata?.errors ?? [];
const newError = { ...error, failed_at: new Date().toISOString() };
await supabase.from('job_runs').update({
metadata: {
...(current?.metadata ?? {}),
errors: [...existingErrors, newError],
},
}).eq('id', jobRunId);
}
// ─── isPipelineEnabled — fonte de verdade em runtime (v5.15) ─────────────────
// Banco prevalece sobre env var. Env var = bootstrap; pipeline_config = runtime.
async function isPipelineEnabled(): Promise<boolean> {
const { data } = await supabase
.from('pipeline_config')
.select('value')
.eq('key', 'CURATE_PIPELINE_ENABLED')
.single();
return (data?.value ?? process.env.CURATE_PIPELINE_ENABLED ?? 'true') === 'true';
}
export default async function handler(req, res) {
const cronRunId = await createJobRun('run_daily_pipeline_maintenance');
const phaseErrors: string[] = [];
// ─── Verificação de habilitação — status 'aborted' (v5.17) ───────────────
// isPipelineEnabled() lê pipeline_config (banco prevalece); fallback para env var
const enabled = await isPipelineEnabled();
if (!enabled) {
await registerCronError(cronRunId, {
step: 'pipeline_guard',
message: 'Pipeline desabilitado via pipeline_config — run abortado sem processar',
});
await supabase.from('job_runs')
.update({ status: 'aborted', finished_at: new Date().toISOString() })
.eq('id', cronRunId);
return res.status(200).json({ ok: false, status: 'aborted', reason: 'pipeline_disabled' });
}
try {
// ─── FASE 1: SQL pura (PASSOs 0, 1, 2) ───────────────────────────
// ⚠️ Política: qualquer falha na FASE 1 aborta o CRON inteiro.
// FASE 1 cobre watchdog, expiração de vagas e recálculo de fontes —
// todas são pré-condições para as fases seguintes operarem com dados corretos.
const { error: phase1Error } = await supabase.rpc('maintenance_phase_1');
if (phase1Error) {
await registerCronError(cronRunId, { step: 'cron_phase_1', message: phase1Error.message });
await supabase.from('job_runs')
.update({ status: 'error', finished_at: new Date() })
.eq('id', cronRunId);
return res.status(500).json({ error: phase1Error.message });
}
// ─── FASE 2: TypeScript — PASSO 3 (quarentena) ───────────────────
// Falha na FASE 2 não impede FASE 3 e FASE 4
try {
// ⚠️ v5.19: RPC transacional única — calcula slots + seleciona + libera no mesmo contexto
// Elimina janela de corrida do fluxo anterior (remaining_slots → release_quarantined_jobs)
// Ex: 90 liberações no dia → RPC libera no máx 10; 0 retornou = nenhuma vaga elegível ou teto atingido
const { data: releasedCount } = await supabase.rpc('release_quarantined_jobs_limited');
// releasedCount: INT — número de vagas efetivamente liberadas (0 se teto atingido)
} catch (phase2Error) {
phaseErrors.push('cron_phase_2');
await registerCronError(cronRunId, { step: 'cron_phase_2', message: phase2Error.message });
// Continua para FASE 3
}
// ─── FASE 3: SQL pura (PASSOs 4, 5) ──────────────────────────────
// Falha na FASE 3 não impede FASE 4
try {
await supabase.rpc('maintenance_phase_2');
} catch (phase3Error) {
phaseErrors.push('cron_phase_3');
await registerCronError(cronRunId, { step: 'cron_phase_3', message: phase3Error.message });
}
// ─── FASE 4: TypeScript — PASSO 6 (embeddings NULL) ──────────────
try {
const { data: nullEmbeddings } = await supabase
.from('job_canonical_roles')
.select('id, label')
.eq('status', 'active')
.is('embedding', null)
.limit(parseInt(process.env.EMBEDDING_REBUILD_BATCH_LIMIT ?? '50'));
for (const role of nullEmbeddings ?? []) {
const vector = await generateE5Embedding(role.label);
await supabase.from('job_canonical_roles')
.update({ embedding: vector })
.eq('id', role.id);
}
} catch (phase4Error) {
phaseErrors.push('cron_phase_4');
await registerCronError(cronRunId, { step: 'cron_phase_4', message: phase4Error.message });
}
// ─── Finalização com status correto (v5.15) ───────────────────────
// 'success' → nenhuma fase falhou
// 'partial' → FASE 1 passou, mas alguma das FASEs 2–4 falhou
const finalStatus = phaseErrors.length === 0 ? 'success' : 'partial';
await supabase.from('job_runs')
.update({ status: finalStatus, finished_at: new Date() })
.eq('id', cronRunId);
return res.status(200).json({ ok: true, status: finalStatus, phaseErrors });
} catch (fatalError) {
await registerCronError(cronRunId, { step: 'cron_phase_1', message: fatalError.message });
await supabase.from('job_runs')
.update({ status: 'error', finished_at: new Date() })
.eq('id', cronRunId);
return res.status(500).json({ error: fatalError.message });
}
}
maintenance_phase_1 — PASSOs 0, 1, 2
CREATE OR REPLACE FUNCTION maintenance_phase_1()
RETURNS void AS $$
BEGIN
-- PASSO 0: Watchdog — resolver runs presos
-- PASSO 0.1: Limpar function_orchestrator_runs filhos travados
UPDATE function_orchestrator_runs
SET status = 'error',
finished_at = NOW(),
error_message = 'Interrompido sem conclusão — resolvido pelo watchdog'
WHERE status = 'running'
AND started_at < NOW() - INTERVAL '30 minutes';
-- PASSO 0.2: Limpar job_runs pais travados (v5.16)
-- Hard crash da Vercel pode deixar job_runs preso em 'running' indefinidamente
-- jsonb_build_object passa NOW() como TIMESTAMPTZ nativo — sem concatenação de string (v5.17)
UPDATE job_runs
SET status = 'error',
finished_at = NOW(),
metadata = jsonb_set(
COALESCE(metadata, '{}'::jsonb),
'{errors}',
COALESCE(metadata->'errors', '[]'::jsonb)
|| jsonb_build_array(
jsonb_build_object(
'step', 'watchdog',
'message', 'Job resolvido pelo watchdog por inatividade',
'failed_at', NOW()
)
)
)
WHERE status = 'running'
AND started_at < NOW() - INTERVAL '30 minutes';
-- PASSO 1: Marcar vagas expiradas como inativas
UPDATE job_postings
SET is_active = false
WHERE posted_at < NOW() - INTERVAL '120 days'
AND is_active = true;
-- PASSO 2: Recalcular distinct_sources_count (verificação de consistência full)
-- O trigger fn_update_distinct_sources_count mantém em tempo real (v5.13).
-- Este recálculo full detecta divergências acumuladas (ex: outage do trigger).
-- ⚠️ Sem JOIN com job_postings — last_seen_at já incorpora a janela temporal.
-- Índice composto (canonical_role_id, last_seen_at DESC) cobre este filtro.
UPDATE job_canonical_roles jcr
SET distinct_sources_count = (
SELECT COUNT(*)
FROM job_canonical_role_sources jcrs
WHERE jcrs.canonical_role_id = jcr.id
AND jcrs.last_seen_at >= NOW() - INTERVAL '120 days'
);
END;
$$ LANGUAGE plpgsql;
release_quarantined_jobs — PASSO 3 como RPC independente
CREATE OR REPLACE FUNCTION release_quarantined_jobs(job_ids UUID[])
RETURNS INT LANGUAGE plpgsql AS $$
DECLARE released INT;
BEGIN
UPDATE job_postings
SET curation_status = 'pending',
llm_parse_fail_count = 0,
quarantined_at = NULL,
quarantine_reason = NULL,
quarantine_released_at = NOW(),
updated_at = NOW()
WHERE id = ANY(job_ids)
AND curation_status = 'quarantined_llm_output';
GET DIAGNOSTICS released = ROW_COUNT;
RETURN released;
END;
$$;
release_quarantined_jobs_limited — liberação transacional atômica (v5.19)
-- ⚠️ v5.19: substitui o fluxo de dois round-trips (check_quarantine_rate_limit → release_quarantined_jobs)
-- Elimina janela de corrida em execuções concorrentes — calcula slots + seleciona + libera
-- no mesmo contexto transacional com FOR UPDATE SKIP LOCKED
-- TypeScript chama uma única RPC sem precisar de resolveQuarantineEligibleIds()
CREATE OR REPLACE FUNCTION release_quarantined_jobs_limited()
RETURNS INT LANGUAGE plpgsql AS $$
DECLARE
remaining_slots INT;
released INT := 0;
BEGIN
-- Calcular slots disponíveis no dia-calendário de São Paulo
SELECT GREATEST(0, 100 - COUNT(*)::INT)
INTO remaining_slots
FROM job_postings
WHERE (quarantine_released_at AT TIME ZONE 'America/Sao_Paulo')::date
= (NOW() AT TIME ZONE 'America/Sao_Paulo')::date;
IF remaining_slots = 0 THEN
RETURN 0;
END IF;
-- Selecionar e liberar em contexto transacional único
UPDATE job_postings
SET curation_status = 'pending',
llm_parse_fail_count = 0,
quarantined_at = NULL,
quarantine_reason = NULL,
quarantine_released_at = NOW(),
updated_at = NOW()
WHERE id IN (
SELECT id FROM job_postings
WHERE curation_status = 'quarantined_llm_output'
AND (quarantined_at + INTERVAL '1 day' * (
SELECT value::INT FROM pipeline_config WHERE key = 'QUARANTINE_EXPIRY_DAYS'
)) <= NOW()
ORDER BY quarantined_at ASC
LIMIT remaining_slots
FOR UPDATE SKIP LOCKED
);
GET DIAGNOSTICS released = ROW_COUNT;
RETURN released;
END;
$$;
-- Retorna o número de vagas efetivamente liberadas (0 se teto atingido ou nenhuma elegível)
-- check_quarantine_rate_limit() mantida para uso em queries de diagnóstico/monitoramento
check_quarantine_rate_limit — consulta de diagnóstico (v5.19)
-- ⚠️ v5.19: não mais usada no fluxo principal — substituída por release_quarantined_jobs_limited()
-- Mantida para diagnóstico, monitoramento e consultas admin
CREATE OR REPLACE FUNCTION check_quarantine_rate_limit()
RETURNS INT LANGUAGE sql STABLE AS $$
SELECT GREATEST(0,
100 - COUNT(*)::INT
)
FROM job_postings
WHERE (quarantine_released_at AT TIME ZONE 'America/Sao_Paulo')::date
= (NOW() AT TIME ZONE 'America/Sao_Paulo')::date;
$$;
-- Retorna 0 quando teto atingido, N quando ainda há N slots disponíveis
search_canonical_roles — busca vetorial com índice HNSW (v5.17)
-- ⚠️ pgvector: ORDER BY só pode conter o operador de distância para usar índice HNSW.
-- Qualquer coluna adicional (vacancy_count, id) força sequential scan — 100% CPU.
-- Solução: CTE busca top N*3 usando só o vetor; outer query reordena em memória.
CREATE OR REPLACE FUNCTION search_canonical_roles(
query_vector VECTOR(384),
result_limit INT DEFAULT 5
)
RETURNS TABLE(id UUID, label TEXT, similarity NUMERIC) AS $$
BEGIN
-- set_config compatível com Transaction Pooler (porta 6543)
PERFORM set_config('hnsw.ef_search', '64', true);
RETURN QUERY
WITH knn_results AS (
SELECT jcr.id,
jcr.label,
jcr.vacancy_count,
(1 - (jcr.embedding <=> query_vector))::NUMERIC AS similarity
FROM job_canonical_roles jcr
WHERE jcr.status = 'active'
AND jcr.embedding IS NOT NULL
ORDER BY jcr.embedding <=> query_vector -- ⚠️ operador vetorial sozinho — HNSW ativado
LIMIT result_limit * 3 -- margem extra para desempate seguro
)
SELECT k.id, k.label, k.similarity
FROM knn_results k
ORDER BY k.similarity DESC,
k.vacancy_count DESC,
k.id ASC
LIMIT result_limit;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION maintenance_phase_2()
RETURNS void AS $$
BEGIN
-- PASSO 4: Recalcular vacancy_count
UPDATE job_canonical_roles jcr
SET vacancy_count = (
SELECT COUNT(*)
FROM job_postings jp
WHERE jp.canonical_role_id = jcr.id
AND jp.curation_status IN ('curated', 'curated_fallback')
AND jp.is_active = true
AND jp.posted_at >= NOW() - INTERVAL '120 days'
);
-- PASSO 5: Requeue de falhas transitórias
UPDATE job_postings
SET curation_status = 'pending', updated_at = NOW()
WHERE id IN (
SELECT id FROM job_postings
WHERE curation_status = 'retryable_error'
AND is_active = true
ORDER BY updated_at ASC
LIMIT 40
FOR UPDATE SKIP LOCKED
);
END;
$$ LANGUAGE plpgsql;
Rate limit de quarentena — RPC transacional única (v5.19):
// ⚠️ v5.19: RPC única — calcula slots + seleciona + libera no mesmo contexto transacional
// Elimina janela de corrida do fluxo anterior (remaining_slots → release_quarantined_jobs)
// check_quarantine_rate_limit() mantida apenas para diagnóstico/monitoramento
const { data: releasedCount } = await supabase.rpc('release_quarantined_jobs_limited');
// releasedCount: número de vagas liberadas (0 = teto atingido ou nenhuma elegível)
Governança mensal — delete em microbatches:
-- Job separado: job_name='monthly_cleanup'
-- ⚠️ DELETE único em tabela com 2+ anos pode deletar milhões de linhas — lock prolongado
-- Solução: microbatches de 5.000 registros com pausa entre lotes
DO $$
DECLARE
deleted_count INT;
total_deleted INT := 0;
BEGIN
LOOP
DELETE FROM function_orchestrator_items
WHERE id IN (
SELECT id FROM function_orchestrator_items
WHERE processed_at < NOW() - INTERVAL '2 years'
LIMIT 5000
FOR UPDATE SKIP LOCKED
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
total_deleted := total_deleted + deleted_count;
EXIT WHEN deleted_count = 0;
PERFORM pg_sleep(0.05); -- 50ms de pausa entre microbatches
END LOOP;
RAISE NOTICE 'Total deletado: %', total_deleted;
END;
$$;
VACUUM ANALYZE function_orchestrator_items;
VACUUM ANALYZE ai_usage_logs;
-- Limpeza de job_canonical_role_sources — registros inativos há mais de 1 ano (v5.17)
-- Seguro: distinct_sources_count só olha últimos 120 dias — registros de 1+ ano não afetam cálculos
-- ⚠️ v5.18: microbatch — mesmo padrão de function_orchestrator_items (evita lock prolongado)
DO $$
DECLARE deleted_count INT;
BEGIN
LOOP
DELETE FROM job_canonical_role_sources
WHERE id IN (
SELECT id FROM job_canonical_role_sources
WHERE last_seen_at < NOW() - INTERVAL '1 year'
LIMIT 5000
FOR UPDATE SKIP LOCKED
);
GET DIAGNOSTICS deleted_count = ROW_COUNT;
EXIT WHEN deleted_count = 0;
PERFORM pg_sleep(0.05);
END LOOP;
END;
$$;
Alertas operacionais:
-- Alerta de canônicos pendentes
SELECT COUNT(*) FROM job_canonical_roles WHERE status = 'pending';
-- Alerta quando > PENDING_CANONICALS_ALERT_THRESHOLD (padrão: 50)
Governança semanal — protocolo quando fallback_ratio > 0.20 (v5.17):
Se fallback_ratio > 0.20 em qualquer run:
1. Verificar se houve alteração recente no SYSTEM_PROMPT via job_runs.metadata.prompt_version
2. Verificar se houve alteração nos arquivos externos via job_runs.metadata.dict_files
3. Comparar com runs anteriores saudáveis — isolar o run que iniciou a regressão
4. Se não houver alteração identificada: verificar cobertura do dicionário para as funções do período
Governança mensal — query de auditoria de inflação por grupo empresarial (v5.17):
-- Detecta canônicos com distinct_sources_count >= 5 mas alta concentração de prefixo
-- Indica possível inflação por subsidiárias do mesmo grupo empresarial
SELECT jcr.label,
jcr.distinct_sources_count,
COUNT(*) AS total_sources,
COUNT(DISTINCT LEFT(jcrs.normalized_company, 5)) AS prefixos_distintos
FROM job_canonical_roles jcr
JOIN job_canonical_role_sources jcrs ON jcrs.canonical_role_id = jcr.id
WHERE jcr.status IN ('pending', 'active')
AND jcr.distinct_sources_count >= 5
GROUP BY jcr.id, jcr.label, jcr.distinct_sources_count
HAVING COUNT(DISTINCT LEFT(jcrs.normalized_company, 5)) <= 2
ORDER BY jcr.distinct_sources_count DESC;
-- Resultado com prefixos_distintos <= 2 merece revisão manual
-- Ação: avaliar se as sources são subsidiárias do mesmo grupo; corrigir distinct_sources_count manualmente
Upsert em job_canonical_roles
(inalterado da v5.13 — slugs ordenados alfabeticamente, COALESCE em latest_posted_at)
Quarentena operacional
(inalterado da v5.13)
Retry de batch com limite:
const MAX_BATCH_RETRIES = parseInt(process.env.LLM_BATCH_PARSE_MAX_RETRIES ?? '2');
let batchAttempt = 0;
while (batchAttempt <= MAX_BATCH_RETRIES) {
try {
// processar batch...
break;
} catch (err) {
if (isBatchOutputParseError(err)) {
batchAttempt++;
if (batchAttempt > MAX_BATCH_RETRIES) {
// Todas as vagas do batch → retryable_error
// NÃO incrementa llm_parse_fail_count
await markBatchItemsAsRetryable(batchItemIds, 'batch_output_parse_error');
await registerJobRunError(jobRunId, {
step: 'batch_processing',
component: `batch_${batchNumber}`,
message: `batch_output_parse_error após ${MAX_BATCH_RETRIES} tentativas`,
failed_at: new Date().toISOString(),
});
break;
}
} else throw err;
}
}
Auto-promoção — confidence_median gravado na promoção
-- Auto-promoção: CTE garante que promoção só ocorre se COUNT(*) >= 5 (v5.15)
-- ⚠️ v5.17: parâmetros INTEGER — INTERVAL ':var' é inválido no PG
-- ⚠️ v5.18: adicionado p_min_sources_emerging — respeita AUTO_PROMOTE_MIN_SOURCES_EMERGING
CREATE OR REPLACE FUNCTION run_auto_promotion(
p_min_sources INT DEFAULT 5,
p_min_sources_emerging INT DEFAULT 3,
p_min_days INT DEFAULT 14
)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
WITH stats AS (
SELECT
foi.canonical_role_id,
COUNT(*) AS n,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY foi.confidence) AS median
FROM function_orchestrator_items foi
JOIN function_orchestrator_runs fo ON fo.id = foi.run_id
WHERE foi.confidence IS NOT NULL
AND fo.started_at >= NOW() - INTERVAL '120 days'
GROUP BY foi.canonical_role_id
HAVING COUNT(*) >= 5
)
UPDATE job_canonical_roles jcr
SET status = 'active',
promoted_at = NOW(),
confidence_median = stats.median
FROM stats
WHERE jcr.id = stats.canonical_role_id
AND jcr.status = 'pending'
AND jcr.distinct_sources_count >= CASE
WHEN jcr.is_emerging THEN p_min_sources_emerging
ELSE p_min_sources
END
AND jcr.promoted_at IS NULL
AND (NOW() - jcr.created_at) >= (p_min_days * INTERVAL '1 day');
-- Se stats não tiver linha para o canonical_role_id: nenhuma atualização ocorre
END;
$$;
-- Chamada TypeScript:
-- await supabase.rpc('run_auto_promotion', {
-- p_min_sources: parseInt(process.env.AUTO_PROMOTE_MIN_SOURCES ?? '5'),
-- p_min_sources_emerging: parseInt(process.env.AUTO_PROMOTE_MIN_SOURCES_EMERGING ?? '3'),
-- p_min_days: parseInt(process.env.AUTO_PROMOTE_MIN_DAYS ?? '14')
-- })
Validação de arquivos JSON externos — completude
// Após validação Zod, verificar completude
function validateFileCompleteness(parsedData: any, fileName: string): void {
if (Object.keys(parsedData.data ?? {}).length === 0) {
throw new Error(`Arquivo ${fileName} passou na validação Zod mas está vazio (data: {}). Abortando.`);
}
}
// Sequência obrigatória no startup:
// 1. Criar job_runs com status='running', metadata={}
// 2. Validar Zod
// 3. Validar completude
// Falha em qualquer etapa → enriquecer job_runs.metadata + abortar
Arquivos externos — repositório
(inalterado da v5.13)
Pipeline de curadoria — fluxo detalhado
(inalterado da v5.13)
Embeddings — arquitetura completa
(inalterado da v5.13)
Variáveis de ambiente
# LLM
LLM_MODEL=claude-haiku-4-5-20251001
LLM_CONCURRENCY_EXTRACTOR=15
LLM_BATCH_SIZE=40
LLM_BATCH_PARSE_MAX_RETRIES=2
# Máximo de retries para batch_output_parse_error antes de marcar vagas como retryable_error
# Embeddings
EMBEDDING_MODEL=intfloat/multilingual-e5-small
EMBEDDING_SIMILARITY_REPLACE_THRESHOLD=0.975
EMBEDDING_REBUILD_BATCH_LIMIT=50
# Máximo de embeddings NULL recalculados por execução do CRON (FASE 4)
# Pipeline
TEXT_TRUNCATION_LIMIT=5000
CURATE_PIPELINE_ENABLED=true
# Valor de bootstrap. Em runtime, pipeline_config prevalece.
SLOW_BATCH_THRESHOLD_MS=35000
# Batch acima deste tempo (ms) incrementa slow_batch_streak
SLOW_BATCH_AUTO_PAUSE_AT=10
# Na Nª ocorrência consecutiva de slow_batch, pipeline é pausado automaticamente via pipeline_config
# Auto-promoção
AUTO_PROMOTE_MIN_SOURCES=5
AUTO_PROMOTE_MIN_SOURCES_EMERGING=3
AUTO_PROMOTE_MIN_DAYS=14
AUTO_PROMOTE_MIN_CONFIDENCE=0.80
AUTO_PROMOTE_MAX_SIMILARITY=0.90
# Quarentena
QUARANTINE_FAIL_THRESHOLD=3
BLACKLIST_EXPIRY_DAYS=90
# Alertas
FALLBACK_RATIO_ALERT_THRESHOLD=0.20
PENDING_CANONICALS_ALERT_THRESHOLD=50
# Connection
SUPABASE_URL=...
SUPABASE_SERVICE_ROLE_KEY=...
# Connection string deve apontar para porta 6543 (Transaction Pooler)
Ciclo de vida dos cargos em job_canonical_roles
(inalterado da v5.13)
Quarentena operacional
(inalterado da v5.13)
Governança da taxonomia — eventos
(inalterado da v5.13)
Golden set — critérios de qualidade
(inalterado da v5.13)
Passo a passo de implementação
Sprint 1 — Sequência obrigatória
(inalterado da v5.13 — referências a curate_runs → function_orchestrator_runs, curate_run_items → function_orchestrator_items)
Sprint 1 — Esforço
(inalterado da v5.13)
Checklist de deploy
| Item | Verificação |
|---|---|
job_runs DDL |
Criado antes de function_orchestrator_runs (FK depende) |
job_runs GIN |
Índice GIN em metadata criado |
job_runs.status CHECK |
Inclui 'partial' e 'aborted' — CHECK (status IN ('running', 'success', 'partial', 'error', 'aborted')) |
job_runs criado primeiro |
status='running', metadata={} antes do Zod e da pré-rotina de hash |
pipeline_config DDL |
Tabela criada + seed ('CURATE_PIPELINE_ENABLED', 'true') |
isPipelineEnabled() |
Lê banco com fallback para env var — banco prevalece |
Fluxo aborted — CRON + Fluxos A e B |
Todo entrypoint de curadoria: isPipelineEnabled() → false → aborted + encerra |
monthly_cleanup agendado |
Entrada "0 4 1 * *" em vercel.json + handler monthly-cleanup.ts |
SLOW_BATCH_AUTO_PAUSE_AT env var |
parseInt(process.env.SLOW_BATCH_AUTO_PAUSE_AT ?? '10') no TypeScript |
increment_circuit_breaker() RPC |
DDL executado — sem supabase.raw() no código |
check_quarantine_rate_limit() RPC |
Retorna remaining_slots INT — teto 100/dia garantido atomicamente |
search_canonical_roles() RPC |
ORDER BY só com operador vetorial; desempate via CTE com result_limit * 3 |
run_auto_promotion() RPC |
p_min_sources, p_min_sources_emerging, p_min_days — CASE WHEN is_emerging |
| Rollback | low_quality = false incluído no UPDATE de reversão |
generate-prompt-version.ts regex |
;? — semicolon opcional |
DELETE job_canonical_role_sources |
Microbatch 5.000 + FOR UPDATE SKIP LOCKED |
registerCronError() |
Interface TypeScript definida — acumula em errors[], não sobrescreve |
metadata.errors[] |
Campo é array em toda a spec — sem uso de metadata.error (singular) |
| Watchdog PASSO 0.2 | job_runs presos resolvidos com jsonb_build_object — sem concatenação de string |
maintenance_phase_1() sem p_job_run_id |
Parâmetro removido — chamada sem argumentos |
maintenance_phase_2() sem p_job_run_id |
Parâmetro removido — chamada sem argumentos |
CRON status partial |
Handler acumula phaseErrors[]; finaliza com 'partial' quando FASE 1 passa mas FASEs 2–4 falham |
| Texto FASE 1 | Documenta “qualquer falha na FASE 1 aborta” (não apenas watchdog) |
| Auto-promoção com CTE | UPDATE FROM stats — promoção só ocorre se COUNT(*) >= 5 |
action_required CHECK |
IS NULL OR IN (...) — sem IN (..., NULL) |
Índice (job_posting_id, created_at DESC) |
Criado em function_orchestrator_items |
| Migration sentinela | UUID 000...002 criado; function_orchestrator_runs históricos com job_run_id preenchido |
function_orchestrator_runs |
job_run_id NOT NULL — toda execução gera job_runs antes de qualquer processamento |
function_orchestrator_items |
retry_count ausente — nenhuma referência no código |
| Renomeação | Nenhuma referência a curate_runs ou curate_run_items no código |
SYSTEM_PROMPT.ts isolado |
Contém apenas texto do prompt |
generate-prompt-version.ts |
Hash apenas do conteúdo do template literal (não do arquivo inteiro) |
prompt-version.generated.ts |
No .gitignore, gerado pelo prebuild |
| Validação de completude | Object.keys(data).length === 0 após Zod para cada arquivo |
db_connection error |
Registrado em log externo quando banco inacessível — não apenas em job_runs |
confidence_median |
Calculado via CTE — promoção bloqueada se COUNT(*) < 5 |
low_quality coexistência |
curation_status e low_quality atualizados de forma síncrona |
| Trigger DELETE | fn_update_distinct_sources_count trata INSERT, UPDATE e DELETE |
| Índice composto sources | (canonical_role_id, last_seen_at DESC) criado |
| CRON 4 fases | FASE 1 aborta tudo se falhar; FASEs 2–4 continuam independentemente |
maintenance_phase_1 |
PASSOs 0, 1, 2 como RPC separada |
release_quarantined_jobs |
PASSO 3 como RPC com parâmetro UUID[] |
maintenance_phase_2 |
PASSOs 4, 5 como RPC separada |
| PASSO 6 (FASE 4) | TypeScript com LIMIT EMBEDDING_REBUILD_BATCH_LIMIT |
slow_batch_streak |
Campos criados em circuit_breaker_state; pausa automática na 11ª ocorrência |
| Reset atômico circuit breaker | UPDATE com .lt('window_started_at', expiryTimestamp) |
LLM_BATCH_PARSE_MAX_RETRIES |
Batch retenta 2x; após esgotamento vagas → retryable_error |
| Backfill com CTE | DISTINCT ON em vez de subquery aninhada |
| Rastreabilidade quarentena | Query sem filtro curation_status — usa error_type = 'item_output_parse_error' |
| Delete mensal em microbatches | Loop 5.000 registros + pg_sleep(0.05) + VACUUM ANALYZE |
| Timezone quarentena | AT TIME ZONE 'America/Sao_Paulo' na query de rate limit |
| Ordenação de slugs | Upsert em lote ordena alfabeticamente |
fallback_ratio |
Calculado ao fim do run; alerta se > 20% |
Alerta pending_review |
job_canonical_roles WHERE status='pending' |
Erro detalhado job_runs |
Toda falha registra step, component, message, failed_at em errors[] |
| Golden set — Crítico | 100% passando |
| Golden set — Alto | ≥ 90% passando |
| Connection Pooler | Porta 6543, set_config() via RPC testado |
Sprint 2 — Validação em shadow mode
(preservado da v5.13)
Plano de rollback
1. IMEDIATO (< 5 minutos):
- CURATE_PIPELINE_ENABLED=false (ou já pausado automaticamente por slow_batch_streak)
- Parar runs manuais via painel
2. DIAGNÓSTICO (30 minutos):
- Verificar `job_runs.metadata.errors[]` para identificar `step`, `component` e `message` da falha
- Verificar function_orchestrator_runs.error_message
- Recuperar dict_files e prompt_version via job_run_id → job_runs.metadata
- Rodar golden set completo em staging
3. ROLLBACK COMPLETO:
- Reverter SYSTEM_PROMPT.ts via git revert
→ próximo build regenera prompt_version automaticamente (hash do conteúdo)
- Reverter arquivos JSON externos via git revert
- Vagas curadas erroneamente: filtrar pelo run_id específico
UPDATE job_postings jp
SET skills = NULL, canonical_role_id = NULL, secondary_role = NULL,
curation_status = 'pending', llm_parse_fail_count = 0,
low_quality = false -- ⚠️ v5.18: obrigatório — sincronia com curation_status
FROM function_orchestrator_items foi
JOIN function_orchestrator_runs fo ON fo.id = foi.run_id
WHERE foi.job_posting_id = jp.id
AND fo.id = :run_id_com_regressao;
4. RERUN após rollback:
- Confirmar golden set 100% Crítico na versão revertida
- Rodar 100 vagas em staging
- Liberar produção
Sprint 3 — Painel em tempo real
(preservado da v5.13)