Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/ /

启动属性

在此页面上

  • Overview
  • 设置

使用以下配置设置来配置 MongoDB Kafka Source 连接器的启动,以将 MongoDB 集合转换为变更流事件。

提示

有关使用复制现有功能的示例,请参阅复制现有数据用法示例。

有关按类别排列的 Source 连接器配置设置列表,请参阅 Source 连接器配置属性指南。

名称
说明
startup.mode
Type: string

Description:
Specifies how the connector should start up when there is no source offset available. Resuming a change stream requires a resume token, which the connector gets from the source offset. If no source offset is available, the connector may either ignore all or some of the existing source data, or may at first copy all existing source data and then continue with processing new data.

If startup.mode=latest, the connector ignores all existing source data.

If startup.mode=timestamp, the connector actuates startup.mode.timestamp.* properties. If no properties are configured, timestamp is equivalent to latest.

If startup.mode=copy_existing, the connector copies all existing source data to Change Stream events. This setting is equivalent to the deprecated setting copy.existing=true.

如果任何系统在 Source 连接器从数据库转换现有数据时更改数据库中的数据,MongoDB 可能会生成重复的变更流事件以反映最新更改。由于数据复制所依赖的变更流事件是幂等的,因此复制的数据最终是一致的。

Default:latest
Accepted Values: latest, timestamp, copy_existing
startup.mode.timestamp.start.at.operation.time
Type: string

Description:
Actuated only if startup.mode=timestamp. Specifies the starting point for the change stream.

To learn more about Change Stream parameters, see $changeStream (aggregation) in the MongoDB manual.

Default: ""
Accepted Values:
  • 自纪元以来的整数秒数(十进制格式)(例如,30

  • ISO-8601 格式的瞬间,精度为一秒(例如,1970-01-01T00:00:30Z

  • 规范扩展 JSON (v2) 格式的 BSON 时间戳(例如 {"$timestamp": {"t": 30, "i": 0}}

startup.mode.copy.existing.namespace.regex
Type: string

Description:
Regular expression the connector uses to match namespaces from which to copy data. A namespace describes the MongoDB database name and collection separated by a period (for example, databaseName.collectionName).

For example, the following regular-expression setting matches collections that start with "page" in the stats database:
startup.mode.copy.existing.namespace.regex=stats\.page.*
The \ character in the example above escapes the . character that follows it in the regular expression. For more information on how to build regular expressions, see Patterns in the Java API documentation.

Default: ""
Accepted Values: A valid regular expression
startup.mode.copy.existing.pipeline
Type: string

Description:
An inline array of pipeline operations the connector runs when copying existing data. You can use this setting to filter the source collection and improve the use of indexes in the copying process.

For example, the following setting uses the $match aggregation operator to instruct the connector to copy only documents that contain a closed field with a value of false.
startup.mode.copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]
Default: ""
Accepted Values: Valid aggregation pipeline stages
startup.mode.copy.existing.max.threads
Type: int

Description:
The maximum number of threads the connector can use to copy data.

Default: number of processors available in the environment
Accepted Values: An integer
startup.mode.copy.existing.queue.size
Type: int

Description:
The size of the queue the connector can use when copying data.

Default: 16000
Accepted Values: An integer
startup.mode.copy.existing.allow.disk.use
Type: boolean

Description:
When set to true, the connector uses temporary disk storage for the copy existing aggregation.

Default: true
Accepted Values: true or false

后退

输出格式

在此页面上