Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

すべてのソースConnector構成プロパティ

項目一覧

  • Overview
  • MongoDB 接続
  • Kafka トピック
  • 変更ストリーム
  • 出力形式
  • スタートアップ
  • エラー処理と中断からの再開

このページでは、MongoDB Kafka ソース コネクタで利用可能なすべての構成プロパティを表示できます。 このページは、他のソース コネクタ構成プロパティ ページの内容を重複させます。

すべてのソース コネクタ構成プロパティ ページの一覧を表示するには、ソースConnector構成プロパティページを参照してください。

次の構成設定を使用して、MongoDB Kafka ソース コネクタが MongoDB クラスターとの接続を確立し、通信する方法を指定します。

MongoDB 接続に関連するオプションのみを表示するには、「 MongoDB Source 接続プロパティ」ページを参照してください。

名前
説明
connection.uri
Required

Type: string

Description:
The URI connection string to connect to your MongoDB instance or cluster.

To learn more, see Connect to MongoDB.

重要: connection.uri設定で認証情報が公開されないようにするには、 ConfigProvider を使用します 適切な構成パラメーターを設定します。

Default: mongodb://localhost:27017,localhost:27018,localhost:27019
Accepted Values: A MongoDB URI connection string
database
Type: string

Description:
Name of the database to watch for changes. If not set, the connector watches all databases for changes.

Default: ""
Accepted Values: A single database name
collection
Type: string

Description:
Name of the collection in the database to watch for changes. If not set, the connector watches all collections for changes.

IMPORTANT: If your database configuration is set to "", the connector ignores the collection setting.

Default: ""
Accepted Values: A single collection name
server.api.version
Type: string

Description:
The Stable API version you want to use with your MongoDB cluster. For more information on the Stable API and versions of MongoDB server that support it, see the Stable API guide.

Default: ""
Accepted Values: An empty string or a valid Stable API version.
server.api.deprecationErrors
Type: boolean

Description:
When set to true, if the connector calls a command on your MongoDB instance that's deprecated in the declared Stable API version, it raises an exception.

You can set the API version with the server.api.version configuration option. For more information on the Stable API, see the MongoDB manual entry on the Stable API.

Default: false
Accepted Values: true or false
server.api.strict
Type: boolean

Description:
When set to true, if the connector calls a command on your MongoDB instance that's not covered in the declared Stable API version, it raises an exception.

You can set the API version with the server.api.version configuration option. For more information on the Stable API, see the MongoDB manual entry on the Stable API.

Default: false
Accepted Values: true or false

次の構成設定を使用して、MongoDB Kafka ソース コネクタがデータを公開する Kafka トピックを指定します。

Kafka トピックに関連するオプションのみを表示するには、「 Kafka トピックのプロパティ」ページを参照してください。

名前
説明
topic.prefix
Type: string

Description:
Specifies the first part of the destination Kafka topic name to which the connector publishes change stream events. The destination topic name is composed of the topic.prefix value followed by the database and collection names, separated by the value specified in the topic.separator property.

To learn more, see the example in Topic Naming Prefix.

Default: ""
Accepted Values: A string composed of ASCII alphanumeric characters including ".", "-", and "_"
topic.suffix
Type: string

Description:
Specifies the last part of the destination Kafka topic name to which the connector publishes change stream events. The destination topic name is composed of the database and collection names followed by the topic.suffix value, separated by the value specified in the topic.separator property.

To learn more, see the example in Topic Naming Suffix.

Default: ""
Accepted Values: A string composed of ASCII alphanumeric characters including ".", "-", and "_"
topic.namespace.map
Type: string

Description:
Specifies a JSON mapping between change stream document namespaces and topic names.

You can use to topic.namespace.map property to specify complex mappings. This property supports regex and wildcard matching.

To learn more about these behaviors and view examples, see Topic Namespace Map.

Default: ""
Accepted Values: A valid JSON object
topic.separator
Type: string

Description:
Specifies the string the connector uses to concatenate the values used to create the name of your topic. The connector publishes records to a topic with a name formed by concatenating the values of the following fields in the following order:
  1. topic.prefix

  2. database

  3. collection

  4. topic.suffix

For example, the following configuration instructs the connector to publish change stream documents from the coll collection of the db database to the prefix-db-coll topic:
topic.prefix=prefix
database=db
collection=coll
topic.separator=-
IMPORTANT: When you use the topic.separator property, note that it doesn't affect how you define the topic.namespace.map property. The topic.namespace.map property uses MongoDB namespaces which you must always specify with a . character to separate the database and collection name.

Default: "."
Accepted Values: A string
topic.mapper
Type: string

Description:
The Java class that defines your custom topic mapping logic.

Default: com.mongodb.kafka.connect.source.topic.mapping.DefaultTopicMapper
Accepted Values: Valid full class name of an implementation of the TopicMapper class.

MongoDB Kafka ソース コネクタを使用する場合、次の構成設定を使用して、変更ストリームの集計パイプラインと変更ストリーム カーソルの読み込み設定(read preference)を指定します。

変更ストリームに関連するオプションのみを表示するには、「変更ストリームのプロパティ」ページを参照してください。

名前
説明
pipeline
Type: string

Description:
An array of aggregation pipelines to run in your change stream. You must configure this setting for the change stream event document, not the fullDocument field.

For example:
[{"$match": { "$and": [{"operationType": "insert"}, {"fullDocument.eventId": 1404 }] } }]
For more examples, see:
Default: "[]"
Accepted Values: Valid aggregation pipeline stage
change.stream.full.document
Type: string

Description:
Determines what values your change stream returns on update operations.

The default setting returns the differences between the original document and the updated document.

The updateLookup setting returns the differences between the original document and updated document as well as a copy of the entire updated document at a point in time after the update.

The whenAvailable setting returns the updated document, if available.

The required setting returns the updated document and raises an error if it is not available.

For more information on how this change stream option works, see Lookup Full Document for Update Operations in the MongoDB manual.

Default: ""
Accepted Values: "", "updateLookup", "whenAvailable", or "required"
change.stream.full.document.before.change
Type: string

Description:
Configures the document pre-image your change stream returns on update operations. The pre-image is not available for source records published while copying existing data, and the pre-image configuration has no effect on copying.

To learn how to configure a collection to enable pre-images, see Change Streams with Document Pre- and Post-Images in the MongoDB manual.

The default setting suppresses the document pre-image.

The whenAvailable setting returns the document pre-image if it's available, before it was replaced, updated, or deleted.

The required setting returns the document pre-image and raises an error if it is not available.

Default: ""
Accepted Values: "" or "whenAvailable" or "required"
publish.full.document.only
Type: boolean

Description:
Whether to return only the fullDocument field from the change stream event document produced by any update event. The fullDocument field contains the most current version of the updated document. To learn more about the fullDocument field, see the update Event in the Server manual.

When set to true, the connector overrides the change.stream.full.document setting and sets it to updateLookup so that the fullDocument field contains updated documents.

Default: false
Accepted Values: true or false
publish.full.document.only.tombstone.on.delete
Type: boolean

Description:
Whether to return tombstone events when documents are deleted. Tombstone events contain the keys of deleted documents with null values. This setting applies only when publish.full.document.only is true.

Default: false
Accepted Values: true or false
change.stream.document.key.as.key
Type: boolean

Description:
Whether to use the document key for the source record key if the document key is present.

When set to true, the connector adds keys of the deleted documents to the tombstone events. When set to false, the connector uses the resume token as the source key for the tombstone events.

Default: true
Accepted Values: true or false
collation
Type: string

Description:
A JSON collation document that specifies language-specific ordering rules that MongoDB applies to the documents returned by the change stream.

Default: ""
Accepted Values: A valid collation JSON document
batch.size
Type: int

Description:
The change stream cursor batch size.

Default: 0
Accepted Values: An integer
poll.await.time.ms
Type: long

Description:
The maximum amount of time in milliseconds that the server waits for new data changes to report to the change stream cursor before returning an empty batch.

Default: 5000
Accepted Values: An integer
poll.max.batch.size
Type: int

Description:
Maximum number of documents to read in a single batch when polling a change stream cursor for new data. You can use this setting to limit the amount of data buffered internally in the connector.

Default: 1000
Accepted Values: An integer

次の構成設定を使用して、MongoDB Kafka ソース コネクタが Kafka トピックに公開するデータの形式を指定します。

出力の形式に関連するオプションのみを表示するには、「出力形式のプロパティ」ページを参照してください。

名前
説明
output.format.key
Type: string

Description:
Specifies which data format the source connector outputs the key document.

Default: json
Accepted Values: bson, json, schema
output.format.value
Type: string

Description:
Specifies which data format the source connector outputs the value document.

The connector supports Protobuf as an output data format. You can enable this format by specifying the schema value and installing and configuring the Kafka Connect Protobuf Converter.

