Java - Change Streams
Maxime Beugnet10 min read • Published Feb 01, 2022 • Updated Oct 01, 2024
Avalie esse Início rápido
- Atualizar para o Java 21
- Atualize o driver Java para 5.0.0
- Atualize
logback-classic
para 1.2.13
- Atualizar para o Java 17
- Atualize o driver Java para 4.11.1
- Atualize o mongodb-crypt para 1.8.0
- Atualize o driver Java para 4.2.2.
- Exemplo de criptografia no nível do campo do lado do cliente adicionado.
- Atualize o driver Java para 4.1.1.
- O registro do driver Java agora está ativado por meio da popular APISLF4J, então adicionei logback no
pom.xml
e um arquivo de configuraçãologback.xml
.
Os change stream foram introduzidos no MongoDB 3.6. Eles permitem que os aplicativos acessem alterações de dados em tempo real sem a complexidade e o risco de afetar o oplog.
Os aplicativos podem usar fluxos de alteração para assinar todas as alterações de dados em uma única coleção, um banco de dados ou uma implantação inteira e reagir imediatamente a elas. Como os fluxos de alterações usam a estrutura de agregação, um aplicativo também pode filtrar alterações específicas ou transformar as notificações à vontade.
Nesta publicação do blog, conforme comprometido na primeira publicação do blog desta série, mostrarei como aproveitar os change stream do MongoDB usando Java.
Usarei o mesmo repositório de sempre nesta série. Se você ainda não tiver uma cópia dele, poderá cloná-lo ou apenas atualizá-lo, caso já o tenha:
1 git clone https://github.com/mongodb-developer/java-quick-start
Se você ainda não configurou seu cluster gratuito no MongoDB Atlas, agora é um ótimo momento para fazê-lo. Você tem todas as instruções nesta postagem do blog.
Nesta publicação do blog, trabalharei no arquivo chamado
ChangeStreams.java
, mas é super fácil trabalhar com os Change Streams .Mostrarei 5 exemplos diferentes para mostrar alguns recursos do change stream. Por uma questão de simplicidade, mostrarei apenas os pedaços de código relacionados diretamente ao change stream. Você pode encontrar toda a amostra de código na parte inferior desta postagem do blog ou no Github.
Para cada exemplo, você precisará iniciar 2 programas Java na ordem correta se quiser reproduzir meus exemplos.
- O primeiro programa é sempre aquele que contém o código Change Streams.
- O segundo será um dos programas Java que já usamos nesta série de postagens de blog Java. Você pode encontrá-los no repositório do Github. Elas gerarão operações MongoDB que observaremos na saída do change stream.
Vamos começar com o change stream mais simples que podemos criar:
1 MongoCollection<Grade> grades = db.getCollection("grades", Grade.class); 2 ChangeStreamIterable<Grade> changeStream = grades.watch(); 3 changeStream.forEach((Consumer<ChangeStreamDocument<Grade>>) System.out::println);
Como você pode ver, tudo o que precisamos é de
myCollection.watch()
! É isso aí.Isso retorna um
ChangeStreamIterable
que, conforme indicado por seu nome, pode ser iterado para retornar nossos eventos de alteração. Aqui, estou iterando sobre meu Change Stream para imprimir meus documentos de eventos de alteração na saída padrão do Java.Também posso simplificar este código assim:
1 grades.watch().forEach(printEvent()); 2 3 private static Consumer<ChangeStreamDocument<Grade>> printEvent() { 4 return System.out::println; 5 }
Reutilizarei essa interface funcional nos meus exemplos a seguir para facilitar a leitura.
Para executar este exemplo:
- Remova o comentário apenas do exemplo 1 do arquivo
ChangeStreams.java
e inicie-o em seu IDE ou em um console dedicado usando Maven na raiz do seu projeto.
1 mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.ChangeStreams" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
- Inicie
MappingPOJO.java
em outro console ou no seu IDE.
1 mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.MappingPOJO" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
No MappingPOJO, estamos realizando 4 operações do MongoDB:
- Estou criando um novo documento
Grade
com o métodoinsertOne()
, - Estou procurando este documento
Grade
usando o métodofind()
, - Estou substituindo inteiramente este
Grade
usando o métodofindOneAndReplace()
, - e, finalmente, estou excluindo esse
Grade
usando o métododeleteOne()
.
Isso é confirmado na saída padrão de
MappingJava
:1 Grade inserted. 2 Grade found: Grade{id=5e2b4a28c9e9d55e3d7dbacf, student_id=10003.0, class_id=10.0, scores=[Score{type='homework', score=50.0}]} 3 Grade replaced: Grade{id=5e2b4a28c9e9d55e3d7dbacf, student_id=10003.0, class_id=10.0, scores=[Score{type='homework', score=50.0}, Score{type='exam', score=42.0}]} 4 Grade deleted: AcknowledgedDeleteResult{deletedCount=1}
Vamos verificar o que temos na saída padrão de
ChangeStreams.java
(prettified):1 ChangeStreamDocument{ 2 operationType=OperationType{ value='insert' }, 3 resumeToken={ "_data":"825E2F3E40000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F3E400C47CF19D59361620004" }, 4 namespace=sample_training.grades, 5 destinationNamespace=null, 6 fullDocument=Grade{ 7 id=5e2f3e400c47cf19d5936162, 8 student_id=10003.0, 9 class_id=10.0, 10 scores=[ Score { type='homework', score=50.0 } ] 11 }, 12 documentKey={ "_id":{ "$oid":"5e2f3e400c47cf19d5936162" } }, 13 clusterTime=Timestamp{ 14 value=6786711608069455873, 15 seconds=1580154432, 16 inc=1 17 }, 18 updateDescription=null, 19 txnNumber=null, 20 lsid=null 21 } 22 ChangeStreamDocument{ operationType=OperationType{ value= 'replace' }, 23 resumeToken={ "_data":"825E2F3E40000000032B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F3E400C47CF19D59361620004" }, 24 namespace=sample_training.grades, 25 destinationNamespace=null, 26 fullDocument=Grade{ 27 id=5e2f3e400c47cf19d5936162, 28 student_id=10003.0, 29 class_id=10.0, 30 scores=[ Score{ type='homework', score=50.0 }, Score{ type='exam', score=42.0 } ] 31 }, 32 documentKey={ "_id":{ "$oid":"5e2f3e400c47cf19d5936162" } }, 33 clusterTime=Timestamp{ 34 value=6786711608069455875, 35 seconds=1580154432, 36 inc=3 37 }, 38 updateDescription=null, 39 txnNumber=null, 40 lsid=null 41 } 42 ChangeStreamDocument{ 43 operationType=OperationType{ value='delete' }, 44 resumeToken={ "_data":"825E2F3E40000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F3E400C47CF19D59361620004" }, 45 namespace=sample_training.grades, 46 destinationNamespace=null, 47 fullDocument=null, 48 documentKey={ "_id":{ "$oid":"5e2f3e400c47cf19d5936162" } }, 49 clusterTime=Timestamp{ 50 value=6786711608069455876, 51 seconds=1580154432, 52 inc=4 53 }, 54 updateDescription=null, 55 txnNumber=null, 56 lsid=null 57 }
Como você pode ver, somente 3 operações aparecem no Change Stream:
- insert,
- substituir,
- excluir.
Isso era esperado porque a operação
find()
é apenas um documento de leitura do MongoDB. Não está mudando nada, portanto, não gerando um evento no Change Stream.Agora que terminamos o exemplo básico, vamos explorar algumas funcionalidades dos Change Streams.
Encerre o programa Change Stream que iniciamos anteriormente e vamos continuar.
Agora vamos fazer a mesma coisa, mas vamos imaginar que estamos interessados apenas em operações de inserção e exclusão.
1 List<Bson> pipeline = List.of(match(in("operationType", List.of("insert", "delete")))); 2 grades.watch(pipeline).forEach(printEvent());
Como você pode ver aqui, estou usando o recurso de pipeline de agregação do change stream para filtrar os eventos de mudança que deseja processar.
Remova o comentário do exemplo 2 em
ChangeStreams.java
e execute o programa seguido por MappingPOJO.java
, assim como fizemos anteriormente.Aqui estão os eventos de mudança que estou recebendo.
1 ChangeStreamDocument {operationType=OperationType {value= 'insert'}, 2 resumeToken= {"_data": "825E2F4983000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F4983CC1D2842BFF555640004"}, 3 namespace=sample_training.grades, 4 destinationNamespace=null, 5 fullDocument=Grade 6 { 7 id=5e2f4983cc1d2842bff55564, 8 student_id=10003.0, 9 class_id=10.0, 10 scores= [ Score {type= 'homework', score=50.0}] 11 }, 12 documentKey= {"_id": {"$oid": "5e2f4983cc1d2842bff55564" }}, 13 clusterTime=Timestamp {value=6786723990460170241, seconds=1580157315, inc=1 }, 14 updateDescription=null, 15 txnNumber=null, 16 lsid=null 17 } 18 19 ChangeStreamDocument { operationType=OperationType {value= 'delete'}, 20 resumeToken= {"_data": "825E2F4983000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F4983CC1D2842BFF555640004"}, 21 namespace=sample_training.grades, 22 destinationNamespace=null, 23 fullDocument=null, 24 documentKey= {"_id": {"$oid": "5e2f4983cc1d2842bff55564"}}, 25 clusterTime=Timestamp {value=6786723990460170244, seconds=1580157315, inc=4}, 26 updateDescription=null, 27 txnNumber=null, 28 lsid=null 29 } 30 ]
Desta vez, estou recebendo apenas os eventos 2 ,
insert
e delete
. O eventoreplace
foi filtrado em comparação com o primeiro exemplo.Igual ao anterior, desta vez estou filtrando meu change stream para manter apenas as operações de atualização.
1 List<Bson> pipeline = List.of(match(eq("operationType", "update"))); 2 grades.watch(pipeline).forEach(printEvent());
Desta vez, siga estas etapas.
- remova o comentário do exemplo 3 em
ChangeStreams.java
, - se você nunca executou
Create.java
, execute-o. Usaremos esses novos documentos na próxima etapa. - iniciar
Update.java
em outro console.
Em seu console do change stream, você deverá ver 13 eventos de atualização. Aqui está o primeiro:
1 ChangeStreamDocument {operationType=OperationType {value= 'update'}, 2 resumeToken= {"_data": "825E2FB83E000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"}, 3 namespace=sample_training.grades, 4 destinationNamespace=null, 5 fullDocument=null, 6 documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe"}}, 7 clusterTime=Timestamp {value=6786845739898109953, seconds=1580185662, inc=1}, 8 updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.10": "You will learn a lot if you read the MongoDB blog!"}}, 9 txnNumber=null, 10 lsid=null 11 }
Como você pode ver, estamos recuperando nossa operação de atualização no campo
updateDescription
, mas estamos obtendo apenas a diferença com a versão anterior desse documento.O campo
fullDocument
é null
porque, por padrão, o MongoDB apenas envia a diferença para evitar sobrecarregar o change stream com informações potencialmente inúteis.Vejamos como podemos alterar esse comportamento no próximo exemplo.
Para esta parte, descomente o exemplo 4 de
ChangeStreams.java
e execute os programas conforme acima.1 List<Bson> pipeline = List.of(match(eq("operationType", "update"))); 2 grades.watch(pipeline).fullDocument(UPDATE_LOOKUP).forEach(printEvent());
Adicionei a opção
UPDATE_LOOKUP
desta vez, para que também possamos recuperar o documento inteiro durante uma operação de atualização.Vamos ver novamente a primeira atualização no meu fluxo de alterações:
1 ChangeStreamDocument {operationType=OperationType {value= 'update'}, 2 resumeToken= {"_data": "825E2FBBC1000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"}, 3 namespace=sample_training.grades, 4 destinationNamespace=null, 5 fullDocument=Grade 6 { 7 id=5e27bcce74aa51a0486763fe, 8 student_id=10002.0, 9 class_id=10.0, 10 scores=null 11 }, 12 documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe" }}, 13 clusterTime=Timestamp {value=6786849601073709057, seconds=1580186561, inc=1 }, 14 updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.11": "You will learn a lot if you read the MongoDB blog!"}}, 15 txnNumber=null, 16 lsid=null 17 }
Observação: o programa
Update.java
atualiza um campo criado "comentários" que não existe em meu POJO Grade
, que representa o esquema original desta collection. Assim, o campo não aparece na saída porque não está mapeado.Se eu quiser ver esse campo
comments
, posso usar um MongoCollection
não mapeado automaticamente para meu POJOGrade.java
.1 MongoCollection<Document> grades = db.getCollection("grades"); 2 List<Bson> pipeline = List.of(match(eq("operationType", "update"))); 3 grades.watch(pipeline).fullDocument(UPDATE_LOOKUP).forEach((Consumer<ChangeStreamDocument<Document>>) System.out::println);
Então é isso que obtenho no meu change stream:
1 ChangeStreamDocument {operationType=OperationType {value= 'update'}, 2 resumeToken= {"_data": "825E2FBD89000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"}, 3 namespace=sample_training.grades, 4 destinationNamespace=null, 5 fullDocument=Document { 6 { 7 _id=5e27bcce74aa51a0486763fe, 8 class_id=10.0, 9 student_id=10002.0, 10 comments= [ You will learn a lot if you read the MongoDB blog!, [...], You will learn a lot if you read the MongoDB blog!] 11 } 12 }, 13 documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe"}}, 14 clusterTime=Timestamp {value=6786851559578796033, seconds=1580187017, inc=1}, 15 updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.13": "You will learn a lot if you read the MongoDB blog!"}}, 16 txnNumber=null, 17 lsid=null 18 }
Reduzi o campo
comments
para mantê-lo legível, mas ele contém 14 vezes o mesmo comentário no meu caso.O documento completo que estamos recuperando aqui durante nossa operação de atualização é o documento após a ocorrência da atualização. Leia mais sobre isso em nossa documentação.
Neste exemplo final 5, simulei um erro e estou reiniciando meu Change Stream a partir de um
resumeToken
que obtive de uma operação anterior em meu Change Stream.É importante observar que um fluxo de alterações será retomado automaticamente diante de um incidente "" . Em geral, o único motivo pelo qual um aplicativo precisa reiniciar o fluxo de alterações manualmente a partir de um token de retomada é se houver um incidente no próprio aplicativo e não no fluxo de alterações (por exemplo, um operador decidiu que o aplicativo precisa ser reiniciado).
1 private static void exampleWithResumeToken(MongoCollection<Grade> grades) { 2 List<Bson> pipeline = List.of(match(eq("operationType", "update"))); 3 ChangeStreamIterable<Grade> changeStream = grades.watch(pipeline); 4 MongoChangeStreamCursor<ChangeStreamDocument<Grade>> cursor = changeStream.cursor(); 5 System.out.println("==> Going through the stream a first time & record a resumeToken"); 6 int indexOfOperationToRestartFrom = 5; 7 int indexOfIncident = 8; 8 int counter = 0; 9 BsonDocument resumeToken = null; 10 while (cursor.hasNext() && counter != indexOfIncident) { 11 ChangeStreamDocument<Grade> event = cursor.next(); 12 if (indexOfOperationToRestartFrom == counter) { 13 resumeToken = event.getResumeToken(); 14 } 15 System.out.println(event); 16 counter++; 17 } 18 System.out.println("==> Let's imagine something wrong happened and I need to restart my Change Stream."); 19 System.out.println("==> Starting from resumeToken=" + resumeToken); 20 assert resumeToken != null; 21 grades.watch(pipeline).resumeAfter(resumeToken).forEach(printEvent()); 22 }
Para este exemplo final, o mesmo que anterior. Descomente a parte 5 (que é apenas chamar o método acima) e comece
ChangeStreams.java
depois Update.java
.Esta é a saída que você deve obter:
1 ==> Going through the stream a first time & record a resumeToken 2 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcce74aa51a0486763fe"}}, clusterTime=Timestamp{value=6786856975532556289, seconds=1580188278, inc=1}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 3 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000022B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBA0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbba"}}, clusterTime=Timestamp{value=6786856975532556290, seconds=1580188278, inc=2}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.15": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 4 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000032B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBB0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbb"}}, clusterTime=Timestamp{value=6786856975532556291, seconds=1580188278, inc=3}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 5 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBC0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbc"}}, clusterTime=Timestamp{value=6786856975532556292, seconds=1580188278, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 6 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000052B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBD0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbd"}}, clusterTime=Timestamp{value=6786856975532556293, seconds=1580188278, inc=5}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 7 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000062B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBE0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbe"}}, clusterTime=Timestamp{value=6786856975532556294, seconds=1580188278, inc=6}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 8 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000072B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBF0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbf"}}, clusterTime=Timestamp{value=6786856975532556295, seconds=1580188278, inc=7}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 9 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000082B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC00004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc0"}}, clusterTime=Timestamp{value=6786856975532556296, seconds=1580188278, inc=8}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 10 ==> Let's imagine something wrong happened and I need to restart my Change Stream. 11 ==> Starting from resumeToken={"_data": "825E2FC276000000062B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBE0004"} 12 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000072B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBF0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbf"}}, clusterTime=Timestamp{value=6786856975532556295, seconds=1580188278, inc=7}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 13 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000082B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC00004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc0"}}, clusterTime=Timestamp{value=6786856975532556296, seconds=1580188278, inc=8}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 14 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000092B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC10004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc1"}}, clusterTime=Timestamp{value=6786856975532556297, seconds=1580188278, inc=9}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 15 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000A2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC20004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc2"}}, clusterTime=Timestamp{value=6786856975532556298, seconds=1580188278, inc=10}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 16 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000B2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC30004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc3"}}, clusterTime=Timestamp{value=6786856975532556299, seconds=1580188278, inc=11}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null} 17 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000D2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC8F94B5117D894CBB90004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc8f94b5117d894cbb9"}}, clusterTime=Timestamp{value=6786856975532556301, seconds=1580188278, inc=13}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"scores.0.score": 904745.0267635228, "x": 150}}, txnNumber=null, lsid=null} 18 ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000F2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBA0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbba"}}, clusterTime=Timestamp{value=6786856975532556303, seconds=1580188278, inc=15}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"scores.0.score": 2126144.0353088505, "x": 150}}, txnNumber=null, lsid=null}
Como você pode ver aqui, consegui parar de ler meu Change Stream e, a partir do
resumeToken
que coletei anteriormente, posso iniciar um novo Change Stream a partir deste ponto no tempo.1 package com.mongodb.quickstart; 2 3 import com.mongodb.ConnectionString; 4 import com.mongodb.MongoClientSettings; 5 import com.mongodb.client.*; 6 import com.mongodb.client.model.changestream.ChangeStreamDocument; 7 import com.mongodb.quickstart.models.Grade; 8 import org.bson.BsonDocument; 9 import org.bson.codecs.configuration.CodecRegistry; 10 import org.bson.codecs.pojo.PojoCodecProvider; 11 import org.bson.conversions.Bson; 12 13 import java.util.List; 14 import java.util.function.Consumer; 15 16 import static com.mongodb.client.model.Aggregates.match; 17 import static com.mongodb.client.model.Filters.eq; 18 import static com.mongodb.client.model.Filters.in; 19 import static com.mongodb.client.model.changestream.FullDocument.UPDATE_LOOKUP; 20 import static org.bson.codecs.configuration.CodecRegistries.fromProviders; 21 import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; 22 23 public class ChangeStreams { 24 25 public static void main(String[] args) { 26 ConnectionString connectionString = new ConnectionString(System.getProperty("mongodb.uri")); 27 CodecRegistry pojoCodecRegistry = fromProviders(PojoCodecProvider.builder().automatic(true).build()); 28 CodecRegistry codecRegistry = fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), pojoCodecRegistry); 29 MongoClientSettings clientSettings = MongoClientSettings.builder() 30 .applyConnectionString(connectionString) 31 .codecRegistry(codecRegistry) 32 .build(); 33 34 try (MongoClient mongoClient = MongoClients.create(clientSettings)) { 35 MongoDatabase db = mongoClient.getDatabase("sample_training"); 36 MongoCollection<Grade> grades = db.getCollection("grades", Grade.class); 37 List<Bson> pipeline; 38 39 // Only uncomment one example at a time. Follow instructions for each individually then kill all remaining processes. 40 41 /** => Example 1: print all the write operations. 42 * => Start "ChangeStreams" then "MappingPOJOs" to see some change events. 43 */ 44 grades.watch().forEach(printEvent()); 45 46 /** => Example 2: print only insert and delete operations. 47 * => Start "ChangeStreams" then "MappingPOJOs" to see some change events. 48 */ 49 // pipeline = List.of(match(in("operationType", List.of("insert", "delete")))); 50 // grades.watch(pipeline).forEach(printEvent()); 51 52 /** => Example 3: print only updates without fullDocument. 53 * => Start "ChangeStreams" then "Update" to see some change events (start "Create" before if not done earlier). 54 */ 55 // pipeline = List.of(match(eq("operationType", "update"))); 56 // grades.watch(pipeline).forEach(printEvent()); 57 58 /** => Example 4: print only updates with fullDocument. 59 * => Start "ChangeStreams" then "Update" to see some change events. 60 */ 61 // pipeline = List.of(match(eq("operationType", "update"))); 62 // grades.watch(pipeline).fullDocument(UPDATE_LOOKUP).forEach(printEvent()); 63 64 /** 65 * => Example 5: iterating using a cursor and a while loop + remembering a resumeToken then restart the Change Streams. 66 * => Start "ChangeStreams" then "Update" to see some change events. 67 */ 68 // exampleWithResumeToken(grades); 69 } 70 } 71 72 private static void exampleWithResumeToken(MongoCollection<Grade> grades) { 73 List<Bson> pipeline = List.of(match(eq("operationType", "update"))); 74 ChangeStreamIterable<Grade> changeStream = grades.watch(pipeline); 75 MongoChangeStreamCursor<ChangeStreamDocument<Grade>> cursor = changeStream.cursor(); 76 System.out.println("==> Going through the stream a first time & record a resumeToken"); 77 int indexOfOperationToRestartFrom = 5; 78 int indexOfIncident = 8; 79 int counter = 0; 80 BsonDocument resumeToken = null; 81 while (cursor.hasNext() && counter != indexOfIncident) { 82 ChangeStreamDocument<Grade> event = cursor.next(); 83 if (indexOfOperationToRestartFrom == counter) { 84 resumeToken = event.getResumeToken(); 85 } 86 System.out.println(event); 87 counter++; 88 } 89 System.out.println("==> Let's imagine something wrong happened and I need to restart my Change Stream."); 90 System.out.println("==> Starting from resumeToken=" + resumeToken); 91 assert resumeToken != null; 92 grades.watch(pipeline).resumeAfter(resumeToken).forEach(printEvent()); 93 } 94 95 private static Consumer<ChangeStreamDocument<Grade>> printEvent() { 96 return System.out::println; 97 } 98 }
Lembre-se de descomentar apenas um exemplo de Change Stream por vez.
Os Change Streams são muito fáceis de usar e configurar no MongoDB. Eles são a chave para qualquer sistema de processamento em tempo real.
O único problema restante aqui é como colocar isso em produção corretamente. Change Streams são principalmente um loop infinito, processando um fluxo infinito de eventos. O multiprocessamento é, obviamente, obrigatório para esse tipo de configuração, especialmente se o tempo de processamento for maior do que o tempo que separa 2 eventos.
Se você quiser aprender mais e afundar seu conhecimento com mais rapidez, recomendamos que você confira o treinamento Caminho do programador Java do MongoDB disponível gratuitamente na MongoDB University.
Na próxima publicação no blog, mostrarei as transações ACID multidocumento em Java.