Docs Menu
Docs Home
/
Spark Connector
/ /

ストリーミング読み取り構成オプション

項目一覧

  • Overview
  • ストリーム構成の変更
  • でのプロパティの指定 connection.uri
  • collectionプロパティで複数のコレクションを指定

ストリーミング モードで MongoDB からデータを読み取るときに、次のプロパティを構成できます。

注意

SparkConfを使用してコネクタの読み取り構成を設定する場合は、各プロパティの前にspark.mongodb.read.を付けます。

プロパティ名
説明
connection.uri
Required.
The connection string configuration key.

Default: mongodb://localhost:27017/
database
Required.
The database name configuration.
collection
Required.
The collection name configuration.
You can specify multiple collections by separating the collection names with a comma.

To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property.
comment
The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None
mode
The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST: スキーマに一致しないドキュメントを解析するときに例外をスローします。

  • ReadConfig.ParseMode.PERMISSIVE: データ型がスキーマと一致しない場合、フィールドをnullに設定します。 無効な各ドキュメントを拡張 JSON string として保存するには、この値をcolumnNameOfCorruptRecordオプションと組み合わせます。

  • ReadConfig.ParseMode.DROPMALFORMED: スキーマに一致しないドキュメントを無視します。


Default: ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None
mongoClientFactory
MongoClientFactory configuration key.
You can specify a custom implementation, which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
aggregation.pipeline
Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

カスタム集計パイプラインは、パーティショニング戦略と互換性がある必要があります。 たとえば、 $groupなどの集計ステージは、複数のパーティションを作成するパーティショニングでは機能しません。

aggregation.allowDiskUse
Specifies whether to allow storage to disk when running the aggregation.

Default: true
change.stream.
Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.
outputExtendedJson
When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false
schemaHint
Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHint option, see the Specify Known Fields with Schema Hints section.

Default: None

MongoDB から変更ストリームを読み取るときに、次のプロパティを構成できます。

プロパティ名
説明
change.stream.lookup.full.document

アップデート操作時に変更ストリームが返す値を決定します。

デフォルト設定では、元のドキュメントと更新されたドキュメントの差が返されます。

updateLookup設定は元のドキュメントと更新されたドキュメントの差も返しますが、更新されたドキュメント全体のコピーも含まれます。

この変更ストリーム オプションの機能の詳細については、MongoDB サーバー マニュアル ガイド「 更新操作のための完全なドキュメントの検索 」を参照してください。

デフォルト: "default"

change.stream.micro.batch.max.partition.count
The maximum number of partitions the Spark Connector divides each micro-batch into. Spark workers can process these partitions in parallel.

This setting applies only when using micro-batch streams.

Default: 1

警告: 1より大きい値を指定すると、Spark Connector が変更イベントを処理する順序が変更される可能性があります。 順序以外の処理によって下流でデータの不整合が発生する可能性がある場合は、この設定を避けます。

change.stream.publish.full.document.only
Specifies whether to publish the changed document or the full change stream document.

When this setting is false, you must specify a schema. The schema must include all fields that you want to read from the change stream. You can use optional fields to ensure that the schema is valid for all change-stream events.

When this setting is true, the connector exhibits the following behavior:
  • connectorは、 fullDocumentフィールドが省略されているメッセージをフィルタリングで除外し、 フィールドの値のみを公開します。

  • スキーマを指定しない場合、connector は 変更ストリーム ドキュメント からスキーマを推論します。

この設定はchange.stream.lookup.full.document設定を上書きします。

デフォルトfalse

change.stream.startup.mode
Specifies how the connector starts up when no offset is available.
This setting accepts the following values:
  • latest: コネクタは、最新のイベントから変更イベントの処理を開始します。 以前に処理されていないイベントは処理されません。

  • timestamp: コネクタは指定された時刻に変更イベントの処理を開始します。

    timestampオプションを使用するには、 change.stream.startup.mode.timestamp.start.at.operation.time設定を使用して時間を指定する必要があります。 この設定では、次の形式のタイムスタンプが受け入れられます。

    デフォルトlatest

SparkConfを使用して以前の設定のいずれかを指定する場合は、それらをconnection.uri設定に含めるか、個別に一覧表示できます。

次のコード例は、 connection.uri設定の一部としてデータベース、コレクション、読み込み設定(read preference)を指定する方法を示しています。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

connection.uriを短くして設定を読みやすくするには、代わりにこれらを個別に指定します。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

重要

connection.uriとその行の両方に 設定を指定すると、 connection.uriの設定が優先されます。 たとえば、次の構成では、接続データベースはfoobarです。これはconnection.uri設定の 値であるためです。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

コレクション名をカンマで区切って、 collection変更ストリーム構成プロパティで複数のコレクションを指定できます。 スペースがコレクション名の一部でない限り、コレクション間にスペースを入れないでください。

次の例に示すように、複数のコレクションを指定します。

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

コレクション名が「*」の場合、または名前にコンマまたはバックスラッシュ(\)が含まれている場合は、次のように文字をエスケープする必要があります。

  • collection 構成オプションで使用されるコレクションの名前にカンマが含まれている場合、 Spark Connectorはそれを 2 つの異なるコレクションとして扱います。 これを回避するには、カンマの前にバックスラッシュ(\)を付けてコンマをエスケープする必要があります。 「my,collection」という名前のコレクションを次のようにエスケープします。

    "my\,collection"
  • collection 構成オプションで使用されるコレクションの名前が「*」の場合、 Spark Connectorはそれをすべてのコレクションをスキャンするための仕様と解釈します。 これを回避するには、アスタリスクの前にバックスラッシュ(\)を付けてアスタリスクをエスケープする必要があります。 「*」という名前のコレクションを次のようにエスケープします。

    "\*"
  • collection 構成オプションで使用されるコレクションの名前にバックスラッシュ(\)が含まれている場合、 Spark Connectorはバックスラッシュをエスケープ文字として扱い、 値の解釈方法が変更される可能性があります。 これを回避するには、バックスラッシュの前に別のバックスラッシュを付けて、バックスラッシュをエスケープする必要があります。 "\collection" という名前のコレクションを次のようにエスケープします。

    "\\collection"

    注意

    Java でコレクション名を string リテラルとして指定する場合は、それぞれのバックスラッシュを別のバックスラッシュでさらにエスケープする必要があります。 たとえば、"\collection" という名前のコレクションは、次のようにエスケープします。

    "\\\\collection"

コレクション名のstringとしてアスタリスク(*)を渡すことで、 データベース内のすべてのコレクションからストリーミングできます。

次の例に示すように、すべてのコレクションを指定します。

...
.option("spark.mongodb.collection", "*")

すべてのコレクションからストリーミング中にコレクションを作成すると、新しいコレクションは自動的にストリームに含まれます。

複数のコレクションからストリーミングしているときに、いつでもコレクションを削除できます。

重要

複数のコレクションを使用したスキーマの推論

change.stream.publish.full.document.onlyオプションをtrueに設定すると、Spark Connector はスキャンされたドキュメントのスキーマを使用してDataFrameのスキーマを推論します。

スキーマ推論はストリーミングの開始時に行われ、ストリーミング中に作成されたコレクションは考慮されません。

複数のコレクションからストリーミングしてスキーマを推論する場合、コネクタは各コレクションを順番にサンプリングします。 多数のコレクションからストリーミングすると、スキーマ推論のパフォーマンスが大幅に低下する可能性があります。 このパフォーマンスへの影響は、スキーマを推論している間のみ発生します。

戻る

読み取り