Sistema OmniSearch - Documentação Completa

O Sistema OmniSearch é um repositório central de busca híbrida que oferece indexação e consulta avançada sobre as tabelas conversations, leads e tickets. Utiliza PostgreSQL Full-Text Search (FTS), trigram similarity e busca numérica em uma arquitetura otimizada com pg_notify.

Características Principais

  • Busca Híbrida: FTS, trigram, e busca numérica
  • Indexação Automática: Triggers SQL com pg_notify
  • Processamento em Batches: Alta vazão e baixa latência
  • Reconciliação Automática: Sistema de fallback para garantir consistência
  • Normalização Inteligente: Suporte a português com unaccent e stemming

Endpoints da API

Eventos de CRUD

POST /search-index/events

Recebe eventos de modificação das tabelas origem.

Request Body:

{
  "origin_table": "conversations|leads|tickets",
  "origin_id": "uuid",
  "operation": "INSERT|UPDATE|DELETE"
}

GET /search-index/events/health

Verifica saúde do serviço de eventos.

Endpoints de Busca

POST /search-index/search/fts

Busca Full-Text Search simples.

Request Body:

{
  "query": "texto para buscar",
  "limit": 20,
  "offset": 0
}

POST /search-index/search/hybrid

Busca híbrida (FTS + trigram + numérica).

Request Body:

{
  "query": "texto ou número",
  "limit": 30,
  "offset": 0
}

POST /search-index/search/unified

Busca unificada com múltiplos tipos.

Request Body:

{
  "query": "texto",
  "types": ["fts", "hybrid", "similarity"],
  "options": {
    "fts": { "limit": 10 },
    "hybrid": { "limit": 15 },
    "similarity": { "threshold": 0.4, "limit": 10 }
  }
}

POST /search-index/search/phone

Busca específica por telefone.

Request Body:

{
  "phone": "11999999999",
  "search_type": "exact|suffix|partial"
}

Endpoints de Administração

GET /search-index/admin/stats

Estatísticas do search index.

POST /search-index/admin/reindex

Reindexação de tabelas.

GET /search-index/admin/health

Health check completo do sistema.

Componentes do Sistema

1. PostgreSQL Notify Listener (pgNotifyListener.js)

Características:

  • Escuta canal omni_search_events
  • Batching configurável (padrão: 200 eventos ou 1000ms)
  • Reconexão automática em caso de falha
  • Estatísticas detalhadas de performance

Configurações:

{
  BATCH_SIZE: 200,           // eventos por batch
  FLUSH_PERIOD: 1000,        // ms para flush automático
  MAX_RECONNECT_ATTEMPTS: 5  // tentativas de reconexão
}

2. Search Index Consumer (searchIndexConsumer.js)

Melhorias:

  • Processa batches ou eventos individuais
  • Processamento paralelo em chunks de 50 eventos
  • Taxa de falha configurável (80% max para retry)
  • Logs detalhados de performance

3. Serviço de Reconciliação (reconciliationService.js)

Funcionalidades:

  • Execução periódica (padrão: 5 minutos)
  • Reconciliação por tabela
  • Limpeza de registros órfãos
  • Health check com score de saúde
  • Lock distribuído para evitar sobreposição

Pipeline de Atualização

Fluxo Automático

  1. Evento CRUD na tabela origem (conversations, leads, tickets)
  2. Trigger SQL envia pg_notify com payload JSON
  3. Listener Node.js agrupa eventos em batches
  4. RabbitMQ recebe batches para processamento
  5. Worker processa batches em paralelo
  6. Upsert na tabela omni_search

Normalização de Dados

Texto

  • Conversão para lowercase
  • Remoção de caracteres especiais (mantém acentos)
  • Normalização de espaços
  • Cálculo automático do TSVector:
to_tsvector('portuguese_unaccent', content)

Telefones

  • Extração apenas de dígitos
  • Validação de comprimento mínimo (8 dígitos)
  • Geração automática do reversed_phone

CPF

  • Extração apenas de dígitos
  • Validação de comprimento (11 dígitos)

Exemplo de Concatenação (conversations)

content = [
  user_name,
  main_topic,
  dialogue_summary,
  conversation_text,
  categories::text,
]
  .filter(Boolean)
  .join(" │ ");

Performance e Otimizações

Configurações por Ambiente

