Fluxos de alterações
Nesta página
- Disponibilidade
- Conecte
- Assista a uma coleta, banco de dados ou implementação
- Considerações sobre o desempenho do change stream
- Abrir um change stream
- Modificar o resultado do change stream
- Consulte o documento completo para saber sobre operações de atualização
- Retomar um change stream
- Casos de uso
- Controle de acesso
- Notificação de evento
- Agrupamentos
- Alterar fluxos e documentos órfãos
- Altere fluxos com pré e pós-imagens de documentos
Os fluxos de alterações permitem que os aplicativos acessem alterações de dados em tempo real sem a complexidade prévia e o risco de afetar manualmente o oplog. Os aplicativos podem usar fluxos de alterações para se inscrever em todas as alterações de dados em uma única coleção, um banco de dados ou uma implantação inteira e reagir imediatamente a elas. Como os fluxos de alterações usam o framework de agregação, os aplicativos também podem filtrar por alterações específicas ou transformar as notificações à vontade.
A partir do MongoDB 5.1, os change streams são otimizados, proporcionando uma utilização mais eficiente dos recursos e uma execução mais rápida de alguns estágios do aggregation pipeline.
Disponibilidade
Os change streams estão disponíveis para conjuntos de réplicas e clusters fragmentados:
Mecanismo de armazenamento.
Os conjuntos de réplicas e clusters fragmentados devem usar o mecanismo de armazenamento WiredTiger. Os change streams também podem ser usados em sistemas que empregam o recurso de encryption at rest do MongoDB.
Versão do protocolo de conjunto de réplicas.
Os conjuntos de réplicas e clusters fragmentados devem usar o protocolo de conjunto de réplicas versão 1 (
pv1
).Habilitar Read Concern "maioria".
Os change streams estão disponíveis independentemente do suporte à read concern
"majority"
; ou seja, o suporte a read concernmajority
pode ser habilitado (padrão) ou desabilitado para usar change streams.
Suporte à API Estável
Os change streams estão incluídos na Stable API V1. No entanto, a opção showExpandedEvents não está incluída na Stable API V1.
Conecte
As conexões para um change stream podem usar listas de sementes de DNS com a opção de conexão +srv
ou listar os servidores individualmente na connection string.
Se o driver perder a conexão com um change stream ou a conexão ficar inativa, ele tentará restabelecer uma conexão com o change stream por meio de outro nó no cluster que tenha uma read preference correspondente. Se o driver não conseguir encontrar um nó com a read preference correta, ele lançará uma exceção.
Para obter mais informações, consulte Formato URI da connection string.
Assista a uma coleta, banco de dados ou implementação
Você pode abrir change stream em:
Alvo | Descrição |
---|---|
Uma collection | Você pode abrir um cursor de change stream para uma única collection (exceto collections Os exemplos nesta página usam os drivers MongoDB para abrir e trabalhar com um cursor de fluxo de alterações para uma única coleção. Consulte também o método |
Um banco de dados | Você pode abrir um cursor de fluxo de alterações para um único banco de dados de dados (exceto Para saber sobre o método do driver MongoDB, consulte a documentação do driver. Consulte também o método |
Uma implantação | Você pode abrir um cursor de fluxo de alterações para uma implantação (um conjunto de réplicas ou um cluster fragmentado) para observar alterações em todas as coleções que não são do sistema em todos os bancos de dados, exceto Para saber sobre o método do driver MongoDB, consulte a documentação do driver. Consulte também o método |
Observação
Alterar exemplos de fluxo
Os exemplos nesta página usam os drivers MongoDB para ilustrar como abrir um cursor de fluxo de alteração para uma coleta e trabalhar com o cursor de fluxo de alteração.
Considerações sobre o desempenho do change stream
Se a quantidade de change streams ativos abertos em um banco de dados exceder o tamanho do pool de conexões, você poderá enfrentar latência de notificação. Cada change stream usa uma conexão e uma operação getMore no change stream pelo período de tempo que aguarda o próximo evento. Para evitar problemas de latência, você deve garantir que o tamanho do pool seja maior do que o número de change stream abertos. Para obter detalhes, consulte a configuração maxPoolSize.
Considerações sobre cluster fragmentado
Quando um change stream é aberto em um cluster fragmentado:
O
mongos
cria change streams individuais em cada shard. Esse comportamento ocorre independentemente de o change stream ter como alvo um determinado intervalo de chaves de shard.Quando o
mongos
recebe resultados do fluxo de mudança, ele classifica e filtra esses resultados. Se necessário, omongos
também executa uma consultafullDocument
.
Para obter o melhor desempenho, limite o uso de queries $lookup
em change streams.
Abrir um change stream
Para abrir um fluxo de alteração:
Para um conjunto de réplicas, você pode emitir a operação de change stream a partir de qualquer membro portador de dados.
Para um cluster fragmentado, você deve emitir a operação de change stream a partir do
mongos
.
O exemplo a seguir abre um change stream para uma collection e itera sobre o cursor para recuperar os documentos do change stream. [1]
➤ Use o menu suspenso Selecione a linguagem no canto superior direito para definir a linguagem dos exemplos nesta página.
Os exemplos de C abaixo presumem que você se conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory
.
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
Os exemplos C# abaixo pressupõem que você conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory
.
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
Os exemplos Go abaixo pressupõem que você conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory
.
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
Os exemplos Java abaixo pressupõem que você conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory
.
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
Os exemplos de Kotlin abaixo pressupõem que você está conectado a um conjunto de réplica MongoDB e pode acessar um banco de dados de dados que contém a coleção inventory
. Para saber mais sobre como concluir essas tarefas, consulte o guia Bancos de dados e coleções de drivers do Kotlin Coroutine .
val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } }
Os exemplos abaixo pressupõem que você se conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory
.
cursor = db.inventory.watch() document = await cursor.next()
Os exemplos de Node.js abaixo pressupõem que você conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory
.
O exemplo a seguir usa stream para processar os eventos de mudança.
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream.on('change', next => { // process next document });
Alternativamente, você também pode usar o iterador para processar os eventos de mudança:
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
ChangeStream estende EventEmitter.
Os exemplos abaixo pressupõem que você se conectou a um conjunto de réplica MongoDB e acessou um banco de dados que contém uma coleção do inventory
.
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
Os exemplos Python abaixo pressupõem que você se conectou a um conjunto de réplicas MongoDB e acessou um banco de dados que contém uma coleção inventory
.
cursor = db.inventory.watch() next(cursor)
Os exemplos abaixo pressupõem que você se conectou a um conjunto de réplica MongoDB e acessou um banco de dados que contém uma coleção do inventory
.
cursor = inventory.watch.to_enum next_change = cursor.next
Os exemplos Swift (Async) abaixo pressupõem que você se conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory
.
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
Os exemplos Swift (Sync) abaixo pressupõem que você se conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory
.
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
Para recuperar o evento de alteração de dados do cursor, repita o cursor do change stream. Para obter informações sobre o change stream, consulte Eventos de alteração.
O change stream permanece aberto até que ocorra uma das seguintes situações:
O cursor está explicitamente fechado.
Ocorre um evento de invalidar; por exemplo, um drop de coleção ou renomear.
A conexão com a implantação do MongoDB fecha ou expira. Consulte Comportamentos do cursor para obter mais informações.
Se a implantação for um cluster fragmentado, uma remoção de fragmento pode fazer com que um cursor de fluxo de alteração aberto seja fechado. O cursor do fluxo de alterações fechado pode não ser totalmente retomável.
Observação
O ciclo de vida de um cursor não fechado depende da linguagem.
[1] | Você pode especificar um startAtOperationTime para abrir o cursor em um determinado momento. Se o ponto de partida especificado estiver no passado, ele deverá estar no intervalo de tempo do oplog. |
Modificar o resultado do change stream
➤ Use o menu suspenso Selecione a linguagem no canto superior direito para definir a linguagem dos exemplos nesta página.
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}}}}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
A lista pipeline
inclui um único estágio $match
que filtra quaisquer operações que atendam a um ou ambos os critérios a seguir:
username
o valor éalice
operationType
o valor édelete
Passar o pipeline
para o método watch()
direciona o fluxo de mudança para retornar notificações depois de passá-las pelo pipeline
especificado.
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
val pipeline = listOf( Aggregates.match( or( eq("fullDocument.username", "alice"), `in`("operationType", listOf("delete")) ) )) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
A lista pipeline
inclui um único estágio $match
que filtra quaisquer operações que atendam a um ou ambos os critérios a seguir:
username
o valor éalice
operationType
o valor édelete
Passar o pipeline
para o método watch()
direciona o fluxo de mudança para retornar notificações depois de passá-las pelo pipeline
especificado.
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
O exemplo a seguir usa stream para processar os eventos de mudança.
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream.on('change', next => { // process next document });
Alternativamente, você também pode usar o iterador para processar os eventos de mudança:
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
Dica
O campo _id do documento de evento de fluxo de alterações atua como o token de retomada. Não use o pipeline para modificar ou remover o campo _id
do evento de fluxo de alterações.
A partir do MongoDB 4.2, os change streams lançarão uma exceção se o pipeline de agregação do fluxo de alteração modificar o campo _id de um evento.
Consulte Mudar eventos para obter mais informações sobre o formato do documento de resposta do change stream.
Consulte o documento completo para saber sobre operações de atualização
Por padrão, os change streams retornam apenas o delta dos campos durante a operação de atualização. No entanto, você pode configurar o change stream para retornar a versão mais atual comprometida por maioria do documento atualizado.
➤ Use o menu suspenso Selecione a linguagem no canto superior direito para definir a linguagem dos exemplos nesta página.
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe a opção "fullDocument"
com o valor "updateLookup"
para o método mongoc_collection_watch
.
No exemplo abaixo, todas as notificações de operações de atualização incluem um campo fullDocument
que representa a versão atual do documento afetado pela operação de atualização.
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
para o método db.collection.watch()
.
No exemplo abaixo, todas as notificações de operações de atualização incluem um campo FullDocument
que representa a versão atual do documento afetado pela operação de atualização.
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
Para retornar a versão mais atual do documento atualizado, SetFullDocument(options.UpdateLookup)
altere a opção de fluxo.
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe FullDocument.UPDATE_LOOKUP
para o método db.collection.watch.fullDocument()
.
No exemplo abaixo, todas as notificações de operações de atualização incluem um campo FullDocument
que representa a versão atual do documento afetado pela operação de atualização.
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe FullDocument.UPDATE_LOOKUP
para o ChangeStreamFlow.fullDocument() método.
No exemplo abaixo, todas as notificações de operações de atualização incluem um campo FullDocument
que representa a versão atual do documento afetado pela operação de atualização.
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } }
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe full_document='updateLookup'
para o método db.collection.watch()
.
No exemplo abaixo, todas as notificações de operações de atualização incluem um campo `full_document
que representa a versão atual do documento afetado pela operação de atualização.
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe { fullDocument: 'updateLookup' }
para o método db.collection.watch()
.
No exemplo abaixo, todas as notificações de operações de atualização incluem um campo fullDocument
que representa a versão atual do documento afetado pela operação de atualização.
O exemplo a seguir usa stream para processar os eventos de mudança.
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream.on('change', next => { // process next document });
Alternativamente, você também pode usar o iterador para processar os eventos de mudança:
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
para o método db.watch()
.
No exemplo abaixo, todas as notificações de operações de atualização incluem um campo fullDocument
que representa a versão atual do documento afetado pela operação de atualização.
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe full_document='updateLookup'
para o método db.collection.watch()
.
No exemplo abaixo, todas as notificações de operações de atualização incluem um campo full_document
que representa a versão atual do documento afetado pela operação de atualização.
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe full_document: 'updateLookup'
para o método db.watch()
.
No exemplo abaixo, todas as notificações de operações de atualização incluem um campo full_document
que representa a versão atual do documento afetado pela operação de atualização.
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe options:
ChangeStreamOptions(fullDocument: .updateLookup)
para o método watch()
.
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe options:
ChangeStreamOptions(fullDocument: .updateLookup)
para o método watch()
.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
Observação
Se houver uma ou mais operações confirmadas pela maioria que modificaram o documento atualizado após a operação de atualização, mas antes da pesquisa, o documento completo retornado pode diferir significativamente do documento no momento da operação de atualização.
No entanto, os deltas incluídos no documento do change stream sempre descrevem corretamente as alterações da collection monitorada que se aplicaram a esse evento do change stream.
O campo fullDocument
de um evento de atualização poderá estar ausente se uma das seguintes opções for verdadeira:
Se o documento for excluído ou se a coleção for descartada entre a atualização e a pesquisa.
Se a atualização alterar os valores de pelo menos um dos campos na chave de fragmento dessa coleção.
Consulte Mudar eventos para obter mais informações sobre o formato do documento de resposta do change stream.
Retomar um change stream
Os change streams são retomáveis especificando um token de retomada para resumeAfter ou startAfter ao abrir o cursor.
resumeAfter
para Change Streams
Você pode retomar uma transmissão de alteração após um evento específico passando um token de currículo para resumeAfter
ao abrir o cursor.
Consulte Tokens de resumo para saber mais.
Importante
O oplog deve ter histórico suficiente para localizar a operação associada ao token ou ao registro de data e hora, se o registro de data e hora estiver no passado.
Não é possível usar
resumeAfter
para retomar um change stream depois que um evento de invalidação (por exemplo, um descarte ou renomeação de coleção) fechar o change stream. Em vez disso, você pode usar startAfter para iniciar um novo change stream após um evento de invalidação.
No exemplo abaixo, a opção resumeAfter
é anexada às opções de stream para recriar a stream após ter sido destruída. Passar o _id
para o change stream tenta retomar as notificações a partir da operação especificada.
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
No exemplo abaixo, o resumeToken
é obtido do último documento de fluxo de alterações e transmitido para o método Watch()
como uma opção. Transmitir resumeToken
para o método Watch()
direciona o fluxo de alterações para tentar retomar as notificações iniciadas após a operação especificada no token de retomada.
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
Você pode usar ChangeStreamOptions.SetResumeAfter para especificar o token de retomada para o fluxo de alterações. Se a opção resumeAfter estiver configurada, o fluxo de alterações retomará as notificações após a operação especificada no token de retomada. O SetResumeAfter
assume um valor que deve ser resolvido para um token de currículo, por exemplo resumeToken
no exemplo abaixo.
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
Você pode usar o método resumeAfter()
para retomar as notificações após a operação especificada no token de retomada. O método resumeAfter()
recebe um valor que deve ser resolvido em um token de resumo, por exemplo resumeToken
no exemplo abaixo.
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
Você pode usar o ChangeStreamFlow.resumeAfter() para retomar as notificações após a operação especificada no token de retomada. O método resumeAfter()
obtém um valor que deve ser resolvido para um token de resumo, como a variável resumeToken
no exemplo abaixo.
val resumeToken = BsonDocument() val job = launch { val changeStream = collection.watch() .resumeAfter(resumeToken) changeStream.collect { println(it) } }
Use o modificador resume_after
para retomar as notificações após a operação especificada no token de retomada. O modificador resume_after
recebe um valor que deve ser resolvido em um token de resumo, como resume_token
no exemplo abaixo.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
Você pode usar a opção resumeAfter
para retomar notificações após a operação especificada no token de retomada. A opção resumeAfter
assume um valor que deve ser resolvido para um token de currículo, por exemplo resumeToken
no exemplo abaixo.
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream.on('change', next => { processChange(next); }); });
Você pode usar a opção resumeAfter
para retomar notificações após a operação especificada no token de retomada. A opção resumeAfter
assume um valor que deve ser resolvido para um token de currículo, por exemplo $resumeToken
no exemplo abaixo.
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
Use o modificador resume_after
para retomar as notificações após a operação especificada no token de retomada. O modificador resume_after
recebe um valor que deve ser resolvido em um token de resumo, como resume_token
no exemplo abaixo.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
Use o modificador resume_after
para retomar as notificações após a operação especificada no token de retomada. O modificador resume_after
recebe um valor que deve ser resolvido em um token de resumo, como resume_token
no exemplo abaixo.
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
Você pode usar a opção resumeAfter
para retomar notificações após a operação especificada no token de retomada. A opção resumeAfter
assume um valor que deve ser resolvido para um token de currículo, por exemplo resumeToken
no exemplo abaixo.
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
Você pode usar a opção resumeAfter
para retomar notificações após a operação especificada no token de retomada. A opção resumeAfter
assume um valor que deve ser resolvido para um token de currículo, por exemplo resumeToken
no exemplo abaixo.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter
para Change Streams
Você pode iniciar um novo change stream após um evento específico passando um token de resumo para startAfter
ao abrir o cursor. Ao contrário de resumeAfter, startAfter
pode retomar notificações após um evento de invalidação criando um novo change stream.
Consulte Tokens de resumo para saber mais.
Importante
O oplog deve ter histórico suficiente para localizar a operação associada ao token ou ao registro de data e hora, se o registro de data e hora estiver no passado.
Retomar tokens
O token de resumo está disponível em várias fontes:
Fonte | Descrição |
---|---|
Cada notificação de change stream inclui um token de resumo no campo _id . | |
O estágio de aggregation Este campo aparece somente ao utilizar o comando | |
O comando getMore inclui um token de currículo no campo cursor.postBatchResumeToken . |
A partir do MongoDB 4.2, os change streams lançarão uma exceção se o pipeline de agregação do fluxo de alteração modificar o campo _id de um evento.
Dica
O MongoDB fornece um "snippet", uma extensão para mongosh
, que decodifica tokens de currículo codificados por hex.
Você pode instalar e executar o trecho resumetoken do mongosh
:
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
Você também pode executar resumetoken a partir da linha de comando (sem usar o mongosh
) se o npm
estiver instalado em seu sistema:
npx mongodb-resumetoken-decoder <RESUME TOKEN>
Consulte o seguinte para obter mais detalhes sobre:
Retomar tokens de eventos de mudança
As notificações de alteração de evento incluem um token de currículo no campo _id
:
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "wallTime": ISODate("2022-10-19T15:37:04.604Z"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
Retomar tokens de aggregate
Ao utilizar o comando aggregate
, o estágio de agregação do $changeStream
inclui um token de currículo no campo cursor.postBatchResumeToken
:
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
Retomar tokens de getMore
O comando getMore
também inclui um token de resumo no campo cursor.postBatchResumeToken
:
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
Casos de uso
Os change streams podem beneficiar arquiteturas com sistemas comerciais dependentes, informando os sistemas downstream quando as mudanças nos dados forem duráveis. Por exemplo, os change streams podem economizar tempo dos desenvolvedores ao implementar serviços de extração, transformação e carga (ETL), sincronização entre plataformas, funcionalidade de colaboração e serviços de notificação.
Controle de acesso
Para implantações que impõem Autenticação em Implantações Autogerenciadas e autorização:
Para abrir um fluxo de alteração em relação a uma coleta específica, os aplicativos devem ter privilégios que concedam ações do
changeStream
efind
na coleta correspondente.{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } Para abrir um change stream em um único banco de dados, os aplicativos devem ter privilégios que concedam ações do
changeStream
efind
em todas as coleções não-system
no banco de dados.{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } Para abrir um change stream em um sistema inteiro, os aplicativos devem ter privilégios que concedem ações do
changeStream
efind
em todas as coleções não-system
para todos os bancos de dados na sistema.{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
Notificação de evento
Os fluxos de alterações notificam apenas as alterações de dados que persistiram para a maioria dos membros portadores de dados no conjunto de réplicas. Isso garante que as notificações sejam acionadas somente por alterações confirmadas pela maioria que sejam duráveis em cenários de falha.
Por exemplo, considere um conjunto de réplicas de 3 membros com um cursor de change stream aberto em relação ao primary. Se um cliente emitir uma operação de inserção, o change stream só notificará o aplicativo da alteração de dados depois que essa inserção persistir para a maioria dos membros portadores de dados.
Se uma operação estiver associada a uma transação, o documento do evento de alteração incluirá o txnNumber
e o lsid
.
Agrupamentos
Os fluxos de alterações usam comparações binárias simple
, a menos que uma coleção explícita seja fornecida.
Alterar fluxos e documentos órfãos
A partir do MongoDB 5.3, durante a migração de intervalo, os eventos de fluxo de alterações não são gerados para atualizações de documentos órfãos.
Altere fluxos com pré e pós-imagens de documentos
A partir do MongoDB 6.0, você pode usar eventos change stream para produzir a versão de um documento antes e depois das alterações (pré e pós-imagens do documento):
A pré-imagem é o documento antes de ser substituído, atualizado ou excluído. Não há pré-imagem para um documento inserido.
A pós-imagem é o documento após ter sido inserido, substituído ou atualizado. Não há pós-imagem para um documento excluído.
Habilite
changeStreamPreAndPostImages
para uma coleção usandodb.createCollection()
,create
oucollMod
.
As imagens pré e pós não estarão disponíveis para um change stream se as imagens forem:
Não habilitadas na coleção no momento de uma operação de atualização ou exclusão de documento.
Removido após o tempo de retenção pré e pós-imagem definido em
expireAfterSeconds
.O exemplo a seguir define
expireAfterSeconds
para100
segundos em um cluster inteiro:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) O exemplo a seguir retorna as configurações atuais do
changeStreamOptions
, incluindoexpireAfterSeconds
:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) Se um change stream for removido do oplog, as imagens pré e pós correspondentes também serão excluídas, independentemente do tempo de retenção pré e pós-imagem
expireAfterSeconds
.
Considerações adicionais:
Habilitar pré e pós-imagens consome espaço de armazenamento e adiciona tempo de processamento. Ative as imagens anteriores e posteriores somente se precisar delas.
Limite o tamanho do evento de transmissão da alteração para menos de 16 megabytes. Para limitar o tamanho do evento, você pode:
Limite o tamanho do documento a 8 megabytes. Você pode solicitar imagens pré e pós simultaneamente na saída do change stream se outros campos de evento de change stream, como
updateDescription
não forem grandes.Solicite apenas pós-imagens na saída de fluxo de alteração para documentos de até 16 megabytes se outros campos de evento de fluxo de alteração como
updateDescription
não forem grandes.Solicite somente pré-imagens na saída do change stream para documentos de até 16 megabytes se:
as atualizações do documento afetam apenas uma pequena fração da estrutura ou conteúdo do documento, e
não causa um evento de alteração
replace
. Um eventoreplace
sempre inclui o pós-imagem.
Para solicitar uma pré-imagem, defina
fullDocumentBeforeChange
comorequired
ouwhenAvailable
emdb.collection.watch()
. Para solicitar uma pós-imagem, definafullDocument
usando o mesmo método.As pré-imagens são gravadas na coleção
config.system.preimages
.A coleção
config.system.preimages
pode ficar grande. Para limitar o tamanho da coleção, você pode definirexpireAfterSeconds
tempo para as pré-imagens, conforme mostrado anteriormente.As pré-imagens são removidas de forma assíncrona por um processo de plano de fundo.
Importante
Funcionalidade incompatível com versões anteriores
A partir do MongoDB 6.0, se você estiver usando imagens anteriores e posteriores de documentos para change streams, deverá desabilitar changeStreamPreandPostImages para cada coleção usando o collMod
comando antes de poder fazer o downgrade para uma versão anterior do MongoDB.
Dica
Veja também:
Para alterar eventos de transmissão e saída, consulte Alterar eventos.
Para observar alterações em uma collection, consulte
db.collection.watch()
.Para obter exemplos completos com a saída do fluxo de alterações, consulte Fluxos de alterações com imagens pré e pós-documento.
Para obter exemplos completos com a saída do fluxo de alterações, consulte Fluxos de alterações com imagens pré e pós-documento.