Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

MongoDB Developer
Atlas
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Atlaschevron-right

Introduction to Atlas Stream Processing Development

Hubert Nguyen7 min read • Published Aug 28, 2024 • Updated Aug 28, 2024
Stream ProcessingAtlas
Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
Welcome to this MongoDB Stream Processing tutorial! In this guide, we will quickly set up a coding workflow and have you write and run your first Stream Processing Instance in no time. In a very short time, we'll learn how to create a new stream processor instance, conveniently code and execute stream processors from Visual Studio Code, and simply aggregate stream data, thus opening the door to a whole new field of the MongoDB Atlas developer data platform.

What we'll cover

  • Prerequisites
  • Setup
  • Create a stream processor instance
  • Set up Visual Studio Code
  • The anatomy of a stream processor
  • Let's execute a stream processor!
  • Hard-coded data in $source declaration
  • Simplest stream processor
  • Stream processing aggregation
  • Add time stamps to the data

Prerequisites

Setup

Create an Atlas stream processing instance

We need to have an Atlas Stream Processing Instance (SPI) ready. Follow the steps in the tutorial Get Started with Atlas Stream Processing: Creating Your First Stream Processor until we have our connection string and username/password, then come back here.
Don't forget to add your IP address to the Atlas Network Access to allow the client to access the instance.

Set up Visual Studio Code for MongoDB Atlas Stream Processing

Thanks to the MongoDB for VS Code extension, we can rapidly develop stream processing (SP) aggregation pipelines and run them directly from inside a VS Code MongoDB playground. This provides a much better developer experience. In the rest of this article, we'll be using VS Code.
Such a playground is a NodeJS environment where we can execute JS code interacting with a live stream processor on MongoDB Atlas. To get started, install VS Code and the MongoDB for VS Code extension.
Below is a great tutorial about installing the extension. It also lists some shell commands we'll need later.
connecting an Atlas Stream Processor in Visual Studio Code

The anatomy of a stream processor 

