스트리밍 읽기 구성 옵션
개요
스트리밍 모드에서 MongoDB의 데이터를 읽을 때 다음 속성을 구성할 수 있습니다.
참고
SparkConf
를 사용하여 커넥터의 읽기 구성을 설정하는 경우 각 속성 앞에 spark.mongodb.read.
를 접두사로 붙입니다.
속성 이름 | 설명 | ||
---|---|---|---|
connection.uri | Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
database | Required. The database name configuration. | ||
collection | Required. The collection name configuration. You can specify multiple collections by separating the collection names
with a comma. To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property. | ||
comment | The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
mode | The parsing strategy to use when handling documents that don't match the
expected schema. This option accepts the following values:
Default: ReadConfig.ParseMode.FAILFAST | ||
columnNameOfCorruptRecord | If you set the mode option to ReadConfig.ParseMode.PERMISSIVE ,
this option specifies the name of the new column that stores the invalid
document as extended JSON. If you're using an explicit schema, it must
include the name of the new column. If you're
using an inferred schema, the Spark Connector adds the new column to the
end of the schema.Default: None | ||
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation, which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
사용자 지정 집계 파이프라인은 파티셔너 전략과 호환되어야 합니다. 예를 들어 | ||
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
change.stream. | Change stream configuration prefix. See the
Change Stream Configuration section for more
information about change streams. | ||
outputExtendedJson | When true , the connector converts BSON types not supported by Spark into
extended JSON strings.
When false , the connector uses the original relaxed JSON format for
unsupported types.Default: false | ||
schemaHint | Specifies a partial schema of known field types to use when inferring
the schema for the collection. To learn more about the schemaHint
option, see the Specify Known Fields with Schema Hints section.Default: None |
변경 스트림 구성
MongoDB에서 변경 스트림을 읽을 때 다음 속성을 구성할 수 있습니다.
속성 이름 | 설명 |
---|---|
change.stream.lookup.full.document | 업데이트 작업 시 변경 스트림이 반환하는 값을 결정합니다. 기본 설정은 소스 문서와 업데이트된 문서 간의 차이점을 반환합니다.
이 변경 스트림 옵션의 작동 방식에 대한 자세한 내용은 MongoDB 서버 매뉴얼 가이드 업데이트 작업을 위한 전체 문서 조회를 참조하세요. Default(기본값): "default" |
change.stream.micro.batch.max.partition.count | The maximum number of partitions the Spark Connector divides each
micro-batch into. Spark workers can process these partitions in parallel. This setting applies only when using micro-batch streams. Default: 1 경고: |
change.stream.publish.full.document.only | Specifies whether to publish the changed document or the full
change stream document. When this setting is false , you must specify a schema. The schema
must include all fields that you want to read from the change stream. You can
use optional fields to ensure that the schema is valid for all change-stream
events.When this setting is true , the connector exhibits the following behavior:
이 설정은 기본값: |
change.stream.startup.mode | Specifies how the connector starts up when no offset is available. This setting accepts the following values:
|
다음에서 속성 지정 connection.uri
SparkConf를 사용하여 이전 설정을 지정하는 경우 해당 설정을 connection.uri
설정에 포함하거나 개별적으로 나열할 수 있습니다.
다음 코드 예시에서는 connection.uri
설정의 일부로 데이터베이스, 컬렉션 및 읽기 설정을 지정하는 방법을 보여 줍니다.
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
connection.uri
를 더 짧게 유지하고 설정을 더 읽기 쉽게 만들려면 대신 개별적으로 지정할 수 있습니다.
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
중요
connection.uri
및 해당 줄 모두에 설정을 지정하면 connection.uri
설정이 우선 적용됩니다. 예를 들어, 다음 구성에서 연결 데이터베이스는 foobar
입니다. 이는 connection.uri
설정의 값이기 때문입니다.
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar
속성에 여러 컬렉션 지정 collection
컬렉션 이름을 쉼표로 구분하여 collection
변경 스트림 구성 속성에서 여러 컬렉션을 지정할 수 있습니다. 컬렉션 이름에 공백이 포함된 경우가 아니라면 컬렉션 사이에 공백을 추가하지 마세요.
다음 예제와 같이 여러 컬렉션을 지정합니다.
... .option("spark.mongodb.collection", "collectionOne,collectionTwo")
컬렉션 이름이 '*'이거나 이름에 쉼표 또는 백슬래시(\)가 포함된 경우 다음과 같이 문자를 이스케이프 처리해야 합니다.
collection
구성 옵션에 사용된 컬렉션의 이름에 쉼표가 포함되어 있으면 Spark Connector는 이를 두 개의 서로 다른 컬렉션으로 취급합니다. 이를 방지하려면 앞에 백슬래시(\)를 붙여 쉼표를 이스케이프해야 합니다. 이름이 'my,collection'인 컬렉션을 다음과 같이 이스케이프 처리합니다."my\,collection" collection
구성 옵션에 사용된 컬렉션의 이름이 '*'인 경우 Spark Connector는 이를 모든 컬렉션을 스캔하는 사양으로 해석합니다. 이를 방지하려면 앞에 백슬래시(\)를 붙여 별표를 이스케이프 처리해야 합니다. 이름이 '*'인 컬렉션을 다음과 같이 이스케이프 처리합니다."\*" collection
구성 옵션에 사용된 컬렉션 이름에 백슬래시(\)가 포함된 경우 Spark Connector는 백슬래시를 이스케이프 문자로 처리하므로 값을 해석하는 방식이 변경될 수 있습니다. 이를 방지하려면 백슬래시 앞에 다른 백슬래시를 붙여 이스케이프 처리해야 합니다. 다음과 같이 '\collection'이라는 이름의 컬렉션을 이스케이프 처리합니다."\\collection" 참고
에서 컬렉션 이름을 리터럴로 지정할 string Java때는 각 백슬래시를 다른 백슬래시로 추가 이스케이프 처리해야 합니다. 예를 들어 '\collection'이라는 이름의 컬렉션을 다음과 같이 이스케이프 처리합니다.
"\\\\collection"
컬렉션 이름에 별표(*)를 string 로 전달하여 데이터베이스의 모든 컬렉션에서 스트리밍할 수 있습니다.
다음 예제와 같이 모든 컬렉션을 지정합니다.
... .option("spark.mongodb.collection", "*")
모든 컬렉션에서 스트리밍하는 동안 컬렉션을 생성하면 새 컬렉션이 자동으로 스트림에 포함됩니다.
여러 컬렉션에서 스트리밍하는 동안 언제든지 컬렉션을 삭제할 수 있습니다.
중요
여러 컬렉션으로 스키마 추론하기
change.stream.publish.full.document.only
옵션을 true
로 설정하면 Spark Connector는 스캔한 문서의 스키마를 사용하여 DataFrame
의 스키마를 추론합니다.
스키마 추론은 스트리밍 시작 시 발생하며 스트리밍 중에 생성된 컬렉션은 고려하지 않습니다.
여러 컬렉션에서 스트리밍하고 스키마를 추론할 때 connector 는 각 컬렉션을 순차적으로 샘플링합니다. 많은 수의 컬렉션에서 스트리밍하면 스키마 추론 성능이 눈에 띄게 느려질 수 있습니다. 이러한 성능 영향은 스키마를 추론하는 동안에만 발생합니다.