Default: json
Accepted Values: bson, json, schema
output.json.formatter
Type: string

Description:
Class name of the JSON formatter the connector should use to output data.

Default:
com.mongodb.kafka.connect.source.json.formatter.DefaultJson
Accepted Values:
Your custom JSON formatter full class name or one of the following built-in formatter class names:
com.mongodb.kafka.connect.source.json.formatter.DefaultJson
com.mongodb.kafka.connect.source.json.formatter.ExtendedJson
com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson
To learn more about these output formats, see JSON Formatters.
output.schema.key
Type: string

Description:
Specifies an Avro schema definition for the key document of the SourceRecord.

To learn more about Avro schema, see Avro in the Data Formats guide.

Default:
{
"type": "record",
"name": "keySchema",
"fields" : [ { "name": "_id", "type": "string" } ]"
}
Accepted Values: A valid Avro schema
output.schema.value
Type: string

Description:
Specifies an Avro schema definition for the value document of the SourceRecord.

To learn more about Avro schema, see Avro in the Data Formats guide.

Default:
{
"name": "ChangeStream",
"type": "record",
"fields": [
{ "name": "_id", "type": "string" },
{ "name": "operationType", "type": ["string", "null"] },
{ "name": "fullDocument", "type": ["string", "null"] },
{ "name": "ns",
"type": [{"name": "ns", "type": "record", "fields": [
{"name": "db", "type": "string"},
{"name": "coll", "type": ["string", "null"] } ]
}, "null" ] },
{ "name": "to",
"type": [{"name": "to", "type": "record", "fields": [
{"name": "db", "type": "string"},
{"name": "coll", "type": ["string", "null"] } ]
}, "null" ] },
{ "name": "documentKey", "type": ["string", "null"] },
{ "name": "updateDescription",
"type": [{"name": "updateDescription", "type": "record", "fields": [
{"name": "updatedFields", "type": ["string", "null"]},
{"name": "removedFields",
"type": [{"type": "array", "items": "string"}, "null"]
}] }, "null"] },
{ "name": "clusterTime", "type": ["string", "null"] },
{ "name": "txnNumber", "type": ["long", "null"]},
{ "name": "lsid", "type": [{"name": "lsid", "type": "record",
"fields": [ {"name": "id", "type": "string"},
{"name": "uid", "type": "string"}] }, "null"] }
]
}
Accepted Values: A valid JSON schema
output.schema.infer.value
Type: boolean

Description:
Whether the connector should infer the schema for the value document of the SourceRecord. Since the connector processes each document in isolation, the connector may generate many schemas.

IMPORTANT: The connector only reads this setting when you set your output.format.value setting to schema.

Default: false
Accepted Values: true or false

次の構成設定を使用して、MongoDB Kafka ソース コネクタの起動を構成し、MongoDB コレクションを Change Stream イベントに変換します。

スタートアップに関連するオプションのみを表示するには、「スタートアップ プロパティ」ページを参照してください。

名前
説明
startup.mode
Type: string

Description:
Specifies how the connector should start up when there is no source offset available. Resuming a change stream requires a resume token, which the connector gets from the source offset. If no source offset is available, the connector may either ignore all or some of the existing source data, or may at first copy all existing source data and then continue with processing new data.

If startup.mode=latest, the connector ignores all existing source data.

If startup.mode=timestamp, the connector actuates startup.mode.timestamp.* properties. If no properties are configured, timestamp is equivalent to latest.

If startup.mode=copy_existing, the connector copies all existing source data to Change Stream events. This setting is equivalent to the deprecated setting copy.existing=true.

ソース コネクタが既存のデータを変換するときに、いずれかのシステムがデータベース内のデータを変更した場合、MongoDB は最新の変更を反映するために重複した変更ストリーム イベントを生成することがあります。 データコピーが依存する変更ストリーム イベントは偶数であるため、コピーされたデータは結果整合性があります。

Default:latest
Accepted Values: latest, timestamp, copy_existing
startup.mode.timestamp.start.at.operation.time
Type: string

Description:
Actuated only if startup.mode=timestamp. Specifies the starting point for the change stream.

To learn more about Change Stream parameters, see $changeStream (aggregation) in the MongoDB manual.

