Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$source

On this page

  • Definition
  • Syntax
  • Apache Kafka Broker
  • MongoDB Collection Change Stream
  • MongoDB Database Change Stream
  • MongoDB Cluster-wide Change Stream Source
  • Document Array
  • Behavior
  • Examples
$source

The $source stage specifies a connection in the Connection Registry to stream data from. The following connection types are supported:

Note

You can't use Atlas serverless instances as a $source.

To operate on streaming data from an Apache Kafka broker, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Required

Label that identifies the connection in the Connection Registry, to ingest data from.

topic

string or array of strings

Required

Name of one or more Apache Kafka topics to stream messages from. If you want to stream messages from more than one topic, specify them in an array.

timeField

document

Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

tsFieldName

string

Optional

Name that overrides the name of the timestamp field projected by the $source.

The $source stage in an Atlas Stream Processing pipeline projects a field called _ts with the assigned timestamp of the document. Sources of streaming data might also use a field named _ts to store the timestamps of each message. To prevent a conflict between these fields, use tsFieldName to rename any source-provided field named _ts before additional processing takes place.

partitionIdleTimeout

document

Optional

Document specifying the amount of time that a partition is allowed to be idle before it is ignored in watermark calculations.

partitionIdleTimeout.size

integer

Optional

Number specifying the duration of the partition idle timeout.

partitionIdleTimeout.unit

string

Optional

Unit of time for the duration of the partition idle timeout.

The value of unit can be one of the following:

  • "ms" (millisecond)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

document

Optional

Document containing fields that override various default values.

config.auto_offset_reset

string

Optional

Specifies which event in the Apache Kafka source topic to begin ingestion with. auto_offset_reset takes the following values:

  • end, latest, or largest : to begin ingestion from the latest event in the topic at the time the aggregation is initialized.

  • earliest, beginning, or smallest : to begin ingestion from the earliest event in the topic.

Defaults to latest.

config.group_id

string

Optional

ID of the kafka consumer group to associate with the stream processor. If omitted, Atlas Stream Processing associates the stream processing instance with an auto-generated ID in the following format:

asp-${streamProcessorId}-consumer

Atlas Stream Processing commits partition offsets to the Apache Kafka broker for the specified consumer group ID after a checkpoint is committed. It commits an offset when messages up through that offset are durably recorded in a checkpoint. This allows you to track the offset lag and progress of the stream processor directly from the Kafka broker consumer group metadata.

config.keyFormat

string

Optional

Data type used to deserialize Apache Kafka key data. Must be one of the following values:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Defaults to binData.

config.keyFormatError

string

Optional

How to handle errors encountered when deserializing Apache Kafka key data. Must be one of the following values:

  • dlq, which writes the document to your Dead Letter Queue.

  • passThrough, which sends the document to the next stage without key data.

Note

Atlas Stream Processing requires that documents in the source data stream be valid json or ejson. Atlas Stream Processing sets the documents that don't meet this requirement to your dead letter queue if you have configured one.

To operate on streaming data from an Atlas collection change stream, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Conditional

Label that identifies the connection in the Connection Registry, to ingest data from.

timeField

document

Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

tsFieldName

string

Optional

Name that overrides the name of default timestamp fields declared by the source.

Atlas Stream Processing pipelines internally add a field to incoming messages called _ts to store timestamp information. Sources of streaming data might also use a field named _ts to store the timestamps of each message. To prevent a conflict between these fields, use tsFieldName to rename any source-provided field named _ts before additional processing takes place.

db

string

Required

Name of a MongoDB database hosted on the Atlas instance specified by connectionName. The change stream of this database acts as the streaming data source.

coll

string or array of strings

Required

Name of one or more MongoDB collections hosted on the Atlas instance specified by connectionName. The change stream of these collections act as the streaming data source. If you omit this field, your stream processor will source from a MongoDB Database Change Stream.

config

document

Optional

Document containing fields that override various default values.

config.startAfter

token

Conditional

The change event after which the source begins reporting. This takes the form of a resume token.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.startAtOperationTime

timestamp

Conditional

The operation time after which the source should begin reporting.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.fullDocument

string

Conditional

Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:

  • updateLookup : Returns only changes on update.

  • required : Must return a full document. If a full document is unavailable, returns nothing.

  • whenAvailable : Returns a full document whenever one is available, otherwise returns changes.

If you do not specify a value for fullDocument, it defaults to updateLookup.

To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection.

config.fullDocumentOnly

boolean

Conditional

Setting that controls whether a change stream source returns the entire change event document including all metadata, or only the contents of fullDocument. If set to true, the source returns only the contents of fullDocument.

To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection.

config.fullDocumentBeforeChange

string

Optional

Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:

  • off : Omits the fullDocumentBeforeChange field.

  • required : Must return a full document in its before changes state. If a full document in its before changes state is unavailable, the stream processor fails.

  • whenAvailable : Returns a full document in its before changes state whenever one is available, otherwise omits the fullDocumentBeforeChange field.

If you do not specify a value for fullDocumentBeforeChange, it defaults to off.

To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection.

