Docs 菜单
Docs 主页
/
MongoDB Atlas
/ /

$source

在此页面上

  • 定义
  • 语法
  • Apache Kafka 代理
  • MongoDB 集合变更流
  • MongoDB database change stream
  • MongoDB 集群范围变更流源
  • 文档数组
  • 行为
  • 示例
$source

$source 阶段在 Connection Registry 中指定要从中流式传输数据的连接。支持以下连接类型:

注意

您不能使用 Atlas 无服务器实例作为 $source

操作来自 Apache Kafka 的流数据 代理中, 阶段具有以下原型形式:$source

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

必需

用于标识连接注册表中要从中提取数据的连接的标签。

topic

字符串或字符串数组

必需

一个或多个 Apache Kafka 主题的名称,用于从这些主题流式传输信息。如果要流式传输多个主题的消息,请在数组中指定这些主题。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

tsFieldName

字符串

Optional

覆盖 $source 投影的时间戳字段名称。

Atlas Stream Processing 管道中的 $source 阶段会投射一个名为 _ts 的字段,其中包含指定的文档时间戳。流媒体数据源还可能使用名为_ts 的字段来存储每条消息的时间戳。为防止这些字段之间发生冲突,请在进行其他处理之前使用 tsFieldName 重命名任何源提供的名为 _ts 的字段。

partitionIdleTimeout

文档

Optional

指定分区在水印计算中被忽略之前允许其空闲的时间长度的文档。

partitionIdleTimeout.size

整型

Optional

指定分区空闲超时持续时间的数字。

partitionIdleTimeout.unit

字符串

Optional

分区空闲超时持续时间的时间单位。

unit的值可以是以下值之一:

  • "ms" (毫秒)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.auto_offset_reset

字符串

Optional

指定从 Apache Kafka 源主题中的哪个事件开始摄取。auto_offset_reset 采用以下值:

  • endlatestlargest :在初始化聚合时从主题中的最新事件开始摄取。

  • earliestbeginningsmallest :从主题中最早的事件开始摄取。

默认值为 latest

config.group_id

字符串

Optional

与流处理器关联的 kafka 使用者群组的 ID。 如果省略,Atlas Stream Processing 会将流处理实例与以下格式的自动生成的 ID 相关联:

asp-${streamProcessorId}-consumer

Atlas Stream Processing 将分区偏移提交到 Apache Kafka 提交检查点后指定消费者组 ID 的代理。当直到偏移量之前的消息持久记录在检查点中时,它才会提交偏移量。 这样,您就可以直接从 Kafka 代理消费者组元数据跟踪流处理器的偏移延迟和进度。

config.keyFormat

字符串

Optional

用于反序列化 Apache Kafka key 数据的数据类型。必须是以下值之一:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

默认值为 binData

config.keyFormatError

字符串

Optional

如何处理在反序列化 Apache Kafka 密钥数据时遇到的错误。必须是下列值之一:

  • dlq,将文档写入您的死信队列。

  • passThrough,将文档发送到缺少关键数据的下一阶段。

注意

Atlas Stream Processing 要求源数据流中的文档在 jsonejson有效。Atlas Stream Processing 会将不满足此要求的文档设置为死信队列 (如果您已配置)。

为了操作来自 Atlas collection 的流媒体数据, $source阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

tsFieldName

字符串

Optional

覆盖源声明的默认时间戳字段名称的名称。

Atlas Stream Processing 管道在内部向传入消息添加了一个名为 _ts 的字段,用于存储时间戳信息。流数据源还可能使用名为 _ts 的字段来存储每条消息的时间戳。为了防止这些字段之间发生冲突,请在进行其他处理之前使用 tsFieldName 来重命名任何源提供的名为 _ts 的字段。

db

字符串

必需

connectionName指定的 Atlas 实例上托管的 MongoDB database 的名称。此数据库的change stream充当流媒体数据源。

coll

字符串或字符串数组

必需

在 指定的Atlas实例上托管的一个或多个MongoDB集合的名称。这些集合的变更流充当流媒体数据源。如果省略此字段,流处理器将从MongoDB数据库变更流中获取数据。connectionName

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

timestamp

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup :仅返回更新时的更改。

  • required :必须返回完整的文档。 如果没有完整文档,则不返回任何内容。

  • whenAvailable :只要有完整文档可用,就返回完整文档,否则返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required :必须以更改前的状态返回完整文档。 如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与集合变更流一起使用,您必须在该集合上启用变更流前像和后像

config.pipeline

文档

Optional

指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合 change-stream-modify-output 所述的参数。

为了操作来自 Atlas 数据库变更流的流数据, $source阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

tsFieldName

字符串

Optional

覆盖源声明的默认时间戳字段名称的名称。

Atlas Stream Processing 管道在内部向传入消息添加了一个名为 _ts 的字段,用于存储时间戳信息。流数据源还可能使用名为 _ts 的字段来存储每条消息的时间戳。为了防止这些字段之间发生冲突,请在进行其他处理之前使用 tsFieldName 来重命名任何源提供的名为 _ts 的字段。

