Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Idiomaschevron-right
Javachevron-right

Java - Change Streams

Maxime Beugnet10 min read • Published Feb 01, 2022 • Updated Oct 01, 2024
MongoDBFluxos de alteraçõesJava
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Início rápido
star-empty
star-empty
star-empty
star-empty
star-empty

Atualizações

O repositório de início rápido do MongoDB Java está disponível no GitHub.

28 de fevereiro de 2024

  • Atualizar para o Java 21
  • Atualize o driver Java para 5.0.0
  • Atualize logback-classic para 1.2.13

14 de novembro de 2023

  • Atualizar para o Java 17
  • Atualize o driver Java para 4.11.1
  • Atualize o mongodb-crypt para 1.8.0

25 de março de 2021

  • Atualize o driver Java para 4.2.2.
  • Exemplo de criptografia no nível do campo do lado do cliente adicionado.

21 de outubro de 2020

  • Atualize o driver Java para 4.1.1.
  • O registro do driver Java agora está ativado por meio da popular APISLF4J, então adicionei logback no pom.xml e um arquivo de configuração logback.xml.

Introdução

Emblema do Java
Os change stream foram introduzidos no MongoDB 3.6. Eles permitem que os aplicativos acessem alterações de dados em tempo real sem a complexidade e o risco de afetar o oplog.
Os aplicativos podem usar fluxos de alteração para assinar todas as alterações de dados em uma única coleção, um banco de dados ou uma implantação inteira e reagir imediatamente a elas. Como os fluxos de alterações usam a estrutura de agregação, um aplicativo também pode filtrar alterações específicas ou transformar as notificações à vontade.
Nesta publicação do blog, conforme comprometido na primeira publicação do blog desta série, mostrarei como aproveitar os change stream do MongoDB usando Java.

Configuração

Usarei o mesmo repositório de sempre nesta série. Se você ainda não tiver uma cópia dele, poderá cloná-lo ou apenas atualizá-lo, caso já o tenha:
1git clone https://github.com/mongodb-developer/java-quick-start
Se você ainda não configurou seu cluster gratuito no MongoDB Atlas, agora é um ótimo momento para fazê-lo. Você tem todas as instruções nesta postagem do blog.

Fluxos de alterações

Nesta publicação do blog, trabalharei no arquivo chamado ChangeStreams.java, mas é super fácil trabalhar com os Change Streams .
Mostrarei 5 exemplos diferentes para mostrar alguns recursos do change stream. Por uma questão de simplicidade, mostrarei apenas os pedaços de código relacionados diretamente ao change stream. Você pode encontrar toda a amostra de código na parte inferior desta postagem do blog ou no Github.
Para cada exemplo, você precisará iniciar 2 programas Java na ordem correta se quiser reproduzir meus exemplos.
  • O primeiro programa é sempre aquele que contém o código Change Streams.
  • O segundo será um dos programas Java que já usamos nesta série de postagens de blog Java. Você pode encontrá-los no repositório do Github. Elas gerarão operações MongoDB que observaremos na saída do change stream.

Um simples Change Streams sem filtros

Vamos começar com o change stream mais simples que podemos criar:
1MongoCollection<Grade> grades = db.getCollection("grades", Grade.class);
2ChangeStreamIterable<Grade> changeStream = grades.watch();
3changeStream.forEach((Consumer<ChangeStreamDocument<Grade>>) System.out::println);
Como você pode ver, tudo o que precisamos é de myCollection.watch()! É isso aí.
Isso retorna um ChangeStreamIterable que, conforme indicado por seu nome, pode ser iterado para retornar nossos eventos de alteração. Aqui, estou iterando sobre meu Change Stream para imprimir meus documentos de eventos de alteração na saída padrão do Java.
Também posso simplificar este código assim:
1grades.watch().forEach(printEvent());
2
3private static Consumer<ChangeStreamDocument<Grade>> printEvent() {
4 return System.out::println;
5}
Reutilizarei essa interface funcional nos meus exemplos a seguir para facilitar a leitura.
Para executar este exemplo:
  • Remova o comentário apenas do exemplo 1 do arquivoChangeStreams.java e inicie-o em seu IDE ou em um console dedicado usando Maven na raiz do seu projeto.
1mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.ChangeStreams" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
  • Inicie MappingPOJO.java em outro console ou no seu IDE.
