Sistema OmniSearch
Sistema OmniSearch - Documentação Completa
O Sistema OmniSearch é um repositório central de busca híbrida que oferece indexação e consulta avançada sobre as tabelas conversations
, leads
e tickets
. Utiliza PostgreSQL Full-Text Search (FTS), trigram similarity e busca numérica em uma arquitetura otimizada com pg_notify
.
Características Principais
- Busca Híbrida: FTS, trigram, e busca numérica
- Indexação Automática: Triggers SQL com
pg_notify
- Processamento em Batches: Alta vazão e baixa latência
- Reconciliação Automática: Sistema de fallback para garantir consistência
- Normalização Inteligente: Suporte a português com unaccent e stemming
Endpoints da API
Eventos de CRUD
POST /search-index/events
Recebe eventos de modificação das tabelas origem.
Request Body:
{
"origin_table": "conversations|leads|tickets",
"origin_id": "uuid",
"operation": "INSERT|UPDATE|DELETE"
}
GET /search-index/events/health
Verifica saúde do serviço de eventos.
Endpoints de Busca
POST /search-index/search/fts
Busca Full-Text Search simples.
Request Body:
{
"query": "texto para buscar",
"limit": 20,
"offset": 0
}
POST /search-index/search/hybrid
Busca híbrida (FTS + trigram + numérica).
Request Body:
{
"query": "texto ou número",
"limit": 30,
"offset": 0
}
POST /search-index/search/unified
Busca unificada com múltiplos tipos.
Request Body:
{
"query": "texto",
"types": ["fts", "hybrid", "similarity"],
"options": {
"fts": { "limit": 10 },
"hybrid": { "limit": 15 },
"similarity": { "threshold": 0.4, "limit": 10 }
}
}
POST /search-index/search/phone
Busca específica por telefone.
Request Body:
{
"phone": "11999999999",
"search_type": "exact|suffix|partial"
}
Endpoints de Administração
GET /search-index/admin/stats
Estatísticas do search index.
POST /search-index/admin/reindex
Reindexação de tabelas.
GET /search-index/admin/health
Health check completo do sistema.
Componentes do Sistema
1. PostgreSQL Notify Listener (pgNotifyListener.js
)
Características:
- Escuta canal
omni_search_events
- Batching configurável (padrão: 200 eventos ou 1000ms)
- Reconexão automática em caso de falha
- Estatísticas detalhadas de performance
Configurações:
{
BATCH_SIZE: 200, // eventos por batch
FLUSH_PERIOD: 1000, // ms para flush automático
MAX_RECONNECT_ATTEMPTS: 5 // tentativas de reconexão
}
2. Search Index Consumer (searchIndexConsumer.js
)
Melhorias:
- Processa batches ou eventos individuais
- Processamento paralelo em chunks de 50 eventos
- Taxa de falha configurável (80% max para retry)
- Logs detalhados de performance
3. Serviço de Reconciliação (reconciliationService.js
)
Funcionalidades:
- Execução periódica (padrão: 5 minutos)
- Reconciliação por tabela
- Limpeza de registros órfãos
- Health check com score de saúde
- Lock distribuído para evitar sobreposição
Pipeline de Atualização
Fluxo Automático
- Evento CRUD na tabela origem (
conversations
,leads
,tickets
) - Trigger SQL envia
pg_notify
com payload JSON - Listener Node.js agrupa eventos em batches
- RabbitMQ recebe batches para processamento
- Worker processa batches em paralelo
- Upsert na tabela
omni_search
Normalização de Dados
Texto
- Conversão para lowercase
- Remoção de caracteres especiais (mantém acentos)
- Normalização de espaços
- Cálculo automático do TSVector:
to_tsvector('portuguese_unaccent', content)
Telefones
- Extração apenas de dígitos
- Validação de comprimento mínimo (8 dígitos)
- Geração automática do
reversed_phone
CPF
- Extração apenas de dígitos
- Validação de comprimento (11 dígitos)
Exemplo de Concatenação (conversations)
content = [
user_name,
main_topic,
dialogue_summary,
conversation_text,
categories::text,
]
.filter(Boolean)
.join(" │ ");
Performance e Otimizações
Configurações por Ambiente
Ambiente | Batch Size | Flush Period | Reconciliação |
---|---|---|---|
Desenvolvimento | 50 | 2000ms | 10 min |
Staging | 100 | 1500ms | 5 min |
Produção | 200 | 1000ms | 5 min |
Métricas Esperadas
- Eventos por segundo: 1000-5000 (dependendo do hardware)
- Latência média: 1-3 segundos (da notificação ao índice)
- Taxa de sucesso: >99% (com reconciliação)
- Overhead: <1% CPU para o listener
Exemplos de Consulta
Busca FTS Simples
SELECT origin_table, origin_id,
ts_rank(tsv, query) as rank
FROM public.omni_search,
to_tsquery('portuguese_unaccent', 'problema & pagamento') query
WHERE tsv @@ query
ORDER BY rank DESC
LIMIT 20;
Busca Híbrida
WITH q AS (
SELECT to_tsquery('portuguese_unaccent', plainto_tsquery('portuguese_unaccent','telefone')) as ft_query
)
SELECT s.origin_table, s.origin_id,
ts_rank(s.tsv, q.ft_query) * 0.7
+ greatest(similarity(s.content,'telefone'), similarity(s.reversed_phone, reverse('5199'))) * 0.3
as score
FROM public.omni_search s, q
WHERE (s.tsv @@ q.ft_query)
OR (s.content % 'telefone')
OR (s.reversed_phone like reverse('5199') || '%')
ORDER BY score DESC
LIMIT 30;
Monitoramento
Estatísticas do Sistema
Listener PostgreSQL
const stats = PgNotifyListener.getStats();
// Retorna: conexão, buffer, eventos processados, erros, uptime
Consumer de Batches
const stats = SearchIndexConsumer.getStats();
// Retorna: status, configurações, fila
Serviço de Reconciliação
const health = await ReconciliationService.getHealthStatus();
// Retorna: score de saúde, estatísticas de triggers, tabela
Funções SQL de Monitoramento
-- Estatísticas da tabela omni_search
SELECT * FROM omni_search_stats();
-- Status dos triggers
SELECT * FROM omni_search_trigger_stats();
Operações Manuais
// Reconciliar tabela específica
const result = await ReconciliationService.reconcileTable("conversations", 500);
// Reconciliação completa
const result = await ReconciliationService.runReconciliation();
// Flush manual do buffer
await PgNotifyListener.forceFlush();
// Configuração dinâmica
PgNotifyListener.updateConfig({
batchSize: 300,
flushPeriod: 500,
});
Troubleshooting
Problemas Comuns
Listener não conecta
- Verificar credenciais PostgreSQL
- Confirmar firewall/rede
- Checar logs de erro
Eventos perdidos
- Verificar triggers ativos:
SELECT * FROM omni_search_trigger_stats()
- Executar reconciliação manual
- Verificar logs do listener
Performance baixa
- Ajustar
BATCH_SIZE
eFLUSH_PERIOD
- Verificar índices da tabela
omni_search
- Analisar logs de processamento de batches
Health Check
const health = await ReconciliationService.getHealthStatus();
if (health.health_score < 70) {
console.log("Sistema com problemas:", health);
}
Logs Importantes
[PG_NOTIFY_LISTENER]
: Eventos do listener PostgreSQL[BATCH_PROCESSED]
: Batches processados com sucesso[RECONCILIATION]
: Execuções de reconciliação[OMNI_SEARCH_SUCCESS/FAILURE]
: Eventos individuais
Integração com Triggers
Para automatizar a alimentação do search index:
-- Exemplo para tabela conversations
CREATE OR REPLACE FUNCTION notify_omni_search()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('omni_search_events', json_build_object(
'origin_table', TG_TABLE_NAME,
'origin_id', COALESCE(NEW.id, OLD.id),
'operation', TG_OP
)::text);
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER conversations_omni_search_trigger
AFTER INSERT OR UPDATE OR DELETE ON conversations
FOR EACH ROW EXECUTE FUNCTION notify_omni_search();
Conclusão
O Sistema OmniSearch oferece:
- Latência baixa: pg_notify instantâneo
- Alta vazão: Batching eficiente
- Escalabilidade: Processamento paralelo
- Resiliência: Reconciliação automática
- Flexibilidade: Múltiplos tipos de busca
A arquitetura mantém o banco PostgreSQL leve enquanto fornece processamento robusto e escalável para busca omnidirecional com suporte completo ao português brasileiro.