Voyage AI joins MongoDB to power more accurate and trustworthy AI applications on Atlas.

Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Reagindo às mudanças do banco de dados com o MongoDB Change Streams e Go

Nic Raboy5 min read • Published Feb 01, 2022 • Updated Feb 03, 2023
Facebook Icontwitter iconlinkedin icon
Classifique este início rápido
star-empty
star-empty
star-empty
star-empty
star-empty
QuickStart Golang Logo
Se você tem acompanhado a minha série de tutoriais de introdução ao Go e ao MongoDB, deve se lembrar que já fizemos muitas coisas até agora. Demos uma olhada em tudo, desde a interação do CRUD com o banco de dados até a modelagem de dados e muito mais. Para acompanhar tudo o que fizemos, você pode dar uma olhada nos seguintes tutoriais da série:
In this tutorial we're going to explore change streams in MongoDB and how they might be useful, all with the Go programming language (Golang).
Antes de dar uma olhada no código, vamos dar um passo para trás e entender o que são change streams e por que muitas vezes são necessários.
Imagine este cenário, um dos muitos possíveis:
Você tem um aplicativo que envolve clientes de internet das coisas (IoT). Digamos que esse seja um aplicativo de cerca geográfica e que os clientes de IoT sejam algo que possa trigger a cerca geográfica à medida que entram e saem do alcance. Em vez de fazer com que seu aplicativo execute consultas constantemente para ver se os clientes estão ao alcance, não faria mais sentido observar em tempo real e React quando isso acontece?
Com os MongoDB change stream, você pode criar um pipeline para observar as alterações em um nível de collection, banco de dados ou sistema e escrever lógica em seu aplicativo para fazer algo à medida que os dados chegam com base no pipeline.

Criando um change stream do MongoDB em tempo real com Golang

Embora haja muitos casos de uso possíveis para change streams, continuaremos com o exemplo que estamos usando em todo o escopo desta série de primeiros passos. Vamos continuar trabalhando com dados de programas e episódios de podcast.
Vamos supor que temos o seguinte código para começar:
1package main
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "sync"
8
9 "go.mongodb.org/mongo-driver/bson"
10 "go.mongodb.org/mongo-driver/mongo"
11 "go.mongodb.org/mongo-driver/mongo/options"
12)
13
14func main() {
15 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("ATLAS_URI")))
16 if err != nil {
17 panic(err)
18 }
19 defer client.Disconnect(context.TODO())
20
21 database := client.Database("quickstart")
22 episodesCollection := database.Collection("episodes")
23}
The above code is a very basic connection to a MongoDB cluster, something that we explored in the Como se conectar ao seu cluster MongoDB com Go, tutorial.
Para observar as mudanças, podemos fazer algo como o seguinte:
1episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{})
2if err != nil {
3 panic(err)
4}
The above code will watch for any and all changes to documents within the episodes collection. The result is a cursor that we can iterate over indefinitely for data as it comes in.
Podemos iterar no cursor e dar sentido aos nossos dados usando o código a seguir:
1episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{})
2if err != nil {
3 panic(err)
4}
5
6defer episodesStream.Close(context.TODO())
7
8for episodesStream.Next(context.TODO()) {
9 var data bson.M
10 if err := episodesStream.Decode(&data); err != nil {
11 panic(err)
12 }
13 fmt.Printf("%v\n", data)
14}
Se os dados chegassem, eles poderiam se parecer com o seguinte:
1map[_id:map[_data:825E4EFCB9000000012B022C0100296E5A1004D960EAE47DBE4DC8AC61034AE145240146645F696400645E3B38511C9D4400004117E80004] clusterTime:{1582234809 1} documentKey:map[_id:ObjectID("5e3b38511c9d
24400004117e8")] fullDocument:map[_id:ObjectID("5e3b38511c9d4400004117e8") description:The second episode duration:30 podcast:ObjectID("5e3b37e51c9d4400004117e6") title:Episode #3] ns:map[coll:episodes
3db:quickstart] operationType:replace]
In the above example, I've done a Replace on a particular document in the collection. In addition to information about the data, I also receive the full document that includes the change. The results will vary depending on the operationType that takes place.
While the code that we used would work fine, it is currently a blocking operation. If we wanted to watch for changes and continue to do other things, we'd want to use a goroutine for iterating over our change stream cursor.
Poderíamos fazer algumas alterações como esta:
1package main
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "sync"
8
9 "go.mongodb.org/mongo-driver/bson"
10 "go.mongodb.org/mongo-driver/mongo"
11 "go.mongodb.org/mongo-driver/mongo/options"
12)
13
14func iterateChangeStream(routineCtx context.Context, waitGroup sync.WaitGroup, stream *mongo.ChangeStream) {
15 defer stream.Close(routineCtx)
16 defer waitGroup.Done()
17 for stream.Next(routineCtx) {
18 var data bson.M
19 if err := stream.Decode(&data); err != nil {
20 panic(err)
21 }
22 fmt.Printf("%v\n", data)
23 }
24}
25
26func main() {
27 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("ATLAS_URI")))
28 if err != nil {
29 panic(err)
30 }
31 defer client.Disconnect(context.TODO())
32
33 database := client.Database("quickstart")
34 episodesCollection := database.Collection("episodes")
35
36 var waitGroup sync.WaitGroup
37
38 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{})
39 if err != nil {
40 panic(err)
41 }
42 waitGroup.Add(1)
43 routineCtx, cancelFn := context.WithCancel(context.Background())
44 go iterateChangeStream(routineCtx, waitGroup, episodesStream)
45
46 waitGroup.Wait()
47}
A few things are happening in the above code. We've moved the stream iteration into a separate function to be used in a goroutine. However, running the application would result in it terminating quite quickly because the main function will terminate not too longer after creating the goroutine. To resolve this, we are making use of a WaitGroup. In our example, the main function will wait until the WaitGroup is empty and the WaitGroup only becomes empty when the goroutine terminates.
Making use of the WaitGroup isn't an absolute requirement as there are other ways to keep the application running while watching for changes. However, given the simplicity of this example, it made sense in order to see any changes in the stream.
To keep the iterateChangeStream function from running indefinitely, we are creating and passing a context that can be canceled. While we don't demonstrate canceling the function, at least we know it can be done.