db

字符串

必需

connectionName指定的 Atlas 实例上托管的 MongoDB database 的名称。此数据库的change stream充当流媒体数据源。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

timestamp

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup :仅返回更新时的更改。

  • required :必须返回完整的文档。 如果没有完整文档,则不返回任何内容。

  • whenAvailable :只要有完整文档可用,就返回完整文档,否则返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required :必须以更改前的状态返回完整文档。 如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.pipeline

文档

Optional

指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合 change-stream-modify-output 所述的参数。

要对来自整个 Atlas 集群变更流的流数据进行操作, $source 阶段具有以下原型形式:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

connectionName

字符串

可选的

用于标识连接注册表中要从中提取数据的连接的标签。

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

tsFieldName

字符串

Optional

覆盖源声明的默认时间戳字段名称的名称。

Atlas Stream Processing 管道在内部向传入消息添加了一个名为 _ts 的字段,用于存储时间戳信息。流数据源还可能使用名为 _ts 的字段来存储每条消息的时间戳。为了防止这些字段之间发生冲突,请在进行其他处理之前使用 tsFieldName 来重命名任何源提供的名为 _ts 的字段。

config

文档

Optional

包含可覆盖各种默认值的字段的文档。

config.startAfter

token

可选的

源开始报告的变更事件。这采用 resume token 的形式。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.startAtOperationTime

timestamp

可选的

源应开始报告的操作时间。

您只能使用config.startAfterconfig.StartAtOperationTime之一。

config.fullDocument

字符串

可选的

用于控制变更流源是应返回完整文档还是仅在发生更新时返回变更的设置。 必须是以下之一:

  • updateLookup :仅返回更新时的更改。

  • required :必须返回完整的文档。 如果没有完整文档,则不返回任何内容。

  • whenAvailable :只要有完整文档可用,就返回完整文档,否则返回更改。

如果没有为 fullDocument 指定值,则默认为updateLookup

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentOnly

布尔

可选的

用于控制change stream源是返回包括所有元数据的整个事件文档,还是仅返回fullDocument内容的设置。如果设置为true ,源仅返回fullDocument的内容。

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.fullDocumentBeforeChange

字符串

Optional

指定change stream源是否应在输出中包含处于原始“更改之前”状态的文档。必须是以下之一:

  • off :省略fullDocumentBeforeChange字段。

  • required :必须以更改前的状态返回完整文档。 如果处于更改前状态的完整文档不可用,则流处理器将失败。

  • whenAvailable :只要有可用的文档,就返回处于更改前状态的完整文档,否则忽略fullDocumentBeforeChange字段。

如果没有为fullDocumentBeforeChange指定值,则默认为off

要将此字段与数据库变更流一起使用,您必须对该数据库中的每个集合启用变更流前像和后像。

config.pipeline

文档

Optional

指定一个聚合管道,用于过滤源点的变更流输出。该管道必须符合 change-stream-modify-output 所述的参数。

为了对文档数组进行操作, $source阶段具有以下原型形式:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"documents" : [{source-doc},...] | <expression>
}
}

$source 阶段采用包含以下字段的文档:

字段
类型
必要性
说明

timeField

文档

Optional

为传入消息定义权威时间戳的文档。

如果使用timeField ,则必须将其定义为以下之一:

  • 将源消息字段作为参数的$toDate表达式

  • 将源消息字段作为参数的$dateFromString表达式。

如果您未声明timeField ,Atlas Stream Processing 会根据源提供的消息时间戳创建一个时间戳。

tsFieldName

字符串

Optional

覆盖源声明的默认时间戳字段名称的名称。

Atlas Stream Processing 管道在内部向传入消息添加了一个名为 _ts 的字段,用于存储时间戳信息。流数据源还可能使用名为 _ts 的字段来存储每条消息的时间戳。为了防止这些字段之间发生冲突,请在进行其他处理之前使用 tsFieldName 来重命名任何源提供的名为 _ts 的字段。

documents

阵列

可选的

用作流媒体数据源的文档数组。该字段的值可以是对象数组,也可以是计算结果为对象数组的表达式。使用connectionName字段时请勿使用此字段。

$source必须是它所在的任何管道的第一阶段。 每个管道只能使用一个$source阶段。

流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:

  1. $source 阶段与在名为 my_weatherdata 的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为 ingestionTime

  2. $match 阶段会排除 dewPoint.value 小于或等于 5.0 的文档,并将 dewPoint.value 大于 5.0 的文档传递到下一个阶段。

  3. $merge 阶段将输出写入 sample_weatherstream 数据库中名为 stream 的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

要查看生成的 sample_weatherstream.stream 集合中的文档,请连接到您的 Atlas 集群并运行以下命令:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('165235')
}
},
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。

后退

聚合管道