Docs Menu
Docs Home
/
Spark 커넥터
/

배치 모드로 MongoDB에서 읽기

이 페이지의 내용

  • 개요
  • 스키마 추론
  • 필터
  • SQL 쿼리
  • API 문서

MongoDB에서 데이터를 읽으려면 SparkSession 객체에서 read() 메서드를 호출하세요. 이 메서드는 일괄 읽기 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataFrameReader Realm 객체를 반환합니다.

MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.

설정
설명
dataFrame.read.format()
기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb을(를) 사용하여 MongoDB에서 읽습니다.
dataFrame.read.option()

option 메서드를 사용하여 MongoDB deployment 연결 문자열, MongoDB database 및 컬렉션, 파티셔너 구성을 포함한 일괄 읽기 설정을 구성합니다.

배치 읽기 구성 옵션 목록은 배치 읽기 구성 옵션 가이드 를 참조하세요.

다음 코드 예시는 이전 구성 설정을 사용하여 MongoDB의 people.contacts 데이터를 읽는 방법을 보여줍니다.

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

DataFrame 유형

DataFrame Java API 에서 클래스로 존재하지 않습니다. Dataset<Row> 를 사용하여 DataFrame을 참조합니다.

MongoDB에서 데이터를 읽으려면 SparkSession 객체에서 read 함수를 호출합니다. 이 함수는 배치 읽기 작업에 대한 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataFrameReader 객체를 반환합니다.

MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.

설정
설명
dataFrame.read.format()
기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb을(를) 사용하여 MongoDB에서 읽습니다.
dataFrame.read.option()

option 메서드를 사용하여 MongoDB deployment 연결 문자열, MongoDB database 및 컬렉션, 그리고 파티셔너 구성을 포함한 일괄 읽기 설정을 구성합니다.

일괄 읽기 구성 옵션 목록은 일괄 읽기 구성 옵션 가이드를 참조하세요.

다음 코드 예시는 이전 구성 설정을 사용하여 MongoDB의 people.contacts 데이터를 읽는 방법을 보여줍니다.

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

MongoDB에서 데이터를 읽으려면 SparkSession 객체에서 read 메서드를 호출합니다. 이 메서드는 일괄 읽기 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataFrameReader 객체를 반환합니다.

MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.

설정
설명
dataFrame.read.format()
기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb을(를) 사용하여 MongoDB에서 읽습니다.
dataFrame.read.option()

option 메서드를 사용하여 MongoDB deployment 연결 문자열, MongoDB database 및 컬렉션, 그리고 파티셔너 구성을 포함한 일괄 읽기 설정을 구성합니다.

일괄 읽기 구성 옵션 목록은 일괄 읽기 구성 옵션 가이드를 참조하세요.

다음 코드 예시는 이전 구성 설정을 사용하여 MongoDB의 people.contacts 데이터를 읽는 방법을 보여줍니다.

val dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

DataFrame 유형

DataFrame은 Row 객체의 Dataset(으)로 표현됩니다. DataFrame 유형은 Dataset[Row]의 별칭입니다.

스키마 없이 데이터세트 또는 데이터프레임을 로드하면 Spark는 기록를 샘플링하여 컬렉션의 스키마를 추론합니다.

MongoDB 컬렉션 people.contacts에 다음 문서가 포함되어 있다고 가정합니다.

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

다음 작업은 people.contacts에서 데이터를 로드하고 데이터프레임의 스키마를 추론합니다.

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

추론된 스키마를 보려면 다음 예와 같이 Dataset<Row> 객체에서 printSchema() 메서드를 사용합니다.

dataFrame.printSchema();

데이터프레임에서 데이터를 보려면 다음 예와 같이 DataFrame 객체에서 show() 메서드를 사용합니다.

dataFrame.show();

스키마 없이 데이터세트 또는 데이터프레임을 로드하면 Spark는 기록를 샘플링하여 컬렉션의 스키마를 추론합니다.

MongoDB 컬렉션 people.contacts에 다음 문서가 포함되어 있다고 가정합니다.

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

다음 작업은 people.contacts에서 데이터를 로드하고 데이터프레임의 스키마를 추론합니다.

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

추론된 스키마를 보려면 다음 예시와 같이 DataFrame 객체에서 printSchema() 함수를 사용합니다.

dataFrame.printSchema()

DataFrame의 데이터를 보려면 다음 예시와 같이 DataFrame 객체에서 show() 함수를 사용합니다.

dataFrame.show()

스키마 없이 데이터세트 또는 데이터프레임을 로드하면 Spark는 기록를 샘플링하여 컬렉션의 스키마를 추론합니다.

MongoDB 컬렉션 people.contacts에 다음 문서가 포함되어 있다고 가정합니다.

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

다음 작업은 people.contacts에서 데이터를 로드하고 데이터프레임의 스키마를 추론합니다.

val dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

추론된 스키마를 보려면 다음 예와 같이 DataFrame 객체에서 printSchema() 메서드를 사용합니다.

dataFrame.printSchema()

데이터프레임에서 데이터를 보려면 다음 예와 같이 DataFrame 객체에서 show() 메서드를 사용합니다.

dataFrame.show()

schemaHint 구성 옵션을 지정하여 스키마 추론 중에 사용할 알려진 필드 값이 포함된 스키마 를 지정할 수 있습니다. 다음 Spark 형식 중 하나로 schemaHint 옵션을 지정할 수 있습니다.

