Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/ /

写入模型策略

在此页面上

  • Overview
  • 批量写入操作
  • 如何指定写入模型策略
  • 指定业务密钥
  • 示例
  • 更新一个时间戳策略
  • 替换一个业务键策略
  • 删除一个业务键策略
  • 自定义写模型策略
  • 示例写模型策略
  • 如何安装策略

本指南将向您介绍如何变更 MongoDB Kafka Sink 连接器将数据写入 MongoDB 的方式。

您可以针对以下使用案例,变更连接器将数据写入 MongoDB 的方式:

  • 插入文档而不是更新或插入文档

  • 替换或更新与 _id 字段以外的筛选器匹配的文档

  • 删除与筛选器匹配的文档

您可以通过指定写模型策略,配置连接器如何将数据写入 MongoDB 的方式。写模型策略是一个类,用于定义 Sink 连接器应如何使用写模型写入数据。写模型是 MongoDB Java 驱动程序接口,定义了写操作的结构。

要学习;了解如何在连接器将Connector接收的接收Connector记录写入MongoDB之前对其进行修改,请阅读有关接收器连接器后处理器的指南。

要查看写入模型策略实施,请参阅 InsertOneDefaultStrategy 类的源代码。

Sink 连接器使用批量写入操作将数据写入 MongoDB。批量写入将多个写入操作(例如插入、更新或删除)组合在一起。

默认情况下,接收器连接器执行有序的批量写入,这保证了数据更改的顺序。在有序批量写入中,如果任何写入操作导致错误,连接器都将跳过该批次中的剩余写入操作。

如果您不需要保证数据更改的顺序,可以将 bulk.write.ordered 设置设置为 false,以便连接器执行无序批量写入。接收器连接器并行执行无序批量写入,这可以提高性能。

此外,当您启用无序批量写入并将 errors.tolerance 设置设置为 all 时,即使批量写入中的任何写入操作失败,Sink 连接器也会继续执行该批次中不返回错误的剩余写入操作。

提示

要了解有关 bulk.write.ordered 设置的更多信息,请参阅《连接器消息处理属性》

要了解有关批量写入操作的详情,请参阅以下文档:

要指定写模型策略,请使用以下设置:

writemodel.strategy=<write model strategy classname>

有关连接器中包含的预建写入模型策略的列表,请参阅写入模型策略配置指南。

业务键是由 Sink 记录中的一个或多个字段组成的值,用于标识其唯一性。默认情况下,Sink 连接器使用 Sink 记录的 _id 字段来检索业务键。要指定不同的业务键,请将文档 Id Adder 后处理器配置为使用自定义值。

您可以将文档t Id Adder 配置为从 Sink 记录键设置 _id 字段,如以下示例属性所示:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
document.id.strategy.partial.key.projection.list=<comma-separated field names>
document.id.strategy.partial.key.projection.type=AllowList

或者,您可以将其配置为从 Sink 记录值设置 _id 字段,如以下示例属性所示:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma-separated field names>
document.id.strategy.partial.value.projection.type=AllowList

重要

提高写性能

在目标集合中创建与业务键的字段对应的唯一索引。这可以提高从 Sink 连接器执行写操作的性能。有关更多信息,请参阅唯一索引指南。

以下写模型策略需要业务键:

  • ReplaceOneBusinessKeyStrategy

  • DeleteOneBusinessKeyStrategy

  • UpdateOneBusinessKeyTimestampStrategy

有关文档 ID 添加器后处理器的更多信息,请参阅配置文档 ID 添加器后处理器

本部分展示以下写入模型策略的配置和输出示例:

您可以配置更新一个时间戳策略,以便在将文档写入 MongoDB 时添加和更新时间戳。此策略执行以下操作:

  • 当连接器插入新的 MongoDB 文档时,它会将 _insertedTS_modifiedTS 字段设置为连接器服务器上的当前时间。

  • 当连接器更新现有的 MongoDB 文档时,它会将 _modifiedTS 字段更新为连接器服务器上的当前时间。

假设您想追踪列车沿线的位置,而 Sink 连接器接收具有以下结构的消息:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

使用 ProvidedInValueStrategy,指定连接器应使用消息的 _id 值来分配 MongoDB 文档中的 _id 字段。指定 ID 并写入模型策略属性如下:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

接收器连接器处理上述示例记录后,会插入一个包含 _insertedTS_modifiedTS 字段的文档,如以下文档所示:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

一小时后,列车沿其路线报告其新位置,该新位置如以下记录所示:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

