バッチ読み取り構成オプション
Overview
バッチ モードで MongoDB からデータを読み取るときに、次のプロパティを構成できます。
注意
SparkConf
を使用してコネクタの読み取り構成を設定する場合は、各プロパティの前にspark.mongodb.read.
を付けます。
プロパティ名 | 説明 | ||
---|---|---|---|
connection.uri | Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
database | Required. The database name configuration. | ||
collection | Required. The collection name configuration. | ||
comment | The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
mode | The parsing strategy to use when handling documents that don't match the
expected schema. This option accepts the following values:
Default: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | If you set the mode option to ReadConfig.ParseMode.PERMISSIVE ,
this option specifies the name of the new column that stores the invalid
document as extended JSON. If you're using an explicit schema, it must
include the name of the new column. If you're
using an inferred schema, the Spark Connector adds the new column to the
end of the schema.Default: None | ||
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
partitioner | The partitioner full class name. You can specify a custom implementation that must implement the
com.mongodb.spark.sql.connector.read.partitioner.Partitioner
interface.See the
Partitioner Configuration section for more
information about partitioners. Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner | ||
partitioner.options. | Partitioner configuration prefix. See the
Partitioner Configuration section for more
information about partitioners. | ||
sampleSize | The number of documents to sample from the collection when inferring the schema. Default: 1000 | ||
sql.inferSchema.mapTypes.enabled | Whether to enable Map types when inferring the schema. When enabled, large compatible struct types are inferred to a
MapType instead.Default: true | ||
sql.inferSchema.mapTypes.minimum.key.size | Minimum size of a StructType before inferring as a MapType .Default: 250 | ||
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
重要カスタム集計パイプラインは、パーティショニング戦略と互換性がある必要があります。 たとえば、 | ||
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
outputExtendedJson | When true , the connector converts BSON types not supported by Spark into
extended JSON strings.
When false , the connector uses the original relaxed JSON format for
unsupported types.Default: false | ||
schemaHint | Specifies a partial schema of known field types to use when inferring
the schema for the collection. To learn more about the schemaHint
option, see the Specify Known Fields with Schema Hints section.Default: None |
パーティション構成
パーティショニングは、 Spark Connectorを使用するバッチ読み取りの読み取り動作を変更します。 データをパーティションに分割することで、変換を並列に実行できます。
このセクションには、次のパーティションの構成情報が含まれています。
注意
バッチ読み取りのみ
データストリーム処理エンジンは単一のデータストリームを生成するため、パーティショニングはストリーミング読み取りに影響しません。
SamplePartitioner
構成
SamplePartitioner
は、デフォルトのパーティション分割であり、 この構成では、パーティション フィールド、パーティション サイズ、およびパーティションあたりのサンプル数を指定できます。
この構成を使用するには、 partitioner
構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
に設定します。
プロパティ名 | 説明 | |
---|---|---|
partitioner.options.partition.field | パーティション分割に使用するフィールド。一意のフィールドである必要があります。 デフォルト: | |
partitioner.options.partition.size | 各パーティションのサイズ(MB 単位)。 パーティション サイズが小さい場合は、作成されるパーティションが多くなり、ドキュメントが少なくなります。 デフォルト: | |
partitioner.options.samples.per.partition | パーティションごとに取得するサンプルの数。 取得されたサンプルの合計数は次のとおりです。
デフォルト: |
例
平均ドキュメント サイズが 0.5 MB の 640 ドキュメントを含むコレクションの場合、デフォルトのSamplePartitioner
構成では 5 つのパーティションが作成され、1 つのパーティションあたり 128 ドキュメントが含まれます。
Spark Connectorは 50 個のドキュメント(対象パーティションごとにデフォルトは 10)をサンプリングし、サンプリングされたドキュメントからパーティション フィールド範囲を選択して 5 つのパーティションを定義します。
ShardedPartitioner
構成
ShardedPartitioner
構成では、シャード構成に基づいてデータが自動的に分割されます。
この構成を使用するには、 partitioner
構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner
に設定します。
重要
ShardedPartitioner の制限
MongoDB Server v 6.0以降では、シャーディング操作によって、すべてのシャードキー値をカバーする大きな初期チャンクが 1 件作成され、シャーディングされたパーティションが非効率的になります。 MongoDB v 6.0以降に接続する場合、シャーディングされたパーティションを使用することは推奨されません。
シャーディングされたパーティションは ハッシュされたシャードキー と互換性がありません。
PaginateBySizePartitioner
構成
PaginateBySizePartitioner
構成では、平均ドキュメント サイズを使用してコレクションを平均サイズのチャンクに分割し、データをページ分割します。
この構成を使用するには、 partitioner
構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner
に設定します。
プロパティ名 | 説明 |
---|---|
partitioner.options.partition.field | パーティション分割に使用するフィールド。一意のフィールドである必要があります。 デフォルト: |
partitioner.options.partition.size | 各パーティションのサイズ(MB 単位)。 パーティション サイズが小さい より少ないドキュメントを含むより多くのパーティションを作成します。 デフォルト: |
PaginateIntoPartitionsPartitioner
構成
PaginateIntoPartitionsPartitioner
の構成では、コレクション内のドキュメント数を許可されるパーティションの最大数で割ったデータがページ分割されます。
この構成を使用するには、 partitioner
構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner
に設定します。
プロパティ名 | 説明 |
---|---|
partitioner.options.partition.field | パーティション分割に使用するフィールド。一意のフィールドである必要があります。 デフォルト: |
partitioner.options.max.number.of.partitions | 作成するパーティションの数。 デフォルト: |
SinglePartitionPartitioner
構成
SinglePartitionPartitioner
構成では 1 つのパーティションが作成されます。
この構成を使用するには、 partitioner
構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner
に設定します。
AutoBucketPartitioner
構成
AutoBucketPartitioner
構成はSamplePartitionerの構成と似ていますが、データをページ分割するために$bucketAuto集計ステージを使用します。 この構成を使用すると、ネストされたフィールドを含む単一または複数のフィールドにわたってデータを分割できます。
この構成を使用するには、 partitioner
構成オプションをcom.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner
に設定します。
プロパティ名 | 説明 |
---|---|
partitioner.options.partition.fieldList | パーティショニングに使用するフィールドのリスト。 値は、単一のフィールド名またはカンマで区切られたフィールドのリストのいずれかになります。 デフォルト: |
partitioner.options.partition.chunkSize | 各パーティションの平均サイズ(MB)。 パーティション サイズが小さい場合は、作成されるパーティションが多くなり、ドキュメントが少なくなります。 この構成では、平均ドキュメントサイズを使用してパーティションあたりのドキュメント数が決定されるため、パーティションのサイズが一致しない場合があります。 デフォルト: |
partitioner.options.partition.samplesPerPartition | パーティションごとに取得するサンプルの数。 デフォルト: |
partitioner.options.partition.partitionKeyProjectionField | コレクションを分割するために使用されるすべてのフィールドを含むプロジェクション フィールドに使用するフィールド名。 各ドキュメントにすでに デフォルト: |
でのプロパティの指定 connection.uri
SparkConfを使用して以前の設定のいずれかを指定する場合は、それらをconnection.uri
設定に含めるか、個別に一覧表示できます。
次のコード例は、 connection.uri
設定の一部としてデータベース、コレクション、読み込み設定(read preference)を指定する方法を示しています。
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
connection.uri
を短くして設定を読みやすくするには、代わりにこれらを個別に指定します。
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
重要
connection.uri
とその行の両方に 設定を指定すると、 connection.uri
の設定が優先されます。 たとえば、次の構成では、接続データベースはfoobar
です。これはconnection.uri
設定の 値であるためです。
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar