Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$merge

On this page

  • Definition
  • Syntax
  • Behavior
  • Limitations
  • Dynamic Expressions
  • Saving Data from Kafka Topics
  • Examples

The $merge stage specifies a connection in the Connection Registry to write messages to. The connection must be an Atlas connection.

A $merge pipeline stage has the following prototype form:

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
…,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge",
"whenNotMatched": "insert | discard",
"parallelism": <integer>
}
}

The Atlas Stream Processing version of $merge uses most of the same fields as the Atlas Data Federation version. Atlas Stream Processing also uses the following fields that are either unique to its implementation of $merge, or modified to suit it. To learn more about the fields shared with Atlas Data Federation $merge, see $merge syntax.

Field
Necessity
Description

into

Required

Simplified to reflect Atlas Stream Processing supporting $merge only into Atlas connections.

To learn more, see this description of the Atlas Data Federation $merge fields.

parallelism

Conditional

Number of threads to which to distribute write operations. Must be an integer value between 1 and 16. Higher parallelism values increase throughput. However, higher values also require the stream processor and the cluster to which it writes to use more computational resources.

If you use a dynamic expression value for into.coll or into.db, you cannot set this value greater than 1.

$merge must be the last stage of any pipeline it appears in. You can use only one $merge stage per pipeline.

The on field has special requirements for $merge against sharded collections. To learn more, see $merge syntax.

If you use a dynamic expression value for into.coll or into.db, you cannot set a parallelism value greater than 1.

You can use a dynamic expression as the value of the following fields:

  • into.db

  • into.coll

This enables your stream processor to write messages to different target Atlas collections on a message-by-message basis.

Example

You have a stream of transaction events that generates messages of the following form:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

To sort each of these into a distinct Atlas database and collection, you can write the following $merge stage:

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}
}

This $merge stage:

  • Writes the Very Important Industries message to a Atlas collection named VIP.subscription.

  • Writes the N. E. Buddy message to a Atlas collection named employee.requisition.

  • Writes the Khan Traktor message to a Atlas collection named contractor.billableHours.

You can only use dynamic expressions that evaluate to strings. For more information on dynamic expressions, see expression operators.

If you specify a database or collection with a dynamic expression, but Atlas Stream Processing cannot evaluate the expression for a given message, Atlas Stream Processing sends that message to the dead letter queue if configured and processes subsequent messages. If there is no dead letter queue configured, then Atlas Stream Processing skips the message completely and processes subsequent messages.

To save streaming data from multiple Apache Kafka Topics into collections in your Atlas cluster, use the $merge stage with the $source stage. The $source stage specifies the topics from which to read data. The $merge stage writes the data to the target collection.

Use the following syntax:

{
"$source": {
"connectionName": "<registered-kafka-connection>",
"topic": [ "<topic-name-1>", "<topic-name-2>", ... ]
}
},
{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
}
},
...
}

A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:

  1. The $source stage establishes a connection with the Apache Kafka broker collecting these reports in a topic named my_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it to ingestionTime.

  2. The $match stage excludes documents that have a dewPoint.value of less than or equal to 5.0 and passes along the documents with dewPoint.value greater than 5.0 to the next stage.

  3. The $merge stage writes the output to an Atlas collection named stream in the sample_weatherstream database. If no such database or collection exist, Atlas creates them.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

To view the documents in the resulting sample_weatherstream.stream collection, connect to your Atlas cluster and run the following command:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Note

The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.

Back

$emit