Docs Menu
Docs Home
/
Spark 커넥터
/ /

스트리밍 읽기 구성 옵션

이 페이지의 내용

  • 개요
  • 변경 스트림 구성
  • 다음에서 속성 지정 connection.uri
  • collection 속성에 여러 컬렉션 지정

스트리밍 모드에서 MongoDB의 데이터를 읽을 때 다음 속성을 구성할 수 있습니다.

참고

SparkConf를 사용하여 커넥터의 읽기 구성을 설정하는 경우 각 속성 앞에 spark.mongodb.read.를 접두사로 붙입니다.

속성 이름
설명
connection.uri
Required.
The connection string configuration key.

Default: mongodb://localhost:27017/
database
Required.
The database name configuration.
collection
Required.
The collection name configuration.
You can specify multiple collections by separating the collection names with a comma.

To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property.
comment
The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None
mode
The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST: 스키마 와 일치하지 않는 문서 를 구문 분석할 때 예외가 발생합니다.

  • ReadConfig.ParseMode.PERMISSIVE: 데이터 유형이 스키마 와 일치하지 않는 경우 필드를 null 으)로 설정합니다. 유효하지 않은 각 문서 를 확장 JSON string 로 저장 하려면 이 값을 columnNameOfCorruptRecord 옵션과 결합합니다.

  • ReadConfig.ParseMode.DROPMALFORMED: 스키마 와 일치하지 않는 모든 문서 를 무시합니다.


Default: ReadConfig.ParseMode.FAILFAST
columnNameOfCorruptRecord
If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None
mongoClientFactory
MongoClientFactory configuration key.
You can specify a custom implementation, which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
aggregation.pipeline
Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

사용자 지정 집계 파이프라인은 파티셔너 전략과 호환되어야 합니다. 예를 들어 $group 같은 집계 단계는 파티션을 두 개 이상 생성하는 파티셔너에서는 작동하지 않습니다.

aggregation.allowDiskUse
Specifies whether to allow storage to disk when running the aggregation.

Default: true
change.stream.
Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.
outputExtendedJson
When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false
schemaHint
Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHint option, see the Specify Known Fields with Schema Hints section.

Default: None

MongoDB에서 변경 스트림을 읽을 때 다음 속성을 구성할 수 있습니다.

속성 이름
설명
change.stream.lookup.full.document

업데이트 작업 시 변경 스트림이 반환하는 값을 결정합니다.

기본 설정은 소스 문서와 업데이트된 문서 간의 차이점을 반환합니다.

updateLookup 설정은 원본 문서와 업데이트된 문서 간의 차이점도 반환하지만 업데이트된 문서 전체의 사본도 포함합니다.

이 변경 스트림 옵션의 작동 방식에 대한 자세한 내용은 MongoDB 서버 매뉴얼 가이드 업데이트 작업을 위한 전체 문서 조회를 참조하세요.

Default(기본값): "default"

change.stream.micro.batch.max.partition.count
The maximum number of partitions the Spark Connector divides each micro-batch into. Spark workers can process these partitions in parallel.

This setting applies only when using micro-batch streams.

Default: 1

경고: 1 보다 큰 값을 지정하면 Spark Connector 가 변경 이벤트를 처리하는 순서가 변경될 수 있습니다. 순서가 맞지 않는 처리 로 인해 다운스트림에 데이터 불일치가 발생할 수 있는 경우 이 설정을 피하세요.

change.stream.publish.full.document.only
Specifies whether to publish the changed document or the full change stream document.

When this setting is false, you must specify a schema. The schema must include all fields that you want to read from the change stream. You can use optional fields to ensure that the schema is valid for all change-stream events.

When this setting is true, the connector exhibits the following behavior:
  • connector 는 fullDocument 필드가 생략된 메시지를 필터링하고 필드 값만 게시합니다.

  • 스키마를 지정하지 않으면 connector 는 변경 스트림 문서에서 스키마를 유추합니다.

이 설정은 change.stream.lookup.full.document 설정을 재정의합니다.

기본값: false

change.stream.startup.mode
Specifies how the connector starts up when no offset is available.
This setting accepts the following values:

SparkConf를 사용하여 이전 설정을 지정하는 경우 해당 설정을 connection.uri 설정에 포함하거나 개별적으로 나열할 수 있습니다.

