Kafka Connect MongoDB からの移行
このガイドを使用して、コミュニティが作成したKafka KafkaConnectMongoDB から 配置を移行します。 sink コネクタを 公式MongoDBKafka Connector に接続します。
次のセクションでは、MongoDB Kafka Sink Connector に移行するために Kafka Connect Sink Connector の構成設定とカスタム クラスに加える必要がある変更を一覧にしています。
構成設定の更新
Kafka Connect 配置の構成設定を次のように変更してから、 MongoDB Kafka Connector配置で使用します。
パッケージ
at.grahsl.kafka.connect.mongodb
を含む値をパッケージcom.mongodb.kafka.connect
で置き換えます。connector.class
の設定を MongoDB Kafka シンク コネクタ クラスに置き換えます。connector.class=com.mongodb.kafka.connect.MongoSinkConnector Kafka Connect プロパティ名から
mongodb.
プレフィックスを削除します。 たとえば、mongodb.connection.uri
をconnection.uri
に変更します。document.id.strategies
設定が存在する場合は、削除します。 この設定の値がカスタム戦略を参照する場合は、document.id.strategy
設定に移動します。 カスタム クラスに加える必要がある変更については、「カスタム クラスのアップデート」セクションをお読みください。コネクタの Kafka トピック構成トピック プロパティで、
mongodb.collection
プレフィックスを含むトピックごとまたはコレクションごとのオーバーライドを指定するために使用するプロパティ名を同等のキーに置き換えます。
カスタム クラスの更新
Kafka Connect Sink Connector の配置でカスタム クラスを使用する場合は、MongoDB Kafka Connector の配置に追加する前に、それらに次の変更を加えてください。
at.grahsl.kafka.connect.mongodb
を含むインポートをcom.mongodb.kafka.connect
に置き換えます。MongoDbSinkConnector
クラスへの参照をMongoSinkConnector
クラスに置き換えます。カスタム シンク コネクタ戦略クラスを更新して
com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy
インターフェースを実装します。MongoDbSinkConnectorConfig
クラスへの参照を更新します。 MongoDB Kafka Connectorでは、そのクラスのロジックは次のクラスに分割されます。
ポストプロセッサ サブクラスの更新
KafkaKafkaConnectPostProcessor
コネクタの配置にポストプロセッサをサブクラスするクラスがある場合は、 Connect クラス内のものをオーバーライドするメソッドを、MongoDBKafka Connector PostProcessor クラスのメソッド署名と一致するように更新します。