Menu Docs
Página inicial do Docs
/
Manual do MongoDB

Fluxos de alterações

Nesta página

  • Disponibilidade
  • Conecte
  • Assista a uma coleta, banco de dados ou implementação
  • Considerações sobre o desempenho do change stream
  • Abrir um change stream
  • Modificar o resultado do change stream
  • Consulte o documento completo para saber sobre operações de atualização
  • Retomar um change stream
  • Casos de uso
  • Controle de acesso
  • Notificação de evento
  • Agrupamentos
  • Alterar fluxos e documentos órfãos
  • Altere fluxos com pré e pós-imagens de documentos

Os fluxos de alterações permitem que os aplicativos acessem alterações de dados em tempo real sem a complexidade prévia e o risco de afetar manualmente o oplog. Os aplicativos podem usar fluxos de alterações para se inscrever em todas as alterações de dados em uma única coleção, um banco de dados ou uma implantação inteira e reagir imediatamente a elas. Como os fluxos de alterações usam o framework de agregação, os aplicativos também podem filtrar por alterações específicas ou transformar as notificações à vontade.

A partir do MongoDB 5.1, os change streams são otimizados, proporcionando uma utilização mais eficiente dos recursos e uma execução mais rápida de alguns estágios do aggregation pipeline.

Os change streams estão disponíveis para conjuntos de réplicas e clusters fragmentados:

Os change streams estão incluídos na Stable API V1. No entanto, a opção showExpandedEvents não está incluída na Stable API V1.

As conexões para um change stream podem usar listas de sementes de DNS com a opção de conexão +srv ou listar os servidores individualmente na connection string.

Se o driver perder a conexão com um change stream ou a conexão ficar inativa, ele tentará restabelecer uma conexão com o change stream por meio de outro nó no cluster que tenha uma read preference correspondente. Se o driver não conseguir encontrar um nó com a read preference correta, ele lançará uma exceção.

Para obter mais informações, consulte Formato URI da connection string.

Você pode abrir change stream em:

Alvo
Descrição
Uma collection

Você pode abrir um cursor de change stream para uma única collection (exceto collections system ou qualquer collection nos bancos de dados admin, local e config).

Os exemplos nesta página usam os drivers MongoDB para abrir e trabalhar com um cursor de fluxo de alterações para uma única coleção. Consulte também o método mongosh db.collection.watch().

Um banco de dados

Você pode abrir um cursor de fluxo de alterações para um único banco de dados de dados (exceto admin, local, e config banco de dados) para acompanhar as alterações em todas as suas coleções que não são do sistema.

Para saber sobre o método do driver MongoDB, consulte a documentação do driver. Consulte também o método mongosh db.watch().

Uma implantação

Você pode abrir um cursor de fluxo de alterações para uma implantação (um conjunto de réplicas ou um cluster fragmentado) para observar alterações em todas as coleções que não são do sistema em todos os bancos de dados, exceto admin, local e config.

Para saber sobre o método do driver MongoDB, consulte a documentação do driver. Consulte também o método mongosh Mongo.watch().

Observação

Alterar exemplos de fluxo

Os exemplos nesta página usam os drivers MongoDB para ilustrar como abrir um cursor de fluxo de alteração para uma coleta e trabalhar com o cursor de fluxo de alteração.

Se a quantidade de change streams ativos abertos em um banco de dados exceder o tamanho do pool de conexões, você poderá enfrentar latência de notificação. Cada change stream usa uma conexão e uma operação getMore no change stream pelo período de tempo que aguarda o próximo evento. Para evitar problemas de latência, você deve garantir que o tamanho do pool seja maior do que o número de change stream abertos. Para obter detalhes, consulte a configuração maxPoolSize.

Quando um change stream é aberto em um cluster fragmentado:

  • O mongos cria change streams individuais em cada shard. Esse comportamento ocorre independentemente de o change stream ter como alvo um determinado intervalo de chaves de shard.

  • Quando o mongos recebe resultados do fluxo de mudança, ele classifica e filtra esses resultados. Se necessário, o mongos também executa uma consulta fullDocument .