유형
형식
DDL
<field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE>
SQL DDL
STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE>
JSON
{ "type": "struct", "fields": [
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> },
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> }]}

다음 예시 에서는 Spark shell 을 사용하여 각 형식에서 schemaHint 옵션을 지정하는 방법을 보여 줍니다. 이 예시 에서는 "value" 이라는 문자열 값 필드 와 "count" 라는 정수 값 필드 를 지정합니다.

import org.apache.spark.sql.types._
val mySchema = StructType(Seq(
StructField("value", StringType),
StructField("count", IntegerType))
// Generate DDL format
mySchema.toDDL
// Generate SQL DDL format
mySchema.sql
// Generate Simple String DDL format
mySchema.simpleString
// Generate JSON format
mySchema.json

다음 예시 와 같이 단순 string DDL 형식 또는 PySpark를 사용하여 JSON 형식으로 schemaHint 옵션을 지정할 수도 있습니다.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
mySchema = StructType([
StructField('value', StringType(), True),
StructField('count', IntegerType(), True)])
# Generate Simple String DDL format
mySchema.simpleString()
# Generate JSON format
mySchema.json()

데이터프레임 또는 데이터세트와 함께 필터를 사용하는 경우, 기본 MongoDB 커넥터 코드는 집계 파이프라인을 구성하여 MongoDB에서 데이터를 필터링한 후에 Spark로 보냅니다. 이렇게 하면 필요한 데이터만 조회하고 처리하여 Spark 성능이 향상됩니다.

MongoDB Spark Connector는 다음 필터를 집계 파이프라인 단계로 전환합니다.

  • 개인정보 정책에

  • EqualNullSafe

  • EqualTo

  • 보다 큼

  • GreaterThanOrEqual

  • IsNull

  • 보다 작은

  • LessThanOrEqual

  • not

  • 또는

  • StringContains

  • StringEndsWith

  • StringStartsWith

filter()을(를) 사용하여 MongoDB 컬렉션에서 데이터의 하위 집합을 읽습니다.

다음 문서가 포함된 fruit 컬렉션을 가정해 보겠습니다.

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

먼저 DataFrame 객체를 설정하여 기본 MongoDB 데이터 소스와 연결합니다.

df = spark.read.format("mongodb").load()

다음 예에는 qty 필드가 10보다 크거나 같은 기록만 포함됩니다.

df.filter(df['qty'] >= 10).show()

이 작업은 다음 출력을 인쇄합니다.

+---+----+------+
|_id| qty| type|
+---+----+------+
|2.0|10.0|orange|
|3.0|15.0|banana|
+---+----+------+

데이터프레임 또는 데이터세트와 함께 필터를 사용하는 경우, 기본 MongoDB 커넥터 코드는 집계 파이프라인을 구성하여 MongoDB에서 데이터를 필터링한 후에 Spark로 보냅니다. 이렇게 하면 필요한 데이터만 조회하고 처리하여 Spark 성능이 향상됩니다.

MongoDB Spark Connector는 다음 필터를 집계 파이프라인 단계로 전환합니다.

  • 개인정보 정책에

  • EqualNullSafe

  • EqualTo

  • 보다 큼

  • GreaterThanOrEqual

  • IsNull

  • 보다 작은

  • LessThanOrEqual

  • not

  • 또는

  • StringContains

  • StringEndsWith

  • StringStartsWith

다음 예시 에서는 연령이 100 미만인 문자를 필터링하고 출력합니다.

df.filter(df("age") < 100).show()

이 연산은 다음을 출력합니다.

+--------------------+---+-------------+
| _id|age| name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82| Fíli|
|[5755d7b4566878c9...| 77| Kíli|
+--------------------+---+-------------+

데이터세트에서 SQL 쿼리를 실행하기 전에 데이터세트에 대한 임시 뷰를 등록해야 합니다.

다음 작업은 characters 테이블을 등록한 다음 쿼리하여 100개 이상의 모든 문자를 찾습니다.

implicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() 다음을 출력합니다.

+-------+----+
| name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Dwalin| 169|
| Óin| 167|
| Glóin| 158|
+-------+----+

DataFrame에 대해 SQL 쿼리를 실행하기 전에 임시 테이블을 등록해야 합니다.

다음 예시에서는 temp 임시 테이블을 등록한 다음 SQL을 사용하여 type 필드에 e 문자가 포함된 레코드를 쿼리합니다.

df.createOrReplaceTempView("temp")
some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
some_fruit.show()

pyspark 셸에서 작업은 다음과 같은 출력을 출력합니다.

+------+----+
| type| qty|
+------+----+
| apple| 5.0|
|orange|10.0|
+------+----+

데이터세트에서 SQL 쿼리를 실행하기 전에 데이터세트에 대한 임시 뷰를 등록해야 합니다.

다음 작업은 characters 테이블을 등록한 다음 쿼리하여 100개 이상의 모든 문자를 찾습니다.

val characters = spark.read.format("mongodb").as[Character]
characters.createOrReplaceTempView("characters")
val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

이 예시에 사용된 유형에 대해 자세히 알아보려면 다음 Apache Spark API 문서를 참조하세요.

돌아가기

배치 모드