Within a stream processing instance (=cluster), it is possible to create and run multiple stream processors (=stream processing pipelines) simultaneously.
A single stream processor (SP) is very similar to a MongoDB aggregation pipeline. It is described by an array of processing stages. However, there are some differences. The most basic SP can be created using only its data source (we'll have executable examples next).
1// our array of stages
2// source is defined earlier
3sp_aggregation_pipeline = [source]
4sp.createStreamProcessor("SP_NAME", sp_aggregation_pipeline, <OPTIONS>)
A more realistic stream processor would contain at least one aggregation stage, and there can be a large number of stages performing various operations to the incoming data stream. There's a generous limit of 16MB for the total processor size.
1sp_aggregation_pipeline = [source, stage_1, stage_2...]
2sp.createStreamProcessor("SP_NAME", sp_aggregation_pipeline, <OPTIONS>)
To increase the development loop velocity, there's an sp.process() function which starts an ephemeral stream processor that won't persist in your stream processing instance.

Let's execute a stream processor!

Let's create basic stream processors and build our way up. First, we need to have some data! Atlas Stream Processing supports several data sources for incoming streaming events. These sources include:
  • Hard-coded data declaration in $source.
  • Kafka streams.
  • MongoDB Atlas databases.

Hard-coded data in $source declaration

For quick testing or self-contained examples, having a small set of hard-coded data is a very convenient way to produce events. We can declare an array of events. Here's an extremely simple example, and note that we'll make some tweaks later to cover different use cases.

Simplest stream processor

In VS Code, we run an ephemeral stream processor with sp.process(). This way, we don't have to use sp.createStreamProcessor() and sp.
.drop() constantly as we would for SPs meant to be saved permanently in the instance.
1src_hard_coded = {
2  $source: {
3    // our hard-coded dataset
4    documents: [
5      {'id': 'entity_1', 'value': 1},
6      {'id': 'entity_1', 'value': 3},
7      {'id': 'entity_2', 'value': 7},
8      {'id': 'entity_1', 'value': 4},
9      {'id': 'entity_2', 'value': 1}
10     ]
11    }
12  }
13sp.process( [src_hard_coded] );
Upon running this playground, we should see data coming out in the VS Code "OUTPUT" tab (CTRL+SHIFT+U to make it appear)
Note: It can take a few seconds for the SP to be uploaded and executed, so don't expect an immediate output.
1{
2  id: 'entity_1',
3  value: 1,
4  _ts: 2024-02-14T18:52:33.704Z,
5  _stream_meta: { timestamp: 2024-02-14T18:52:33.704Z }
6}
7{
8  id: 'entity_1',
9  value: 3,
10  _ts: 2024-02-14T18:52:33.704Z,
11  _stream_meta: { timestamp: 2024-02-14T18:52:33.704Z }
12}
13...
This simple SP can be used to ensure that data is coming into the SP and there are no problems upstream with our source. Timestamps data was generated at ingestion time.

Stream processing aggregation

Building on what we have, adding a simple aggregation pipeline to our SP is easy. Below, we're adding a $group stage to aggregate/accumulate incoming messages' "value" field into an array for the requested interval.
Note that the "w" stage (w stands for "Window") of the SP pipeline contains an aggregation pipeline inside. With Stream Processing, we have aggregation pipelines in the stream processing pipeline.
This stage features a $tumblingWindow which defines the time length the aggregation will be running against. Remember that streams are supposed to be continuous, so a window is similar to a buffer.
interval defines the time length of a window. Since the window is a continuous data stream, we can only aggregate on a slice at a time.
idleTimeout defines how long the $source can remain idle before closing the window. This is useful if the stream is not sustained.
1src_hard_coded = {
2  $source: {
3    // our hard-coded dataset
4    documents: [
5      {'id': 'entity_1', 'value': 1},
6      {'id': 'entity_1', 'value': 3},
7      {'id': 'entity_2', 'value': 7},
8      {'id': 'entity_1', 'value': 4},
9      {'id': 'entity_2', 'value': 1}
10     ]
11    }
12  }
13
14w = {
15  $tumblingWindow: {
16    // This is the slice of time we want to look at every iteration
17    interval: {size: NumberInt(2), unit: "second"},
18    // If no additional data is coming in, idleTimeout defines when the window is forced to close
19    idleTimeout : {size: NumberInt(2), unit: "second"},
20    "pipeline": [
21      {
22        '$group': {
23            '_id': '$id',
24            'values': { '$push': "$value" }
25          }
26        }
27      ]
28    }
29  }
30sp_pipeline = [src_hard_coded, w];
31sp.process( sp_pipeline );
Let it run for a few seconds, and we should get an output similar to the following. $group will create one document per incoming "id" field and aggregate the relevant values into a new array field, "values."
1{
2  _id: 'entity_2',
3  values: [ 7, 1 ],
4  _stream_meta: {
5    windowStartTimestamp: 2024-02-14T19:29:46.000Z,
6    windowEndTimestamp: 2024-02-14T19:29:48.000Z
7  }
8}
9{
10  _id: 'entity_1',
11  values: [ 1, 3, 4 ],
12  _stream_meta: {
13    windowStartTimestamp: 2024-02-14T19:29:46.000Z,
14    windowEndTimestamp: 2024-02-14T19:29:48.000Z
15  }
16}
Depending on the $tumblingWindow settings, the aggregation will output several documents that match the timestamps. For example, these settings...
1...
2$tumblingWindow: {
3    interval: {size: NumberInt(10), unit: "second"},
4    idleTimeout : {size: NumberInt(10), unit: "second"},
5...
...will yield the following aggregation output:
1{
2  _id: 'entity_1',
3  values: [ 1 ],
4  _stream_meta: {
5    windowStartTimestamp: 2024-02-13T14:51:30.000Z,
6    windowEndTimestamp: 2024-02-13T14:51:40.000Z
7  }
8}
9{
10  _id: 'entity_1',
11  values: [ 3, 4 ],
12  _stream_meta: {
13    windowStartTimestamp: 2024-02-13T14:51:40.000Z,
14    windowEndTimestamp: 2024-02-13T14:51:50.000Z
15  }
16}
17{
18  _id: 'entity_2',
19  values: [ 7, 1 ],
20  _stream_meta: {
21    windowStartTimestamp: 2024-02-13T14:51:40.000Z,
22    windowEndTimestamp: 2024-02-13T14:51:50.000Z
23  }
24}
See how the windowStartTimestamp and windowEndTimestamp fields show the 10-second intervals as requested (14:51:30 to 14:51:40 etc.).

Additional learning resources: building aggregations

Atlas Stream Processing uses the MongoDB Query API. You can learn more about the MongoDB Query API with the official Query API documentation, [free] interactive course, and tutorial.
Important: Stream Processing aggregation pipelines do not support all database aggregation operations and have additional operators specific to streaming, like $tumblingWindow. Check the official Stream Processing aggregation documentation.

Add timestamps to the data

Even when we hard-code data, there's an opportunity to provide a timestamp in case we want to perform $sort operations and better mimic a real use case. This would be the equivalent of an event-time timestamp embedded in the message.
There are many other types of timestamps if we use a live Kafka stream (producer-assigned, server-side, ingestion-time, and more). Add a timestamp to our messages and use the document's  "timeField" property to make it the authoritative stream timestamp.
1src_hard_coded = {
2  $source: {
3    // define our event "timestamp_gps" as the _ts
4    timeField: { '$dateFromString': { dateString: '$timestamp_msg' } },
5    // our hard-coded dataset
6    documents: [
7      {'id': 'entity_1', 'value': 1, 'timestamp_msg': '2024-02-13T14:51:39.402336'},
8      {'id': 'entity_1', 'value': 3, 'timestamp_msg': '2024-02-13T14:51:41.402674'},
9      {'id': 'entity_2', 'value': 7, 'timestamp_msg': '2024-02-13T14:51:43.402933'},
10      {'id': 'entity_1', 'value': 4, 'timestamp_msg': '2024-02-13T14:51:45.403352'},
11      {'id': 'entity_2', 'value': 1, 'timestamp_msg': '2024-02-13T14:51:47.403752'}
12     ]
13    }
14  }
At this point, we have everything we need to test new pipelines and create proofs of concept in a convenient and self-contained form. In a subsequent article, we will demonstrate how to connect to various streaming sources.

Tip and tricks

At the time of publishing, Atlas Stream Processing is in public preview and there are a number of known Stream Processing limitations that you should be aware of, such as regional data center availability, connectivity with other Atlas projects, and user privileges.
When running an ephemeral stream processor via sp.process(), many errors (JSON serialization issue, late data, divide by zero, $validate errors) that might have gone to a dead letter queue (DLQ) are sent to the default output to help you debug.
For SPs created with sp.createStreamProcessor(), you'll have to configure your DLQ manually. Consult the documentation for this. On the "Manage Stream Processor" documentation page, search for "Define a DLQ."
After merging data into an Atlas database, it is possible to use existing pipeline aggregation building tools in the Atlas GUI's builder or MongoDB Compass to create and debug pipelines. Since these tools are meant for the core database API, remember that some operators are not supported by stream processors, and streaming features like windowing are not currently available.

Conclusion

With that, you should have everything you need to get your first stream processor up and running. In a future post, we will dive deeper into connecting to different sources of data for your stream processors.
If you have any questions, share them in our community forum, meet us during local MongoDB User Groups (MUGs), or come check out one of our MongoDB .local events.

References

Top Comments in Forums
There are no comments on this article yet.
Start the Conversation

Facebook Icontwitter iconlinkedin icon
Rate this quickstart
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

How to Optimize LLM Applications With Prompt Compression Using LLMLingua and LangChain


Jun 18, 2024 | 13 min read
Article

Slowly Changing Dimensions and Their Application in MongoDB


Dec 13, 2022 | 7 min read
Article

Build an E-commerce Search Using MongoDB Vector Search and OpenAI


Mar 12, 2024 | 12 min read
Tutorial

Maintaining a Geolocation Specific Game Leaderboard with Phaser and MongoDB


Apr 02, 2024 | 18 min read
Table of Contents