Sink 连接器处理前面的记录后,将插入包含以下数据的文档:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

有关 ProvidedInValueStrategy 的更多信息,请参阅有关如何配置文档 Id Adder 后处理器的部分。

您可以配置替换一个业务键策略,替换与业务键值匹配的文档。要在记录的多个字段上定义业务键并配置连接器,替换包含匹配业务键的文档,请执行以下任务:

  1. 在集合中创建与业务键字段对应的唯一索引

  2. 指定 PartialValueStrategy ID 策略来标识属于连接器配置中业务键的字段。

  3. 在连接器设置中指定 ReplaceOneBusinessKeyStrategy 写入模型策略。

假设您想要利用由 flight_no 表示的航班号和 airport_code 表示的机场位置来追踪飞机容量。示例消息包含以下信息:

{
"flight_no": "Z342",
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

要实施该策略,使用 flight_noairport_code 作为业务键,首先在 MongoDB Shell 中为这些字段创建唯一索引:

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

接下来,在投影列表中指定 PartialValueStrategy 策略和业务键字段。指定 ID 并写入模型策略配置,如下所示:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=flight_no,airport_code
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

插入集合的示例数据包含以下内容:

{
"flight_no": "Z342"
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

当连接器处理与现有文档的业务键匹配的 Sink 数据时,它会将文档替换为新值,而不变更业务键字段:

{
"flight_no": "Z342"
"airport_code": "LAX",
"status": "canceled"
}

在连接器处理 Sink 数据后,它会将 MongoDB 中的原始示例文档替换为前面的文档。

您可以使用“删除一个业务键”策略将连接器配置为在收到与业务键匹配的消息时删除文档。要从记录的多个字段上设置业务键并配置连接器,删除包含匹配业务键的文档,请执行以下任务:

  1. 在 MongoDB 集合中创建与业务键字段对应的唯一索引

  2. 指定 PartialValueStrategy 作为 ID 策略,标识属于连接器配置中业务键的字段。

  3. 在连接器设置中指定 DeleteOneBusinessKeyStrategy 写入模型策略。

假设要从包含类似于以下内容的文档的集合中删除特定年份的日历事件:

{
"year": 2005,
"month": 3,
"day": 15,
"event": "Dentist Appointment"
}

要实施该策略,请使用 year 作为业务键,首先在 MongoDB Shell 中对这些字段创建唯一索引:

db.collection.createIndex({ "year": 1 }, { unique: true })

接下来,在配置中指定业务键和写模型策略,如下所示:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=year
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy

如果您的连接器处理包含业务键 year 的 Sink 记录,它会删除具有 MongoDB 返回的匹配字段值的第一个文档。假设连接器处理包含以下值数据的 Sink 记录:

{
"year": 2005,
...
}

当连接器处理前面的记录时,它会从集合中删除第一个包含值为“2005”的 year 字段的文档,例如原始“牙医预约”示例文档。

如果连接器中包含的任何写入模型策略都不适合您的使用案例,则可以创建自己的模型策略。

写入模型策略是一个实现 WriteModelStrategy 接口且必须覆盖 createWriteModel() 方法的 Java 类。

有关所需的方法签名,请参阅 WriteModelStrategy 接口的源代码

以下自定义写入模型策略会返回写入操作,用接收记录中 fullDocument 字段的值替换与接收记录中 _id 字段匹配的 MongoDB 文档:

/**
* Custom write model strategy
*
* This class reads the 'fullDocument' field from a change stream and
* returns a ReplaceOne operation.
*/
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

有关自定义写入模型策略的另一个示例,请参阅 GitHub 上的 UpsertAsPartOfDocumentStrategy 示例策略。

如需将接收连接器配置为使用自定义写入策略,必须完成以下操作:

  1. 将自定义写入策略类编译为 JAR 文件。

  2. 将编译好的 JAR 添加到 Kafka 工作进程的类路径/插件路径中。有关插件路径的更多信息,请参阅 Confluent 文档。

    注意

    Kafka Connect 独立加载插件。部署自定义写入策略时,连接器 JAR 和写入模型策略 JAR 必须位于同一路径上。您的路径应如下所示:

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar

    要了解有关 Kafka Connect 插件的更多信息,请参阅 Confluent 提供的指南

  3. writemodel.strategy 配置设置中指定自定义类。

如需了解如何将类编译成 JAR 文件,请参阅 Java SE 文档中的 JAR 部署指南

后退

Fundamentals