AmbienteBatch SizeFlush PeriodReconciliação
Desenvolvimento502000ms10 min
Staging1001500ms5 min
Produção2001000ms5 min

Métricas Esperadas

  • Eventos por segundo: 1000-5000 (dependendo do hardware)
  • Latência média: 1-3 segundos (da notificação ao índice)
  • Taxa de sucesso: >99% (com reconciliação)
  • Overhead: <1% CPU para o listener

Exemplos de Consulta

Busca FTS Simples

SELECT origin_table, origin_id,
       ts_rank(tsv, query) as rank
FROM public.omni_search,
     to_tsquery('portuguese_unaccent', 'problema & pagamento') query
WHERE tsv @@ query
ORDER BY rank DESC
LIMIT 20;

Busca Híbrida

WITH q AS (
  SELECT to_tsquery('portuguese_unaccent', plainto_tsquery('portuguese_unaccent','telefone')) as ft_query
)
SELECT s.origin_table, s.origin_id,
  ts_rank(s.tsv, q.ft_query) * 0.7
  + greatest(similarity(s.content,'telefone'), similarity(s.reversed_phone, reverse('5199'))) * 0.3
  as score
FROM public.omni_search s, q
WHERE (s.tsv @@ q.ft_query)
   OR (s.content % 'telefone')
   OR (s.reversed_phone like reverse('5199') || '%')
ORDER BY score DESC
LIMIT 30;

Monitoramento

Estatísticas do Sistema

Listener PostgreSQL

const stats = PgNotifyListener.getStats();
// Retorna: conexão, buffer, eventos processados, erros, uptime

Consumer de Batches

const stats = SearchIndexConsumer.getStats();
// Retorna: status, configurações, fila

Serviço de Reconciliação

const health = await ReconciliationService.getHealthStatus();
// Retorna: score de saúde, estatísticas de triggers, tabela

Funções SQL de Monitoramento

-- Estatísticas da tabela omni_search
SELECT * FROM omni_search_stats();

-- Status dos triggers
SELECT * FROM omni_search_trigger_stats();

Operações Manuais

// Reconciliar tabela específica
const result = await ReconciliationService.reconcileTable("conversations", 500);

// Reconciliação completa
const result = await ReconciliationService.runReconciliation();

// Flush manual do buffer
await PgNotifyListener.forceFlush();

// Configuração dinâmica
PgNotifyListener.updateConfig({
  batchSize: 300,
  flushPeriod: 500,
});

Troubleshooting

Problemas Comuns

Listener não conecta

  • Verificar credenciais PostgreSQL
  • Confirmar firewall/rede
  • Checar logs de erro

Eventos perdidos

  • Verificar triggers ativos: SELECT * FROM omni_search_trigger_stats()
  • Executar reconciliação manual
  • Verificar logs do listener

Performance baixa

  • Ajustar BATCH_SIZE e FLUSH_PERIOD
  • Verificar índices da tabela omni_search
  • Analisar logs de processamento de batches

Health Check

const health = await ReconciliationService.getHealthStatus();
if (health.health_score < 70) {
  console.log("Sistema com problemas:", health);
}

Logs Importantes

  • [PG_NOTIFY_LISTENER]: Eventos do listener PostgreSQL
  • [BATCH_PROCESSED]: Batches processados com sucesso
  • [RECONCILIATION]: Execuções de reconciliação
  • [OMNI_SEARCH_SUCCESS/FAILURE]: Eventos individuais

Integração com Triggers

Para automatizar a alimentação do search index:

-- Exemplo para tabela conversations
CREATE OR REPLACE FUNCTION notify_omni_search()
RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify('omni_search_events', json_build_object(
    'origin_table', TG_TABLE_NAME,
    'origin_id', COALESCE(NEW.id, OLD.id),
    'operation', TG_OP
  )::text);
  RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER conversations_omni_search_trigger
  AFTER INSERT OR UPDATE OR DELETE ON conversations
  FOR EACH ROW EXECUTE FUNCTION notify_omni_search();

Conclusão

O Sistema OmniSearch oferece:

  • Latência baixa: pg_notify instantâneo
  • Alta vazão: Batching eficiente
  • Escalabilidade: Processamento paralelo
  • Resiliência: Reconciliação automática
  • Flexibilidade: Múltiplos tipos de busca

A arquitetura mantém o banco PostgreSQL leve enquanto fornece processamento robusto e escalável para busca omnidirecional com suporte completo ao português brasileiro.