Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$merge

On this page

  • Definition
  • Syntax
  • Behavior

The $merge stage specifies a connection in the Connection Registry to write messages to. The connection must be an Atlas connection.

A $merge pipeline stage has the following prototype form:

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge",
"whenNotMatched": "insert | discard"
}
}

The Atlas Stream Processing version of $merge uses most of the same fields as the Atlas Data Federation version. However, because Atlas Stream Processing only supports merging into an Atlas connection, the syntax of into is simplified. For more information, see this description of the Atlas Data Federation $merge fields.

$merge must be the last stage of any pipeline it appears in. You can use only one $merge stage per pipeline.

The on field has special requirements for $merge against sharded collections. To learn more, see $merge syntax.

You can use a dynamic expression as the value of the following fields:

  • into.db

  • into.coll

This enables your stream processor to write messages to different target Atlas collections on a message-by-message basis.

Example

You have a stream of transaction events that generates messages of the following form:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

To sort each of these into a distinct Atlas database and collection, you can write the following $merge stage:

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}

This $merge stage:

  • Writes the Very Important Industries message to a Atlas collection named VIP.subscription.

  • Writes the N. E. Buddy message to a Atlas collection named employee.requisition.

  • Writes the Khan Traktor message to a Atlas collection named contractor.billableHours.

You can only use dynamic expressions that evaluate to strings. For more information on dynamic expressions, see expression operators.

If you specify a database or collection with a dynamic expression, but Atlas Stream Processing cannot evaluate the expression for a given message, Atlas Stream Processing sends that message to the dead letter queue if configured and processes subsequent messages. If there is no dead letter queue configured, then Atlas Stream Processing skips the message completely and processes subsequent messages.

← $emit