1mvn compile exec:java -Dexec.mainClass="com.mongodb.quickstart.MappingPOJO" -Dmongodb.uri="mongodb+srv://USERNAME:PASSWORD@cluster0-abcde.mongodb.net/test?w=majority"
No MappingPOJO, estamos realizando 4 operações do MongoDB:
  • Estou criando um novo documento Gradecom o métodoinsertOne(),
  • Estou procurando este documentoGrade usando o métodofind(),
  • Estou substituindo inteiramente este Grade usando o métodofindOneAndReplace(),
  • e, finalmente, estou excluindo esse Grade usando o métododeleteOne().
Isso é confirmado na saída padrão de MappingJava:
1Grade inserted.
2Grade found: Grade{id=5e2b4a28c9e9d55e3d7dbacf, student_id=10003.0, class_id=10.0, scores=[Score{type='homework', score=50.0}]}
3Grade replaced: Grade{id=5e2b4a28c9e9d55e3d7dbacf, student_id=10003.0, class_id=10.0, scores=[Score{type='homework', score=50.0}, Score{type='exam', score=42.0}]}
4Grade deleted: AcknowledgedDeleteResult{deletedCount=1}
Vamos verificar o que temos na saída padrão de ChangeStreams.java (prettified):
1ChangeStreamDocument{
2 operationType=OperationType{ value='insert' },
3 resumeToken={ "_data":"825E2F3E40000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F3E400C47CF19D59361620004" },
4 namespace=sample_training.grades,
5 destinationNamespace=null,
6 fullDocument=Grade{
7 id=5e2f3e400c47cf19d5936162,
8 student_id=10003.0,
9 class_id=10.0,
10 scores=[ Score { type='homework', score=50.0 } ]
11 },
12 documentKey={ "_id":{ "$oid":"5e2f3e400c47cf19d5936162" } },
13 clusterTime=Timestamp{
14 value=6786711608069455873,
15 seconds=1580154432,
16 inc=1
17 },
18 updateDescription=null,
19 txnNumber=null,
20 lsid=null
21}
22ChangeStreamDocument{ operationType=OperationType{ value= 'replace' },
23 resumeToken={ "_data":"825E2F3E40000000032B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F3E400C47CF19D59361620004" },
24 namespace=sample_training.grades,
25 destinationNamespace=null,
26 fullDocument=Grade{
27 id=5e2f3e400c47cf19d5936162,
28 student_id=10003.0,
29 class_id=10.0,
30 scores=[ Score{ type='homework', score=50.0 }, Score{ type='exam', score=42.0 } ]
31 },
32 documentKey={ "_id":{ "$oid":"5e2f3e400c47cf19d5936162" } },
33 clusterTime=Timestamp{
34 value=6786711608069455875,
35 seconds=1580154432,
36 inc=3
37 },
38 updateDescription=null,
39 txnNumber=null,
40 lsid=null
41}
42ChangeStreamDocument{
43 operationType=OperationType{ value='delete' },
44 resumeToken={ "_data":"825E2F3E40000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F3E400C47CF19D59361620004" },
45 namespace=sample_training.grades,
46 destinationNamespace=null,
47 fullDocument=null,
48 documentKey={ "_id":{ "$oid":"5e2f3e400c47cf19d5936162" } },
49 clusterTime=Timestamp{
50 value=6786711608069455876,
51 seconds=1580154432,
52 inc=4
53 },
54 updateDescription=null,
55 txnNumber=null,
56 lsid=null
57}
Como você pode ver, somente 3 operações aparecem no Change Stream:
  • insert,
  • substituir,
  • excluir.
Isso era esperado porque a operaçãofind() é apenas um documento de leitura do MongoDB. Não está mudando nada, portanto, não gerando um evento no Change Stream.
Agora que terminamos o exemplo básico, vamos explorar algumas funcionalidades dos Change Streams.
Encerre o programa Change Stream que iniciamos anteriormente e vamos continuar.

Um simples Change Stream filtrando pelo tipo de operação

