Docs Menu
Docs Home
/
Spark Connector
/ /

バッチ読み取り構成オプション

項目一覧

  • Overview
  • パーティション構成
  • でのプロパティの指定 connection.uri

バッチ モードで MongoDB からデータを読み取るときに、次のプロパティを構成できます。

注意

SparkConfを使用してコネクタの読み取り構成を設定する場合は、各プロパティの前にspark.mongodb.read.を付けます。

プロパティ名
説明
connection.uri
Required.
The connection string configuration key.

Default: mongodb://localhost:27017/
database
Required.
The database name configuration.
collection
Required.
The collection name configuration.
comment
The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None
mode
The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST: スキーマに一致しないドキュメントを解析するときに例外をスローします。

  • ReadConfig.ParseMode.PERMISSIVE: データ型がスキーマと一致しない場合、フィールドをnullに設定します。 無効な各ドキュメントを拡張 JSON string として保存するには、この値をcolumnNameOfCorruptRecordオプションと組み合わせます。

  • ReadConfig.ParseMode.DROPMALFORMED: スキーマに一致しないドキュメントを無視します。


Default: ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None
mongoClientFactory
MongoClientFactory configuration key.
You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
partitioner
The partitioner full class name.
You can specify a custom implementation that must implement the com.mongodb.spark.sql.connector.read.partitioner.Partitioner interface.
See the Partitioner Configuration section for more information about partitioners.

Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
partitioner.options.
Partitioner configuration prefix.
See the Partitioner Configuration section for more information about partitioners.
sampleSize
The number of documents to sample from the collection when inferring
the schema.

Default: 1000
sql.inferSchema.mapTypes.enabled
Whether to enable Map types when inferring the schema.
When enabled, large compatible struct types are inferred to a MapType instead.

Default: true
sql.inferSchema.mapTypes.minimum.key.size
Minimum size of a StructType before inferring as a MapType.

Default: 250
aggregation.pipeline
Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

重要

カスタム集計パイプラインは、パーティショニング戦略と互換性がある必要があります。 たとえば、 $groupなどの集計ステージは、複数のパーティションを作成するパーティショニングでは機能しません。

aggregation.allowDiskUse
Specifies whether to allow storage to disk when running the aggregation.

Default: true
outputExtendedJson
When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false
schemaHint
Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHint option, see the Specify Known Fields with Schema Hints section.

Default: None

パーティショニングは、 Spark Connectorを使用するバッチ読み取りの読み取り動作を変更します。 データをパーティションに分割することで、変換を並列に実行できます。

このセクションには、次のパーティションの構成情報が含まれています。

注意

バッチ読み取りのみ

データストリーム処理エンジンは単一のデータストリームを生成するため、パーティショニングはストリーミング読み取りに影響しません。

SamplePartitioner は、デフォルトのパーティション分割であり、 この構成では、パーティション フィールド、パーティション サイズ、およびパーティションあたりのサンプル数を指定できます。

この構成を使用するには、 partitioner構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.SamplePartitionerに設定します。

プロパティ名
説明
partitioner.options.partition.field

パーティション分割に使用するフィールド。一意のフィールドである必要があります。

デフォルト: _id

partitioner.options.partition.size

各パーティションのサイズ(MB 単位)。 パーティション サイズが小さい場合は、作成されるパーティションが多くなり、ドキュメントが少なくなります。

デフォルト: 64

partitioner.options.samples.per.partition

パーティションごとに取得するサンプルの数。 取得されたサンプルの合計数は次のとおりです。

samples per partition * ( count / number of documents per partition)

デフォルト: 10

平均ドキュメント サイズが 0.5 MB の 640 ドキュメントを含むコレクションの場合、デフォルトのSamplePartitioner構成では 5 つのパーティションが作成され、1 つのパーティションあたり 128 ドキュメントが含まれます。

