$merge
定义
$merge 阶段在 Connection Registry 中指定要将消息写入到的连接。连接必须是Atlas连接。
$merge
管道阶段采用以下原型形式:
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge", "whenNotMatched": "insert | discard" } }
语法
Atlas Stream Processing 版本的 $merge 使用与 Atlas Data Federation 版本相同的大部分字段。但是,由于 Atlas Stream Processing 仅支持合并到 Atlas 连接,因此 into
的语法经过了简化。有关更多信息,请参阅对 Atlas Data Federation $merge
字段的描述。
行为
$merge
必须是它所在的任何管道的最后一个阶段。 每个管道只能使用一个$merge
阶段。
针对分片集合,on
字段对 $merge
有特殊要求。要了解更多信息,请参阅 $merge 语法。
您可以使用动态表达式作为以下字段的值:
into.db
into.coll
这使您的流处理器能够逐条消息将消息写入不同的目标 Atlas collection。
例子
您有一个事务事件流,它会生成以下形式的消息:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
要将其中每一个分类到不同的 Atlas 数据库和collection中,您可以编写以下$merge
阶段:
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" }
此$merge
阶段:
将
Very Important Industries
消息写入名为VIP.subscription
的 Atlas collection。将
N. E. Buddy
消息写入名为employee.requisition
的 Atlas collection。将
Khan Traktor
消息写入名为contractor.billableHours
的 Atlas collection。
您只能使用计算结果为字符串的动态表达式。 有关动态表达式的更多信息,请参阅表达式操作符。
如果您使用动态表达式指定数据库或集合,但是 Atlas Stream Processing 无法计算给定消息的表达式,Atlas Stream Processing 将在配置后将该消息发送到死信队列并处理后续消息。如果未配置死信队列,则 Atlas Stream Processing 会完全跳过该消息并处理后续消息。
示例
流数据源从不同位置生成详细的天气报告,符合示例天气数据集的模式。以下聚合分为三个阶段:
$source
阶段与在名为my_weatherdata
的主题中收集这些报告的 Apache Kafka 代理建立连接,将每条记录摄取到后续聚合阶段。此阶段还会覆盖其投影的时间戳字段的名称,将其设置为ingestionTime
。$match
阶段会排除dewPoint.value
小于或等于5.0
的文档,并将dewPoint.value
大于5.0
的文档传递到下一个阶段。$merge
阶段将输出写入sample_weatherstream
数据库中名为stream
的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
要查看生成的 sample_weatherstream.stream
集合中的文档,请连接到您的 Atlas 集群并运行以下命令:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
以上是一个有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。