Para obter o melhor desempenho, limite o uso de queries $lookup em change streams.

Para abrir um fluxo de alteração:

  • Para um conjunto de réplicas, você pode emitir a operação de change stream a partir de qualquer membro portador de dados.

  • Para um cluster fragmentado, você deve emitir a operação de change stream a partir do mongos.

O exemplo a seguir abre um change stream para uma collection e itera sobre o cursor para recuperar os documentos do change stream. [1]


➤ Use o menu suspenso Selecione a linguagem no canto superior direito para definir a linguagem dos exemplos nesta página.


Os exemplos de C abaixo presumem que você se conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory.

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

Os exemplos C# abaixo pressupõem que você conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory.

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

Os exemplos Go abaixo pressupõem que você conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory.

cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

Os exemplos Java abaixo pressupõem que você conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory.

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

Os exemplos de Kotlin abaixo pressupõem que você está conectado a um conjunto de réplica MongoDB e pode acessar um banco de dados de dados que contém a coleção inventory . Para saber mais sobre como concluir essas tarefas, consulte o guia Bancos de dados e coleções de drivers do Kotlin Coroutine .

val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}

Os exemplos abaixo pressupõem que você se conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory.

cursor = db.inventory.watch()
document = await cursor.next()

Os exemplos de Node.js abaixo pressupõem que você conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory.

O exemplo a seguir usa stream para processar os eventos de mudança.

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
// process next document
});

Alternativamente, você também pode usar o iterador para processar os eventos de mudança:

const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();

ChangeStream estende EventEmitter.

Os exemplos abaixo pressupõem que você se conectou a um conjunto de réplica MongoDB e acessou um banco de dados que contém uma coleção do inventory.

$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

Os exemplos Python abaixo pressupõem que você se conectou a um conjunto de réplicas MongoDB e acessou um banco de dados que contém uma coleção inventory.

cursor = db.inventory.watch()
next(cursor)

Os exemplos abaixo pressupõem que você se conectou a um conjunto de réplica MongoDB e acessou um banco de dados que contém uma coleção do inventory.

cursor = inventory.watch.to_enum
next_change = cursor.next

Os exemplos Swift (Async) abaixo pressupõem que você se conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory.

let inventory = db.collection("inventory")
// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}

Os exemplos Swift (Sync) abaixo pressupõem que você se conectou a um conjunto de réplicas do MongoDB e acessou um banco de dados que contém uma coleção inventory.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

Para recuperar o evento de alteração de dados do cursor, repita o cursor do change stream. Para obter informações sobre o change stream, consulte Eventos de alteração.

O change stream permanece aberto até que ocorra uma das seguintes situações:

  • O cursor está explicitamente fechado.

  • Ocorre um evento de invalidar; por exemplo, um drop de coleção ou renomear.

  • A conexão com a implantação do MongoDB fecha ou expira. Consulte Comportamentos do cursor para obter mais informações.

  • Se a implantação for um cluster fragmentado, uma remoção de fragmento pode fazer com que um cursor de fluxo de alteração aberto seja fechado. O cursor do fluxo de alterações fechado pode não ser totalmente retomável.

Observação

O ciclo de vida de um cursor não fechado depende da linguagem.

[1] Você pode especificar um startAtOperationTime para abrir o cursor em um determinado momento. Se o ponto de partida especificado estiver no passado, ele deverá estar no intervalo de tempo do oplog.

➤ Use o menu suspenso Selecione a linguagem no canto superior direito para definir a linguagem dos exemplos nesta página.


Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

A lista pipeline inclui um único estágio $match que filtra quaisquer operações que atendam a um ou ambos os critérios a seguir:

  • username o valor é alice

  • operationType o valor é delete

Passar o pipeline para o método watch() direciona o fluxo de mudança para retornar notificações depois de passá-las pelo pipeline especificado.

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