config.pipeline

document

Optional

Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in change-stream-modify-output.

To operate on streaming data from an Atlas database change stream, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Conditional

Label that identifies the connection in the Connection Registry, to ingest data from.

timeField

document

Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

tsFieldName

string

Optional

Name that overrides the name of default timestamp fields declared by the source.

Atlas Stream Processing pipelines internally add a field to incoming messages called _ts to store timestamp information. Sources of streaming data might also use a field named _ts to store the timestamps of each message. To prevent a conflict between these fields, use tsFieldName to rename any source-provided field named _ts before additional processing takes place.

db

string

Required

Name of a MongoDB database hosted on the Atlas instance specified by connectionName. The change stream of this database acts as the streaming data source.

config

document

Optional

Document containing fields that override various default values.

config.startAfter

token

Conditional

The change event after which the source begins reporting. This takes the form of a resume token.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.startAtOperationTime

timestamp

Conditional

The operation time after which the source should begin reporting.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.fullDocument

string

Conditional

Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:

  • updateLookup : Returns only changes on update.

  • required : Must return a full document. If a full document is unavailable, returns nothing.

  • whenAvailable : Returns a full document whenever one is available, otherwise returns changes.

If you do not specify a value for fullDocument, it defaults to updateLookup.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.fullDocumentOnly

boolean

Conditional

Setting that controls whether a change stream source returns the entire change event document including all metadata, or only the contents of fullDocument. If set to true, the source returns only the contents of fullDocument.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.fullDocumentBeforeChange

string

Optional

Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:

  • off : Omits the fullDocumentBeforeChange field.

  • required : Must return a full document in its before changes state. If a full document in its before changes state is unavailable, the stream processor fails.

  • whenAvailable : Returns a full document in its before changes state whenever one is available, otherwise omits the fullDocumentBeforeChange field.

If you do not specify a value for fullDocumentBeforeChange, it defaults to off.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.pipeline

document

Optional

Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in change-stream-modify-output.

To operate on streaming data from an entire Atlas cluster change stream, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Conditional

Label that identifies the connection in the Connection Registry, to ingest data from.

timeField

document

Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

tsFieldName

string

Optional

Name that overrides the name of default timestamp fields declared by the source.

Atlas Stream Processing pipelines internally add a field to incoming messages called _ts to store timestamp information. Sources of streaming data might also use a field named _ts to store the timestamps of each message. To prevent a conflict between these fields, use tsFieldName to rename any source-provided field named _ts before additional processing takes place.

config

document

Optional

Document containing fields that override various default values.

config.startAfter

token

Conditional

The change event after which the source begins reporting. This takes the form of a resume token.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.startAtOperationTime

timestamp

Conditional

The operation time after which the source should begin reporting.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.fullDocument

string

Conditional

Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:

  • updateLookup : Returns only changes on update.

  • required : Must return a full document. If a full document is unavailable, returns nothing.

  • whenAvailable : Returns a full document whenever one is available, otherwise returns changes.

If you do not specify a value for fullDocument, it defaults to updateLookup.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.fullDocumentOnly

boolean

Conditional

Setting that controls whether a change stream source returns the entire change event document including all metadata, or only the contents of fullDocument. If set to true, the source returns only the contents of fullDocument.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.fullDocumentBeforeChange

string

Optional

Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:

  • off : Omits the fullDocumentBeforeChange field.

  • required : Must return a full document in its before changes state. If a full document in its before changes state is unavailable, the stream processor fails.

  • whenAvailable : Returns a full document in its before changes state whenever one is available, otherwise omits the fullDocumentBeforeChange field.

If you do not specify a value for fullDocumentBeforeChange, it defaults to off.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.pipeline

document

Optional

Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in change-stream-modify-output.

To operate on an array of documents, the $source stage has the following prototype form:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"documents" : [{source-doc},...] | <expression>
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description

timeField

document

Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

tsFieldName

string

Optional

Name that overrides the name of default timestamp fields declared by the source.

Atlas Stream Processing pipelines internally add a field to incoming messages called _ts to store timestamp information. Sources of streaming data might also use a field named _ts to store the timestamps of each message. To prevent a conflict between these fields, use tsFieldName to rename any source-provided field named _ts before additional processing takes place.

documents

array

Conditional

Array of documents to use as a streaming data source. The value of this field can either be an array of objects or an expression that evaluates to an array of objects. Do not use this field when using the connectionName field.

$source must be the first stage of any pipeline it appears in. You can use only one $source stage per pipeline.

A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:

  1. The $source stage establishes a connection with the Apache Kafka broker collecting these reports in a topic named my_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it to ingestionTime.

  2. The $match stage excludes documents that have a dewPoint.value of less than or equal to 5.0 and passes along the documents with dewPoint.value greater than 5.0 to the next stage.

  3. The $merge stage writes the output to an Atlas collection named stream in the sample_weatherstream database. If no such database or collection exist, Atlas creates them.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

To view the documents in the resulting sample_weatherstream.stream collection, connect to your Atlas cluster and run the following command:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('165235')
}
},
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Note

The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.

Back

Aggregation Pipelines