スキーマを指定する
この使用例では、MongoDB Kafka ソース コネクタを構成して、カスタムスキーマをデータに適用する方法を示します。 スキーマは、Apache Kafka トピック内のデータに関する構造とタイプ情報を指定する定義です。 ソース コネクタによって設定されるトピックのデータが一貫した構造になっているようにする必要がある場合は、スキーマを使用します。
コネクタでスキーマを使用する方法について詳しくは、 スキーマの適用ガイドをご覧ください。
例
アプリケーションが MongoDB コレクション内のカスタマー データを追跡し、このデータを Kafka トピックに公開するとします。 カスタマー データのサブスクライブに、一貫した形式のデータを受信したい。 データに スキーマ を適用することを選択します。
要件とソリューションは次のとおりです。
要件 | 解決法 |
---|---|
MongoDB コレクションからカスタマー データを受信する | Configure a MongoDB source connector to receive updates to data
from a specific database and collection. |
カスタマー データ スキーマを提供する | Specify a schema that corresponds to the structure and data types of
the customer data. |
カスタマー データから Kafka メタデータを省略 | Include only the data from the fullDocument field. |
上記の要件を満たす完全な構成ファイルについては、「 構成の指定 」を参照してください。
コレクションからのデータの受信
MongoDB コレクションからデータを受信するようにソース コネクタを構成するには、データベース名とコレクション名を指定します。 この例では、次のように、 customers
データベース内のpurchases
コレクションから読み取るようにコネクタを構成できます。
database=customers collection=purchases
カスタム スキーマの作成
コレクションのサンプル カスタマー データ ドキュメントには、次の情報が含まれています。
{ "name": "Zola", "visits": [ { "$date": "2021-07-25T17:30:00.000Z" }, { "$date": "2021-10-03T14:06:00.000Z" } ], "goods_purchased": { "apples": 1, "bananas": 10 } }
サンプル ドキュメントから、スキーマは次のデータ型を使用してフィールドを表示する必要があると判断します。
フィールド名 | データ型 | 説明 |
---|---|---|
name | Name of the customer | |
訪問 | 配列 の タイムスタンプ | カスタマーが訪問した日付 |
product_price | 顧客が購入した商品の名前と数量 |
以下のサンプル スキーマに示すように、Apache Avro スキーマ形式を使用してデータを記述できます。
{ "type": "record", "name": "Customer", "fields": [{ "name": "name", "type": "string" },{ "name": "visits", "type": { "type": "array", "items": { "type": "long", "logicalType": "timestamp-millis" } } },{ "name": "goods_purchased", "type": { "type": "map", "values": "int" } } ] }
重要
変換子
Atlas のバイナリ エンコーディングで Apache Kafka を介してデータを送信する場合は、Avro 変換を使用する必要があります。 詳細については、変換に関するガイド を参照してください。
公開レコードからメタデータを省略する
connectorは、カスタマー データ ドキュメントと、そのドキュメントを記述するメタデータを Kafka トピックに公開します。 次の設定を使用して、レコードのfullDocument
フィールドに含まれるドキュメント データのみを含めるように connector を設定できます。
publish.full.document.only=true
fullDocument
フィールドの詳細については、 Change Streamsのガイドを参照してください。
構成を指定する
カスタム スキーマ コネクターの構成は次のようになります。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your MongoDB connection URI> database=customers collection=purchases publish.full.document.only=true output.format.value=schema output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]} value.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter
注意
埋め込みスキーマ
上記の構成では、 Kafka Connect JSON schema 変換 は カスタム スキーマを メッセージに埋め込みます。 JSON schema 変換の詳細については、変換のガイドを参照してください。
スキーマの指定の詳細については、「スキーマの適用」ガイドを参照してください。