val pipeline = listOf(
Aggregates.match(
or(
eq("fullDocument.username", "alice"),
`in`("operationType", listOf("delete"))
)
))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

A lista pipeline inclui um único estágio $match que filtra quaisquer operações que atendam a um ou ambos os critérios a seguir:

  • username o valor é alice

  • operationType o valor é delete

Passar o pipeline para o método watch() direciona o fluxo de mudança para retornar notificações depois de passá-las pelo pipeline especificado.

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

O exemplo a seguir usa stream para processar os eventos de mudança.

const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
});

Alternativamente, você também pode usar o iterador para processar os eventos de mudança:

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

Você pode controlar a change stream fornecendo uma matriz de um ou mais dos seguintes estágios do pipeline ao configurar o fluxo de alterações:

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()

Dica

O campo _id do documento de evento de fluxo de alterações atua como o token de retomada. Não use o pipeline para modificar ou remover o campo _id do evento de fluxo de alterações.

A partir do MongoDB 4.2, os change streams lançarão uma exceção se o pipeline de agregação do fluxo de alteração modificar o campo _id de um evento.

Consulte Mudar eventos para obter mais informações sobre o formato do documento de resposta do change stream.

Por padrão, os change streams retornam apenas o delta dos campos durante a operação de atualização. No entanto, você pode configurar o change stream para retornar a versão mais atual comprometida por maioria do documento atualizado.


➤ Use o menu suspenso Selecione a linguagem no canto superior direito para definir a linguagem dos exemplos nesta página.


Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe a opção "fullDocument" com o valor "updateLookup" para o método mongoc_collection_watch.

No exemplo abaixo, todas as notificações de operações de atualização incluem um campo fullDocument que representa a versão atual do documento afetado pela operação de atualização.

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" para o método db.collection.watch().

No exemplo abaixo, todas as notificações de operações de atualização incluem um campo FullDocument que representa a versão atual do documento afetado pela operação de atualização.

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

Para retornar a versão mais atual do documento atualizado, SetFullDocument(options.UpdateLookup) altere a opção de fluxo.

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.Current

Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe FullDocument.UPDATE_LOOKUP para o método db.collection.watch.fullDocument().

No exemplo abaixo, todas as notificações de operações de atualização incluem um campo FullDocument que representa a versão atual do documento afetado pela operação de atualização.

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe FullDocument.UPDATE_LOOKUP para o ChangeStreamFlow.fullDocument() método.

No exemplo abaixo, todas as notificações de operações de atualização incluem um campo FullDocument que representa a versão atual do documento afetado pela operação de atualização.

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}

Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe full_document='updateLookup' para o método db.collection.watch().