Agora vamos fazer a mesma coisa, mas vamos imaginar que estamos interessados apenas em operações de inserção e exclusão.
1List<Bson> pipeline = List.of(match(in("operationType", List.of("insert", "delete"))));
2grades.watch(pipeline).forEach(printEvent());
Como você pode ver aqui, estou usando o recurso de pipeline de agregação do change stream para filtrar os eventos de mudança que deseja processar.
Remova o comentário do exemplo 2 em ChangeStreams.java e execute o programa seguido por MappingPOJO.java, assim como fizemos anteriormente.
Aqui estão os eventos de mudança que estou recebendo.
1ChangeStreamDocument {operationType=OperationType {value= 'insert'},
2 resumeToken= {"_data": "825E2F4983000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F4983CC1D2842BFF555640004"},
3 namespace=sample_training.grades,
4 destinationNamespace=null,
5 fullDocument=Grade
6 {
7 id=5e2f4983cc1d2842bff55564,
8 student_id=10003.0,
9 class_id=10.0,
10 scores= [ Score {type= 'homework', score=50.0}]
11 },
12 documentKey= {"_id": {"$oid": "5e2f4983cc1d2842bff55564" }},
13 clusterTime=Timestamp {value=6786723990460170241, seconds=1580157315, inc=1 },
14 updateDescription=null,
15 txnNumber=null,
16 lsid=null
17}
18
19ChangeStreamDocument { operationType=OperationType {value= 'delete'},
20 resumeToken= {"_data": "825E2F4983000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E2F4983CC1D2842BFF555640004"},
21 namespace=sample_training.grades,
22 destinationNamespace=null,
23 fullDocument=null,
24 documentKey= {"_id": {"$oid": "5e2f4983cc1d2842bff55564"}},
25 clusterTime=Timestamp {value=6786723990460170244, seconds=1580157315, inc=4},
26 updateDescription=null,
27 txnNumber=null,
28 lsid=null
29 }
30]
Desta vez, estou recebendo apenas os eventos 2 , insert e delete. O eventoreplace foi filtrado em comparação com o primeiro exemplo.

Alterar o comportamento padrão do Stream com operações de atualização

Igual ao anterior, desta vez estou filtrando meu change stream para manter apenas as operações de atualização.
1List<Bson> pipeline = List.of(match(eq("operationType", "update")));
2grades.watch(pipeline).forEach(printEvent());
Desta vez, siga estas etapas.
  • remova o comentário do exemplo 3 em ChangeStreams.java,
  • se você nunca executou Create.java, execute-o. Usaremos esses novos documentos na próxima etapa.
  • iniciar Update.java em outro console.
Em seu console do change stream, você deverá ver 13 eventos de atualização. Aqui está o primeiro:
1ChangeStreamDocument {operationType=OperationType {value= 'update'},
2 resumeToken= {"_data": "825E2FB83E000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"},
3 namespace=sample_training.grades,
4 destinationNamespace=null,
5 fullDocument=null,
6 documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe"}},
7 clusterTime=Timestamp {value=6786845739898109953, seconds=1580185662, inc=1},
8 updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.10": "You will learn a lot if you read the MongoDB blog!"}},
9 txnNumber=null,
10 lsid=null
11}
Como você pode ver, estamos recuperando nossa operação de atualização no campoupdateDescription, mas estamos obtendo apenas a diferença com a versão anterior desse documento.
O campofullDocument é null porque, por padrão, o MongoDB apenas envia a diferença para evitar sobrecarregar o change stream com informações potencialmente inúteis.
Vejamos como podemos alterar esse comportamento no próximo exemplo.

Fluxo de alterações com "Pesquisa de atualização"

