Invalid Resume Token
Overview
MongoDB Kafka ソース コネクタで無効な再開トークンから回復する方法を学びます。
スタック トレース
次のスタック トレースは、ソース コネクタに無効な再開トークンがあることを示しています。
... org.apache.kafka.connect.errors.ConnectException: ResumeToken not found. Cannot create a change stream cursor ... Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog ...
原因
ソース コネクタの再開トークンの ID が MongoDB 配置の oplog のどのエントリにも対応しない場合、コネクタは MongoDB 変更ストリームの処理を開始する場所を判断する方法がありません。 この問題が発生する可能性のあるシナリオを確認するには、次のタブをクリックします。
このシナリオでは、ソース コネクタを一時停止し、MongoDB 配置の oplog を入力します。
MongoDB Kafka ソース コネクタを使用して Kafka 配置を開始します。
ソース MongoDB 名前空間に変更イベントを生成し、connector はこのイベントに対応する再開トークンを保存します。
ソース コネクタを一時停止します。
connectorが一時停止している間に MongoDB oplog を埋めて、MongoDB は再開トークンに対応する oplog エントリを削除します。
ソース コネクタを再起動しても、再開トークンが MongoDB oplog に存在しないため、処理を再開できません。
このシナリオでは、ソース コネクタは、アップデートの頻度が低い MongoDB 名前空間 の変更をリッスンし、ハートビート機能が有効になっていない場合します。
MongoDB Kafka ソース コネクタを使用して Kafka 配置を開始します。
ソース MongoDB 名前空間に変更イベントを生成し、connector はこのイベントに対応する再開トークンを保存します。
MongoDB 配置が oplog から再開トークンを発行するために使用する変更イベントをローテーションするのにかかる時間は、ソース MongoDB 名前空間は更新されません。
ソース MongoDB 名前空間に変更イベントが発生しており、ソース コネクタは MongoDB oplog に再開トークンが存在しないため、処理を再開できません。
oplog の詳細については、 MongoDB マニュアルを参照してください。
変更ストリームの詳細については、 Change Streamsガイドをご覧ください。
ソリューション
次のいずれかの方法を使用して、無効な再開トークンから回復できます。
エラーを一時的に許容する
コネクタの再開トークンをアップデートする変更ストリーム イベントを生成する間、エラーを許容するようにソース コネクタを構成できます。 この回復戦略は最も単純ですが、無効な再開トークンとは無関係なエラーをコネクターが一時的に無視するリスクがあります。 配置内のエラーを一時的に許容できない場合は、代わりに保存されたオフセットを削除できます。
エラーを一時的に許容するようにソース コネクタを構成するには、次の手順に従います。
すべてのエラーを許容するには、
errors.tolerance
オプションを に設定します。errors.tolerance=all ソース コネクタが参照するコレクション内のドキュメントを挿入、アップデート、または削除して、コネクタの再開トークンをアップデートする変更ストリーム イベントを生成します。
変更ストリーム イベントを生成したら、次のように
errors.tolerance
オプションを に設定してエラーを許容されなくなります。errors.tolerance=none
errors.tolerance
オプションの詳細については、「エラー処理と中断からの再開のプロパティ」ページを参照してください。
保存されたオフセットをリセットする
再開トークンを含む Kafka Connect オフセット データをリセットして、コネクタが変更ストリームの処理を再開できるようにすることができます。
オフセット データをリセットするには、 offset.partition.name
構成プロパティの値を、 Kafka 配置に存在しないパーティション名に変更します。 offset.partition.name
プロパティは次のように設定できます。
offset.partition.name=<a string>
Tip
オフセット パーティションの名前付け
オフセット パーティションに名前を付けるには、次のパターンを使用することを検討してください。
offset.partition.name=<source connector name>.<monotonically increasing number>
このパターンには、次の利点があります。
コネクタをリセットした回数を記録
オフセット パーティションが属するコネクターのドキュメント
例、ソースコネクタに"source-values"
という名前を付け、 offset.partition.name
プロパティを初めて設定する場合は、コネクタを次のように構成します。
offset.partition.name=source-values.1
次回コネクタのオフセット データをリセットするときは、コネクタを次のように構成します。
offset.partition.name=source-values.2
offset.partition.name
構成プロパティの詳細については、「エラー処理と中断プロパティからの再開」ページを参照してください。
コネクタの名前付けの詳細については、公式の Apache Kafka を参照してください ドキュメント。
防止
更新頻度の低い 名前空間によって発生する無効な再開トークン エラーを防ぐには、ハートビートを有効にします。 ハートビートは、ソース MongoDB 名前空間の内容が変更された場合に、コネクターに定期的に再開トークンを更新させるソース コネクタの機能です。
ハートビートを有効にするには、ソース コネクタ構成で次のオプションを指定します。
heartbeat.interval.ms=<a positive integer>
ハートビートの詳細については、「エラー処理と中断からの再開」ガイドを参照してください。