No exemplo abaixo, todas as notificações de operações de atualização incluem um campo `full_document que representa a versão atual do documento afetado pela operação de atualização.

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()

Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe { fullDocument: 'updateLookup' } para o método db.collection.watch().

No exemplo abaixo, todas as notificações de operações de atualização incluem um campo fullDocument que representa a versão atual do documento afetado pela operação de atualização.

O exemplo a seguir usa stream para processar os eventos de mudança.

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', next => {
// process next document
});

Alternativamente, você também pode usar o iterador para processar os eventos de mudança:

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" para o método db.watch().

No exemplo abaixo, todas as notificações de operações de atualização incluem um campo fullDocument que representa a versão atual do documento afetado pela operação de atualização.

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe full_document='updateLookup' para o método db.collection.watch().

No exemplo abaixo, todas as notificações de operações de atualização incluem um campo full_document que representa a versão atual do documento afetado pela operação de atualização.

cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)

Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe full_document: 'updateLookup' para o método db.watch().

No exemplo abaixo, todas as notificações de operações de atualização incluem um campo full_document que representa a versão atual do documento afetado pela operação de atualização.

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe options: ChangeStreamOptions(fullDocument: .updateLookup) para o método watch().

let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}

Para retornar a versão mais atual acordada majoritariamente do documento atualizado, passe options: ChangeStreamOptions(fullDocument: .updateLookup) para o método watch().

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

Observação

Se houver uma ou mais operações confirmadas pela maioria que modificaram o documento atualizado após a operação de atualização, mas antes da pesquisa, o documento completo retornado pode diferir significativamente do documento no momento da operação de atualização.

No entanto, os deltas incluídos no documento do change stream sempre descrevem corretamente as alterações da collection monitorada que se aplicaram a esse evento do change stream.

O campo fullDocument de um evento de atualização poderá estar ausente se uma das seguintes opções for verdadeira:

  • Se o documento for excluído ou se a coleção for descartada entre a atualização e a pesquisa.

  • Se a atualização alterar os valores de pelo menos um dos campos na chave de fragmento dessa coleção.

Consulte Mudar eventos para obter mais informações sobre o formato do documento de resposta do change stream.

Os change streams são retomáveis especificando um token de retomada para resumeAfter ou startAfter ao abrir o cursor.

Você pode retomar uma transmissão de alteração após um evento específico passando um token de currículo para resumeAfter ao abrir o cursor.

Consulte Tokens de resumo para saber mais.

Importante

  • O oplog deve ter histórico suficiente para localizar a operação associada ao token ou ao registro de data e hora, se o registro de data e hora estiver no passado.

  • Não é possível usar resumeAfter para retomar um change stream depois que um evento de invalidação (por exemplo, um descarte ou renomeação de coleção) fechar o change stream. Em vez disso, você pode usar startAfter para iniciar um novo change stream após um evento de invalidação.

No exemplo abaixo, a opção resumeAfter é anexada às opções de stream para recriar a stream após ter sido destruída. Passar o _id para o change stream tenta retomar as notificações a partir da operação especificada.

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
}

No exemplo abaixo, o resumeToken é obtido do último documento de fluxo de alterações e transmitido para o método Watch() como uma opção. Transmitir resumeToken para o método Watch() direciona o fluxo de alterações para tentar retomar as notificações iniciadas após a operação especificada no token de retomada.

var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();

Você pode usar ChangeStreamOptions.SetResumeAfter para especificar o token de retomada para o fluxo de alterações. Se a opção resumeAfter estiver configurada, o fluxo de alterações retomará as notificações após a operação especificada no token de retomada. O SetResumeAfter assume um valor que deve ser resolvido para um token de currículo, por exemplo resumeToken no exemplo abaixo.

resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)
ok = cs.Next(ctx)
result := cs.Current

Você pode usar o método resumeAfter() para retomar as notificações após a operação especificada no token de retomada. O método resumeAfter() recebe um valor que deve ser resolvido em um token de resumo, por exemplo resumeToken no exemplo abaixo.

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

Você pode usar o ChangeStreamFlow.resumeAfter() para retomar as notificações após a operação especificada no token de retomada. O método resumeAfter() obtém um valor que deve ser resolvido para um token de resumo, como a variável resumeToken no exemplo abaixo.

val resumeToken = BsonDocument()
val job = launch {
val changeStream = collection.watch()
.resumeAfter(resumeToken)
changeStream.collect {
println(it)
}
}

Use o modificador resume_after para retomar as notificações após a operação especificada no token de retomada. O modificador resume_after recebe um valor que deve ser resolvido em um token de resumo, como resume_token no exemplo abaixo.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

Você pode usar a opção resumeAfter para retomar notificações após a operação especificada no token de retomada. A opção resumeAfter assume um valor que deve ser resolvido para um token de currículo, por exemplo resumeToken no exemplo abaixo.

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream.on('change', next => {
processChange(next);
});
});

Você pode usar a opção resumeAfter para retomar notificações após a operação especificada no token de retomada. A opção resumeAfter assume um valor que deve ser resolvido para um token de currículo, por exemplo $resumeToken no exemplo abaixo.

$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$firstChange = $changeStream->current();

Use o modificador resume_after para retomar as notificações após a operação especificada no token de retomada. O modificador resume_after recebe um valor que deve ser resolvido em um token de resumo, como resume_token no exemplo abaixo.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)

Use o modificador resume_after para retomar as notificações após a operação especificada no token de retomada. O modificador resume_after recebe um valor que deve ser resolvido em um token de resumo, como resume_token no exemplo abaixo.

change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.next

Você pode usar a opção resumeAfter para retomar notificações após a operação especificada no token de retomada. A opção resumeAfter assume um valor que deve ser resolvido para um token de currículo, por exemplo resumeToken no exemplo abaixo.

let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}

Você pode usar a opção resumeAfter para retomar notificações após a operação especificada no token de retomada. A opção resumeAfter assume um valor que deve ser resolvido para um token de currículo, por exemplo resumeToken no exemplo abaixo.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()

Você pode iniciar um novo change stream após um evento específico passando um token de resumo para startAfter ao abrir o cursor. Ao contrário de resumeAfter, startAfter pode retomar notificações após um evento de invalidação criando um novo change stream.

Consulte Tokens de resumo para saber mais.

Importante

  • O oplog deve ter histórico suficiente para localizar a operação associada ao token ou ao registro de data e hora, se o registro de data e hora estiver no passado.

O token de resumo está disponível em várias fontes:

Fonte
Descrição
Cada notificação de change stream inclui um token de resumo no campo _id.

O estágio de aggregation $changeStream inclui um token de resumo no campo cursor.postBatchResumeToken.

Este campo aparece somente ao utilizar o comando aggregate.

O comando getMore inclui um token de currículo no campo cursor.postBatchResumeToken.

A partir do MongoDB 4.2, os change streams lançarão uma exceção se o pipeline de agregação do fluxo de alteração modificar o campo _id de um evento.

Dica

O MongoDB fornece um "snippet", uma extensão para mongosh, que decodifica tokens de currículo codificados por hex.

Você pode instalar e executar o trecho resumetoken do mongosh:

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

Você também pode executar resumetoken a partir da linha de comando (sem usar o mongosh) se o npm estiver instalado em seu sistema:

npx mongodb-resumetoken-decoder <RESUME TOKEN>

Consulte o seguinte para obter mais detalhes sobre:

As notificações de alteração de evento incluem um token de currículo no campo _id:

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

Ao utilizar o comando aggregate, o estágio de agregação do $changeStream inclui um token de currículo no campo cursor.postBatchResumeToken :

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

O comando getMore também inclui um token de resumo no campo cursor.postBatchResumeToken:

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

Os change streams podem beneficiar arquiteturas com sistemas comerciais dependentes, informando os sistemas downstream quando as mudanças nos dados forem duráveis. Por exemplo, os change streams podem economizar tempo dos desenvolvedores ao implementar serviços de extração, transformação e carga (ETL), sincronização entre plataformas, funcionalidade de colaboração e serviços de notificação.

Para implantações que impõem Autenticação em Implantações Autogerenciadas e autorização:

  • Para abrir um fluxo de alteração em relação a uma coleta específica, os aplicativos devem ter privilégios que concedam ações do changeStream e find na coleta correspondente.

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • Para abrir um change stream em um único banco de dados, os aplicativos devem ter privilégios que concedam ações do changeStream e find em todas as coleções não-system no banco de dados.

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • Para abrir um change stream em um sistema inteiro, os aplicativos devem ter privilégios que concedem ações do changeStream e find em todas as coleções não-system para todos os bancos de dados na sistema.

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

Os fluxos de alterações notificam apenas as alterações de dados que persistiram para a maioria dos membros portadores de dados no conjunto de réplicas. Isso garante que as notificações sejam acionadas somente por alterações confirmadas pela maioria que sejam duráveis em cenários de falha.

Por exemplo, considere um conjunto de réplicas de 3 membros com um cursor de change stream aberto em relação ao primary. Se um cliente emitir uma operação de inserção, o change stream só notificará o aplicativo da alteração de dados depois que essa inserção persistir para a maioria dos membros portadores de dados.

Se uma operação estiver associada a uma transação, o documento do evento de alteração incluirá o txnNumber e o lsid.

Os fluxos de alterações usam comparações binárias simple, a menos que uma coleção explícita seja fornecida.

A partir do MongoDB 5.3, durante a migração de intervalo, os eventos de fluxo de alterações não são gerados para atualizações de documentos órfãos.

A partir do MongoDB 6.0, você pode usar eventos change stream para produzir a versão de um documento antes e depois das alterações (pré e pós-imagens do documento):

  • A pré-imagem é o documento antes de ser substituído, atualizado ou excluído. Não há pré-imagem para um documento inserido.

  • A pós-imagem é o documento após ter sido inserido, substituído ou atualizado. Não há pós-imagem para um documento excluído.

  • Habilite changeStreamPreAndPostImages para uma coleção usando db.createCollection(), create ou collMod.

As imagens pré e pós não estarão disponíveis para um change stream se as imagens forem:

  • Não habilitadas na coleção no momento de uma operação de atualização ou exclusão de documento.

  • Removido após o tempo de retenção pré e pós-imagem definido em expireAfterSeconds.

    • O exemplo a seguir define expireAfterSeconds para 100 segundos em um cluster inteiro:

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )
    • O exemplo a seguir retorna as configurações atuais do changeStreamOptions, incluindo expireAfterSeconds:

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • Definir expireAfterSeconds para off usa a política de retenção: pré-imagens e pós-imagens são retidas até os eventos de change streams serem removidos do oplog.

    • Se um change stream for removido do oplog, as imagens pré e pós correspondentes também serão excluídas, independentemente do tempo de retenção pré e pós-imagem expireAfterSeconds .

Considerações adicionais:

  • Habilitar pré e pós-imagens consome espaço de armazenamento e adiciona tempo de processamento. Ative as imagens anteriores e posteriores somente se precisar delas.

  • Limite o tamanho do evento de transmissão da alteração para menos de 16 megabytes. Para limitar o tamanho do evento, você pode:

    • Limite o tamanho do documento a 8 megabytes. Você pode solicitar imagens pré e pós simultaneamente na saída do change stream se outros campos de evento de change stream, como updateDescription não forem grandes.

    • Solicite apenas pós-imagens na saída de fluxo de alteração para documentos de até 16 megabytes se outros campos de evento de fluxo de alteração como updateDescription não forem grandes.

    • Solicite somente pré-imagens na saída do change stream para documentos de até 16 megabytes se:

      • as atualizações do documento afetam apenas uma pequena fração da estrutura ou conteúdo do documento, e

      • não causa um evento de alteração replace . Um evento replace sempre inclui o pós-imagem.

  • Para solicitar uma pré-imagem, defina fullDocumentBeforeChange como required ou whenAvailable em db.collection.watch(). Para solicitar uma pós-imagem, defina fullDocument usando o mesmo método.

  • As pré-imagens são gravadas na coleção config.system.preimages.

    • A coleção config.system.preimages pode ficar grande. Para limitar o tamanho da coleção, você pode definir expireAfterSeconds tempo para as pré-imagens, conforme mostrado anteriormente.

    • As pré-imagens são removidas de forma assíncrona por um processo de plano de fundo.

Importante

Funcionalidade incompatível com versões anteriores

A partir do MongoDB 6.0, se você estiver usando imagens anteriores e posteriores de documentos para change streams, deverá desabilitar changeStreamPreandPostImages para cada coleção usando o collMod comando antes de poder fazer o downgrade para uma versão anterior do MongoDB.

Dica

Veja também:

Para obter exemplos completos com a saída do fluxo de alterações, consulte Fluxos de alterações com imagens pré e pós-documento.

Voltar

Limitações