Spark Connectorは 50 個のドキュメント(対象パーティションごとにデフォルトは 10)をサンプリングし、サンプリングされたドキュメントからパーティション フィールド範囲を選択して 5 つのパーティションを定義します。

ShardedPartitioner構成では、シャード構成に基づいてデータが自動的に分割されます。

この構成を使用するには、 partitioner構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.ShardedPartitionerに設定します。

重要

ShardedPartitioner の制限

  1. MongoDB Server v 6.0以降では、シャーディング操作によって、すべてのシャードキー値をカバーする大きな初期チャンクが 1 件作成され、シャーディングされたパーティションが非効率的になります。 MongoDB v 6.0以降に接続する場合、シャーディングされたパーティションを使用することは推奨されません。

  2. シャーディングされたパーティションは ハッシュされたシャードキー と互換性がありません。

PaginateBySizePartitioner構成では、平均ドキュメント サイズを使用してコレクションを平均サイズのチャンクに分割し、データをページ分割します。

この構成を使用するには、 partitioner構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitionerに設定します。

プロパティ名
説明
partitioner.options.partition.field

パーティション分割に使用するフィールド。一意のフィールドである必要があります。

デフォルト: _id

partitioner.options.partition.size

各パーティションのサイズ(MB 単位)。 パーティション サイズが小さい

より少ないドキュメントを含むより多くのパーティションを作成します。

デフォルト: 64

PaginateIntoPartitionsPartitionerの構成では、コレクション内のドキュメント数を許可されるパーティションの最大数で割ったデータがページ分割されます。

この構成を使用するには、 partitioner構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitionerに設定します。

プロパティ名
説明
partitioner.options.partition.field

パーティション分割に使用するフィールド。一意のフィールドである必要があります。

デフォルト: _id

partitioner.options.max.number.of.partitions

作成するパーティションの数。

デフォルト: 64

SinglePartitionPartitioner構成では 1 つのパーティションが作成されます。

この構成を使用するには、 partitioner構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitionerに設定します。

AutoBucketPartitioner構成はSamplePartitionerの構成と似ていますが、データをページ分割するために$bucketAuto集計ステージを使用します。 この構成を使用すると、ネストされたフィールドを含む単一または複数のフィールドにわたってデータを分割できます。

この構成を使用するには、 partitioner構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitionerに設定します。

プロパティ名
説明
partitioner.options.partition.fieldList

パーティショニングに使用するフィールドのリスト。 値は、単一のフィールド名またはカンマで区切られたフィールドのリストのいずれかになります。

デフォルト: _id

partitioner.options.partition.chunkSize

各パーティションの平均サイズ(MB)。 パーティション サイズが小さい場合は、作成されるパーティションが多くなり、ドキュメントが少なくなります。 この構成では、平均ドキュメントサイズを使用してパーティションあたりのドキュメント数が決定されるため、パーティションのサイズが一致しない場合があります。

デフォルト: 64

partitioner.options.partition.samplesPerPartition

パーティションごとに取得するサンプルの数。

デフォルト: 100

partitioner.options.partition.partitionKeyProjectionField

コレクションを分割するために使用されるすべてのフィールドを含むプロジェクション フィールドに使用するフィールド名。 各ドキュメントにすでに__idxフィールドが含まれている場合にのみ、このプロパティの値を変更することをお勧めします。

デフォルト: __idx

SparkConfを使用して以前の設定のいずれかを指定する場合は、それらをconnection.uri設定に含めるか、個別に一覧表示できます。

次のコード例は、 connection.uri設定の一部としてデータベース、コレクション、読み込み設定(read preference)を指定する方法を示しています。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

connection.uriを短くして設定を読みやすくするには、代わりにこれらを個別に指定します。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

重要

connection.uriとその行の両方に 設定を指定すると、 connection.uriの設定が優先されます。 たとえば、次の構成では、接続データベースはfoobarです。これはconnection.uri設定の 値であるためです。

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

戻る

バッチ モードでの MongoDB からの読み取り