Working with Change Streams from Your Swift Application
Andrew Morgan4 min read • Published Jan 25, 2023 • Updated Jan 25, 2023
FULL APPLICATION
Rate this quickstart
My day job is to work with our biggest customers to help them get the best out of MongoDB when creating new applications or migrating existing ones. Their use cases often need side effects to be run whenever data changes — one of the most common requirements is to maintain an audit trail.
When customers are using MongoDB Atlas, it's a no-brainer to recommend Atlas Triggers. With triggers, you provide the code, and Atlas makes sure that it runs whenever the data you care about changes. There's no need to stand up an app server, and triggers are very efficient as they run alongside your data.
Unfortunately, there are still some workloads that customers aren't ready to move to the public cloud. For these applications, I recommend change streams. Change streams are the underlying mechanism used by Triggers and many other MongoDB technologies — Kafka Connector, Charts, Spark Connector, Atlas Search, anything that needs to react to data changes.
Using change streams is surprisingly easy. Ask the MongoDB Driver to open a change stream and it returns a database cursor. Listen to that cursor, and your application receives an event for every change in your collection.
This post shows you how to use change streams from a Swift application. The principles are exactly the same for other languages. You can find a lot more on change streams at Developer Center.
I recently started using the MongoDB Swift Driver for the first time. I decided to build a super-simple Mac desktop app that lets you browse your collections (which MongoDB Compass does a much better job of) and displays change stream events in real time (which Compass doesn't currently do).
Provide your connection-string and then browse your collections. Select the "Enable change streams" option to display change events in real time.
We need a variable to store the change stream (a database cursor)
1 private var changeStream: ChangeStream<ChangeStreamEvent<BSONDocument>>?
as well as one to store the latest change event received from the change stream (this will be used to update the UI):
1 private var latestChangeEvent: ChangeStreamEvent<BSONDocument>?
The
registerChangeStream
function is called whenever the user checks or unchecks the change stream option:1 func registerChangeStream() async { 2 // If the view already has an active change stream, close it down 3 if let changeStream = changeStream { 4 _ = changeStream.kill() 5 self.changeStream = nil 6 } 7 if enabledChangeStreams { 8 do { 9 let changeStreamOptions = ChangeStreamOptions(fullDocument: .updateLookup) 10 changeStream = try await collection?.watch(options: changeStreamOptions) 11 _ = changeStream?.forEach({ changeEvent in 12 withAnimation { 13 latestChangeEvent = changeEvent 14 showingChangeEvent = true 15 Task { 16 await loadDocs() 17 } 18 } 19 }) 20 } catch { 21 errorMessage = "Failed to register change stream: \(error.localizedDescription)" 22 } 23 } 24 }
The function specifies what data it wants to see by creating a
ChangeStreamOptions
structure — you can see the available options in the Swift driver docs. In this app, I specify that I want to receive the complete new document (in addition to the deltas) when a document is updated. Note that the full document is always included for insert and replace operations.The code then calls
watch
on the current collection. Note that you can also provide an aggregation pipeline as a parameter named pipeline
when calling watch
. That pipeline can filter and reshape the events your application receives.Once the asynchronous watch function completes, the
forEach
loop processes each change event as it's received.When the loop updates our
latestChangeEvent
variable, the change is automatically propagated to the ChangeEventView
:1 ChangeEventView(event: latestChangeEvent)
You can see all of the code to display the change event in
ChangeEventView.swift
. I'll show some highlights here.The view receives the change event from the enclosing view (
CollectionView
):1 let event: ChangeStreamEvent<BSONDocument>
The code looks at the
operationType
value in the event to determine what color to use for the window:1 var color: Color { 2 switch event.operationType { 3 case .insert: 4 return .green 5 case .update: 6 return .yellow 7 case .replace: 8 return .orange 9 case .delete: 10 return .red 11 default: 12 return .teal 13 } 14 }
documentKey
contains the _id
value for the document that was changed in the MongoDB collection:1 if let documentKey = event.documentKey { 2 ... 3 Text(documentKey.toPrettyJSONString()) 4 ... 5 } 6 }
If the database operation was an update, then the delta is stored in
updateDescription
:1 if let updateDescription = event.updateDescription { 2 ... 3 Text(updateDescription.updatedFields.toPrettyJSONString()) 4 ... 5 } 6 }
The complete document after the change was applied in MongoDB is stored in
fullDocument
:1 if let fullDocument = event.fullDocument { 2 ... 3 Text(fullDocument.toPrettyJSONString()) 4 ... 5 } 6 }
If the processing of the change events is a critical process, then you need to handle events such as your process crashing.
The
_id.resumeToken
in the ChangeStreamEvent
is a token that can be used when starting the process to continue from where you left off. Simply provide this token to the resumeAfter
or startAfter
options when opening the change stream. Note that this assumes that the events you've missed haven't rotated out of the Oplog.Use Change Streams (or Atlas triggers, if you're able) to simplify your code base by decoupling the handling of side-effects from each place in your code that changes data.
After reading this post, you've hopefully realized just how simple it is to create applications that react to data changes using MongoDB Change Streams. Questions? Comments? Head over to our Developer Community to continue the conversation!