Reagindo às mudanças do banco de dados com o MongoDB Change Streams e Go
Classifique este início rápido

Se você tem acompanhado a minha série de tutoriais de introdução ao Go e ao MongoDB, deve se lembrar que já fizemos muitas coisas até agora. Demos uma olhada em tudo, desde a interação do CRUD com o banco de dados até a modelagem de dados e muito mais. Para acompanhar tudo o que fizemos, você pode dar uma olhada nos seguintes tutoriais da série:
In this tutorial we're going to explore change streams in MongoDB and how they might be useful, all with the Go programming language (Golang).
Antes de dar uma olhada no código, vamos dar um passo para trás e entender o que são change streams e por que muitas vezes são necessários.
Imagine este cenário, um dos muitos possíveis:
Você tem um aplicativo que envolve clientes de internet das coisas (IoT). Digamos que esse seja um aplicativo de cerca geográfica e que os clientes de IoT sejam algo que possa trigger a cerca geográfica à medida que entram e saem do alcance. Em vez de fazer com que seu aplicativo execute consultas constantemente para ver se os clientes estão ao alcance, não faria mais sentido observar em tempo real e React quando isso acontece?
Com os MongoDB change stream, você pode criar um pipeline para observar as alterações em um nível de collection, banco de dados ou sistema e escrever lógica em seu aplicativo para fazer algo à medida que os dados chegam com base no pipeline.
Embora haja muitos casos de uso possíveis para change streams, continuaremos com o exemplo que estamos usando em todo o escopo desta série de primeiros passos. Vamos continuar trabalhando com dados de programas e episódios de podcast.
Vamos supor que temos o seguinte código para começar:
1 package main 2 3 import ( 4 "context" 5 "fmt" 6 "os" 7 "sync" 8 9 "go.mongodb.org/mongo-driver/bson" 10 "go.mongodb.org/mongo-driver/mongo" 11 "go.mongodb.org/mongo-driver/mongo/options" 12 ) 13 14 func main() { 15 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("ATLAS_URI"))) 16 if err != nil { 17 panic(err) 18 } 19 defer client.Disconnect(context.TODO()) 20 21 database := client.Database("quickstart") 22 episodesCollection := database.Collection("episodes") 23 }
The above code is a very basic connection to a MongoDB cluster, something that we explored in the Como se conectar ao seu cluster MongoDB com Go, tutorial.
Para observar as mudanças, podemos fazer algo como o seguinte:
1 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{}) 2 if err != nil { 3 panic(err) 4 }
The above code will watch for any and all changes to documents within the
episodes
collection. The result is a cursor that we can iterate over indefinitely for data as it comes in.Podemos iterar no cursor e dar sentido aos nossos dados usando o código a seguir:
1 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{}) 2 if err != nil { 3 panic(err) 4 } 5 6 defer episodesStream.Close(context.TODO()) 7 8 for episodesStream.Next(context.TODO()) { 9 var data bson.M 10 if err := episodesStream.Decode(&data); err != nil { 11 panic(err) 12 } 13 fmt.Printf("%v\n", data) 14 }
Se os dados chegassem, eles poderiam se parecer com o seguinte:
1 map[_id:map[_data:825E4EFCB9000000012B022C0100296E5A1004D960EAE47DBE4DC8AC61034AE145240146645F696400645E3B38511C9D4400004117E80004] clusterTime:{1582234809 1} documentKey:map[_id:ObjectID("5e3b38511c9d 2 4400004117e8")] fullDocument:map[_id:ObjectID("5e3b38511c9d4400004117e8") description:The second episode duration:30 podcast:ObjectID("5e3b37e51c9d4400004117e6") title:Episode #3] ns:map[coll:episodes 3 db:quickstart] operationType:replace]
In the above example, I've done a
Replace
on a particular document in the collection. In addition to information about the data, I also receive the full document that includes the change. The results will vary depending on the operationType
that takes place.While the code that we used would work fine, it is currently a blocking operation. If we wanted to watch for changes and continue to do other things, we'd want to use a goroutine for iterating over our change stream cursor.
Poderíamos fazer algumas alterações como esta:
1 package main 2 3 import ( 4 "context" 5 "fmt" 6 "os" 7 "sync" 8 9 "go.mongodb.org/mongo-driver/bson" 10 "go.mongodb.org/mongo-driver/mongo" 11 "go.mongodb.org/mongo-driver/mongo/options" 12 ) 13 14 func iterateChangeStream(routineCtx context.Context, waitGroup sync.WaitGroup, stream *mongo.ChangeStream) { 15 defer stream.Close(routineCtx) 16 defer waitGroup.Done() 17 for stream.Next(routineCtx) { 18 var data bson.M 19 if err := stream.Decode(&data); err != nil { 20 panic(err) 21 } 22 fmt.Printf("%v\n", data) 23 } 24 } 25 26 func main() { 27 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("ATLAS_URI"))) 28 if err != nil { 29 panic(err) 30 } 31 defer client.Disconnect(context.TODO()) 32 33 database := client.Database("quickstart") 34 episodesCollection := database.Collection("episodes") 35 36 var waitGroup sync.WaitGroup 37 38 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{}) 39 if err != nil { 40 panic(err) 41 } 42 waitGroup.Add(1) 43 routineCtx, cancelFn := context.WithCancel(context.Background()) 44 go iterateChangeStream(routineCtx, waitGroup, episodesStream) 45 46 waitGroup.Wait() 47 }
A few things are happening in the above code. We've moved the stream iteration into a separate function to be used in a goroutine. However, running the application would result in it terminating quite quickly because the
main
function will terminate not too longer after creating the goroutine. To resolve this, we are making use of a WaitGroup
. In our example, the main
function will wait until the WaitGroup
is empty and the WaitGroup
only becomes empty when the goroutine terminates.Making use of the
WaitGroup
isn't an absolute requirement as there are other ways to keep the application running while watching for changes. However, given the simplicity of this example, it made sense in order to see any changes in the stream.To keep the
iterateChangeStream
function from running indefinitely, we are creating and passing a context that can be canceled. While we don't demonstrate canceling the function, at least we know it can be done.No exemplo anterior, o pipeline de agregação que usamos era o mais básico possível. Em outras palavras, estávamos procurando por toda e qualquer mudança que estivesse acontecendo em nossa coleção específica. Embora isso possa ser bom em muitos cenários, você provavelmente aproveitará mais o uso de um pipeline de agregação mais bem definido.
Veja o exemplo a seguir:
1 matchPipeline := bson.D{ 2 { 3 "$match", bson.D{ 4 {"operationType", "insert"}, 5 {"fullDocument.duration", bson.D{ 6 {"$gt", 30}, 7 }}, 8 }, 9 }, 10 } 11 12 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{matchPipeline})
In the above example, we're still watching for changes to the
episodes
collection. However, this time we're only watching for new documents that have a duration
field greater than 30. Any other insert or other change stream operation won't be detected.Os resultados do código acima, quando uma correspondência é encontrada, podem ter a seguinte aparência:
1 map[_id:map[_data:825E4F03CF000000012B022C0100296E5A1004D960EAE47DBE4DC8AC61034AE145240146645F696400645E4F03A01C9D44000063CCBD0004] clusterTime:{1582236623 1} documentKey:map[_id:ObjectID("5e4f03a01c9d 2 44000063ccbd")] fullDocument:map[_id:ObjectID("5e4f03a01c9d44000063ccbd") description:a quick start into mongodb duration:35 podcast:1234 title:getting started with mongodb] ns:map[coll:episodes db:qui 3 ckstart] operationType:insert]
With change streams, you'll have access to a subset of the MongoDB aggregation pipeline and its operators. You can learn more about what's available in the official documentation.
Você acabou de ver como usar o change stream do MongoDB em um aplicativo Go usando o driver MongoDB Go. Como apontado anteriormente, o change stream facilita o React a alterações no banco de dados, na collection e no sistema sem precisar consultar constantemente o cluster. Isso permite que você planeje com eficiência os pipelines de agregação para responder à medida que acontecem em tempo real.
Se você deseja acompanhar os outros tutoriais da série de início rápido do MongoDB com Go, você os encontra abaixo:
Para encerrar a série, o próximo tutorial se concentrará em transações com o driver Go do MongoDB.