Usando Change Streams do MongoDB em Java
Rajesh S Nair6 min read • Published Apr 04, 2023 • Updated Aug 28, 2024
Avalie esse Artigo
O MongoDB percorreu um longo caminho desde que era um mecanismo de banco de dados desenvolvido na empresa de Internet DoubleClick até se tornar o principal armazenamento de dados NoSQL que atende a grandes clientes de vários domínios.
Com o crescimento do mecanismo de banco de dados, o MongoDB continua adicionando novos recursos e melhorias em seu produto de banco de dados, o que o torna o banco de dados NoSQL preferido para novos requisitos e desenvolvimentos de produtos.
Um desses recursos adicionados ao kit de ferramentas do MongoDB são os fluxos de alterações, que foram adicionados com o MongoDB 3.6 lançamento. Antes da versão 3.6, manter um cursor de cauda aberto foi usado para executar uma funcionalidade semelhante. Change streams são um recurso que permite o streaming em tempo real de alterações de eventos de dados no banco de dados.
O streaming de dados orientado a eventos é um requisito crítico em muitos casos de uso de desenvolvimentos de produtos/recursos implementados atualmente. Muitos aplicativos desenvolvidos hoje exigem que as alterações nos dados de uma fonte de dados precisem se propagar para outra fonte em tempo real. Eles também podem exigir que o aplicativo execute determinadas ações quando ocorre uma alteração nos dados da fonte de dados. O registro em log é um desses casos de uso em que o aplicativo pode precisar coletar, processar e transmitir logs em tempo real e, portanto, exigiria uma ferramenta ou plataforma de streaming, como change stream, para implementá-lo.
Como a palavra indica, os change streams são o recurso do MongoDB que captura a "alteração" e a "transmite" para a fonte de dados de destino desejada.
É uma API que permite ao usuário assinar seu aplicativo para qualquer alteração na collection, banco de dados ou até mesmo em todo o sistema. Não há nenhuma ação de middleware ou pesquisa de dados a ser iniciada pelo usuário para aproveitar esse recurso de captura de dados em tempo real e orientada por eventos.
O MongoDB usa a replicação como tecnologia subjacente para change streams usando os registros de operações gerados para a replicação de dados entre os nós de réplicas.
O oplog é uma coleção especial limitada que registra todas as operações que modificam os dados armazenados nos bancos de dados. Quanto maior o oplog, mais operações podem ser registradas nele. Usar o oplog para change stream garante que o change stream seja acionado na mesma ordem em que foi aplicado ao banco de dados.
Conforme o fluxo acima, quando há uma operação CRUD no MongoDB database, o oplog a captura, e esses arquivos oplog são usados pelo MongoDB para transmitir essas alterações para aplicativos/recipientes de dados em tempo real.
Se compararmos as tecnologias MongoDB e Kafka, ambas se enquadrariam em blocos completamente separados. O MongoDB é classificado como um banco de dados NoSQL, que pode armazenar estruturas de documentos semelhantes a JSON. O Kafka é uma plataforma de streaming de eventos para feeds de dados em tempo real. Ele é usado principalmente como um serviço de mensagens no modelo editor-assinante que fornece um sistema de registro de mensagens replicado para transmitir dados de uma fonte para outra.
O Kafka ajuda a consumir enormes conjuntos de dados de fontes de dados desejadas, filtrar/agregar esses dados e enviá-los para a fonte de dados destinada de forma confiável e eficiente. Embora o MongoDB seja um sistema de banco de dados e seu caso de uso esteja a quilômetros de distância de um sistema de mensagens como o Kafka, o recurso Change Streams fornece funcionalidades semelhantes às do Kafka.
Fundamentalmente, os change stream atuam como um serviço de mensagens para transmitir dados em tempo real de qualquer collection do seu MongoDB database. Ele ajuda você a agregar/filtrar esses dados e armazená-los de volta na mesma fonte de dados do MongoDB database. Em resumo, se você tiver um caso de uso limitado que não exija uma solução geral, mas seja limitado à sua fonte de dados (MongoDB), poderá prosseguir com o change stream como sua solução de streaming. Ainda assim, se você quiser envolver diferentes fontes de dados fora do MongoDB e quiser uma solução generalizada para conjuntos de dados de mensagens, o Kafka faria mais sentido.
Ao usar change stream, você não precisa de uma licença ou servidor separado para hospedar seu serviço de mensagens. Ao contrário do Kafka, você obteria o melhor dos dois mundos, que é um ótimo banco de dados e um sistema de mensagens eficiente.
O MongoDB fornece conectores Kafka que podem ser usados para ler dados dentro e fora de tópicos do Kafka em tempo real, mas se seu caso de uso não for grande o suficiente para gastar no Kafka, os change streams podem ser a substituição perfeita para o streaming de seus dados.
Além disso, os conectores Kafka usam change streams ocultos, portanto, você teria que criar sua configuração Kafka configurando serviços de connector e iniciando conectores de origem e coletor para o MongoDB. No caso de change streams, você simplesmente observa as alterações que deseja na coleção, sem nenhuma configuração de pré-requisito.
Change stream, uma vez abertos para uma collection, atuam como um mecanismo de monitoramento de eventos em seu banco de dados/collection ou, em alguns casos, documentos em seu banco de dados.
A funcionalidade principal está em ajudá-lo a "observar" as alterações em uma entidade. O trabalho de background necessário para esse mecanismo de streaming de alterações é implementado por uma funcionalidade já disponível no MongoDB, que é o oplog.
Embora tenha a desvantagem de bloquear os recursos do sistema, esse monitoramento de eventos para sua collection de fontes tem casos de uso em muitos cenários críticos para os negócios, como a captura de entradas de log de dados de aplicativos ou o monitoramento de alterações de inventário para uma loja virtual de comércio eletrônico, e assim por diante. Portanto, é importante ajustar o change stream com o caso de uso correto.
Como o oplog é o driver de todo o mecanismo de change stream, um ambiente replicado de pelo menos um único nó é o primeiro pré-requisito para o uso de change streams. Você também precisará do seguinte:
- Inicie o fluxo de alteração para a coleção/banco de dados pretendido.
- Ter os recursos de CPU necessários para o cluster.
Em vez de configurar um cluster auto-hospedado para atender à lista de verificação acima, há sempre uma opção para usar a solução hospedada baseada na nuvem, MongoDB Atlas. Usando o Atlas, você pode obter uma configuração pronta para uso com apenas alguns cliques. Como os change streams consomem muitos recursos, o fator custo deve ser lembrado ao disparar uma instância no Atlas para o streaming de dados.
No mundo atual do desenvolvimento de back-end, os streams são um tópico importante, pois ajudam os desenvolvedores a ter um pipeline sistemático para processar os dados persistentes usados em seus aplicativos. O streaming de dados ajuda a gerar relatórios, ter um mecanismo de notificação para determinados critérios ou, em alguns casos, alterar algum esquema com base nos eventos recebidos por meio de fluxos.
Aqui, demonstrarei como implementar um change stream para um aplicativo Java Spring.
Depois que o pré-requisito para habilitar o change stream for concluído, as etapas no nível do banco de dados estarão quase concluídas. Agora você precisará escolher a collection na qual deseja habilitar os change stream.
Vamos considerar que você tem um aplicativo Java Spring para um site de e-commerce e tem uma coleção chamada
e_products
, que contém informações sobre o produto que está sendo vendido no site.Para simplificar, os campos da collection podem ser:
1 {"_id" , "productName", "productDescription" , "price" , "colors" , "sizes"}
Agora, esses campos são preenchidos a partir da sua coleção por meio da API Java para mostrar as informações do produto em seu site quando um produto é pesquisado ou clicado.
Agora, digamos que existe outra coleção,
vendor_products
, que contém dados de outra fonte (por exemplo, outro fornecedor de produtos). Nesse caso, ele contém alguns dos produtos em seu e_products
, mas com mais tamanhos e opções de cores.Você deseja que seu aplicativo seja sincronizado com o tamanho e a cor disponíveis mais recentes para cada produto. O change stream podem ajudá-lo a fazer exatamente isso. Eles podem ser habilitados em sua collection
vendor_products
para observar qualquer novo produto inserido e, em seguida, para cada um dos eventos de inserção, você pode ter alguma lógica para adicionar as cores/tamanhos à sua collectione_products
usada pelo seu aplicativo.Você pode criar um aplicativo de microsserviço especificamente para esse caso de uso. Ao usar um microsserviço dedicado, você pode alocar CPU/memória suficiente para que o aplicativo tenha um thread para assistir em sua
vendor_products
collection. A classe de configuração em seu aplicativo Spring teria o seguinte código para iniciar o relógio:1 2 public void runChangeStreamConfig() throws InterruptedException { 3 CodecRegistry pojoCodecRegistry = fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), 4 fromProviders(PojoCodecProvider.builder().automatic(true).build())); 5 MongoCollection<VendorProducts> vendorCollection = mongoTemplate.getDb().withCodecRegistry(pojoCodecRegistry).getCollection("vendor_products", VendorProducts.class); 6 List<Bson> pipeline = singletonList(match(eq("operationType", "insert"))); 7 oldEcomFieldsCollection.watch(pipeline).forEach(s -> 8 mergeFieldsVendorToProducts(s.getDocumentKey().get("_id").asString().getValue()) 9 ); 10 }
No trecho de código acima, você pode ver como a coleção é selecionada para ser monitorada e que o tipo de operação monitorada é "inserir". Isso só verificará novos produtos adicionados a esta collection. Se necessário, também podemos fazer o monitoramento para "atualizar" ou "excluir".
Uma vez que isso esteja em vigor, sempre que um novo produto for adicionado ao
vendor_products
, esse método será invocado e o _id
desse produto será passado para mergeFieldsVendorToProducts()
método em que você pode escrever sua lógica para mesclar as várias propriedades de vendor_products
para a coleçãoe_products
.1 forEach(s -> 2 { 3 Query query = new Query(); 4 query.addCriteria(Criteria.where("_id").is(s.get("_id"))); 5 Update update = new Update(); 6 update.set(field, s.get(field)); 7 mongoTemplate.updateFirst(query, update, EProducts.class); 8 })
Este é um caso de uso pequeno para change streams; há muitos exemplos em que os change streams podem ser úteis. É uma questão de usar essa ferramenta para o caso de uso correto.
Concluindo, o change stream no MongoDB fornece uma maneira poderosa e flexível de monitorar alterações em seu banco de dados em tempo real. Se você precisa React às alterações à medida que elas acontecem, sincronizar dados em vários sistemas ou criar fluxos de trabalho personalizados orientados a eventos, os change stream podem ajudá-lo a atingir esses objetivos com facilidade.
Ao aproveitar o poder dos fluxos de mudança, você pode melhorar a capacidade de resposta e a eficiência de seus aplicativos, reduzir o risco de inconsistências de dados e obter insights mais profundos sobre o comportamento do seu banco de dados.
Embora haja uma curva de aprendizado ao trabalhar com change streams, o MongoDB fornece documentaçãoabrangente e uma série de exemplos para ajudá-lo a começar. Com um pouco de prática, você pode aproveitar todo o potencial dos change streams e criar aplicativos mais robustos, escaláveis e resilientes.