Docs Menu

Docs HomeMongoDB Kafka Connector

Copy Existing Properties

On this page

  • Overview
  • Settings

Use the following configuration settings to enable the copy existing feature which converts MongoDB collections into Change Stream events.

Tip

See also:

For an example of the copy existing feature, see the Copy Existing Data Usage Example.

For a list of source connector configuration settings organized by category, see the guide on Source Connector Configuration Properties.

Name
Description
copy.existing
Type: boolean

Description:
Whether to enable the copy existing feature which converts all data in a MongoDB collection to Change Stream events and publishes them on Kafka topics. If MongoDB changes the source collection data after the connector starts the copy process, the connector creates events for the changes after it completes the copy process.

Note

Data Copy Can Produce Duplicate Events

If any system changes the data in the database while the source connector converts existing data from it, MongoDB may produce duplicate change stream events to reflect the latest changes. Since the change stream events on which the data copy relies are idempotent, the copied data is eventually consistent.

Default:false
Accepted Values: true or false
copy.existing.namespace.regex
Type: string

Description:
Regular expression the connector uses to match namespaces from which to copy data. A namespace describes the MongoDB database name and collection separated by a period, e.g. databaseName.collectionName.

Example

In the following example, the regular expression setting matches collections that start with "page" in the "stats" database.

copy.existing.namespace.regex=stats\.page.*

The "\" character in the example above escapes the "." character that follows it in the regular expression. For more information on how to build regular expressions, see the Java API documentation on Patterns.

Default: ""
Accepted Values: A valid regular expression
copy.existing.pipeline
Type: string

Description:
An array of pipeline operations the connector runs when copying existing data. You can use this setting to filter the source collection and improve the use of indexes in the copying process.

Example

The following example shows how you can use the $match aggregation operator to instruct the connector to copy only documents that contain a closed field with a value of false.

copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]
Default: []
Accepted Values: Valid aggregation pipeline stages
copy.existing.max.threads
Type: int

Description:
The maximum number of threads the connector can use to copy data.
Default: number of processors available in the environment
Accepted Values: An integer
copy.existing.queue.size
Type: int

Description:
The size of the queue the connector can use when copying data.
Default: 16000
Accepted Values: An integer
copy.existing.allow.disk.use
Type: boolean

Description:
When set to true, the connector sets the copy existing aggregation to use temporary disk storage.
Default: true
←  Output Format PropertiesError Handling and Resuming from Interruption Properties →

On this page