$changeStreamSplitLargeEvent(聚合)
定义
MongoDB 7.0(和 6.0.9)新增内容。
如果变更流包含超过16 MB 的大型事件,则会返回BSONObjectTooLarge
异常。 从MongoDB 7.0 (和6.0.9 )开始, 您可以使用$changeStreamSplitLargeEvent
阶段将事件分割为较小的片段。
仅应在绝对必要时使用 $changeStreamSplitLargeEvent
。例如,如果您的应用程序需要完整的文档前后图像,并且生成超过 16 MB 的大型事件,请使用 $changeStreamSplitLargeEvent
。
在决定使用$changeStreamSplitLargeEvent
之前,应首先尝试减小变更事件的大小。 示例:
除非应用程序需要,否则不要请求文档前像或后像。这在更多情况下会生成
fullDocument
和fullDocumentBeforeChange
字段,这些字段通常是变更事件中的最大对象。使用
$project
阶段以仅包含应用程序所需的字段。这样可以减小更改事件的大小,并避免将大型事件拆分为片段的额外时间。这样可以在每个批次中返回更多变更事件。
管道中只能有一个 $changeStreamSplitLargeEvent
阶段,而且必须是最后一个阶段。您只能在 $changeStream
管道中使用 $changeStreamSplitLargeEvent
。
$changeStreamSplitLargeEvent
事务语法:
{ $changeStreamSplitLargeEvent: {} }
行为
$changeStreamSplitLargeEvent
将超过16 MB 的事件拆分为片段,并使用变更流游标按顺序返回片段。
分割片段,以便在第一个片段中返回最大数量的字段。 这可确保尽快返回事件上下文。
当分割变更事件时,仅使用顶级字段的大小。$changeStreamSplitLargeEvent
不会递归处理或分割子文档。例如,如果使用 $project
阶段创建具有大小为 20 MB 的单个字段的变更事件,则不会分割该事件,并且该阶段会返回错误。
每个片段都有一个恢复令牌。使用片段令牌恢复的数据流会出现以下两种情况:
从后续片段开始一个新流。
如果从序列中的最后一个片段恢复,则从下一个事件开始继续处理。
事件的每个片段都包含一个splitEvent
文档:
splitEvent: { fragment: <int>, of: <int> }
下表描述了字段。
字段 | 说明 |
---|---|
| 片段索引,从1开始。 |
| 事件的片段总数。 |
例子
本节中的示例场景展示了如何将$changeStreamSplitLargeEvent
与名为myCollection
的新集合结合使用。
创建myCollection
并插入一个包含略低于16 MB 数据的文档:
db.myCollection.insertOne( { _id: 0, largeField: "a".repeat( 16 * 1024 * 1024 - 1024 ) } )
largeField
包含重复的字母a
。
为 myCollection
启用 ChangeStreampreandPostImages,以让变更流可以检索更新前(前像)和更新后(后像)的文档:
db.runCommand( { collMod: "myCollection", changeStreamPreAndPostImages: { enabled: true } } )
使用db.collection.watch()
创建变更流游标以监控对myCollection
的更改:
myChangeStreamCursor = db.myCollection.watch( [ { $changeStreamSplitLargeEvent: {} } ], { fullDocument: "required", fullDocumentBeforeChange: "required" } )
对于变更流事件:
fullDocument: "required"
包括文档后像。fullDocumentBeforeChange: "required"
包括文档前像。
有关详细信息,请参阅$changeStream
。
更新myCollection
中的文档,这也会生成包含文档前像和后像的变更流事件:
db.myCollection.updateOne( { _id: 0 }, { $set: { largeField: "b".repeat( 16 * 1024 * 1024 - 1024 ) } } )
largeField
现在包含重复的字母b
。
使用 next()
方法从 myChangeStreamCursor
检索片段,并将片段存储在名为 firstFragment
、secondFragment
和 thirdFragment
的对象中:
const firstFragment = myChangeStreamCursor.next() const secondFragment = myChangeStreamCursor.next() const thirdFragment = myChangeStreamCursor.next()
显示firstFragment.splitEvent
:
firstFragment.splitEvent
包含片段详细信息的输出:
splitEvent: { fragment: 1, of: 3 }
同样, secondFragment.splitEvent
和thirdFragment.splitEvent
返回:
splitEvent: { fragment: 2, of: 3 } splitEvent: { fragment: 3, of: 3 }
要检查firstFragment
的对象键:
Object.keys( firstFragment )
输出:
[ '_id', 'splitEvent', 'wallTime', 'clusterTime', 'operationType', 'documentKey', 'ns', 'fullDocument' ]
要检查firstFragment.fullDocument
的大小(以字节为单位):
bsonsize( firstFragment.fullDocument )
输出:
16776223
secondFragment
包含fullDocumentBeforeChange
前像,大小约为 16 MB。 以下示例显示了secondFragment
的对象键:
Object.keys( secondFragment )
输出:
[ '_id', 'splitEvent', 'fullDocumentBeforeChange' ]
thirdFragment
包含updateDescription
字段,大小约为 16 MB。以下示例显示了thirdFragment
的对象键:
Object.keys( thirdFragment )
输出:
[ '_id', 'splitEvent', 'updateDescription' ]
有关变更流和事件的更多信息,请参阅变更事件。