Docs 菜单

接收连接器后处理程序

在此页面上,您可以了解如何在 MongoDB Kafka Sink 连接器中配置后处理器。后处理器修改连接器从 Kafka 主题读取的 Sink 记录,然后将其存储在 MongoDB 集合中。后处理器可以进行的数据修改的一些示例包括:

  • 将文档 _id 字段设置为自定义值

  • 包含或排除消息键或值字段

  • 重命名字段

您可以使用连接器中预构建的后处理器,也可以实现自己的后处理器。

有关后处理器的详情,请参阅以下部分:

后处理器会修改从 Kafka 主题读取的数据。此连接器会将消息存储在 SinkDocument 类中,其中包含 Kafka SinkRecord 键与值字段的表示形式。此连接器会按顺序应用配置中指定的所有后处理器,并将结果存储在 MongoDB 集合中。

后处理器执行数据修改任务,例如生成文档_id字段、投影消息键或值字段以及重命名字段。 您可以使用 中包含的预构建后处理器,也可以通过扩展后处理器connector { 4} 类。

重要

后处理器和变更数据捕获 (CDC) 处理程序

不能应用帖子处理器应用于 CDC 处理程序事件数据。 如果同时指定两者,Connector会记录警告。

您可以在 post.processor.chain 配置设置中以逗号分隔的列表形式指定一个或多个后处理器。如果您指定多个后处理器,连接器会按顺序应用后处理器,其中每个后处理器都会修改前一个后处理器输出的数据。

为确保连接器写入 MongoDB 的文档包含唯一的 _id 字段,如果您没有包含 DocumentIdAdder 后处理器,它会自动将 DocumentIdAdder 后处理器添加到链的第一个位置。

以下示例设置规定,连接器应首先运行 KafkaMetaAdder 后处理器,然后在输出上运行 AllowListValueProjector 后处理器。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector

下表列出了接收器连接器所包含的所有后处理器的列表。

后处理器名称
说明
DocumentIdAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Inserts an _id field determined by the configured strategy.
The default strategy is BsonOidStrategy.
For information on strategy options and configuration, see the Configure the Document Id Adder Post Processor section.
BlockListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Removes matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
BlockListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Removes matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Includes only matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListValueProjector``
Includes only matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
KafkaMetaAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field named "topic-partition-offset" and sets the value to the concatenation of Kafka topic, partition, and offset to the document.
RenameByMapping
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified field name in the key or value document.
For information on configuration, see the Renaming by Mapping Example.
RenameByRegex
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expression in the key or value document.
For information on configuration, see the Renaming by Regular Expression Example.
NullFieldValueRemover
Full Path:
com.mongodb.kafka.connect.sink.processor.NullFieldValueRemover``
Removes all document fields that contain null values from the sink record.

DocumentIdAdder 后处理器使用策略来确定应如何格式化 MongoDB 文档中的 _id 字段。策略定义了您可以根据使用案例进行自定义的预设行为。

如以下示例所示,您可以在 document.id.strategy 设置中为该后处理器指定策略:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

下表列出了配置 DocumentIdAdder 后处理器时可以使用的策略:

策略名称
说明
BsonOidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Generates a MongoDB BSON ObjectId.
Default strategy for the DocumentIdAdder post processor.
KafkaMetaDataStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.
FullKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses the complete key structure of the sink document to generate the value for the _id field.
Defaults to a blank document if no key exists.
ProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure of the sink document.
Throws an exception if the field is missing from the sink document.
ProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the sink document.
Throws an exception if the field is missing from the sink document.
PartialKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a block list or allow list projection of the sink document key structure.
Defaults to a blank document if no key exists.
PartialValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a block list or allow list projection of the sink document value structure.
Defaults to a blank document if no value exists.
UuidProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy
Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy
Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``
Uses a randomly generated UUID in string format.

如果内置文档 ID 加法器策略未涵盖您的使用案例,则可按照以下步骤定义自定义文档 ID 策略:

  1. 创建一个实现接口 IdStrategy 并包含自定义配置逻辑的 Java 类。

  2. 将此类编译为 JAR 文件。

  3. 将已编译的 JAR 添加到所有 Kafka Worker 的类路径/插件路径中。 有关插件路径的更多信息,请参阅 Confluence 文档。

  4. document.id.strategy 设置更新为所有 Kafka Worker 中自定义类的完整类名。

注意

所选策略可能会对传递语义产生影响

BSON ObjectId 或 UUID 策略只能保证至少传递一次,因为连接器会在重试或再次处理记录时生成新的 ID。如果您能保证构成文档 ID 的字段是唯一的,则其他策略也允许一次性传递。

有关 IdStrategy 接口的示例实现,请参阅包含用此连接器打包的 ID 策略实现的源代码目录。

本部分展示以下类型的后处理器的配置和输出示例:

支持列表阻止列表投影器后处理器决定输出中包含和排除哪些字段。

使用支持列表投影器时,后处理器仅输出您指定字段的数据。

使用阻止列表投影器时,后处理器仅删除您指定字段的数据。

注意

您可以使用“.”(点)符号来引用记录中的嵌套字段。您还可以使用该符号来引用数组中文档的字段。

在后处理器链中添加投影器时,您必须指定投影器类型,以及是否将其应用于接收器文档的键或值部分。

有关投影器配置和输出的示例,请参阅以下部分。

假设 Kafka 记录值文档类似于以下用户配置文件数据:

{
"name": "Sally Kimball",
"age": 10,
"address": {
"city": "Idaville",
"country": "USA"
},
"hobbies": [
"reading",
"solving crime"
]
}

