ストリーミング読み取り構成オプション
Overview
ストリーミング モードで MongoDB からデータを読み取るときに、次のプロパティを構成できます。
注意
SparkConf
を使用してコネクタの読み取り構成を設定する場合は、各プロパティの前にspark.mongodb.read.
を付けます。
プロパティ名 | 説明 | ||
---|---|---|---|
connection.uri | Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
database | Required. The database name configuration. | ||
collection | Required. The collection name configuration. You can specify multiple collections by separating the collection names
with a comma. To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property. | ||
comment | The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
mode | The parsing strategy to use when handling documents that don't match the
expected schema. This option accepts the following values:
Default: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | If you set the mode option to ReadConfig.ParseMode.PERMISSIVE ,
this option specifies the name of the new column that stores the invalid
document as extended JSON. If you're using an explicit schema, it must
include the name of the new column. If you're
using an inferred schema, the Spark Connector adds the new column to the
end of the schema.Default: None | ||
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation, which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
カスタム集計パイプラインは、パーティショニング戦略と互換性がある必要があります。 たとえば、 | ||
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
change.stream. | Change stream configuration prefix. See the
Change Stream Configuration section for more
information about change streams. | ||
outputExtendedJson | When true , the connector converts BSON types not supported by Spark into
extended JSON strings.
When false , the connector uses the original relaxed JSON format for
unsupported types.Default: false | ||
schemaHint | Specifies a partial schema of known field types to use when inferring
the schema for the collection. To learn more about the schemaHint
option, see the Specify Known Fields with Schema Hints section.Default: None |
ストリーム構成の変更
MongoDB から変更ストリームを読み取るときに、次のプロパティを構成できます。
プロパティ名 | 説明 |
---|---|
change.stream.lookup.full.document | アップデート操作時に変更ストリームが返す値を決定します。 デフォルト設定では、元のドキュメントと更新されたドキュメントの差が返されます。
この変更ストリーム オプションの機能の詳細については、MongoDB サーバー マニュアル ガイド「 更新操作のための完全なドキュメントの検索 」を参照してください。 デフォルト: "default" |
change.stream.micro.batch.max.partition.count | The maximum number of partitions the Spark Connector divides each
micro-batch into. Spark workers can process these partitions in parallel. This setting applies only when using micro-batch streams. Default: 1 警告: |
change.stream.publish.full.document.only | Specifies whether to publish the changed document or the full
change stream document. When this setting is false , you must specify a schema. The schema
must include all fields that you want to read from the change stream. You can
use optional fields to ensure that the schema is valid for all change-stream
events.When this setting is true , the connector exhibits the following behavior:
この設定は デフォルト: |
change.stream.startup.mode | Specifies how the connector starts up when no offset is available. This setting accepts the following values:
|
でのプロパティの指定 connection.uri
SparkConfを使用して以前の設定のいずれかを指定する場合は、それらをconnection.uri
設定に含めるか、個別に一覧表示できます。
次のコード例は、 connection.uri
設定の一部としてデータベース、コレクション、読み込み設定(read preference)を指定する方法を示しています。
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
connection.uri
を短くして設定を読みやすくするには、代わりにこれらを個別に指定します。
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
重要
connection.uri
とその行の両方に 設定を指定すると、 connection.uri
の設定が優先されます。 たとえば、次の構成では、接続データベースはfoobar
です。これはconnection.uri
設定の 値であるためです。
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar
プロパティで複数のコレクションを指定collection
コレクション名をカンマで区切って、 collection
変更ストリーム構成プロパティで複数のコレクションを指定できます。 スペースがコレクション名の一部でない限り、コレクション間にスペースを入れないでください。
次の例に示すように、複数のコレクションを指定します。
... .option("spark.mongodb.collection", "collectionOne,collectionTwo")
コレクション名が「*」の場合、または名前にコンマまたはバックスラッシュ(\)が含まれている場合は、次のように文字をエスケープする必要があります。
collection
構成オプションで使用されるコレクションの名前にカンマが含まれている場合、 Spark Connectorはそれを 2 つの異なるコレクションとして扱います。 これを回避するには、カンマの前にバックスラッシュ(\)を付けてコンマをエスケープする必要があります。 「my,collection」という名前のコレクションを次のようにエスケープします。"my\,collection" collection
構成オプションで使用されるコレクションの名前が「*」の場合、 Spark Connectorはそれをすべてのコレクションをスキャンするための仕様と解釈します。 これを回避するには、アスタリスクの前にバックスラッシュ(\)を付けてアスタリスクをエスケープする必要があります。 「*」という名前のコレクションを次のようにエスケープします。"\*" collection
構成オプションで使用されるコレクションの名前にバックスラッシュ(\)が含まれている場合、 Spark Connectorはバックスラッシュをエスケープ文字として扱い、 値の解釈方法が変更される可能性があります。 これを回避するには、バックスラッシュの前に別のバックスラッシュを付けて、バックスラッシュをエスケープする必要があります。 "\collection" という名前のコレクションを次のようにエスケープします。"\\collection" 注意
Java でコレクション名を string リテラルとして指定する場合は、それぞれのバックスラッシュを別のバックスラッシュでさらにエスケープする必要があります。 たとえば、"\collection" という名前のコレクションは、次のようにエスケープします。
"\\\\collection"
コレクション名のstringとしてアスタリスク(*)を渡すことで、 データベース内のすべてのコレクションからストリーミングできます。
次の例に示すように、すべてのコレクションを指定します。
... .option("spark.mongodb.collection", "*")
すべてのコレクションからストリーミング中にコレクションを作成すると、新しいコレクションは自動的にストリームに含まれます。
複数のコレクションからストリーミングしているときに、いつでもコレクションを削除できます。
重要
複数のコレクションを使用したスキーマの推論
change.stream.publish.full.document.only
オプションをtrue
に設定すると、Spark Connector はスキャンされたドキュメントのスキーマを使用してDataFrame
のスキーマを推論します。
スキーマ推論はストリーミングの開始時に行われ、ストリーミング中に作成されたコレクションは考慮されません。
複数のコレクションからストリーミングしてスキーマを推論する場合、コネクタは各コレクションを順番にサンプリングします。 多数のコレクションからストリーミングすると、スキーマ推論のパフォーマンスが大幅に低下する可能性があります。 このパフォーマンスへの影響は、スキーマを推論している間のみ発生します。