Para esta parte, descomente o exemplo 4 de ChangeStreams.java e execute os programas conforme acima.
1List<Bson> pipeline = List.of(match(eq("operationType", "update")));
2grades.watch(pipeline).fullDocument(UPDATE_LOOKUP).forEach(printEvent());
Adicionei a opção UPDATE_LOOKUP desta vez, para que também possamos recuperar o documento inteiro durante uma operação de atualização.
Vamos ver novamente a primeira atualização no meu fluxo de alterações:
1ChangeStreamDocument {operationType=OperationType {value= 'update'},
2 resumeToken= {"_data": "825E2FBBC1000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"},
3 namespace=sample_training.grades,
4 destinationNamespace=null,
5 fullDocument=Grade
6 {
7 id=5e27bcce74aa51a0486763fe,
8 student_id=10002.0,
9 class_id=10.0,
10 scores=null
11 },
12 documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe" }},
13 clusterTime=Timestamp {value=6786849601073709057, seconds=1580186561, inc=1 },
14 updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.11": "You will learn a lot if you read the MongoDB blog!"}},
15 txnNumber=null,
16 lsid=null
17}
Observação: o programaUpdate.java atualiza um campo criado "comentários" que não existe em meu POJO Grade, que representa o esquema original desta collection. Assim, o campo não aparece na saída porque não está mapeado.
Se eu quiser ver esse campocomments, posso usar um MongoCollection não mapeado automaticamente para meu POJOGrade.java .
1MongoCollection<Document> grades = db.getCollection("grades");
2List<Bson> pipeline = List.of(match(eq("operationType", "update")));
3grades.watch(pipeline).fullDocument(UPDATE_LOOKUP).forEach((Consumer<ChangeStreamDocument<Document>>) System.out::println);
Então é isso que obtenho no meu change stream:
1ChangeStreamDocument {operationType=OperationType {value= 'update'},
2 resumeToken= {"_data": "825E2FBD89000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"},
3 namespace=sample_training.grades,
4 destinationNamespace=null,
5 fullDocument=Document {
6 {
7 _id=5e27bcce74aa51a0486763fe,
8 class_id=10.0,
9 student_id=10002.0,
10 comments= [ You will learn a lot if you read the MongoDB blog!, [...], You will learn a lot if you read the MongoDB blog!]
11 }
12 },
13 documentKey= {"_id": {"$oid": "5e27bcce74aa51a0486763fe"}},
14 clusterTime=Timestamp {value=6786851559578796033, seconds=1580187017, inc=1},
15 updateDescription=UpdateDescription {removedFields= [], updatedFields= {"comments.13": "You will learn a lot if you read the MongoDB blog!"}},
16 txnNumber=null,
17 lsid=null
18}
Reduzi o campocomments para mantê-lo legível, mas ele contém 14 vezes o mesmo comentário no meu caso.
O documento completo que estamos recuperando aqui durante nossa operação de atualização é o documento após a ocorrência da atualização. Leia mais sobre isso em nossa documentação.

Change Streams são retomáveis