您可以配置 AllowList 值投影器,使用以下设置存储值文档的“name”、“address.city”和“hobbies”字段等选定数据:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=name,address.city,hobbies

后处理器应用投影后,将输出以下记录:

{
"name": "Sally Kimball",
"address": {
"city": "Idaville"
},
"hobbies": [
"reading",
"solving crime"
]
}

假设您的 Kafka 记录关键文档类似于以下用户识别数据:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
"source": "mobile"
},
"authToken": {
"alg": "HS256",
"type": "JWT",
"payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"
}
}

您可以配置 BlockList 键投影器以省略“authToken”和“registration.source”字段,然后再使用以下设置存储数据:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
key.projection.type=BlockList
key.projection.list=authToken,registration.source

后处理器应用投影后,将输出以下记录:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
}
}

本部分介绍如何配置投影器后处理器以匹配通配符模式,从而匹配字段名称。

模式

说明

*

匹配处于当前级别的任意数量的字符。

**

匹配当前级别和所有嵌套级别中的任何字符。

关于本节中的允许列表和阻止列表通配符模式匹配示例,请参阅以下包含天气测量值的文档:

{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
},
"moisture": {
"average": 340,
"units": "mm"
}
}
}

您可以使用 * 通配符来匹配多个字段名称。以下示例配置将匹配如下字段:

  • 名为 "city" 的顶级字段

  • 名为“average”的字段,即任何以名称“wind_speed”开头的顶级字段的子文档。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=city,wind_speed*.average

后处理器应用支持列表投影后,将输出以下记录:

{
"city": "Springfield",
"wind_speed_10m": {
"average": 3,
},
"wind_speed_80m": {
"average": 8,
}
}

您可以使用 ** 通配符,匹配从您指定通配符开始的任何级别的对象。以下通配符匹配示例将投影任何包含名为“low”的字段的文档。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=**.low

应用投影的后处理器输出以下记录:

{
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
}
}
}

您可以使用通配符模式来匹配特定文档级别的字段,如以下区块列表配置示例所示:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
value.projection.type=BlockList
value.projection.list=*.*.temperature
{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"moisture": {
"average": 340,
"units": "mm"
}
}
}

本部分介绍如何配置 RenameByMappingRenameByRegex 字段重命名器后处理器以更新接收器记录中的字段名称。字段重命名设置具体如下:

  • 是否更新记录中的键或值文档

  • 要更新的字段名称

  • 新字段名

您必须在 JSON 数组中指定 RenameByMappingRenameByRegex 设置。您可以使用点符号或模式匹配来指定嵌套字段。

字段重命名后处理器示例使用以下示例 Sink 记录:

关键文档

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

值文档

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

RenameByMapping 帖子处理器设置指定一个或多个 JSON 对象,这些对象将与字符串匹配的字段分配给一个新名称。每个对象都包含 oldName 元素中要匹配的文本以及 newName 元素中的替换文本,如下表所述。

密钥名称
说明

oldName

指定是否匹配键或值文档中的字段以及要替换的字段名称。该设置使用“.”字符来分隔两个值。

newName

为所有字段匹配项指定替换字段名。

以下示例属性与关键文档的“位置”字段相匹配,并将其重命名为“国家/地区”:

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

此设置指示 RenameByMapping 后处理器将原始密钥文档转换为以下文档:

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

您可以通过在 oldName 字段中指定具有附加字段名称的值文档,对值文档执行类似的字段名称分配,如下所示:

field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]

此设置指示 RenameByMapping 后处理器将原始值文档转换为以下文档:

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

您还可以使用字符串格式的 JSON 数组在 field.renamer.mapping 属性中指定一个或多个映射,如以下设置所示:

field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]

RenameByRegex 后处理器设置指定其应匹配的字段名和文本模式,以及匹配文本的替换值。您可以在包含下表所述字段的 JSON 对象中指定一个或多个重命名表达式:

密钥名称
说明

regexp

包含匹配字段的正则表达式,用于执行替换。

模式

包含与要替换的文本匹配的正则表达式。

替换

包含在 pattern 字段所定义正则表达式的所有匹配项的替换文本。

以下示例设置指示后处理器执行以下操作:

  • 匹配密钥文档中任何以“date”开头的字段名。在匹配字段集中,用 - 字符替换所有与模式 _ 匹配的文本。

  • 匹配值文档中任何属于 crepes 的子文档的字段名。在匹配字段集中,用 quantity 替换所有与模式 purchased 匹配的文本。

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

当连接器将后处理器应用于示例密钥文档示例值文档时,它会输出以下内容:

关键文档

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

值文档

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

警告

重命名后处理器不会覆盖现有字段名称

您在重命名后处理器中设置的目标字段名可能会导致同一文档出现重复的字段名。为避免这种情况,后处理器在文档同一级别复制现有字段名时会跳过重命名。

如果内置后处理器不适用于使用案例,则可以使用以下步骤创建自定义后处理器类:

  1. 创建一个扩展 PostProcessor 抽象类的 Java 类。

  2. 覆盖类中的 process() 方法。您可以更新 SinkDocument,即 Sink 记录键和值字段的 BSON 表示,并在方法中访问原始的 Kafka SinkRecord

  3. 将此类编译为 JAR 文件。

  4. 将已编译的 JAR 添加到所有 Kafka Worker 的类路径/插件路径中。 有关插件路径的更多信息,请参阅有关 手动安装社区连接器的 Confluence 文档。

  5. 将后处理器完整类名添加到后处理器链配置中。

例如后处理器,您可以浏览内置后处理器类的源代码。