모든 소스 커넥터 구성 속성
개요
이 페이지에서는 MongoDB Kafka 소스 커넥터에 사용 가능한 모든 구성 속성을 볼 수 있습니다. 이 페이지는 다른 소스 커넥터 구성 속성 페이지의 내용을 복제합니다.
모든 소스 connector 구성 속성 페이지의 목록을 보려면 소스 Connector 구성 속성 페이지를 참조하세요.
MongoDB 연결
다음 구성 설정을 사용하여 MongoDB Kafka 소스 커넥터가 연결을 설정하고 MongoDB cluster와 통신하는 방법을 지정합니다.
MongoDB 연결과 관련된 옵션만 보려면 MongoDB 소스 연결 속성 페이지를 참조하세요.
이름 | 설명 |
---|---|
connection.uri | Required Type: string Description: The URI connection string
to connect to your MongoDB instance or cluster. To learn more, see Connect to MongoDB. 중요: 설정에서 인증 자격 증명 이 노출되지 않도록 Default: mongodb://localhost:27017,localhost:27018,localhost:27019 Accepted Values: A MongoDB URI connection string |
database | Type: string Description: Name of the database to watch for changes. If not set, the connector
watches all databases for changes. Default: "" Accepted Values: A single database name |
collection | Type: string Description: Name of the collection in the database to watch for changes. If not
set, the connector watches all collections for changes. IMPORTANT: If your database configuration is set to "" , the connector
ignores the collection setting.Default: "" Accepted Values: A single collection name |
server.api.version | Type: string Description: The Stable API version you want to use with your MongoDB
cluster. For more information on the Stable API and versions of
MongoDB Server that support it, see the Stable API
guide. Default: "" Accepted Values: An empty string or a valid Stable API version. |
server.api.deprecationErrors | Type: boolean Description: When set to true , if the connector calls a command on your
MongoDB instance that's deprecated in the declared Stable API
version, it raises an exception.You can set the API version with the server.api.version
configuration option. For more information on the Stable API, see
the MongoDB manual entry on the
Stable API.Default: false Accepted Values: true or false |
server.api.strict | Type: boolean Description: When set to true , if the connector calls a command on your
MongoDB instance that's not covered in the declared Stable API
version, it raises an exception.You can set the API version with the server.api.version
configuration option. For more information on the Stable API, see
the MongoDB manual entry on the
Stable API.Default: false Accepted Values: true or false |
Kafka 주제
다음 구성 설정을 사용하여 MongoDB Kafka 소스 커넥터가 데이터를 게시할 Kafka 주제를 지정합니다.
Kafka 주제와 관련된 옵션만 보려면 Kafka 주제 속성 페이지를 참조하세요.
이름 | 설명 | ||||
---|---|---|---|---|---|
topic.prefix | Type: string Description: Specifies the first part of the destination Kafka
topic name to which the connector publishes change stream events.
The destination topic name is composed of the topic.prefix
value followed by the database and collection names, separated by the value
specified in the topic.separator property.To learn more, see the example in Topic Naming Prefix. Default: "" Accepted Values: A string composed of ASCII alphanumeric
characters including ".", "-", and "_" | ||||
topic.suffix | Type: string Description: Specifies the last part of the destination Kafka
topic name to which the connector publishes change stream events.
The destination topic name is composed of the database and
collection names followed by the topic.suffix value,
separated by the value specified in the topic.separator property.To learn more, see the example in Topic Naming Suffix. Default: "" Accepted Values: A string composed of ASCII alphanumeric
characters including ".", "-", and "_" | ||||
topic.namespace.map | Type: string Description: Specifies a JSON mapping between change stream document
namespaces
and topic names. You can use to topic.namespace.map property to
specify complex mappings. This property supports regex
and wildcard matching.To learn more about these behaviors and
view examples, see Topic Namespace Map. Default: "" Accepted Values: A valid JSON object | ||||
topic.separator | Type: string Description: Specifies the string the connector uses to concatenate the values used
to create the name of your topic. The connector publishes records to a
topic with a name formed by concatenating the values of the following fields
in the following order:
For example, the following configuration instructs the connector to publish
change stream documents from the coll collection of the
db database to the prefix-db-coll topic:
IMPORTANT: When you use the topic.separator property, note that it
doesn't affect how you define the topic.namespace.map property.
The topic.namespace.map property uses MongoDB
namespaces
which you must always specify with a . character to separate
the database and collection name.Default: "." Accepted Values: A string | ||||
topic.mapper | Type: string Description: The Java class that defines your custom topic mapping logic. Default: com.mongodb.kafka.connect.source.topic.mapping.DefaultTopicMapper Accepted Values: Valid full class name of an implementation
of the TopicMapper
class. |
변경 스트림
다음 구성 설정을 사용하여 MongoDB Kafka 소스 커넥터로 작업할 때 변경 스트림에 대한 집계 파이프라인과 변경 스트림 커서에 대한 읽기 설정을 지정합니다.
변경 스트림과 관련된 옵션만 보려면 변경 스트림 속성 페이지를 참조하세요.
이름 | 설명 | |
---|---|---|
pipeline | Type: string Description: An array of aggregation pipelines to run in your change stream.
You must configure this setting for the change stream
event document, not the fullDocument field.For example:
For more examples, see: Default: "[]" Accepted Values: Valid aggregation pipeline stage | |
change.stream.full.document | Type: string Description: Determines what values your change stream returns on update
operations. The default setting returns the differences between the original
document and the updated document. The updateLookup setting returns the differences between the
original document and updated document as well as a copy of the
entire updated document at a point in time after the update.The whenAvailable setting returns the updated document,
if available.The required setting returns the updated document and
raises an error if it is not available.For more information on how this change stream option works, see
Lookup Full Document for Update Operations
in the MongoDB manual. Default: "" Accepted Values: "" , "updateLookup" , "whenAvailable" , or "required" | |
change.stream.full.document.before.change | Type: string Description: Configures the document pre-image your change stream returns on update
operations. The pre-image is not available for source records
published while copying existing data, and the pre-image configuration
has no effect on copying. To learn how to configure a collection to enable
pre-images, see Change Streams with Document Pre- and Post-Images in the MongoDB manual. The default setting suppresses the document pre-image. The whenAvailable setting returns the document pre-image if
it's available, before it was replaced, updated, or
deleted.The required setting returns the document pre-image and
raises an error if it is not available.Default: "" Accepted Values: "" or "whenAvailable" or "required" | |
publish.full.document.only | Type: boolean Description: Whether to return only the fullDocument field from the
change stream event document produced by any update event. The
fullDocument field contains the most current version of the
updated document. To learn more about the fullDocument
field, see the update Event in the Server manual.When set to true , the connector overrides the
change.stream.full.document setting and sets it to
updateLookup so that the fullDocument field contains
updated documents.Default: false Accepted Values: true or false | |
publish.full.document.only.tombstone.on.delete | Type: boolean Description: Whether to return tombstone events when documents are deleted.
Tombstone events contain the keys of deleted documents with
null values. This setting applies only when
publish.full.document.only is true .Default: false Accepted Values: true or false | |
change.stream.document.key.as.key | Type: boolean Description: Whether to use the document key for the source record key if
the document key is present. When set to true , the connector adds keys of the deleted
documents to the tombstone events. When set to false ,
the connector uses the resume token as the source key for
the tombstone events.Default: true Accepted Values: true or false | |
collation | Type: string Description: A JSON collation document
that specifies language-specific ordering rules that MongoDB
applies to the documents returned by the change stream. Default: "" Accepted Values: A valid collation JSON document | |
batch.size | Type: int Description: The change stream cursor batch size. Default: 0 Accepted Values: An integer | |
poll.await.time.ms | Type: long Description: The maximum amount of time in milliseconds that the server waits for new
data changes to report to the change stream cursor before returning an
empty batch. Default: 5000 Accepted Values: An integer | |
poll.max.batch.size | Type: int Description: Maximum number of documents to read in a single batch when polling
a change stream cursor for new data. You can use this setting to
limit the amount of data buffered internally in the connector. Default: 1000 Accepted Values: An integer |
출력 형식
MongoDB Kafka 소스 커넥터가 Kafka 주제에 게시하는 데이터 형식을 지정하려면 다음 구성 설정을 사용합니다.
출력 형식과 관련된 옵션만 보려면 출력 형식 속성 페이지를 참조하세요.
이름 | 설명 | |||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
output.format.key | Type: string Description: Specifies which data format the source connector outputs the key
document. Default: json Accepted Values: bson , json , schema | |||||||||||||||||||||||||||||||
output.format.value | Type: string Description: Specifies which data format the source connector outputs the value
document. The connector supports Protobuf as an
output data format. You can enable this format by specifying the
schema value and installing and configuring the Kafka Connect Protobuf
Converter.Default: json Accepted Values: bson , json , schema | |||||||||||||||||||||||||||||||
output.json.formatter | Type: string Description: Class name of the JSON formatter the connector should use to
output data. Default:
Accepted Values: Your custom JSON formatter full class name or one of the
following built-in formatter class names:
To learn more about these output formats, see JSON Formatters. | |||||||||||||||||||||||||||||||
output.schema.key | Type: string Description: Specifies an Avro schema definition for the key document of the
SourceRecord. To learn more about Avro schema, see Avro in the
Data Formats guide. Default:
Accepted Values: A valid Avro schema | |||||||||||||||||||||||||||||||
output.schema.value | Type: string Description: Specifies an Avro schema definition for the value document of the
SourceRecord. To learn more about Avro schema, see Avro in the
Data Formats guide. Default:
Accepted Values: A valid JSON schema | |||||||||||||||||||||||||||||||
output.schema.infer.value | Type: boolean Description: Whether the connector should infer the schema for the value
document of the SourceRecord.
Since the connector processes each document in isolation, the
connector may generate many schemas. IMPORTANT: The connector only reads this setting when you set your
output.format.value setting to schema .Default: false Accepted Values: true or false |
시작
다음 구성 설정을 사용하여 MongoDB 컬렉션을 변경 스트림 이벤트로 변환하기 위한 MongoDB Kafka 소스 커넥터의 시작을 구성합니다.
시작과 관련된 옵션만 보려면 시작 속성 페이지를 참조하세요.
이름 | 설명 | |
---|---|---|
startup.mode | Type: string Description: Specifies how the connector should start up when there is no
source offset available. Resuming a change stream requires a
resume token, which the connector gets from the source offset.
If no source offset is available, the connector may either
ignore all or some of the existing source data, or may at first
copy all existing source data and then continue with processing
new data. If startup.mode=latest , the connector ignores all existing
source data.If startup.mode=timestamp , the connector
actuates startup.mode.timestamp.* properties. If no
properties are configured, timestamp is equivalent to
latest .If startup.mode=copy_existing , the connector
copies all existing source data to Change Stream events. This
setting is equivalent to the deprecated setting copy.existing=true .소스 커넥터가 데이터베이스의 기존 데이터를 변환하는 동안 시스템에서 데이터베이스의 데이터를 변경하는 경우, MongoDB는 최신 변경 사항을 반영하기 위해 중복된 변경 스트림 이벤트를 생성할 수 있습니다. 데이터 복사가 의존하는 변경 스트림 이벤트는 멱등성이 있으므로 복사된 데이터는 결국 일관성을 갖습니다. Default: latest Accepted Values: latest , timestamp , copy_existing | |
startup.mode.timestamp.start.at.operation.time | Type: string Description: Actuated only if startup.mode=timestamp . Specifies the
starting point for the change stream.To learn more about Change Stream parameters, see
$changeStream (aggregation)
in the MongoDB manual. Default: "" Accepted Values:
| |
startup.mode.copy.existing.namespace.regex | Type: string Description: Regular expression the connector uses to match namespaces from
which to copy data. A namespace describes the MongoDB database name
and collection separated by a period (for example, databaseName.collectionName ).For example, the following regular-expression setting matches
collections that start with "page" in the stats database:
The \ character in the example above escapes the . character
that follows it in the regular expression. For more information on
how to build regular expressions, see
Patterns
in the Java API documentation.Default: "" Accepted Values: A valid regular expression | |
startup.mode.copy.existing.pipeline | Type: string Description: An inline array of pipeline operations
the connector runs when copying existing data. You can use this
setting to filter the source collection and improve the use of
indexes in the copying process. For example, the following setting uses the $match
aggregation operator to instruct the connector to copy only
documents that contain a closed field with a value of false .
Default: "" Accepted Values: Valid aggregation pipeline stages | |
startup.mode.copy.existing.max.threads | Type: int Description: The maximum number of threads the connector can use to copy data. Default: number of processors available in the environment Accepted Values: An integer | |
startup.mode.copy.existing.queue.size | Type: int Description: The size of the queue the connector can use when copying data. Default: 16000 Accepted Values: An integer | |
startup.mode.copy.existing.allow.disk.use | Type: boolean Description: When set to true , the connector uses temporary disk storage
for the copy existing aggregation.Default: true Accepted Values: true or false |
오류 처리 및 중단 후 재개
다음 구성 설정을 사용하여 MongoDB Kafka 소스 커넥터에서 오류가 발생할 때 작동하는 방식을 지정하고 중단된 읽기 재개와 관련된 설정을 지정할 수 있습니다.
오류 처리와 관련된 옵션만 보려면 오류 처리 및 중단 후 재개 속성 페이지를 참조하세요.
이름 | 설명 |
---|---|
mongo.errors.tolerance | Type: string Description: Whether to continue processing messages when the connector encounters
an error. Set this to "none" if you want the connector to stop
processing messages and report the issue if it encounters an
error.Set this to "all" if you want the connector to continue
processing messages and ignore any errors it encounters.IMPORTANT: This property overrides the
errors.tolerance
Connect Framework property. Default: "none" Accepted Values: "none" or "all" |
mongo.errors.log.enable | Type: boolean Description: Whether the connector should report errors in the log file. Set this to true to log all errors the connector encounters.Set this to false to log errors that are not tolerated by the
connector. You can specify which errors the connector should
tolerate using the errors.tolerance or mongo.errors.tolerance
setting.IMPORTANT: This property overrides the
errors.log.enable
Connect Framework property. Default: false Accepted Values: true or false |
mongo.errors.deadletterqueue.topic.name | Type: string Description: The name of topic to use as the dead letter queue. If you specify a value, the connector writes invalid messages to the
dead letter queue topic as extended JSON strings. If you leave this setting blank, the connector does not write
invalid messages to any topic. IMPORTANT: You must set errors.tolerance or mongo.errors.tolerance
setting to "all" to enable this property.Default: "" Accepted Values: A valid Kafka topic name |
offset.partition.name | Type: string Description: The custom offset partition name to use. You can use this option
to instruct the connector to start a new change stream when an
existing offset contains an invalid resume token. If you leave this setting blank, the connector uses the default partition name
based on the connection details. To view a strategy for naming
offset partitions, see Reset Stored Offsets. Default: "" Accepted Values: A string. To learn more about naming a partition,
see
SourceRecord
in the Apache Kafka API documentation. |
heartbeat.interval.ms | Type: long Description: The number of milliseconds the connector waits between sending
heartbeat messages. The connector sends heartbeat messages when
source records are not published in the specified interval. This mechanism improves
resumability of the connector for low volume namespaces. Heartbeat messages contain a postBatchResumeToken data field.
The value of this field contains the MongoDB server oplog entry that
the connector last read from the change stream.Set this to 0 to disable heartbeat messages.To learn more, see Prevention
in the Invalid Resume Token
page. Default: 0 Accepted Values: An integer |
heartbeat.topic.name | Type: string Description: The name of the topic on which the connector should publish
heartbeat messages. You must provide a positive value in the
heartbeat.interval.ms setting to enable this feature.Default: __mongodb_heartbeats Accepted Values: A valid Kafka topic name |