From 8b053b5c45fb80dd792988b0c02840bc720e8335 Mon Sep 17 00:00:00 2001 From: Renato Cron Date: Tue, 3 Feb 2026 22:19:08 -0300 Subject: [PATCH] WIP --- backend/LLM_OPPORTUNITIES_REPORT.md | 365 ++++++++++++++ backend/src/bin/run-duckdb-sidecar.ts | 226 +++++++++ backend/src/common/duckdb/README.md | 308 ++++++++++++ .../common/duckdb/duckdb-provider.service.ts | 236 ++++++++- .../common/duckdb/duckdb-search.service.ts | 397 +++++++++++++++ .../common/duckdb/duckdb-sidecar.service.ts | 453 ++++++++++++++++++ backend/src/common/duckdb/duckdb.module.ts | 9 +- backend/src/common/duckdb/example-usage.ts | 245 ++++++++++ backend/src/common/duckdb/index.ts | 9 + 9 files changed, 2223 insertions(+), 25 deletions(-) create mode 100644 backend/LLM_OPPORTUNITIES_REPORT.md create mode 100644 backend/src/bin/run-duckdb-sidecar.ts create mode 100644 backend/src/common/duckdb/README.md create mode 100644 backend/src/common/duckdb/duckdb-search.service.ts create mode 100644 backend/src/common/duckdb/duckdb-sidecar.service.ts create mode 100644 backend/src/common/duckdb/example-usage.ts create mode 100644 backend/src/common/duckdb/index.ts diff --git a/backend/LLM_OPPORTUNITIES_REPORT.md b/backend/LLM_OPPORTUNITIES_REPORT.md new file mode 100644 index 0000000000..23b1fc2d0e --- /dev/null +++ b/backend/LLM_OPPORTUNITIES_REPORT.md @@ -0,0 +1,365 @@ +# Relatório de Oportunidades para LLM (1B parâmetros) no SMAE + +## Sumário Executivo + +O SMAE (Sistema de Monitoramento e Avaliação Estratégica) é uma plataforma complexa de governança pública que gerencia planos municipais, projetos, transferências de recursos, indicadores e relatórios. Uma LLM de 1B parâmetros pode agregar valor significativo em áreas específicas que envolvem processamento de linguagem natural, classificação e geração assistida, sem necessidade de processamento massivo de dados estruturados. + +--- + +## 1. Visão Geral do Sistema + +### Domínios Principais identificados no Schema: + +| Domínio | Descrição | Volume de Dados | +|---------|-----------|-----------------| +| **PDM/Planos Setoriais** | Metas, iniciativas, atividades, indicadores | Alto | +| **Projetos (PP/MDO)** | Gestão de projetos e obras | Alto | +| **Transferências Voluntárias** | Repasse de recursos entre entes | Médio | +| **Variáveis e Indicadores** | Dados quantitativos e categóricos | Muito Alto | +| **Orçamento** | Planejado, realizado, empenhos | Alto | +| **Workflows** | Processos de aprovação e tramitação | Médio | +| **Relatórios** | Geração de documentos e análises | Alto | +| **Notas/Comunicados** | Registro de comunicações internas | Médio | + +--- + +## 2. Oportunidades de Alto Valor para LLM 1B + +### 2.1 Sumarização e Análise de Textos Governamentais + +**Contexto:** O sistema possui diversos campos textuais que acumulam informações ao longo do tempo: +- Análises qualitativas de metas (`MetaCicloFisicoAnalise.informacoes_complementares`) +- Detalhamentos de riscos (`MetaCicloFisicoRisco.detalhamento`, `ponto_de_atencao`) +- Fechamentos e comentários (`MetaCicloFisicoFechamento.comentario`) +- Notas e comunicações (`Nota.nota`) + +**Valor Proposto:** +``` +┌─────────────────────────────────────────────────────────────────┐ +│ ANTES: Coordenador lê 50+ análises mensais manualmente │ +│ DEPOIS: LLM gera resumo executivo com destaque para riscos │ +│ │ +│ ANTES: Usuário escreve análise do zero │ +│ DEPOIS: LLM sugere baseado em dados históricos similares │ +└─────────────────────────────────────────────────────────────────┘ +``` + +**Implementação Sugerida:** +- Endpoint: `POST /api/llm/analise/sumarizar` +- Input: Ciclo ID + Meta ID + contexto +- Output: Resumo estruturado + sentimento + riscos identificados + +--- + +### 2.2 Classificação Automática de Transferências de Recursos + +**Contexto:** O módulo de transferências voluntárias (`Transferencia`, `DistribuicaoRecurso`) processa repasses que precisam ser classificados por área temática, tipo de intervenção e prioridade. + +**Modelo de Valor:** +| Campo Atual | Uso de LLM | Economia | +|-------------|-----------|----------| +| `Transferencia.objeto` | Classificação automática em áreas temáticas | 70% do tempo de triagem | +| `Transferencia.detalhamento` | Extração de entidades (orgãos, valores, prazos) | 50% do tempo de cadastro | +| Anexos (PDF) | Resumo automático para análise prévia | 60% do tempo de revisão | + +**Exemplo de Fluxo:** +```typescript +// Serviço sugerido: TransferenciaClassificacaoService +async classificarTransferencia(dto: CreateTransferenciaDto) { + const sugestoes = await this.llmService.analisar({ + texto: dto.objeto + ' ' + dto.detalhamento, + tarefa: 'classificar_area_tematica', + opcoes: await this.areaTematicaService.findAll() + }); + + return { + area_tematica_sugerida: sugestoes.classificacao, + confianca: sugestoes.score, + palavras_chave: sugestoes.entidades + }; +} +``` + +--- + +### 2.3 Assistente de Preenchimento de Indicadores e Variáveis + +**Contexto:** O sistema gerencia milhares de variáveis (`Variavel`, `Indicador`) com descrições, metodologias e justificativas. Muitas são similares entre si. + +**Oportunidades:** + +1. **Sugestão de Metodologia:** + - Baseado em variáveis similares já cadastradas + - Sugere descrição e fonte de dados + +2. **Validação de Consistência:** + - Verifica se a justificativa de alteração de valor é coerente com o contexto + - Alerta sobre valores atípicos comparados ao histórico + +3. **Geração de Análise Qualitativa:** + ``` + Input: Série de valores + contexto da meta + Output: "A variável apresentou crescimento de 15% no trimestre, + consistente com a inauguração da unidade prevista no + cronograma. Recomenda-se manter o acompanhamento mensal." + ``` + +--- + +### 2.4 Geração e Revisão de Relatórios Automatizados + +**Contexto:** O módulo de relatórios (`Relatorio`, `reports/*`) gera diversos documentos em Excel/PDF. Uma LLM pode enriquecer significativamente este processo. + +**Casos de Uso:** + +| Relatório | Função da LLM | Valor | +|-----------|--------------|-------| +| Monitoramento Mensal | Gerar texto introdutório contextualizado | Padronização + velocidade | +| Indicadores | Explicar variações em linguagem natural | Clareza para gestores | +| Projetos | Resumir status de múltiplos projetos | Visão consolidada | +| Tribunal de Contas | Verificar conformidade textual | Redução de retrabalho | + +**Implementação:** +```typescript +// RelatorioService com LLM +async gerarRelatorio(dto: CreateReportDto) { + const dados = await this.coletarDados(dto); + + // Gera narrativa explicativa + const narrativa = await this.llmService.gerarNarrativa({ + tipo: dto.fonte, + dados: dados.resumo, + tom: 'formal_governamental' + }); + + // Insere no documento gerado + return this.pdfService.gerar({ + dados: dados.detalhados, + narrativa: narrativa.texto, + graficos: dados.visualizacoes + }); +} +``` + +--- + +### 2.5 Análise de Sentimento em Notas e Comunicações + +**Contexto:** O sistema possui notas (`Nota`, `NotaEnderecamento`) usadas para comunicação entre órgãos sobre projetos e transferências. + +**Aplicação:** +- **Priorização Inteligente:** Classificar urgência baseada no conteúdo, não apenas no título +- **Roteamento Automático:** Sugerir destinatários baseado no histórico de comunicações similares +- **Alerta de Conflitos:** Detectar tom negativo ou discordância que requer mediação + +**Exemplo:** +```typescript +interface AnaliseNota { + nota_id: number; + sentimento: 'positivo' | 'neutro' | 'negativo' | 'urgente'; + topicos: string[]; // Extraídos do texto + acoes_sugeridas: string[]; // Baseado em padrões históricos + orgaos_envolvidos: number[]; // Entidades mencionadas +} +``` + +--- + +### 2.6 Validação e Melhoria de Descrições de Projetos + +**Contexto:** Projetos (`Projeto`) possuem campos textuais importantes: `objeto`, `objetivo`, `publico_alvo`, `principais_etapas`. + +**Funções da LLM:** +1. **Padronização:** Sugerir formatação consistente com diretrizes de governança +2. **Completude:** Verificar se todos os elementos obrigatórios estão presentes +3. **Clareza:** Identificar jargões ou ambiguidades +4. **Coerência:** Verificar alinhamento entre objetivo, objeto e etapas + +**API Sugerida:** +```typescript +POST /api/llm/validacao/projeto +{ + "projeto_id": 123, + "campos": ["objeto", "objetivo"], + "diretrizes": "pdm_2024_2027" +} + +Response: +{ + "pontuacao_clareza": 8.5, + "sugestoes": [ + { + "campo": "objeto", + "tipo": "ambiguidade", + "trecho": "melhorar a cidade", + "sugestao": "especificar a região ou ação concreta" + } + ] +} +``` + +--- + +### 2.7 Assistente para Configuração de Workflows + +**Contexto:** O sistema possui workflows configuráveis (`Workflow`, `WorkflowFase`, `WorkflowTarefa`) para tramitação de processos. + +**Valor:** +- **Geração de Descrições:** Criar descrições claras de tarefas baseadas no tipo de workflow +- **Validação de Fluxo:** Identificar possíveis gargalos ou loops na configuração +- **Sugestão de Responsáveis:** Recomendar perfis de acesso baseados em workflows similares + +--- + +## 3. Análise de Viabilidade Técnica + +### 3.1 Modelos 1B Parametros - Capacidades Esperadas + +| Capacidade | Adequação | Observação | +|------------|-----------|------------| +| Sumarização de textos curtos (<500 tokens) | ✅ Excelente | Ideal para análises qualitativas | +| Classificação de texto | ✅ Excelente | Áreas temáticas, tipos de nota | +| Extração de entidades | ✅ Boa | Orgãos, valores, datas em textos | +| Geração de texto estruturado | ✅ Boa | Narrativas de relatórios simples | +| Tradução/parafraseamento | ✅ Boa | Padronização de descrições | +| Análise de sentimento | ⚠️ Regular | Requer fine-tuning com dados do domínio | +| Raciocínio complexo | ❌ Limitada | Não adequado para cálculos orçamentários | + +### 3.2 Requisitos de Infraestrutura + +``` +Opção 1: Modelo Local (on-premise) +├── GPU: 4-8GB VRAM (RTX 3060 ou superior) +├── RAM: 16GB+ recomendado +├── Latência: 50-200ms por inferência +└── Custo: Hardware inicial + manutenção + +Opção 2: API Externa (OpenAI, Anthropic, etc.) +├── Custo por token: ~$0.0015/1K tokens +├── Latência: 200-500ms + rede +├── Escalabilidade: Automática +└── Segurança: Dados saem da infraestrutura + +Opção 3: Modelo Híbrido (recomendado) +├── Tarefas críticas: Local +├── Tarefas genéricas: API externa +└── Fallback: Regra-based quando LLM indisponível +``` + +### 3.3 Pontos de Integração no Código Existente + +``` +src/ +├── mf/metas/ # Análise qualitativa, sugestões +├── bloco-nota/nota/ # Classificação, priorização +├── casa-civil/ # Classificação de transferências +│ ├── transferencia/ +│ └── demanda/ +├── pp/projeto/ # Validação de descrições +├── reports/ # Geração de narrativas +├── variavel/ # Sugestão de metodologias +└── workflow/ # Configuração assistida +``` + +--- + +## 4. Priorização de Implementação + +### Fase 1: Quick Wins (1-2 meses) + +1. **Sumarização de Análises Qualitativas** + - Baixo risco, alto valor percebido + - Dados já estruturados no `MetaCicloFisicoAnalise` + - Fácil rollback se necessário + +2. **Classificação de Transferências** + - Processo existente pode ser aprimorado + - ROI mensurável em tempo de triagem + +### Fase 2: Valor Estratégico (3-4 meses) + +3. **Assistente de Relatórios** + - Impacto direto na produtividade + - Requer integração com geradores de PDF/Excel + +4. **Validação de Descrições de Projetos** + - Melhora qualidade dos dados do sistema + - Previne retrabalho futuro + +### Fase 3: Diferenciação (6+ meses) + +5. **Análise Preditiva de Riscos** + - Combina dados históricos + texto + - Requer acumulação de dados de treinamento + +6. **Chatbot de Suporte ao Usuário** + - Baseado na documentação do sistema + - Integração com privilégios de acesso + +--- + +## 5. Considerações de Segurança e Governança + +### 5.1 Dados Sensíveis + +| Categoria | Restrição | Solução | +|-----------|-----------|---------| +| Orçamento detalhado | Alta | Não enviar para APIs externas | +| Dados de parlamentares | Média | Anonimização antes de processar | +| Análises qualitativas | Média | Modelo local ou pseudonimização | +| Descrições de projetos | Baixa | Pode usar API externa | + +### 5.2 Rastreabilidade + +```typescript +// Tabela sugerida para auditoria +model LLMInteracao { + id Int @id @default(autoincrement()) + pessoa_id Int + endpoint String // qual funcionalidade + tipo_operacao String // sumarizar, classificar, etc + input_tokens Int + output_tokens Int + tempo_ms Int + custo_estimado Decimal? + cache_hit Boolean // se usou cache + criado_em DateTime @default(now()) +} +``` + +--- + +## 6. Métricas de Sucesso + +| Indicador | Baseline | Meta | Como Medir | +|-----------|----------|------|------------| +| Tempo de análise de ciclo | 4 horas | 1 hora | Tempo entre abertura e fechamento | +| Taxa de aceitação de sugestões | - | >70% | Logs de interação | +| Erros em relatórios | 15% | <5% | Retrabalho identificado | +| Satisfação dos usuários | - | >4.0/5 | Pesquisa NPS | +| Custo por processamento | - | <$0.10 | Monitoramento de tokens | + +--- + +## 7. Conclusão + +Uma LLM de 1B parâmetros pode agregar **valor significativo e mensurável** ao SMAE, especialmente em áreas que envolvem: + +1. **Processamento de linguagem natural** em análises e comunicações +2. **Classificação e categorização** de transferências e demandas +3. **Geração assistida** de textos para relatórios e descrições +4. **Validação e padronização** de conteúdo inserido no sistema + +A chave para o sucesso está em: +- Começar com casos de uso de **baixo risco e alto valor** +- Manter **fallbacks baseados em regras** para garantir continuidade +- Implementar **monitoramento rigoroso** de custos e qualidade +- Respeitar as **restrições de segurança** de dados governamentais + +--- + +**Próximos Passos Recomendados:** +1. Prova de conceito com sumarização de análises qualitativas +2. Benchmark de modelos 1B (TinyLlama, Phi-2, Qwen-1.8B) +3. Definição de arquitetura de integração +4. Levantamento de dados para fine-tuning no domínio público diff --git a/backend/src/bin/run-duckdb-sidecar.ts b/backend/src/bin/run-duckdb-sidecar.ts new file mode 100644 index 0000000000..4254b9563a --- /dev/null +++ b/backend/src/bin/run-duckdb-sidecar.ts @@ -0,0 +1,226 @@ +import { Logger } from '@nestjs/common'; +import { NestFactory } from '@nestjs/core'; +import { AppModule } from '../app.module'; +import { DuckDBProviderService } from '../common/duckdb/duckdb-provider.service'; +import { Database } from 'duckdb-async'; + +// Desativa crontabs no sidecar +process.env.DISABLED_CRONTABS = 'all'; + +const logger = new Logger('DuckDBSidecar'); +let duckdb: Database | null = null; +let isShuttingDown = false; + +// Mensagens de controle do processo pai +type SidecarRequest = + | { type: 'query'; sql: string; params?: any[] } + | { type: 'attach_postgres'; connectionString: string; alias?: string } + | { type: 'install_extension'; name: string } + | { type: 'ping' }; + +type SidecarResponse = + | { event: 'query_result'; data: any[]; error?: string } + | { event: 'query_error'; error: string } + | { event: 'attached'; alias: string; error?: string } + | { event: 'extension_installed'; name: string; error?: string } + | { event: 'pong' } + | { event: 'ready' } + | { event: 'error'; error: string }; + +// Tratamento de erros +process.on('uncaughtException', (error: Error) => { + logger.error(`Exceção não capturada: ${error.message}`, error.stack); + if (process.send) process.send({ event: 'error', error: error.message } as SidecarResponse); +}); + +process.on('unhandledRejection', (reason: unknown) => { + logger.error(`Rejeição não tratada: ${reason}`); +}); + +// Sinais de desligamento +process.on('SIGTERM', async () => { + logger.log('SIGTERM recebido, encerrando...'); + await shutdown(); +}); + +process.on('SIGINT', async () => { + logger.log('SIGINT recebido, encerrando...'); + await shutdown(); +}); + +async function shutdown() { + if (isShuttingDown) return; + isShuttingDown = true; + + logger.log('Desligando DuckDB sidecar...'); + if (duckdb) { + try { + await duckdb.close(); + logger.log('Conexão DuckDB fechada'); + } catch (e) { + logger.error(`Erro ao fechar DuckDB: ${e}`); + } + } + process.exit(0); +} + +async function bootstrap() { + logger.log('Iniciando DuckDB Sidecar...'); + + try { + const app = await NestFactory.createApplicationContext(AppModule, { + logger: ['error', 'warn'], + }); + app.enableShutdownHooks(); + + // Obtém o serviço configurado + const duckdbProvider = app.get(DuckDBProviderService); + + // Cria instância do DuckDB + duckdb = await duckdbProvider.getConfiguredInstance(); + + // Instala extensões úteis para vector search e FTS + logger.log('Instalando extensões...'); + await duckdb.run('INSTALL vss;'); + await duckdb.run('LOAD vss;'); + await duckdb.run('INSTALL fts;'); + await duckdb.run('LOAD fts;'); + await duckdb.run('INSTALL postgres;'); + await duckdb.run('LOAD postgres;'); + + logger.log('DuckDB Sidecar pronto'); + + // Notifica o processo pai que está pronto + if (process.send) { + process.send({ event: 'ready' } as SidecarResponse); + } + + // Configura handler de mensagens do processo pai + process.on('message', async (msg: SidecarRequest) => { + if (isShuttingDown) return; + + try { + switch (msg.type) { + case 'ping': + if (process.send) process.send({ event: 'pong' } as SidecarResponse); + break; + + case 'query': + await handleQuery(msg.sql, msg.params); + break; + + case 'attach_postgres': + await handleAttachPostgres(msg.connectionString, msg.alias || 'postgres'); + break; + + case 'install_extension': + await handleInstallExtension(msg.name); + break; + + default: + logger.warn(`Mensagem desconhecida: ${JSON.stringify(msg)}`); + } + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error); + logger.error(`Erro ao processar mensagem: ${errorMsg}`); + if (process.send) { + process.send({ event: 'error', error: errorMsg } as SidecarResponse); + } + } + }); + } catch (error) { + logger.error(`Falha ao iniciar sidecar: ${error}`); + process.exit(1); + } +} + +async function handleQuery(sql: string, params?: any[]) { + if (!duckdb) { + if (process.send) { + process.send({ event: 'query_error', error: 'DuckDB não inicializado' } as SidecarResponse); + } + return; + } + + try { + const startTime = Date.now(); + const rows = await duckdb.all(sql, params || []); + const duration = Date.now() - startTime; + + logger.debug(`Query executada em ${duration}ms: ${sql.substring(0, 100)}...`); + + if (process.send) { + process.send({ event: 'query_result', data: rows } as SidecarResponse); + } + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error); + logger.error(`Erro na query: ${errorMsg}`); + if (process.send) { + process.send({ event: 'query_error', error: errorMsg } as SidecarResponse); + } + } +} + +async function handleAttachPostgres(connectionString: string, alias: string) { + if (!duckdb) { + if (process.send) { + process.send({ + event: 'attached', + alias, + error: 'DuckDB não inicializado', + } as SidecarResponse); + } + return; + } + + try { + // ATTACH PostgreSQL database + // connectionString format: "postgresql://user:pass@host:port/dbname" + await duckdb.run(`ATTACH '${connectionString}' AS ${alias} (TYPE postgres)`); + logger.log(`PostgreSQL anexado como ${alias}`); + + if (process.send) { + process.send({ event: 'attached', alias } as SidecarResponse); + } + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error); + logger.error(`Erro ao anexar PostgreSQL: ${errorMsg}`); + if (process.send) { + process.send({ event: 'attached', alias, error: errorMsg } as SidecarResponse); + } + } +} + +async function handleInstallExtension(name: string) { + if (!duckdb) { + if (process.send) { + process.send({ + event: 'extension_installed', + name, + error: 'DuckDB não inicializado', + } as SidecarResponse); + } + return; + } + + try { + await duckdb.run(`INSTALL ${name};`); + await duckdb.run(`LOAD ${name};`); + logger.log(`Extensão ${name} instalada e carregada`); + + if (process.send) { + process.send({ event: 'extension_installed', name } as SidecarResponse); + } + } catch (error) { + const errorMsg = error instanceof Error ? error.message : String(error); + logger.error(`Erro ao instalar extensão ${name}: ${errorMsg}`); + if (process.send) { + process.send({ event: 'extension_installed', name, error: errorMsg } as SidecarResponse); + } + } +} + +bootstrap().catch((error) => { + logger.error(`Erro fatal no bootstrap: ${error}`); + process.exit(1); +}); diff --git a/backend/src/common/duckdb/README.md b/backend/src/common/duckdb/README.md new file mode 100644 index 0000000000..4e8a4206f8 --- /dev/null +++ b/backend/src/common/duckdb/README.md @@ -0,0 +1,308 @@ +# DuckDB Sidecar Module + +Módulo NestJS para executar DuckDB em um processo separado (sidecar), com suporte a: +- **Vector Similarity Search** (HNSW index) - para busca semântica com embeddings CLIP +- **Full-Text Search** (BM25) - para busca textual +- **Hybrid Search** - combinação de ambas as abordagens +- **PostgreSQL integration** - attach direto ao banco Postgres + +## Arquitetura + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Main NestJS Process │ +│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────┐ │ +│ │ DuckDBSidecarSvc │ │ DuckDBSearchSvc │ │ Other Svcs │ │ +│ │ (monitora e │ │ (API de busca) │ │ │ │ +│ │ controla) │ │ │ │ │ │ +│ └────────┬─────────┘ └────────┬─────────┘ └──────────────┘ │ +│ │ │ │ +│ │ IPC (fork) │ Usa │ +│ ▼ ▼ │ +│ ┌──────────────────────────────────────────────┐ │ +│ │ DuckDB Sidecar Process │ │ +│ │ ┌──────────────────────────────────────┐ │ │ +│ │ │ DuckDB (:memory:) │ │ │ +│ │ │ - VSS extension (HNSW index) │ │ │ +│ │ │ - FTS extension (BM25) │ │ │ +│ │ │ - Postgres extension │ │ │ +│ │ └──────────────────────────────────────┘ │ │ +│ └──────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ + │ + │ ATTACH + ▼ + ┌─────────────────┐ + │ PostgreSQL │ + │ (dados) │ + └─────────────────┘ +``` + +## Features + +### 1. Processo Sidecar Isolado +- DuckDB roda em processo separado via `child_process.fork()` +- Comunicação via IPC (mensagens) +- Auto-restart em caso de falha (máx 1x por minuto) +- Health check periódico (ping/pong) + +### 2. Vector Similarity Search +- HNSW index para busca aproximada em alta velocidade +- Suporta métricas: `cosine`, `l2sq` (euclidiana), `ip` (inner product) +- Ideal para embeddings CLIP, sentence-transformers, etc. + +### 3. Full-Text Search (BM25) +- Índice FTS similar ao SQLite FTS5 +- Stemming em português +- Ranking BM25 configurável + +### 4. Hybrid Search +- Combina resultados de vector + BM25 +- Usa RRF (Reciprocal Rank Fusion) +- Pesos configuráveis + +## Instalação + +O módulo já está incluído em `AppModule`. Para usar em outros módulos: + +```typescript +import { DuckDBModule } from './common/duckdb'; + +@Module({ + imports: [DuckDBModule], + providers: [YourService], +}) +export class YourModule {} +``` + +## Uso + +### Básico - Executar Queries + +```typescript +import { DuckDBSidecarService } from './common/duckdb'; + +@Injectable() +export class MyService { + constructor(private readonly duckdb: DuckDBSidecarService) {} + + async doSomething() { + // Anexa PostgreSQL + await this.duckdb.attachPostgres(); + + // Sincroniza dados + await this.duckdb.syncFromPostgres('minha_tabela', { + columns: ['id', 'embedding', 'conteudo'], + limit: 10000, + }); + + // Executa query + const result = await this.duckdb.query( + 'SELECT * FROM minha_tabela WHERE id > ?', + [100] + ); + + if (result.success) { + console.log(result.data); + } + } +} +``` + +### Busca Vetorial (CLIP/Embeddings) + +```typescript +import { DuckDBSearchService } from './common/duckdb'; + +@Injectable() +export class ImageSearchService { + constructor(private readonly search: DuckDBSearchService) {} + + async searchSimilarImages(clipEmbedding: number[], topK = 10) { + // Sincroniza tabela com embeddings (se necessário) + await this.search.syncTable('imagens', { + columns: ['id', 'embedding', 'filename'], + embeddingColumn: 'embedding', + }); + + // Busca por similaridade + const results = await this.search.vectorSearch('imagens', clipEmbedding, topK, { + embeddingColumn: 'embedding', + idColumn: 'id', + metric: 'cosine', // CLIP usa cosine similarity + additionalColumns: ['filename'], + }); + + return results.map((r) => ({ + id: r.id, + score: r.score, + filename: r.metadata?.filename, + })); + } +} +``` + +### Busca BM25 (Texto) + +```typescript +async searchDocuments(query: string, topK = 10) { + // Sincroniza com índice FTS + await this.search.syncTable('documentos', { + columns: ['id', 'titulo', 'conteudo'], + idColumn: 'id', + textColumns: ['titulo', 'conteudo'], + }); + + // Busca BM25 + const results = await this.search.bm25Search('documentos', query, topK, { + idColumn: 'id', + fields: ['titulo', 'conteudo'], + conjunctive: false, // OR entre termos + }); + + return results; +} +``` + +### Busca Híbrida (Vector + BM25) + +```typescript +async hybridSearch(queryText: string, queryEmbedding: number[], topK = 10) { + // Sincroniza tabela com ambos os índices + await this.search.syncTable('documentos', { + columns: ['id', 'embedding', 'titulo', 'conteudo'], + embeddingColumn: 'embedding', + idColumn: 'id', + textColumns: ['titulo', 'conteudo'], + }); + + // Busca híbrida + const results = await this.search.hybridSearch( + 'documentos', + queryEmbedding, + queryText, + topK, + { + weights: { vector: 0.7, bm25: 0.3 }, + embeddingColumn: 'embedding', + idColumn: 'id', + textFields: ['titulo', 'conteudo'], + } + ); + + return results.map((r) => ({ + id: r.id, + combinedScore: r.combinedScore, + semanticScore: r.semanticScore, + lexicalScore: r.lexicalScore, + })); +} +``` + +## API Reference + +### DuckDBSidecarService + +| Método | Descrição | +|--------|-----------| +| `isSidecarReady()` | Verifica se o processo está pronto | +| `query(sql, params?)` | Executa SQL arbitrário | +| `attachPostgres(conn?, alias?)` | Anexa banco PostgreSQL | +| `installExtension(name)` | Instala extensão DuckDB | +| `syncFromPostgres(table, opts?)` | Copia dados do Postgres para memória | + +### DuckDBSearchService + +| Método | Descrição | +|--------|-----------| +| `syncTable(name, opts?)` | Sincroniza tabela e cria índices | +| `createVectorIndex(table, col, metric?)` | Cria índice HNSW | +| `createBM25Index(table, idCol, textCols)` | Cria índice FTS | +| `vectorSearch(table, vector, topK?, opts?)` | Busca por similaridade | +| `bm25Search(table, query, topK?, opts?)` | Busca textual BM25 | +| `hybridSearch(table, vector, text, topK?, opts?)` | Busca combinada | + +## Configuração + +Variáveis de ambiente necessárias: + +```env +# PostgreSQL (para ATTACH automático) +DATABASE_URL=postgresql://user:pass@localhost:5432/dbname + +# Ou componentes individuais +DB_HOST=localhost +DB_PORT=5432 +DB_USER=user +DB_PASS=pass +DB_NAME=dbname +``` + +## Monitoramento e Resiliência + +### Auto-restart +- Se o processo sidecar morrer, é reiniciado automaticamente +- Máximo de 1 reinício por minuto (rate limiting) +- Health check a cada 30 segundos via ping/pong + +### Logs +``` +[DuckDBSidecar] Iniciando DuckDB sidecar process... +[DuckDBSidecar] DuckDB sidecar está pronto +[DuckDBSearch] Tabela documentos sincronizada com sucesso (1000 linhas) +``` + +## Performance Tips + +1. **Memória**: O DuckDB roda em memória (`:memory:`). Ajuste `memory_limit` conforme necessário. + +2. **HNSW Index**: Crie o índice APÓS inserir os dados para bulk load mais rápido: + ```typescript + await search.syncTable('dados', { limit: 100000 }); + await search.createVectorIndex('dados', 'embedding', 'cosine'); + ``` + +3. **Batch Updates**: Para grandes volumes, sincronize em batches: + ```typescript + for (let offset = 0; offset < total; offset += 10000) { + await search.syncTable('dados', { + where: `id > ${offset} AND id <= ${offset + 10000}`, + }); + } + ``` + +4. **Colunas**: Sincronize apenas colunas necessárias: + ```typescript + await search.syncTable('dados', { + columns: ['id', 'embedding'], // sem colunas grandes + }); + ``` + +## Troubleshooting + +### Sidecar não inicia +- Verifique logs do processo filho +- Verifique se `DATABASE_URL` está configurada +- Verifique permissões de arquivo + +### Queries lentas +- Verifique se índice HNSW foi criado: `PRAGMA show_indexes` +- Verifique métrica correta (CLIP = cosine) +- Limite o número de linhas sincronizadas + +### Erro "DuckDB sidecar não está pronto" +- O processo pode estar reiniciando após falha +- Aguarde alguns segundos e tente novamente + +## Extensões Disponíveis + +O sidecar já vem com: +- `vss` - Vector Similarity Search (HNSW) +- `fts` - Full-Text Search +- `postgres` - PostgreSQL scanner +- `httpfs` - S3/HTTP access + +Para instalar outras extensões: +```typescript +await sidecar.installExtension('spatial'); +``` diff --git a/backend/src/common/duckdb/duckdb-provider.service.ts b/backend/src/common/duckdb/duckdb-provider.service.ts index c276150ac8..6ac1c0ce6a 100644 --- a/backend/src/common/duckdb/duckdb-provider.service.ts +++ b/backend/src/common/duckdb/duckdb-provider.service.ts @@ -1,41 +1,233 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { Database } from 'duckdb-async'; import { SmaeConfigService } from '../services/smae-config.service'; +export interface DuckDBConfig { + /** Caminho do banco (:memory: para memória) */ + dbPath?: string; + /** Limite de memória (ex: '2GB') */ + memoryLimit?: string; + /** Número de threads */ + threads?: number; + /** Extensões para instalar/carregar */ + extensions?: string[]; + /** Configurações S3 */ + s3Config?: { + accessKey: string; + secretKey: string; + region: string; + endpoint: string; + useSsl?: boolean; + urlStyle?: 'vhost' | 'path'; + }; +} + @Injectable() export class DuckDBProviderService { + private readonly logger = new Logger(DuckDBProviderService.name); + constructor(private readonly smaeConfigService: SmaeConfigService) {} + /** + * Cria uma instância configurada do DuckDB com suporte a S3 + */ async getConfiguredInstance(): Promise { - const accessKey = await this.smaeConfigService.getConfig('S3_ACCESS_KEY'); - const secretKey = await this.smaeConfigService.getConfig('S3_SECRET_KEY'); - const region = (await this.smaeConfigService.getConfig('S3_REGION')) ?? 'us-east-1'; - let endpoint = await this.smaeConfigService.getConfig('S3_HOST'); - const urlStyle = (await this.smaeConfigService.getConfig('S3_URL_STYLE')) ?? 'vhost'; - if (endpoint?.startsWith('http')) { - // If the endpoint starts with http, we assume it's a full URL them must remove the protocol - endpoint = endpoint.replace(/^https?:\/\//, ''); + const config = await this.buildConfigFromSmaeConfig(); + return this.createInstance(config); + } + + /** + * Cria uma instância do DuckDB com configurações específicas + */ + async createInstance(config: DuckDBConfig = {}): Promise { + const dbPath = config.dbPath ?? ':memory:'; + const memoryLimit = config.memoryLimit ?? '2GB'; + const threads = config.threads ?? 2; + const extensions = config.extensions ?? ['httpfs', 'vss', 'fts', 'postgres']; + + this.logger.log(`Criando instância DuckDB (${dbPath})...`); + + const duckdb = await Database.create(dbPath); + + // Configurações de performance + await duckdb.run(`SET memory_limit = '${memoryLimit}'`); + await duckdb.run(`SET threads = ${threads}`); + + // Instala e carrega extensões + for (const ext of extensions) { + try { + await duckdb.run(`INSTALL ${ext};`); + await duckdb.run(`LOAD ${ext};`); + this.logger.debug(`Extensão ${ext} carregada`); + } catch (e) { + this.logger.warn(`Falha ao carregar extensão ${ext}: ${e}`); + } + } + + // Configura S3 se fornecido + if (config.s3Config) { + await this.configureS3(duckdb, config.s3Config); } - const duckDB = await Database.create(':memory:'); - await duckDB.run('INSTALL httpfs;'); - await duckDB.run('LOAD httpfs;'); - await duckDB.run(` - CREATE OR REPLACE SECRET api_log_backup_s3_secret ( + this.logger.log('Instância DuckDB criada com sucesso'); + return duckdb; + } + + /** + * Cria uma instância otimizada para vector similarity search + */ + async createVectorSearchInstance( + vectorDimension: number, + estimatedRows: number = 100000 + ): Promise { + // Estima memória necessária: ~2GB base + ~1KB por vetor para HNSW index + const estimatedMemoryMB = 2048 + Math.ceil((estimatedRows * 1) / 1024); + const memoryLimit = `${Math.min(estimatedMemoryMB, 8192)}MB`; // Max 8GB + + const config: DuckDBConfig = { + dbPath: ':memory:', + memoryLimit, + threads: 4, + extensions: ['vss', 'fts', 'postgres'], + }; + + const db = await this.createInstance(config); + + // Habilita persistência experimental do índice HNSW se necessário + await db.run('SET hnsw_enable_experimental_persistence = true'); + + return db; + } + + /** + * Anexa um banco PostgreSQL à instância DuckDB + */ + async attachPostgres( + db: Database, + connectionString: string, + alias: string = 'postgres' + ): Promise { + this.logger.log(`Anexando PostgreSQL como ${alias}...`); + await db.run(`ATTACH '${connectionString}' AS ${alias} (TYPE postgres)`); + this.logger.log(`PostgreSQL anexado com sucesso como ${alias}`); + } + + /** + * Cria um índice HNSW para vector similarity search + */ + async createHNSWIndex( + db: Database, + tableName: string, + columnName: string, + metric: 'l2sq' | 'cosine' | 'ip' = 'cosine', + options: { + ef_construction?: number; + ef_search?: number; + M?: number; + } = {} + ): Promise { + const efConstruction = options.ef_construction ?? 128; + const efSearch = options.ef_search ?? 64; + const M = options.M ?? 16; + + this.logger.log(`Criando índice HNSW em ${tableName}.${columnName} (metric: ${metric})...`); + + const indexName = `idx_${tableName}_${columnName}_hnsw`; + + await db.run(` + CREATE INDEX IF NOT EXISTS ${indexName} + ON ${tableName} + USING HNSW (${columnName}) + WITH ( + metric = '${metric}', + ef_construction = ${efConstruction}, + ef_search = ${efSearch}, + M = ${M} + ) + `); + + this.logger.log(`Índice HNSW ${indexName} criado com sucesso`); + } + + /** + * Cria um índice FTS (Full-Text Search) com BM25 + */ + async createFTSIndex( + db: Database, + tableName: string, + idColumn: string, + ...textColumns: string[] + ): Promise { + this.logger.log(`Criando índice FTS em ${tableName} (${textColumns.join(', ')})...`); + + const columnsStr = textColumns.map((c) => `'${c}'`).join(', '); + + await db.run(` + PRAGMA create_fts_index( + '${tableName}', + '${idColumn}', + ${columnsStr}, + stemmer = 'portuguese', + stopwords = 'portuguese' + ) + `); + + this.logger.log(`Índice FTS criado com sucesso`); + } + + private async configureS3( + db: Database, + config: NonNullable + ): Promise { + const endpoint = config.endpoint.replace(/^https?:\/\//, ''); + const useSsl = config.useSsl ?? config.endpoint.startsWith('https'); + const urlStyle = config.urlStyle ?? 'vhost'; + + await db.run(` + CREATE OR REPLACE SECRET smaep_s3_secret ( TYPE S3, PROVIDER CONFIG, - KEY_ID '${accessKey}', - SECRET '${secretKey}', - REGION '${region}', + KEY_ID '${config.accessKey}', + SECRET '${config.secretKey}', + REGION '${config.region}', ENDPOINT '${endpoint}', - USE_SSL ${endpoint?.startsWith('https') ? 'TRUE' : 'FALSE'}, + USE_SSL ${useSsl ? 'TRUE' : 'FALSE'}, URL_STYLE '${urlStyle}' - ); + ) `); - await duckDB.run("SET memory_limit = '800MB'"); - await duckDB.run('SET threads = 1'); + this.logger.log('Configuração S3 aplicada'); + } + + private async buildConfigFromSmaeConfig(): Promise { + const accessKey = await this.smaeConfigService.getConfig('S3_ACCESS_KEY'); + const secretKey = await this.smaeConfigService.getConfig('S3_SECRET_KEY'); + const region = (await this.smaeConfigService.getConfig('S3_REGION')) ?? 'us-east-1'; + let endpoint = await this.smaeConfigService.getConfig('S3_HOST') ?? ''; + const urlStyle = (await this.smaeConfigService.getConfig('S3_URL_STYLE')) ?? 'vhost'; + + if (endpoint.startsWith('http')) { + endpoint = endpoint.replace(/^https?:\/\//, ''); + } + + const config: DuckDBConfig = { + dbPath: ':memory:', + memoryLimit: '2GB', + threads: 2, + extensions: ['httpfs', 'vss', 'fts', 'postgres'], + }; + + if (accessKey && secretKey) { + config.s3Config = { + accessKey, + secretKey, + region, + endpoint, + urlStyle: urlStyle as 'vhost' | 'path', + useSsl: endpoint.startsWith('https'), + }; + } - return duckDB; + return config; } } diff --git a/backend/src/common/duckdb/duckdb-search.service.ts b/backend/src/common/duckdb/duckdb-search.service.ts new file mode 100644 index 0000000000..b71a9049de --- /dev/null +++ b/backend/src/common/duckdb/duckdb-search.service.ts @@ -0,0 +1,397 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { DuckDBSidecarService } from './duckdb-sidecar.service'; +import { ConfigService } from '@nestjs/config'; + +export interface VectorSearchResult { + id: string | number; + score: number; + metadata?: Record; +} + +export interface BM25SearchResult { + id: string | number; + score: number; + content?: string; +} + +export interface HybridSearchResult { + id: string | number; + semanticScore: number; + lexicalScore: number; + combinedScore: number; + metadata?: Record; +} + +/** + * Serviço de busca usando DuckDB para Vector Similarity (CLIP/embeddings) e BM25. + * + * Este serviço utiliza o sidecar DuckDB para executar buscas eficientes: + * - Busca vetorial usando HNSW index (cosine similarity) + * - Busca textual usando BM25 (FTS extension) + * - Busca híbrida combinando ambas + * + * @example + * // Busca por similaridade de embeddings + * const results = await searchService.vectorSearch( + * 'documentos', + * queryEmbedding, // float[] from CLIP or other model + * 10 + * ); + * + * @example + * // Busca BM25 + * const results = await searchService.bm25Search( + * 'documentos', + * 'termos de busca', + * 10 + * ); + * + * @example + * // Busca híbrida + * const results = await searchService.hybridSearch( + * 'documentos', + * queryEmbedding, + * 'termos de busca', + * { vectorWeight: 0.7, bm25Weight: 0.3 }, + * 10 + * ); + */ +@Injectable() +export class DuckDBSearchService implements OnModuleInit { + private readonly logger = new Logger(DuckDBSearchService.name); + private postgresAttached = false; + + constructor( + private readonly sidecar: DuckDBSidecarService, + private readonly configService: ConfigService + ) {} + + async onModuleInit() { + // Anexa PostgreSQL ao iniciar + if (this.sidecar.isSidecarReady()) { + await this.attachPostgres(); + } + } + + /** + * Sincroniza dados do PostgreSQL para o DuckDB. + * Carrega embeddings e conteúdo textual para memória. + */ + async syncTable( + tableName: string, + options: { + columns?: string[]; + where?: string; + limit?: number; + embeddingColumn?: string; + textColumns?: string[]; + idColumn?: string; + } = {} + ): Promise { + if (!this.sidecar.isSidecarReady()) { + this.logger.error('Sidecar não está pronto'); + return false; + } + + // Garante que PostgreSQL está anexado + if (!this.postgresAttached) { + const attached = await this.attachPostgres(); + if (!attached) return false; + } + + const columns = options.columns || ['*']; + const result = await this.sidecar.syncFromPostgres(tableName, { + columns, + where: options.where, + limit: options.limit, + duckdbTableName: tableName, + }); + + if (!result.success) { + this.logger.error(`Falha ao sincronizar tabela ${tableName}: ${result.error}`); + return false; + } + + this.logger.log(`Tabela ${tableName} sincronizada com sucesso (${result.data.length} linhas)`); + + // Cria índices se especificado + if (options.embeddingColumn) { + await this.createVectorIndex(tableName, options.embeddingColumn); + } + + if (options.textColumns && options.textColumns.length > 0 && options.idColumn) { + await this.createBM25Index(tableName, options.idColumn, options.textColumns); + } + + return true; + } + + /** + * Cria índice HNSW para busca vetorial + */ + async createVectorIndex( + tableName: string, + columnName: string, + metric: 'l2sq' | 'cosine' | 'ip' = 'cosine' + ): Promise { + const sql = ` + CREATE INDEX IF NOT EXISTS idx_${tableName}_${columnName}_hnsw + ON ${tableName} USING HNSW (${columnName}) + WITH (metric = '${metric}') + `; + + const result = await this.sidecar.query(sql); + + if (!result.success) { + this.logger.error(`Falha ao criar índice vetorial: ${result.error}`); + return false; + } + + this.logger.log(`Índice vetorial criado em ${tableName}.${columnName}`); + return true; + } + + /** + * Cria índice FTS para busca BM25 + */ + async createBM25Index( + tableName: string, + idColumn: string, + textColumns: string[] + ): Promise { + const columnsStr = textColumns.map((c) => `'${c}'`).join(', '); + const sql = ` + PRAGMA create_fts_index( + '${tableName}', + '${idColumn}', + ${columnsStr}, + stemmer = 'portuguese', + stopwords = 'portuguese' + ) + `; + + const result = await this.sidecar.query(sql); + + if (!result.success) { + this.logger.error(`Falha ao criar índice BM25: ${result.error}`); + return false; + } + + this.logger.log(`Índice BM25 criado em ${tableName} (${textColumns.join(', ')})`); + return true; + } + + /** + * Busca por similaridade vetorial (CLIP/embeddings) + */ + async vectorSearch( + tableName: string, + queryVector: number[], + topK: number = 10, + options: { + embeddingColumn?: string; + idColumn?: string; + metric?: 'cosine' | 'l2sq' | 'ip'; + additionalColumns?: string[]; + where?: string; + } = {} + ): Promise { + const embeddingCol = options.embeddingColumn || 'embedding'; + const idCol = options.idColumn || 'id'; + const metric = options.metric || 'cosine'; + const additionalCols = options.additionalColumns || []; + const whereClause = options.where ? `WHERE ${options.where}` : ''; + + // Função de distância baseada na métrica + let distanceFunc: string; + let orderDirection: 'ASC' | 'DESC'; + + switch (metric) { + case 'cosine': + distanceFunc = 'array_cosine_similarity'; + orderDirection = 'DESC'; // Maior = mais similar + break; + case 'l2sq': + distanceFunc = 'array_distance'; + orderDirection = 'ASC'; // Menor = mais similar + break; + case 'ip': + distanceFunc = 'array_inner_product'; + orderDirection = 'DESC'; // Maior = mais similar + break; + default: + distanceFunc = 'array_cosine_similarity'; + orderDirection = 'DESC'; + } + + const vectorStr = queryVector.map((v) => v.toString()).join(', '); + const extraCols = additionalCols.length > 0 ? ', ' + additionalCols.join(', ') : ''; + + const sql = ` + SELECT + ${idCol} as id, + ${distanceFunc}(${embeddingCol}, [${vectorStr}]::FLOAT[${queryVector.length}]) as score + ${extraCols} + FROM ${tableName} + ${whereClause} + ORDER BY score ${orderDirection} + LIMIT ${topK} + `; + + const result = await this.sidecar.query(sql); + + if (!result.success) { + this.logger.error(`Erro na busca vetorial: ${result.error}`); + return []; + } + + return result.data.map((row: any) => ({ + id: row.id, + score: row.score, + metadata: additionalCols.reduce((acc, col) => { + if (col in row) acc[col] = row[col]; + return acc; + }, {} as Record), + })); + } + + /** + * Busca textual usando BM25 + */ + async bm25Search( + tableName: string, + queryText: string, + topK: number = 10, + options: { + idColumn?: string; + fields?: string[]; + additionalColumns?: string[]; + conjunctive?: boolean; + } = {} + ): Promise { + const idCol = options.idColumn || 'id'; + const fields = options.fields; + const additionalCols = options.additionalColumns || []; + const conjunctive = options.conjunctive ?? false; + + const extraCols = additionalCols.length > 0 ? ', ' + additionalCols.join(', ') : ''; + + // Escapa a query para evitar SQL injection + const escapedQuery = queryText.replace(/'/g, "''"); + const fieldsParam = fields ? `fields := '${fields.join(', ')}'` : 'fields := NULL'; + + const sql = ` + SELECT + ${idCol} as id, + fts_main_${tableName}.match_bm25( + ${idCol}, + '${escapedQuery}', + ${fieldsParam}, + conjunctive := ${conjunctive ? 1 : 0} + ) as score + ${extraCols} + FROM ${tableName} + WHERE score IS NOT NULL + ORDER BY score DESC + LIMIT ${topK} + `; + + const result = await this.sidecar.query(sql); + + if (!result.success) { + this.logger.error(`Erro na busca BM25: ${result.error}`); + return []; + } + + return result.data.map((row: any) => ({ + id: row.id, + score: row.score, + content: additionalCols.length > 0 ? row[additionalCols[0]] : undefined, + })); + } + + /** + * Busca híbrida combinando vector search + BM25 + */ + async hybridSearch( + tableName: string, + queryVector: number[], + queryText: string, + topK: number = 10, + options: { + weights?: { vector: number; bm25: number }; + embeddingColumn?: string; + idColumn?: string; + textFields?: string[]; + } = {} + ): Promise { + const weights = options.weights || { vector: 0.7, bm25: 0.3 }; + const embeddingCol = options.embeddingColumn || 'embedding'; + const idCol = options.idColumn || 'id'; + + // Busca semântica (vetorial) + const vectorResults = await this.vectorSearch(tableName, queryVector, topK * 2, { + embeddingColumn: embeddingCol, + idColumn: idCol, + metric: 'cosine', + }); + + // Busca lexical (BM25) + const bm25Results = await this.bm25Search(tableName, queryText, topK * 2, { + idColumn: idCol, + fields: options.textFields, + }); + + // Combina resultados usando RRF (Reciprocal Rank Fusion) + const k = 60; // Constante do RRF + const scores = new Map(); + + // Adiciona scores semânticos (normalizados) + const maxVectorScore = Math.max(...vectorResults.map((r) => r.score), 1); + vectorResults.forEach((r, idx) => { + const existing = scores.get(r.id) || { semantic: 0, lexical: 0 }; + existing.semantic = (r.score / maxVectorScore) * (1 / (k + idx + 1)); + scores.set(r.id, existing); + }); + + // Adiciona scores BM25 + bm25Results.forEach((r, idx) => { + const existing = scores.get(r.id) || { semantic: 0, lexical: 0 }; + existing.lexical = 1 / (k + idx + 1); // BM25 já é ranqueado + scores.set(r.id, existing); + }); + + // Calcula score combinado + const combined: HybridSearchResult[] = Array.from(scores.entries()) + .map(([id, score]) => ({ + id, + semanticScore: score.semantic, + lexicalScore: score.lexical, + combinedScore: score.semantic * weights.vector + score.lexical * weights.bm25, + })) + .sort((a, b) => b.combinedScore - a.combinedScore) + .slice(0, topK); + + return combined; + } + + /** + * Executa uma query SQL arbitrária no sidecar + */ + async executeQuery(sql: string, params?: any[]): Promise { + const result = await this.sidecar.query(sql, params); + return result.success ? result.data : null; + } + + private async attachPostgres(): Promise { + const connectionString = this.configService.get('DATABASE_URL'); + if (!connectionString) { + this.logger.error('DATABASE_URL não configurada'); + return false; + } + + const attached = await this.sidecar.attachPostgres(connectionString, 'postgres'); + this.postgresAttached = attached; + return attached; + } +} diff --git a/backend/src/common/duckdb/duckdb-sidecar.service.ts b/backend/src/common/duckdb/duckdb-sidecar.service.ts new file mode 100644 index 0000000000..f0d47302a4 --- /dev/null +++ b/backend/src/common/duckdb/duckdb-sidecar.service.ts @@ -0,0 +1,453 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { fork, ChildProcess } from 'child_process'; +import { resolve as resolvePath } from 'path'; +import { ConfigService } from '@nestjs/config'; + +// Tipos de mensagens IPC +type SidecarRequest = + | { type: 'query'; sql: string; params?: any[] } + | { type: 'attach_postgres'; connectionString: string; alias?: string } + | { type: 'install_extension'; name: string } + | { type: 'ping' }; + +type SidecarResponse = + | { event: 'query_result'; data: any[] } + | { event: 'query_error'; error: string } + | { event: 'attached'; alias: string; error?: string } + | { event: 'extension_installed'; name: string; error?: string } + | { event: 'pong' } + | { event: 'ready' } + | { event: 'error'; error: string }; + +type QueryResult = { success: true; data: any[] } | { success: false; error: string }; + +/** + * Serviço que gerencia o processo sidecar do DuckDB. + * - Mantém um processo filho rodando DuckDB + * - Monitora saúde do processo via ping/pong + * - Reinicia o processo se necessário (máx 1 vez por minuto) + * - Provê interface simples para executar queries + */ +@Injectable() +export class DuckDBSidecarService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(DuckDBSidecarService.name); + private childProcess: ChildProcess | null = null; + private isShuttingDown = false; + private isReady = false; + + // Controle de reinício + private lastRestartTime: Date | null = null; + private restartCount = 0; + private readonly MAX_RESTARTS_PER_MINUTE = 1; + private readonly RESTART_WINDOW_MS = 60 * 1000; // 1 minuto + + // Health check + private lastPongTime: Date | null = null; + private healthCheckInterval: NodeJS.Timeout | null = null; + private readonly HEALTH_CHECK_INTERVAL_MS = 30000; // 30 segundos + private readonly PONG_TIMEOUT_MS = 10000; // 10 segundos + + // Queries pendentes (para correlação request/response) + private pendingQueries = new Map< + string, + { resolve: (value: QueryResult) => void; reject: (reason: Error) => void; timer: NodeJS.Timeout } + >(); + private queryIdCounter = 0; + private readonly QUERY_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutos + + constructor(private readonly configService: ConfigService) {} + + async onModuleInit() { + this.logger.log('Inicializando DuckDB Sidecar Service...'); + await this.startSidecar(); + this.startHealthCheck(); + } + + async onModuleDestroy() { + this.logger.log('Destruindo DuckDB Sidecar Service...'); + this.isShuttingDown = true; + this.stopHealthCheck(); + await this.stopSidecar(); + } + + /** + * Verifica se o sidecar está pronto para receber queries + */ + isSidecarReady(): boolean { + return this.isReady && this.childProcess !== null && this.childProcess.connected; + } + + /** + * Executa uma query SQL no DuckDB sidecar + * @param sql - Query SQL a ser executada + * @param params - Parâmetros da query (opcional) + * @returns Resultado da query ou erro + */ + async query(sql: string, params?: any[]): Promise { + if (!this.isSidecarReady()) { + return { success: false, error: 'DuckDB sidecar não está pronto' }; + } + + const queryId = `query_${++this.queryIdCounter}_${Date.now()}`; + + return new Promise((resolve, reject) => { + // Timeout para a query + const timer = setTimeout(() => { + this.pendingQueries.delete(queryId); + resolve({ success: false, error: 'Query timeout' }); + }, this.QUERY_TIMEOUT_MS); + + this.pendingQueries.set(queryId, { resolve, reject, timer }); + + // Envia a query para o processo filho + const request: SidecarRequest = { type: 'query', sql, params }; + this.childProcess!.send(request, (err) => { + if (err) { + clearTimeout(timer); + this.pendingQueries.delete(queryId); + resolve({ success: false, error: `Falha ao enviar query: ${err.message}` }); + } + }); + }); + } + + /** + * Anexa um banco PostgreSQL ao DuckDB + * @param connectionString - Connection string do PostgreSQL + * @param alias - Alias para o banco (padrão: 'postgres') + */ + async attachPostgres(connectionString?: string, alias = 'postgres'): Promise { + if (!this.isSidecarReady()) { + this.logger.error('DuckDB sidecar não está pronto'); + return false; + } + + const pgConn = + connectionString || this.configService.get('DATABASE_URL') || this.buildConnectionStringFromEnv(); + + if (!pgConn) { + this.logger.error('Connection string do PostgreSQL não encontrada'); + return false; + } + + return new Promise((resolve) => { + const request: SidecarRequest = { type: 'attach_postgres', connectionString: pgConn, alias }; + + const timer = setTimeout(() => { + resolve(false); + }, 30000); + + const handler = (msg: SidecarResponse) => { + if (msg.event === 'attached' && msg.alias === alias) { + clearTimeout(timer); + this.childProcess!.off('message', handler); + resolve(!msg.error); + } + }; + + this.childProcess!.on('message', handler); + this.childProcess!.send(request); + }); + } + + /** + * Instala e carrega uma extensão do DuckDB + * @param name - Nome da extensão + */ + async installExtension(name: string): Promise { + if (!this.isSidecarReady()) { + this.logger.error('DuckDB sidecar não está pronto'); + return false; + } + + return new Promise((resolve) => { + const request: SidecarRequest = { type: 'install_extension', name }; + + const timer = setTimeout(() => { + resolve(false); + }, 60000); + + const handler = (msg: SidecarResponse) => { + if (msg.event === 'extension_installed' && msg.name === name) { + clearTimeout(timer); + this.childProcess!.off('message', handler); + resolve(!msg.error); + } + }; + + this.childProcess!.on('message', handler); + this.childProcess!.send(request); + }); + } + + /** + * Sincroniza dados do PostgreSQL para o DuckDB (in-memory) + * Útil para carregar embeddings e dados para BM25 search + */ + async syncFromPostgres( + tableName: string, + options?: { + columns?: string[]; + where?: string; + limit?: number; + duckdbTableName?: string; + } + ): Promise { + const duckdbTable = options?.duckdbTableName || tableName; + const columns = options?.columns?.join(', ') || '*'; + const whereClause = options?.where ? `WHERE ${options.where}` : ''; + const limitClause = options?.limit ? `LIMIT ${options.limit}` : ''; + + // Cria tabela no DuckDB a partir do PostgreSQL + const sql = ` + CREATE OR REPLACE TABLE ${duckdbTable} AS + SELECT ${columns} + FROM postgres.${tableName} + ${whereClause} + ${limitClause} + `; + + return this.query(sql); + } + + /** + * Inicia o processo sidecar + */ + private async startSidecar(): Promise { + if (this.childProcess) { + this.logger.warn('Sidecar já está rodando'); + return; + } + + // Verifica rate limit de reinício + if (this.lastRestartTime) { + const timeSinceLastRestart = Date.now() - this.lastRestartTime.getTime(); + if (timeSinceLastRestart < this.RESTART_WINDOW_MS) { + this.restartCount++; + if (this.restartCount > this.MAX_RESTARTS_PER_MINUTE) { + this.logger.error( + `Limite de reinícios por minuto excedido (${this.MAX_RESTARTS_PER_MINUTE}). Aguardando...` + ); + // Aguarda até poder reiniciar + await new Promise((resolve) => + setTimeout(resolve, this.RESTART_WINDOW_MS - timeSinceLastRestart) + ); + } + } else { + // Janela passou, reseta contador + this.restartCount = 0; + } + } + + this.lastRestartTime = new Date(); + this.logger.log('Iniciando DuckDB sidecar process...'); + + return new Promise((resolve, reject) => { + const sidecarPath = resolvePath(__dirname, './../../bin/') + '/run-duckdb-sidecar.js'; + this.childProcess = fork(sidecarPath, [], { + silent: false, // Herda stdio do processo pai para logs + env: { ...process.env, NODE_ENV: process.env.NODE_ENV }, + }); + + // Handler de mensagens + this.childProcess.on('message', (msg: SidecarResponse) => { + this.handleMessage(msg); + + if (msg.event === 'ready') { + this.isReady = true; + this.logger.log('DuckDB sidecar está pronto'); + resolve(); + } + }); + + // Handler de erro + this.childProcess.on('error', (err) => { + this.logger.error(`Erro no processo sidecar: ${err.message}`); + if (!this.isReady) { + reject(err); + } + }); + + // Handler de exit + this.childProcess.on('exit', (code, signal) => { + this.logger.warn(`Sidecar encerrado (code: ${code}, signal: ${signal})`); + this.isReady = false; + this.childProcess = null; + + if (!this.isShuttingDown) { + this.scheduleRestart(); + } + }); + + // Timeout de inicialização + setTimeout(() => { + if (!this.isReady) { + reject(new Error('Timeout ao iniciar sidecar')); + } + }, 60000); + }); + } + + /** + * Para o processo sidecar + */ + private async stopSidecar(): Promise { + if (!this.childProcess) return; + + this.logger.log('Parando DuckDB sidecar...'); + + // Rejeita todas as queries pendentes + for (const [id, pending] of this.pendingQueries.entries()) { + clearTimeout(pending.timer); + pending.resolve({ success: false, error: 'Sidecar sendo desligado' }); + this.pendingQueries.delete(id); + } + + // Tenta graceful shutdown + this.childProcess.kill('SIGTERM'); + + // Aguarda até 5 segundos para encerrar + await new Promise((resolve) => { + const timeout = setTimeout(() => { + if (this.childProcess && !this.childProcess.killed) { + this.logger.warn('Forçando kill do sidecar'); + this.childProcess.kill('SIGKILL'); + } + resolve(); + }, 5000); + + this.childProcess!.on('exit', () => { + clearTimeout(timeout); + resolve(); + }); + }); + + this.childProcess = null; + this.isReady = false; + this.logger.log('DuckDB sidecar parado'); + } + + /** + * Agenda reinício do sidecar + */ + private scheduleRestart() { + if (this.isShuttingDown) return; + + this.logger.log('Agendando reinício do sidecar...'); + setTimeout(async () => { + if (!this.isShuttingDown && !this.isSidecarReady()) { + try { + await this.startSidecar(); + } catch (error) { + this.logger.error(`Falha ao reiniciar sidecar: ${error}`); + } + } + }, 5000); // Aguarda 5 segundos antes de tentar reiniciar + } + + /** + * Inicia health check periódico + */ + private startHealthCheck() { + this.healthCheckInterval = setInterval(async () => { + if (!this.isSidecarReady()) { + this.logger.warn('Sidecar não está pronto durante health check'); + return; + } + + // Envia ping + const pingTime = Date.now(); + this.childProcess!.send({ type: 'ping' } as SidecarRequest); + + // Aguarda pong por um tempo limitado + setTimeout(() => { + if (this.lastPongTime && this.lastPongTime.getTime() < pingTime) { + this.logger.error('Pong não recebido a tempo, sidecar pode estar travado'); + // Força reinício + if (this.childProcess) { + this.childProcess.kill('SIGKILL'); + } + } + }, this.PONG_TIMEOUT_MS); + }, this.HEALTH_CHECK_INTERVAL_MS); + } + + /** + * Para health check + */ + private stopHealthCheck() { + if (this.healthCheckInterval) { + clearInterval(this.healthCheckInterval); + this.healthCheckInterval = null; + } + } + + /** + * Processa mensagens recebidas do processo filho + */ + private handleMessage(msg: SidecarResponse) { + switch (msg.event) { + case 'pong': + this.lastPongTime = new Date(); + break; + + case 'query_result': + this.resolvePendingQuery(msg.data); + break; + + case 'query_error': + this.resolvePendingQuery(undefined, msg.error); + break; + + case 'error': + this.logger.error(`Erro reportado pelo sidecar: ${msg.error}`); + break; + + case 'ready': + case 'attached': + case 'extension_installed': + // Já tratados em seus respectivos métodos + break; + + default: + this.logger.warn(`Mensagem desconhecida do sidecar: ${JSON.stringify(msg)}`); + } + } + + /** + * Resolve uma query pendente + */ + private resolvePendingQuery(data?: any[], error?: string) { + // Como não temos correlação por ID nas respostas atuais, + // resolvemos a primeira query pendente (FIFO) + const [firstEntry] = this.pendingQueries.entries(); + if (!firstEntry) return; + + const [id, pending] = firstEntry; + clearTimeout(pending.timer); + this.pendingQueries.delete(id); + + if (error) { + pending.resolve({ success: false, error }); + } else { + pending.resolve({ success: true, data: data || [] }); + } + } + + /** + * Constrói connection string a partir de variáveis de ambiente + */ + private buildConnectionStringFromEnv(): string | null { + const host = process.env.DB_HOST || 'localhost'; + const port = process.env.DB_PORT || '5432'; + const user = process.env.DB_USER || process.env.DB_USERNAME; + const pass = process.env.DB_PASS || process.env.DB_PASSWORD; + const db = process.env.DB_NAME || process.env.DB_DATABASE; + + if (!user || !pass || !db) { + return null; + } + + return `postgresql://${user}:${pass}@${host}:${port}/${db}`; + } +} diff --git a/backend/src/common/duckdb/duckdb.module.ts b/backend/src/common/duckdb/duckdb.module.ts index 6071f18987..35f8934f05 100644 --- a/backend/src/common/duckdb/duckdb.module.ts +++ b/backend/src/common/duckdb/duckdb.module.ts @@ -1,9 +1,12 @@ import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; import { DuckDBProviderService } from './duckdb-provider.service'; +import { DuckDBSidecarService } from './duckdb-sidecar.service'; +import { DuckDBSearchService } from './duckdb-search.service'; @Module({ - imports: [], - providers: [DuckDBProviderService], - exports: [DuckDBProviderService], + imports: [ConfigModule], + providers: [DuckDBProviderService, DuckDBSidecarService, DuckDBSearchService], + exports: [DuckDBProviderService, DuckDBSidecarService, DuckDBSearchService], }) export class DuckDBModule {} diff --git a/backend/src/common/duckdb/example-usage.ts b/backend/src/common/duckdb/example-usage.ts new file mode 100644 index 0000000000..9067af0dbd --- /dev/null +++ b/backend/src/common/duckdb/example-usage.ts @@ -0,0 +1,245 @@ +/** + * EXEMPLO DE USO - DuckDB Sidecar para Busca Vetorial e BM25 + * + * Este arquivo demonstra como usar o DuckDBSidecarService e DuckDBSearchService + * em seus próprios serviços e controllers. + */ + +import { Controller, Get, Injectable, Query } from '@nestjs/common'; +import { DuckDBSearchService, VectorSearchResult, HybridSearchResult } from './index'; + +// ============================================================================ +// EXEMPLO 1: Serviço de Busca de Documentos +// ============================================================================ + +@Injectable() +export class DocumentoSearchService { + constructor(private readonly search: DuckDBSearchService) {} + + /** + * Busca documentos por texto (BM25) + */ + async buscarPorTexto(query: string, limite: number = 10) { + // 1. Sincroniza dados do PostgreSQL (apenas na primeira vez ou quando atualizar) + await this.search.syncTable('documento', { + columns: ['id', 'titulo', 'conteudo', 'categoria'], + idColumn: 'id', + textColumns: ['titulo', 'conteudo'], + // where: "categoria = 'publico'", // opcional: filtrar dados + }); + + // 2. Executa busca BM25 + const resultados = await this.search.bm25Search('documento', query, limite, { + idColumn: 'id', + fields: ['titulo', 'conteudo'], + additionalColumns: ['titulo', 'categoria'], + }); + + return resultados; + } + + /** + * Busca documentos por embedding (busca semântica) + */ + async buscarPorEmbedding(embedding: number[], limite: number = 10) { + // Sincroniza com coluna de embedding + await this.search.syncTable('documento', { + columns: ['id', 'embedding', 'titulo', 'conteudo'], + embeddingColumn: 'embedding', + }); + + // Busca por similaridade de cosseno + const resultados = await this.search.vectorSearch('documento', embedding, limite, { + embeddingColumn: 'embedding', + metric: 'cosine', + additionalColumns: ['titulo', 'conteudo'], + }); + + return resultados; + } + + /** + * Busca híbrida (texto + embedding) + */ + async buscarHibrida(queryTexto: string, embedding: number[], limite: number = 10) { + // Sincroniza com ambos os índices + await this.search.syncTable('documento', { + columns: ['id', 'embedding', 'titulo', 'conteudo'], + embeddingColumn: 'embedding', + idColumn: 'id', + textColumns: ['titulo', 'conteudo'], + }); + + // Busca combinada + const resultados = await this.search.hybridSearch( + 'documento', + embedding, + queryTexto, + limite, + { + weights: { vector: 0.6, bm25: 0.4 }, + embeddingColumn: 'embedding', + idColumn: 'id', + textFields: ['titulo', 'conteudo'], + } + ); + + return resultados; + } +} + +// ============================================================================ +// EXEMPLO 2: Serviço de Busca de Imagens (CLIP) +// ============================================================================ + +@Injectable() +export class ImagemSearchService { + constructor(private readonly search: DuckDBSearchService) {} + + /** + * Busca imagens similares usando embeddings CLIP + */ + async buscarImagensSimilares(clipEmbedding: number[], limite: number = 10) { + // Sincroniza tabela de imagens + await this.search.syncTable('imagem', { + columns: ['id', 'embedding_clip', 'nome_arquivo', 'url'], + embeddingColumn: 'embedding_clip', + }); + + // Busca por similaridade + const resultados = await this.search.vectorSearch('imagem', clipEmbedding, limite, { + embeddingColumn: 'embedding_clip', + metric: 'cosine', // CLIP usa cosine similarity + additionalColumns: ['nome_arquivo', 'url'], + }); + + return resultados.map((r) => ({ + id: r.id, + similaridade: r.score, + nomeArquivo: r.metadata?.nome_arquivo, + url: r.metadata?.url, + })); + } +} + +// ============================================================================ +// EXEMPLO 3: Controller REST +// ============================================================================ + +@Controller('busca') +export class BuscaController { + constructor( + private readonly docSearch: DocumentoSearchService, + private readonly imgSearch: ImagemSearchService + ) {} + + @Get('documentos') + async buscarDocumentos( + @Query('q') query: string, + @Query('tipo') tipo?: 'texto' | 'semantica' | 'hibrida', + @Query('limite') limite: number = 10 + ) { + if (!query) return { error: 'Query obrigatória' }; + + switch (tipo) { + case 'semantica': + // Aqui você precisaria gerar o embedding da query + // usando um modelo como CLIP ou sentence-transformers + const embedding = await this.gerarEmbedding(query); + return this.docSearch.buscarPorEmbedding(embedding, limite); + + case 'hibrida': + const embeddingHibrido = await this.gerarEmbedding(query); + return this.docSearch.buscarHibrida(query, embeddingHibrido, limite); + + case 'texto': + default: + return this.docSearch.buscarPorTexto(query, limite); + } + } + + @Get('imagens') + async buscarImagens( + @Query('embedding') embeddingStr: string, + @Query('limite') limite: number = 10 + ) { + // Recebe embedding como JSON string: "[0.1, 0.2, ...]" + const embedding = JSON.parse(embeddingStr); + return this.imgSearch.buscarImagensSimilares(embedding, limite); + } + + private async gerarEmbedding(texto: string): Promise { + // Integração com modelo de embeddings (CLIP, etc.) + // Retorna array de floats + throw new Error('Não implementado - integrar com modelo'); + } +} + +// ============================================================================ +// EXEMPLO 4: Uso Avançado - Queries SQL Diretas +// ============================================================================ + +import { DuckDBSidecarService } from './index'; + +@Injectable() +export class AnaliseAvancadaService { + constructor(private readonly sidecar: DuckDBSidecarService) {} + + async analisePersonalizada() { + // Anexa PostgreSQL + await this.sidecar.attachPostgres(); + + // Sincroniza dados específicos + await this.sidecar.syncFromPostgres('minha_tabela', { + columns: ['id', 'valor', 'categoria'], + where: 'created_at > NOW() - INTERVAL \'30 days\'', + }); + + // Query complexa com Window Functions, CTEs, etc. + const resultado = await this.sidecar.query(` + WITH ranked AS ( + SELECT + id, + valor, + categoria, + ROW_NUMBER() OVER (PARTITION BY categoria ORDER BY valor DESC) as rank + FROM minha_tabela + ) + SELECT * FROM ranked WHERE rank <= 5 + `); + + if (resultado.success) { + return resultado.data; + } + return []; + } +} + +// ============================================================================ +// MÓDULO +// ============================================================================ + +/* +Para usar em um módulo: + +import { Module } from '@nestjs/common'; +import { DuckDBModule } from '../common/duckdb'; +import { + DocumentoSearchService, + ImagemSearchService, + BuscaController, + AnaliseAvancadaService +} from './example-usage'; + +@Module({ + imports: [DuckDBModule], + providers: [ + DocumentoSearchService, + ImagemSearchService, + AnaliseAvancadaService, + ], + controllers: [BuscaController], + exports: [DocumentoSearchService, ImagemSearchService], +}) +export class MinhaBuscaModule {} +*/ diff --git a/backend/src/common/duckdb/index.ts b/backend/src/common/duckdb/index.ts new file mode 100644 index 0000000000..47af9d825a --- /dev/null +++ b/backend/src/common/duckdb/index.ts @@ -0,0 +1,9 @@ +export { DuckDBModule } from './duckdb.module'; +export { DuckDBProviderService, DuckDBConfig } from './duckdb-provider.service'; +export { DuckDBSidecarService } from './duckdb-sidecar.service'; +export { + DuckDBSearchService, + VectorSearchResult, + BM25SearchResult, + HybridSearchResult, +} from './duckdb-search.service';