Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

スキーマを指定する

この使用例では、MongoDB Kafka ソース コネクタを構成して、カスタムスキーマをデータに適用する方法を示します。 スキーマは、Apache Kafka トピック内のデータに関する構造とタイプ情報を指定する定義です。 ソース コネクタによって設定されるトピックのデータが一貫した構造になっているようにする必要がある場合は、スキーマを使用します。

コネクタでスキーマを使用する方法について詳しくは、 スキーマの適用ガイドをご覧ください。

アプリケーションが MongoDB コレクション内のカスタマー データを追跡し、このデータを Kafka トピックに公開するとします。 カスタマー データのサブスクライブに、一貫した形式のデータを受信したい。 データに スキーマ を適用することを選択します。

要件とソリューションは次のとおりです。

要件
解決法

MongoDB コレクションからカスタマー データを受信する

Configure a MongoDB source connector to receive updates to data from a specific database and collection.

カスタマー データ スキーマを提供する

Specify a schema that corresponds to the structure and data types of the customer data.

カスタマー データから Kafka メタデータを省略

Include only the data from the fullDocument field.

上記の要件を満たす完全な構成ファイルについては、「 構成の指定 」を参照してください。

MongoDB コレクションからデータを受信するようにソース コネクタを構成するには、データベース名とコレクション名を指定します。 この例では、次のように、 customersデータベース内のpurchasesコレクションから読み取るようにコネクタを構成できます。

database=customers
collection=purchases

コレクションのサンプル カスタマー データ ドキュメントには、次の情報が含まれています。

{
"name": "Zola",
"visits": [
{
"$date": "2021-07-25T17:30:00.000Z"
},
{
"$date": "2021-10-03T14:06:00.000Z"
}
],
"goods_purchased": {
"apples": 1,
"bananas": 10
}
}

サンプル ドキュメントから、スキーマは次のデータ型を使用してフィールドを表示する必要があると判断します。

フィールド名
データ型
説明

name

Name of the customer

訪問

配列 の タイムスタンプ

カスタマーが訪問した日付

product_price

マップ string(想定された型)から 整数 への 値

顧客が購入した商品の名前と数量

以下のサンプル スキーマに示すように、Apache Avro スキーマ形式を使用してデータを記述できます。

{
"type": "record",
"name": "Customer",
"fields": [{
"name": "name",
"type": "string"
},{
"name": "visits",
"type": {
"type": "array",
"items": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
},{
"name": "goods_purchased",
"type": {
"type": "map",
"values": "int"
}
}
]
}

重要

変換子

Atlas のバイナリ エンコーディングで Apache Kafka を介してデータを送信する場合は、Avro 変換を使用する必要があります。 詳細については、変換に関するガイド を参照してください。

connectorは、カスタマー データ ドキュメントと、そのドキュメントを記述するメタデータを Kafka トピックに公開します。 次の設定を使用して、レコードのfullDocumentフィールドに含まれるドキュメント データのみを含めるように connector を設定できます。

publish.full.document.only=true

fullDocumentフィールドの詳細については、 Change Streamsのガイドを参照してください。

カスタム スキーマ コネクターの構成は次のようになります。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your MongoDB connection URI>
database=customers
collection=purchases
publish.full.document.only=true
output.format.value=schema
output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

注意

埋め込みスキーマ

上記の構成では、 Kafka Connect JSON schema 変換 は カスタム スキーマを メッセージに埋め込みます。 JSON schema 変換の詳細については、変換のガイドを参照してください。

スキーマの指定の詳細については、「スキーマの適用」ガイドを参照してください。

戻る

既存のデータのコピー