Docs 菜单

$source

$source

$source 阶段在连接注册表中指定要从中流式传输数据的连接。支持以下连接类型:

  • Apache Kafka 代理

  • MongoDB collection change stream

  • MongoDB database change stream

  • 文档数组

注意

您不能使用 Atlas 无服务器实例作为 $source

操作来自 Apache Kafka 的流数据 代理中, 阶段具有以下原型形式:$source

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<string> (optional)",
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

用于标识连接注册表中要从中提取数据的连接的标签。

topic

字符串或字符串数组

必需

一个或多个 Apache Kafka 主题的名称,用于从这些主题流式传输信息。如果要流式传输多个主题的消息,请在数组中指定这些主题。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

tsFieldName

字符串

Optional

将文件的指定时间戳投影到 $source 字段的名称。

如果省略,$source 不会向文档添加时间戳。

partitionIdleTimeout

文档

Optional

指定分区在水印计算中被忽略之前允许其空闲的时间长度的文档。

默认情况下,此字段为禁用。要处理因空闲而无进展的分区,请为此字段设置一个值。

partitionIdleTimeout.size

整型

Optional

指定分区空闲超时持续时间的数字。

partitionIdleTimeout.unit

字符串

Optional

分区空闲超时持续时间的时间单位。

unit的值可以是以下值之一:

  • "ms"(毫秒)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.auto_offset_reset

字符串

Optional

指定从 Apache Kafka 源主题中的哪个事件开始摄取。auto_offset_reset 采用以下值:

  • endlatestlargest :在初始化聚合时从主题中的最新事件开始摄取。

  • earliestbeginningsmallest :从主题中最早的事件开始摄取。

默认值为 latest

config.group_id

字符串

Optional

与流处理器关联的 kafka 使用者群组的 ID。 如果省略,Atlas Stream Processing 会将流处理实例与以下格式的自动生成的 ID 相关联:

asp-${streamProcessorId}-consumer

Atlas Stream Processing 将分区偏移提交到 Apache Kafka 提交检查点后指定消费者组 ID 的代理。当直到偏移量之前的消息持久记录在检查点中时,它才会提交偏移量。 这样,您就可以直接从 Kafka 代理消费者组元数据跟踪流处理器的偏移延迟和进度。

config.keyFormat

字符串

Optional

用于反序列化 Apache Kafka key 数据的数据类型。必须是以下值之一:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

默认值为 binData

config.keyFormatError

字符串

Optional

如何处理在反序列化 Apache Kafka 密钥数据时遇到的错误。必须是下列值之一:

  • dlq,将文档写入死信队列。

  • passThrough,将文档发送到下一阶段,但不包含关键数据。

注意

Atlas Stream Processing 要求源数据流中的文档在 jsonejson有效。Atlas Stream Processing 会将不满足此要求的文档设置为死信队列 (如果您已配置)。

Atlas 集合变更流允许应用程序访问针对单个集合的实时数据更改。要学习如何对集合打开变更流,请参阅变更流

为了操作来自 Atlas collection 的流媒体数据, $source阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<string> (optional)",
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

tsFieldName

字符串

Optional

将文件的指定时间戳投影到 $source 字段的名称。

如果省略,$source 不会向文档添加时间戳。

db

字符串

必需

connectionName指定的 Atlas 实例上托管的 MongoDB database 的名称。此数据库的change stream充当流媒体数据源。

coll

字符串或字符串数组

必需

connectionName 指定的Atlas实例上托管的一个或多个MongoDB集合的名称。 这些集合的变更流充当流媒体数据源。 如果省略此字段,流处理器将从MongoDB数据库变更流中获取数据。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

timestamp

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup : 仅在更新时返回更改。

  • required:必须返回完整文档。如果没有完整文档,则不返回任何内容。

  • whenAvailable:如果有完整的文档,则返回完整的文档,否则返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required:必须返回处于更改前状态的完整文档。如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.pipeline

文档

Optional

指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合修改变更流输出中描述的参数。

Atlas 数据库变更流允许应用程序访问针对单个数据库的实时数据更改。要学习如何针对数据库打开变更流,请参阅变更流