Complicando o Change Stream com o Pipeline de Agregação

No exemplo anterior, o pipeline de agregação que usamos era o mais básico possível. Em outras palavras, estávamos procurando por toda e qualquer mudança que estivesse acontecendo em nossa coleção específica. Embora isso possa ser bom em muitos cenários, você provavelmente aproveitará mais o uso de um pipeline de agregação mais bem definido.
Veja o exemplo a seguir:
1matchPipeline := bson.D{
2 {
3 "$match", bson.D{
4 {"operationType", "insert"},
5 {"fullDocument.duration", bson.D{
6 {"$gt", 30},
7 }},
8 },
9 },
10}
11
12episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{matchPipeline})
In the above example, we're still watching for changes to the episodes collection. However, this time we're only watching for new documents that have a duration field greater than 30. Any other insert or other change stream operation won't be detected.
Os resultados do código acima, quando uma correspondência é encontrada, podem ter a seguinte aparência:
1map[_id:map[_data:825E4F03CF000000012B022C0100296E5A1004D960EAE47DBE4DC8AC61034AE145240146645F696400645E4F03A01C9D44000063CCBD0004] clusterTime:{1582236623 1} documentKey:map[_id:ObjectID("5e4f03a01c9d
244000063ccbd")] fullDocument:map[_id:ObjectID("5e4f03a01c9d44000063ccbd") description:a quick start into mongodb duration:35 podcast:1234 title:getting started with mongodb] ns:map[coll:episodes db:qui
3ckstart] operationType:insert]
With change streams, you'll have access to a subset of the MongoDB aggregation pipeline and its operators. You can learn more about what's available in the official documentation.

Conclusão

Você acabou de ver como usar o change stream do MongoDB em um aplicativo Go usando o driver MongoDB Go. Como apontado anteriormente, o change stream facilita o React a alterações no banco de dados, na collection e no sistema sem precisar consultar constantemente o cluster. Isso permite que você planeje com eficiência os pipelines de agregação para responder à medida que acontecem em tempo real.
Se você deseja acompanhar os outros tutoriais da série de início rápido do MongoDB com Go, você os encontra abaixo:
Para encerrar a série, o próximo tutorial se concentrará em transações com o driver Go do MongoDB.

Facebook Icontwitter iconlinkedin icon
Classifique este início rápido
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

MongoDB Atlas Authentication Using Service Accounts (OAuth)


Jan 23, 2025 | 9 min read
Tutorial

Como construir um aplicativo web Go com Gi, MongoDB e AI


Aug 30, 2024 | 11 min read
Tutorial

Vá para o MongoDB usando conectores Kafka - Guia final do agente


Sep 17, 2024 | 7 min read
Tutorial

Noções básicas de HTTP com Go 1.22


Apr 23, 2024 | 7 min read