Menu Docs
Página inicial do Docs
/ / /
Scala
/

Fluxos de alterações

Nesta página

  • Pré-requisitos
  • Conecte-se a um MongoDB deployment
  • Fique atento às mudanças em uma coleção
  • Fique atento às mudanças em um banco de dados
  • Fique atento às alterações em todos os bancos de dados
  • Filtrando conteúdo

MongoDB Server versão 3.6 introduz o operador de pipeline de agregação do $changeStream.

Os change streams fornecem uma maneira de observar as alterações nos documentos de uma collection. Para melhorar a usabilidade desta nova etapa, o tipo MongoCollection inclui o método watch() . A instância ChangeStreamObservable configura o change stream e tenta retomar automaticamente se encontrar um erro potencialmente recuperável.

Você deve configurar os seguintes componentes para executar os exemplos de código neste guia:

  • Uma test.restaurants coleção preenchida com documentos do restaurants.json arquivo nos ativos de documentação do Github.

  • As seguintes declarações de importação:

import java.util.concurrent.CountDownLatch
import org.mongodb.scala._
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.changestream._

Observação

Este guia usa as implicações do Observable como abordadas no Quick Start Primary.

Primeiro, conecte a um MongoDB deployment e, em seguida, declare e defina as instâncias MongoDatabase e MongoCollection .

O código a seguir se conecta a uma MongoDB deployment standalone em execução em localhost na porta 27017. Em seguida, define a variável database para fazer referência ao banco de dados test e a variável collection para fazer referência à coleção restaurants :

val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("test")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

Para saber mais sobre como se conectar a sistemas do MongoDB, consulte o tutorial Conectar ao MongoDB .

Para criar um change stream, use um dos métodos MongoCollection.watch() .

No exemplo a seguir , o fluxo de alterações imprime todas as alterações observadas:

case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {
val latch = new CountDownLatch(1)
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)
override def onError(throwable: Throwable): Unit = {
println(s"Error: '$throwable")
latch.countDown()
}
override def onComplete(): Unit = latch.countDown()
def await(): Unit = latch.await()
}
val observer = LatchedObserver()
collection.watch().subscribe(observer)
observer.await() // Block waiting for the latch

Os aplicativos podem abrir um único change stream para observar todas as collections que não são do sistema de um banco de dados de dados. Para criar esse change stream, use um dos métodos MongoDatabase.watch() .

No exemplo a seguir, o fluxo de alterações imprime todas as alterações que ele observa no banco de dados fornecido:

val observer = LatchedObserver()
database.watch().subscribe(observer)
observer.await() // Block waiting for the latch

Os aplicativos podem abrir um único change stream para observar todas as collections que não são do sistema de todos os bancos de dados em um MongoDB deployment. Para criar esse change stream, use um dos métodos MongoClient.watch() .

No exemplo a seguir, o change stream imprime todas as alterações que observa no sistema ao qual o MongoClient está conectado:

val observer = LatchedObserver()
client.watch().subscribe(observer)
observer.await() // Block waiting for the latch

Você pode passar uma lista de estágios de agregação para o método watch() para modificar os dados retornados pelo operador $changeStream .

Observação

Nem todos os operadores de agregação são suportados. Consulte Change Streams no manual do servidor MongoDB para saber mais.

No exemplo a seguir , o change stream imprime todas as alterações que observa correspondentes às operações insert, update, replace e delete .

Primeiro, o pipeline inclui um estágio $match para filtrar documentos em que o operationType é um insert, update, replace ou delete. Em seguida, ele define fullDocument como FullDocument.UPDATE_LOOKUP, para que o documento após a atualização seja incluído nos resultados:

val observer = LatchedObserver()
collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete")))))
.fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer)
observer.await() // Block waiting for the latch

Voltar

Framework de aggregation