Default: ""
Accepted Values:
  • 10進数形式での UNIXエポックからの整数(例: 30

  • 1 秒の精度を持つ ISO-8601 形式のインスタンス(例: 1970-01-01T00:00:30Z

  • 標準拡張 JSON(v2)形式の BSON タイムスタンプ(例: {"$timestamp": {"t": 30, "i": 0}}

startup.mode.copy.existing.namespace.regex
Type: string

Description:
Regular expression the connector uses to match namespaces from which to copy data. A namespace describes the MongoDB database name and collection separated by a period (for example, databaseName.collectionName).

For example, the following regular-expression setting matches collections that start with "page" in the stats database:
startup.mode.copy.existing.namespace.regex=stats\.page.*
The \ character in the example above escapes the . character that follows it in the regular expression. For more information on how to build regular expressions, see Patterns in the Java API documentation.

Default: ""
Accepted Values: A valid regular expression
startup.mode.copy.existing.pipeline
Type: string

Description:
An inline array of pipeline operations the connector runs when copying existing data. You can use this setting to filter the source collection and improve the use of indexes in the copying process.

For example, the following setting uses the $match aggregation operator to instruct the connector to copy only documents that contain a closed field with a value of false.
startup.mode.copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]
Default: ""
Accepted Values: Valid aggregation pipeline stages
startup.mode.copy.existing.max.threads
Type: int

Description:
The maximum number of threads the connector can use to copy data.

Default: number of processors available in the environment
Accepted Values: An integer
startup.mode.copy.existing.queue.size
Type: int

Description:
The size of the queue the connector can use when copying data.

Default: 16000
Accepted Values: An integer
startup.mode.copy.existing.allow.disk.use
Type: boolean

Description:
When set to true, the connector uses temporary disk storage for the copy existing aggregation.

Default: true
Accepted Values: true or false

次の構成設定を使用して、MongoDB Kafka ソース コネクタがエラーを発生したときにどのように動作するかを指定し、中断された読み取りの再開に関連する設定を指定します。

エラーの処理に関連するオプションのみを表示するには、「エラー処理と中断からの再開のプロパティ」ページを参照してください。

名前
説明
mongo.errors.tolerance
Type: string

Description:
Whether to continue processing messages when the connector encounters an error.

Set this to "none" if you want the connector to stop processing messages and report the issue if it encounters an error.

Set this to "all" if you want the connector to continue processing messages and ignore any errors it encounters.

IMPORTANT: This property overrides the errors.tolerance Connect Framework property.

Default: "none"
Accepted Values: "none" or "all"
mongo.errors.log.enable
Type: boolean

Description:
Whether the connector should report errors in the log file.

Set this to true to log all errors the connector encounters.

Set this to false to log errors that are not tolerated by the connector. You can specify which errors the connector should tolerate using the errors.tolerance or mongo.errors.tolerance setting.

IMPORTANT: This property overrides the errors.log.enable Connect Framework property.

Default: false
Accepted Values: true or false
mongo.errors.deadletterqueue.topic.name
Type: string

Description:
The name of topic to use as the dead letter queue.

If you specify a value, the connector writes invalid messages to the dead letter queue topic as extended JSON strings.

If you leave this setting blank, the connector does not write invalid messages to any topic.
IMPORTANT: You must set errors.tolerance or mongo.errors.tolerance setting to "all" to enable this property.

Default: ""
Accepted Values: A valid Kafka topic name
offset.partition.name
Type: string

Description:
The custom offset partition name to use. You can use this option to instruct the connector to start a new change stream when an existing offset contains an invalid resume token.

If you leave this setting blank, the connector uses the default partition name based on the connection details.

To view a strategy for naming offset partitions, see Reset Stored Offsets.

Default: ""
Accepted Values: A string. To learn more about naming a partition, see SourceRecord in the Apache Kafka API documentation.
heartbeat.interval.ms
Type: long

Description:
The number of milliseconds the connector waits between sending heartbeat messages. The connector sends heartbeat messages when source records are not published in the specified interval. This mechanism improves resumability of the connector for low volume namespaces.

Heartbeat messages contain a postBatchResumeToken data field. The value of this field contains the MongoDB server oplog entry that the connector last read from the change stream.

Set this to 0 to disable heartbeat messages.

To learn more, see Prevention in the Invalid Resume Token page.

Default: 0
Accepted Values: An integer
heartbeat.topic.name
Type: string

Description:
The name of the topic on which the connector should publish heartbeat messages. You must provide a positive value in the heartbeat.interval.ms setting to enable this feature.

Default: __mongodb_heartbeats
Accepted Values: A valid Kafka topic name

戻る

エラー処理と中断からの再開