Neste exemplo final 5, simulei um erro e estou reiniciando meu Change Stream a partir de um resumeToken que obtive de uma operação anterior em meu Change Stream.
É importante observar que um fluxo de alterações será retomado automaticamente diante de um incidente "" . Em geral, o único motivo pelo qual um aplicativo precisa reiniciar o fluxo de alterações manualmente a partir de um token de retomada é se houver um incidente no próprio aplicativo e não no fluxo de alterações (por exemplo, um operador decidiu que o aplicativo precisa ser reiniciado).
1private static void exampleWithResumeToken(MongoCollection<Grade> grades) {
2 List<Bson> pipeline = List.of(match(eq("operationType", "update")));
3 ChangeStreamIterable<Grade> changeStream = grades.watch(pipeline);
4 MongoChangeStreamCursor<ChangeStreamDocument<Grade>> cursor = changeStream.cursor();
5 System.out.println("==> Going through the stream a first time & record a resumeToken");
6 int indexOfOperationToRestartFrom = 5;
7 int indexOfIncident = 8;
8 int counter = 0;
9 BsonDocument resumeToken = null;
10 while (cursor.hasNext() && counter != indexOfIncident) {
11 ChangeStreamDocument<Grade> event = cursor.next();
12 if (indexOfOperationToRestartFrom == counter) {
13 resumeToken = event.getResumeToken();
14 }
15 System.out.println(event);
16 counter++;
17 }
18 System.out.println("==> Let's imagine something wrong happened and I need to restart my Change Stream.");
19 System.out.println("==> Starting from resumeToken=" + resumeToken);
20 assert resumeToken != null;
21 grades.watch(pipeline).resumeAfter(resumeToken).forEach(printEvent());
22}
Para este exemplo final, o mesmo que anterior. Descomente a parte 5 (que é apenas chamar o método acima) e comece ChangeStreams.java depois Update.java.
Esta é a saída que você deve obter:
1==> Going through the stream a first time & record a resumeToken
2ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000012B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCCE74AA51A0486763FE0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcce74aa51a0486763fe"}}, clusterTime=Timestamp{value=6786856975532556289, seconds=1580188278, inc=1}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
3ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000022B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBA0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbba"}}, clusterTime=Timestamp{value=6786856975532556290, seconds=1580188278, inc=2}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.15": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
4ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000032B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBB0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbb"}}, clusterTime=Timestamp{value=6786856975532556291, seconds=1580188278, inc=3}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
5ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000042B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBC0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbc"}}, clusterTime=Timestamp{value=6786856975532556292, seconds=1580188278, inc=4}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
6ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000052B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBD0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbd"}}, clusterTime=Timestamp{value=6786856975532556293, seconds=1580188278, inc=5}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
7ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000062B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBE0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbe"}}, clusterTime=Timestamp{value=6786856975532556294, seconds=1580188278, inc=6}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
8ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000072B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBF0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbf"}}, clusterTime=Timestamp{value=6786856975532556295, seconds=1580188278, inc=7}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
9ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000082B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC00004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc0"}}, clusterTime=Timestamp{value=6786856975532556296, seconds=1580188278, inc=8}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
10==> Let's imagine something wrong happened and I need to restart my Change Stream.
11==> Starting from resumeToken={"_data": "825E2FC276000000062B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBE0004"}
12ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000072B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBF0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbbf"}}, clusterTime=Timestamp{value=6786856975532556295, seconds=1580188278, inc=7}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
13ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000082B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC00004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc0"}}, clusterTime=Timestamp{value=6786856975532556296, seconds=1580188278, inc=8}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
14ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC276000000092B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC10004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc1"}}, clusterTime=Timestamp{value=6786856975532556297, seconds=1580188278, inc=9}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
15ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000A2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC20004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc2"}}, clusterTime=Timestamp{value=6786856975532556298, seconds=1580188278, inc=10}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
16ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000B2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBC30004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbc3"}}, clusterTime=Timestamp{value=6786856975532556299, seconds=1580188278, inc=11}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"comments.14": "You will learn a lot if you read the MongoDB blog!"}}, txnNumber=null, lsid=null}
17ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000D2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC8F94B5117D894CBB90004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc8f94b5117d894cbb9"}}, clusterTime=Timestamp{value=6786856975532556301, seconds=1580188278, inc=13}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"scores.0.score": 904745.0267635228, "x": 150}}, txnNumber=null, lsid=null}
18ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E2FC2760000000F2B022C0100296E5A100496C525567BB74BD28BFD504F987082C046645F696400645E27BCC9F94B5117D894CBBA0004"}, namespace=sample_training.grades, destinationNamespace=null, fullDocument=null, documentKey={"_id": {"$oid": "5e27bcc9f94b5117d894cbba"}}, clusterTime=Timestamp{value=6786856975532556303, seconds=1580188278, inc=15}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"scores.0.score": 2126144.0353088505, "x": 150}}, txnNumber=null, lsid=null}
Como você pode ver aqui, consegui parar de ler meu Change Stream e, a partir do resumeToken que coletei anteriormente, posso iniciar um novo Change Stream a partir deste ponto no tempo.

Código final