为了操作来自 Atlas 数据库变更流的流数据, $source阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<string> (optional)",
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

tsFieldName

字符串

Optional

将文件的指定时间戳投影到 $source 字段的名称。

如果省略,$source 不会向文档添加时间戳。

db

字符串

必需

connectionName指定的 Atlas 实例上托管的 MongoDB database 的名称。此数据库的change stream充当流媒体数据源。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

timestamp

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup : 仅在更新时返回更改。

  • required : 必须返回完整文档。如果没有完整文档,则不返回任何内容。

  • whenAvailable:如果有完整文档可用,则会返回完整文档,否则将返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required:必须返回处于更改前状态的完整文档。如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.pipeline

文档

Optional

指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合修改变更流输出中描述的参数。

要对来自整个 Atlas 集群变更流的流数据进行操作, $source 阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<string> (optional)",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

tsFieldName

字符串

Optional

将文件的指定时间戳投影到 $source 字段的名称。

如果省略,$source 不会向文档添加时间戳。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

timestamp

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup : 仅在更新时返回更改。

  • required : 必须返回完整文档。如果没有完整文档,则不返回任何内容。

  • whenAvailable:如果有完整文档可用,则会返回完整文档,否则将返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required:必须返回处于更改前状态的完整文档。如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.pipeline

文档

Optional

指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合 change-stream-modify-output 所述的参数。

为了对文档数组进行操作, $source阶段具有以下原型形式:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<string> (optional)",
"documents" : [{source-doc},...] | <expression>
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

tsFieldName

字符串

Optional

将文件的指定时间戳投影到 $source 字段的名称。

如果省略,$source 不会向文档添加时间戳。

documents

阵列

可选的

用作流媒体数据源的文档数组。该字段的值可以是对象数组,也可以是计算结果为对象数组的表达式。使用connectionName字段时请勿使用此字段。

$source必须是它所在的任何管道的第一阶段。 每个管道只能使用一个$source阶段。

流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:

  1. $source 阶段与在名为 my_weatherdata 的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为 ingestionTime

  2. $match 阶段会排除 dewPoint.value 小于或等于 5.0 的文档,并将 dewPoint.value 大于 5.0 的文档传递到下一个阶段。

  3. $merge 阶段将输出写入 sample_weatherstream 数据库中名为 stream 的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

要查看生成的 sample_weatherstream.stream 集合中的文档,请连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

以下聚合会从 cluster0-collection 源引入数据,而该源会连接到已加载示例数据集的 Atlas 集群。要了解如何创建流处理实例并将 Atlas 集群连接添加到连接注册表,请参阅 Atlas Stream Processing 入门。此聚合会运行两个阶段以打开变更流并记录对 sample_weatherdata 数据库中 data 集合的更改:

  1. $source 阶段会连接到 cluster0-collection 源,并针对 sample_mflix 数据库中的 movies 集合打开变更流。变更流输出会进行过滤以仅记录插入删除变更事件。

  2. $merge 阶段会将过滤后的变更流文档写入 sample_mflix 数据库中名为 movies_changes 的 Atlas 集合。如果不存在此类集合,Atlas 则会进行创建。

{
$source: {
connectionName: "cluster0-connection",
db : "sample_weatherdata",
coll : "data"
},
$merge: {
into: {
connectionName: "cluster0-connection",
db: "sample_weatherdata",
coll: "data_changes"
}
}
}

以下 mongosh 命令会删除 data 文档:

db.getSiblingDB("sample_weatherdata").data.deleteOne(
{ _id: ObjectId("5553a99ae4b02cf715120e4b") }
)

删除 data 文档后,流处理器会将此变更流事件文档写入 sample_weatherdata.data_changes 集合。要查看生成的 sample_weatherdata.data_changes 集合中的文档,请使用 mongosh 连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherdata").data_changes.find()
[
{
_id: {
_data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'
},
clusterTime: Timestamp({ t: 1738790819, i: 1 }),
documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },
ns: { db: 'sample_weatherdata', coll: 'data' },
operationType: 'delete',
wallTime: ISODate('2025-02-05T21:26:59.313Z')
}
]