$source
On this page
Definition
The $source
stage specifies a connection in the
Connection Registry to stream data
from. The following connection types are supported:
Apache Kafka broker
MongoDB collection change stream
MongoDB database change stream
Document array
Note
You can't use Atlas serverless instances as a
$source
.
Syntax
Apache Kafka Broker
To operate on streaming data from an Apache Kafka broker, the
$source
stage has the following prototype form:
{ "$source": { "connectionName": "<registered-connection>", "topic" : ["<source-topic>", ...], "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
The $source
stage takes a document with the following fields:
Field | Type | Necessity | Description | |
---|---|---|---|---|
| string | Required | Label that identifies the connection in the Connection Registry, to ingest data from. | |
| string or array of strings | Required | Name of one or more Apache Kafka topics to stream messages from. If you want to stream messages from more than one topic, specify them in an array. | |
| document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use
If you do not declare a | |
| string | Optional | Name that overrides the name of the timestamp field projected by the $source. The $source stage in an Atlas Stream Processing pipeline projects a field
called | |
| document | Optional | Document specifying the amount of time that a partition is allowed to be idle before it is ignored in watermark calculations. | |
| integer | Optional | Number specifying the duration of the partition idle timeout. | |
| string | Optional | Unit of time for the duration of the partition idle timeout. The value of
| |
| document | Optional | Document containing fields that override various default values. | |
| string | Optional | Specifies which event in the Apache Kafka source topic to begin
ingestion with.
Defaults to | |
| string | Optional | ID of the kafka consumer group to associate with the stream processor. If omitted, Atlas Stream Processing associates the stream processing instance with an auto-generated ID in the following format:
Atlas Stream Processing commits partition offsets to the Apache Kafka broker for the specified consumer group ID after a checkpoint is committed. It commits an offset when messages up through that offset are durably recorded in a checkpoint. This allows you to track the offset lag and progress of the stream processor directly from the Kafka broker consumer group metadata. | |
| string | Optional | Data type used to deserialize Apache Kafka key data. Must be one of the following values:
Defaults to | |
| string | Optional | How to handle errors encountered when deserializing Apache Kafka key data. Must be one of the following values:
|
Note
Atlas Stream Processing requires that documents in the source data stream be
valid json
or ejson
. Atlas Stream Processing sets the documents that
don't meet this requirement to your dead letter queue if you have configured one.
MongoDB Collection Change Stream
To operate on streaming data from an Atlas collection change
stream, the $source
stage has the following prototype form:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "coll" : ["<source-coll>",...], "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
The $source
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
| string | Conditional | Label that identifies the connection in the Connection Registry, to ingest data from. |
| document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use
If you do not declare a |
| string | Optional | Name that overrides the name of default timestamp fields declared by the source. Atlas Stream Processing pipelines internally add a field to incoming
messages called |
| string | Required | Name of a MongoDB database hosted on the Atlas instance
specified by |
| string or array of strings | Required | Name of one or more MongoDB collections hosted on the Atlas
instance specified by |
| document | Optional | Document containing fields that override various default values. |
| token | Conditional | The change event after which the source begins reporting. This takes the form of a resume token. You can use only one of either |
| timestamp | Conditional | The operation time after which the source should begin reporting. You can use only one of either |
| string | Conditional | Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:
If you do not specify a value for fullDocument, it defaults to
To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection. |
| boolean | Conditional | Setting that controls whether a change stream source returns
the entire change event document including all metadata, or
only the contents of To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection. |
| string | Optional | Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:
If you do not specify a value for To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection. |
| document | Optional | Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in change-stream-modify-output. |
MongoDB Database Change Stream
To operate on streaming data from an Atlas database change
stream, the $source
stage has the following prototype form:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
The $source
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
| string | Conditional | Label that identifies the connection in the Connection Registry, to ingest data from. |
| document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use
If you do not declare a |
| string | Optional | Name that overrides the name of default timestamp fields declared by the source. Atlas Stream Processing pipelines internally add a field to incoming
messages called |
| string | Required | Name of a MongoDB database hosted on the Atlas instance
specified by |
| document | Optional | Document containing fields that override various default values. |
| token | Conditional | The change event after which the source begins reporting. This takes the form of a resume token. You can use only one of either |
| timestamp | Conditional | The operation time after which the source should begin reporting. You can use only one of either |
| string | Conditional | Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:
If you do not specify a value for fullDocument, it defaults to
To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. |
| boolean | Conditional | Setting that controls whether a change stream source returns
the entire change event document including all metadata, or
only the contents of To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. |
| string | Optional | Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:
If you do not specify a value for To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. |
| document | Optional | Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in change-stream-modify-output. |
MongoDB Cluster-wide Change Stream Source
To operate on streaming data from an entire Atlas cluster change
stream, the $source
stage has the following prototype form:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
The $source
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
| string | Conditional | Label that identifies the connection in the Connection Registry, to ingest data from. |
| document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use
If you do not declare a |
| string | Optional | Name that overrides the name of default timestamp fields declared by the source. Atlas Stream Processing pipelines internally add a field to incoming
messages called |
| document | Optional | Document containing fields that override various default values. |
| token | Conditional | The change event after which the source begins reporting. This takes the form of a resume token. You can use only one of either |
| timestamp | Conditional | The operation time after which the source should begin reporting. You can use only one of either |
| string | Conditional | Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:
If you do not specify a value for fullDocument, it defaults to
To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. |
| boolean | Conditional | Setting that controls whether a change stream source returns
the entire change event document including all metadata, or
only the contents of To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. |
| string | Optional | Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:
If you do not specify a value for To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database. |
| document | Optional | Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in change-stream-modify-output. |
Document Array
To operate on an array of documents, the $source
stage has the
following prototype form:
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "documents" : [{source-doc},...] | <expression> } }
The $source
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
| document | Optional | Document that defines an authoritative timestamp for incoming messages. If you use
If you do not declare a |
| string | Optional | Name that overrides the name of default timestamp fields declared by the source. Atlas Stream Processing pipelines internally add a field to incoming
messages called |
| array | Conditional | Array of documents to use as a streaming data source. The
value of this field can either be an array of objects or an
expression that evaluates to an array of objects. Do not use this
field when using the |
Behavior
$source
must be the first stage of any pipeline it appears
in. You can use only one $source
stage per pipeline.
Examples
A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:
The
$source
stage establishes a connection with the Apache Kafka broker collecting these reports in a topic namedmy_weatherdata
, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it toingestionTime
.The
$match
stage excludes documents that have adewPoint.value
of less than or equal to5.0
and passes along the documents withdewPoint.value
greater than5.0
to the next stage.The
$merge
stage writes the output to an Atlas collection namedstream
in thesample_weatherstream
database. If no such database or collection exist, Atlas creates them.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
To view the documents in the resulting sample_weatherstream.stream
collection, connect to your Atlas cluster and run the following command:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.