ChangeStreams.java (código: ):
1package com.mongodb.quickstart;
2
3import com.mongodb.ConnectionString;
4import com.mongodb.MongoClientSettings;
5import com.mongodb.client.*;
6import com.mongodb.client.model.changestream.ChangeStreamDocument;
7import com.mongodb.quickstart.models.Grade;
8import org.bson.BsonDocument;
9import org.bson.codecs.configuration.CodecRegistry;
10import org.bson.codecs.pojo.PojoCodecProvider;
11import org.bson.conversions.Bson;
12
13import java.util.List;
14import java.util.function.Consumer;
15
16import static com.mongodb.client.model.Aggregates.match;
17import static com.mongodb.client.model.Filters.eq;
18import static com.mongodb.client.model.Filters.in;
19import static com.mongodb.client.model.changestream.FullDocument.UPDATE_LOOKUP;
20import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
21import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
22
23public class ChangeStreams {
24
25 public static void main(String[] args) {
26 ConnectionString connectionString = new ConnectionString(System.getProperty("mongodb.uri"));
27 CodecRegistry pojoCodecRegistry = fromProviders(PojoCodecProvider.builder().automatic(true).build());
28 CodecRegistry codecRegistry = fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), pojoCodecRegistry);
29 MongoClientSettings clientSettings = MongoClientSettings.builder()
30 .applyConnectionString(connectionString)
31 .codecRegistry(codecRegistry)
32 .build();
33
34 try (MongoClient mongoClient = MongoClients.create(clientSettings)) {
35 MongoDatabase db = mongoClient.getDatabase("sample_training");
36 MongoCollection<Grade> grades = db.getCollection("grades", Grade.class);
37 List<Bson> pipeline;
38
39 // Only uncomment one example at a time. Follow instructions for each individually then kill all remaining processes.
40
41 /** => Example 1: print all the write operations.
42 * => Start "ChangeStreams" then "MappingPOJOs" to see some change events.
43 */
44 grades.watch().forEach(printEvent());
45
46 /** => Example 2: print only insert and delete operations.
47 * => Start "ChangeStreams" then "MappingPOJOs" to see some change events.
48 */
49// pipeline = List.of(match(in("operationType", List.of("insert", "delete"))));
50// grades.watch(pipeline).forEach(printEvent());
51
52 /** => Example 3: print only updates without fullDocument.
53 * => Start "ChangeStreams" then "Update" to see some change events (start "Create" before if not done earlier).
54 */
55// pipeline = List.of(match(eq("operationType", "update")));
56// grades.watch(pipeline).forEach(printEvent());
57
58 /** => Example 4: print only updates with fullDocument.
59 * => Start "ChangeStreams" then "Update" to see some change events.
60 */
61// pipeline = List.of(match(eq("operationType", "update")));
62// grades.watch(pipeline).fullDocument(UPDATE_LOOKUP).forEach(printEvent());
63
64 /**
65 * => Example 5: iterating using a cursor and a while loop + remembering a resumeToken then restart the Change Streams.
66 * => Start "ChangeStreams" then "Update" to see some change events.
67 */
68// exampleWithResumeToken(grades);
69 }
70 }
71
72 private static void exampleWithResumeToken(MongoCollection<Grade> grades) {
73 List<Bson> pipeline = List.of(match(eq("operationType", "update")));
74 ChangeStreamIterable<Grade> changeStream = grades.watch(pipeline);
75 MongoChangeStreamCursor<ChangeStreamDocument<Grade>> cursor = changeStream.cursor();
76 System.out.println("==> Going through the stream a first time & record a resumeToken");
77 int indexOfOperationToRestartFrom = 5;
78 int indexOfIncident = 8;
79 int counter = 0;
80 BsonDocument resumeToken = null;
81 while (cursor.hasNext() && counter != indexOfIncident) {
82 ChangeStreamDocument<Grade> event = cursor.next();
83 if (indexOfOperationToRestartFrom == counter) {
84 resumeToken = event.getResumeToken();
85 }
86 System.out.println(event);
87 counter++;
88 }
89 System.out.println("==> Let's imagine something wrong happened and I need to restart my Change Stream.");
90 System.out.println("==> Starting from resumeToken=" + resumeToken);
91 assert resumeToken != null;
92 grades.watch(pipeline).resumeAfter(resumeToken).forEach(printEvent());
93 }
94
95 private static Consumer<ChangeStreamDocument<Grade>> printEvent() {
96 return System.out::println;
97 }
98}
Lembre-se de descomentar apenas um exemplo de Change Stream por vez.

Encerrando

Os Change Streams são muito fáceis de usar e configurar no MongoDB. Eles são a chave para qualquer sistema de processamento em tempo real.
O único problema restante aqui é como colocar isso em produção corretamente. Change Streams são principalmente um loop infinito, processando um fluxo infinito de eventos. O multiprocessamento é, obviamente, obrigatório para esse tipo de configuração, especialmente se o tempo de processamento for maior do que o tempo que separa 2 eventos.
Se você quiser aprender mais e afundar seu conhecimento com mais rapidez, recomendamos que você confira o treinamento Caminho do programador Java do MongoDB disponível gratuitamente na MongoDB University.
Na próxima publicação no blog, mostrarei as transações ACID multidocumento em Java.

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Início rápido
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Início rápido

Introdução ao Minecraft e ao MongoDB


Jul 15, 2024 | 4 min read
Tutorial

Usando a autenticação AWS IAM com o conector MongoDB para Apache Kafka


Jul 01, 2024 | 4 min read
Início rápido

Java - Mapeamento de POJOs


Mar 01, 2024 | 5 min read
Tutorial

Explorando os recursos avançados do Atlas Search com o MongoDB Atlas Atlas Search


Aug 20, 2024 | 6 min read
Sumário