다음 코드 예시에서는 connection.uri 설정의 일부로 데이터베이스, 컬렉션 및 읽기 설정을 지정하는 방법을 보여 줍니다.

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

connection.uri를 더 짧게 유지하고 설정을 더 읽기 쉽게 만들려면 대신 개별적으로 지정할 수 있습니다.

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

중요

connection.uri 및 해당 줄 모두에 설정을 지정하면 connection.uri 설정이 우선 적용됩니다. 예를 들어, 다음 구성에서 연결 데이터베이스는 foobar입니다. 이는 connection.uri 설정의 값이기 때문입니다.

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

컬렉션 이름을 쉼표로 구분하여 collection 변경 스트림 구성 속성에서 여러 컬렉션을 지정할 수 있습니다. 컬렉션 이름에 공백이 포함된 경우가 아니라면 컬렉션 사이에 공백을 추가하지 마세요.

다음 예제와 같이 여러 컬렉션을 지정합니다.

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

컬렉션 이름이 '*'이거나 이름에 쉼표 또는 백슬래시(\)가 포함된 경우 다음과 같이 문자를 이스케이프 처리해야 합니다.

  • collection 구성 옵션에 사용된 컬렉션의 이름에 쉼표가 포함되어 있으면 Spark Connector는 이를 두 개의 서로 다른 컬렉션으로 취급합니다. 이를 방지하려면 앞에 백슬래시(\)를 붙여 쉼표를 이스케이프해야 합니다. 이름이 'my,collection'인 컬렉션을 다음과 같이 이스케이프 처리합니다.

    "my\,collection"
  • collection 구성 옵션에 사용된 컬렉션의 이름이 '*'인 경우 Spark Connector는 이를 모든 컬렉션을 스캔하는 사양으로 해석합니다. 이를 방지하려면 앞에 백슬래시(\)를 붙여 별표를 이스케이프 처리해야 합니다. 이름이 '*'인 컬렉션을 다음과 같이 이스케이프 처리합니다.

    "\*"
  • collection 구성 옵션에 사용된 컬렉션 이름에 백슬래시(\)가 포함된 경우 Spark Connector는 백슬래시를 이스케이프 문자로 처리하므로 값을 해석하는 방식이 변경될 수 있습니다. 이를 방지하려면 백슬래시 앞에 다른 백슬래시를 붙여 이스케이프 처리해야 합니다. 다음과 같이 '\collection'이라는 이름의 컬렉션을 이스케이프 처리합니다.

    "\\collection"

    참고

    에서 컬렉션 이름을 리터럴로 지정할 string Java때는 각 백슬래시를 다른 백슬래시로 추가 이스케이프 처리해야 합니다. 예를 들어 '\collection'이라는 이름의 컬렉션을 다음과 같이 이스케이프 처리합니다.

    "\\\\collection"

컬렉션 이름에 별표(*)를 string 로 전달하여 데이터베이스의 모든 컬렉션에서 스트리밍할 수 있습니다.

다음 예제와 같이 모든 컬렉션을 지정합니다.

...
.option("spark.mongodb.collection", "*")

모든 컬렉션에서 스트리밍하는 동안 컬렉션을 생성하면 새 컬렉션이 자동으로 스트림에 포함됩니다.

여러 컬렉션에서 스트리밍하는 동안 언제든지 컬렉션을 삭제할 수 있습니다.

중요

여러 컬렉션으로 스키마 추론하기

change.stream.publish.full.document.only 옵션을 true 로 설정하면 Spark Connector는 스캔한 문서의 스키마를 사용하여 DataFrame 의 스키마를 추론합니다.

스키마 추론은 스트리밍 시작 시 발생하며 스트리밍 중에 생성된 컬렉션은 고려하지 않습니다.

여러 컬렉션에서 스트리밍하고 스키마를 추론할 때 connector 는 각 컬렉션을 순차적으로 샘플링합니다. 많은 수의 컬렉션에서 스트리밍하면 스키마 추론 성능이 눈에 띄게 느려질 수 있습니다. 이러한 성능 영향은 스키마를 추론하는 동안에만 발생합니다.

돌아가기

스트리밍 모드로 MongoDB에서 읽기