Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$emit

On this page

  • Definition
  • Syntax
  • Apache Kafka Broker
  • Atlas Time Series Collection
  • Behavior

The $emit stage specifies a connection in the Connection Registry to emit messages to. The connection must be either an Apache Kafka broker or a time series collection.

To write processed data to an Apache Kafka broker, use the $emit pipeline stage with the following prototype form:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic" : "<target-topic>" | <expression>,
"config": {
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

The $emit stage takes a document with the following fields:

Field
Type
Necessity
Description
connectionName
string
Required
Name, as it appears in the Connection Registry, of the connection to ingest data from.
topic
string | expression
Required
Name of the Apache Kafka topic to emit messages to.
config
document
Optional
Document containing fields that override various default values.
config.key
object | string
Optional

Expression that evaluates to a Apache Kafka message key.

If you specify config.key, you must specify config.keyFormat.

config.keyFormat
string
Conditional

Data type used to deserialize Apache Kafka key data. Must be one of the following values:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Defaults to binData. If you specify config.key, you must specify config.keyFormat. If the config.key of a document does not deserialize successfully to the specified data type, Atlas Stream Processing sends it to your dead letter queue.

config.outputFormat
string
Optional

JSON format to use when emitting messages to Apache Kafka. Must be one of the following values:

  • "relaxedJson"

  • "canonicalJson"

Defaults to "relaxedJson".

To write processed data to an Atlas time series collection, use the $emit pipeline stage with the following prototype form:

{
"$emit": {
"connectionName": "<registered-connection>",
"db" : "<target-db>",
"coll" : "<target-coll>",
"timeseries" : {
<options>
}
}
}

The $emit stage takes a document with the following fields:

Field
Type
Necessity
Description
connectionName
string
Required
Name, as it appears in the Connection Registry, of the connection to ingest data from.
db
string
Required
Name of the Atlas database that contains the target time series collection.
coll
string
Required
Name of the Atlas time series collection to write to.
timeseries
document
Required
Document defining the time series fields for the collection.

Note

The maximum size for documents within a time series collection is 4 MB. To learn more, see Time Series Collection Limitations.

$emit must be the last stage of any pipeline it appears in. You can use only one $emit stage per pipeline.

You can only write to a single Atlas time series collection per stream processor. If you specify a collection that doesn't exist, Atlas creates the collection with the time series fields you specified. You must specify an existing database.

You can use a dynamic expression as the value of the topic field to enable your stream processor to write to different target Apache Kafka topics on a message-by-message basis.

Example

You have a stream of transaction events that generates messages of the following form:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

To sort each of these into a distinct Apache Kafka topic, you can write the following $emit stage:

$emit: {
connectionName: "kafka1",
topic: "$customerStatus"
}

This $emit stage:

  • Writes the Very Important Industries message to a topic named VIP.

  • Writes the N. E. Buddy message to a topic named employee.

  • Writes the Khan Traktor message to a topic named contractor.

You can use only dynamic expressions that evaluate to strings. For more information on dynamic expressions, see expression operators.

If you specify a topic that doesn't already exist, Apache Kafka automatically creates the topic when it receives the first message that targets it.

If you specify a topic with a dynamic expression, but Atlas Stream Processing cannot evaluate the expression for a given message, Atlas Stream Processing sends that message to the dead letter queue if configured and processes subsequent messages. If there is no dead letter queue configured, then Atlas Stream Processing skips the message completely and processes subsequent messages.

← $tumblingWindow
$merge →