배치 모드로 MongoDB에서 읽기
개요
MongoDB에서 데이터를 읽으려면 SparkSession
객체에서 read()
메서드를 호출하세요. 이 메서드는 일괄 읽기 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataFrameReader
Realm 객체를 반환합니다.
MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.
설정 | 설명 |
---|---|
dataFrame.read.format() | 기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB에서 읽습니다. |
dataFrame.read.option() |
배치 읽기 구성 옵션 목록은 배치 읽기 구성 옵션 가이드 를 참조하세요. |
다음 코드 예시는 이전 구성 설정을 사용하여 MongoDB의 people.contacts
데이터를 읽는 방법을 보여줍니다.
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
팁
DataFrame 유형
DataFrame
Java API 에서 클래스로 존재하지 않습니다. Dataset<Row>
를 사용하여 DataFrame을 참조합니다.
MongoDB에서 데이터를 읽으려면 SparkSession
객체에서 read
함수를 호출합니다. 이 함수는 배치 읽기 작업에 대한 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataFrameReader
객체를 반환합니다.
MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.
설정 | 설명 |
---|---|
dataFrame.read.format() | 기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB에서 읽습니다. |
dataFrame.read.option() |
일괄 읽기 구성 옵션 목록은 일괄 읽기 구성 옵션 가이드를 참조하세요. |
다음 코드 예시는 이전 구성 설정을 사용하여 MongoDB의 people.contacts
데이터를 읽는 방법을 보여줍니다.
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
MongoDB에서 데이터를 읽으려면 SparkSession
객체에서 read
메서드를 호출합니다. 이 메서드는 일괄 읽기 작업의 형식 및 기타 구성 설정을 지정하는 데 사용할 수 있는 DataFrameReader
객체를 반환합니다.
MongoDB에서 읽으려면 다음 구성 설정을 지정해야 합니다.
설정 | 설명 |
---|---|
dataFrame.read.format() | 기본 입력 데이터 데이터 소스의 형식을 지정합니다. mongodb 을(를) 사용하여 MongoDB에서 읽습니다. |
dataFrame.read.option() |
일괄 읽기 구성 옵션 목록은 일괄 읽기 구성 옵션 가이드를 참조하세요. |
다음 코드 예시는 이전 구성 설정을 사용하여 MongoDB의 people.contacts
데이터를 읽는 방법을 보여줍니다.
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
팁
DataFrame 유형
DataFrame은 Row
객체의 Dataset
(으)로 표현됩니다. DataFrame
유형은 Dataset[Row]
의 별칭입니다.
스키마 추론
스키마 없이 데이터세트 또는 데이터프레임을 로드하면 Spark는 기록를 샘플링하여 컬렉션의 스키마를 추론합니다.
MongoDB 컬렉션 people.contacts
에 다음 문서가 포함되어 있다고 가정합니다.
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
다음 작업은 people.contacts
에서 데이터를 로드하고 데이터프레임의 스키마를 추론합니다.
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
추론된 스키마를 보려면 다음 예와 같이 Dataset<Row>
객체에서 printSchema()
메서드를 사용합니다.
dataFrame.printSchema();
데이터프레임에서 데이터를 보려면 다음 예와 같이 DataFrame
객체에서 show()
메서드를 사용합니다.
dataFrame.show();
스키마 없이 데이터세트 또는 데이터프레임을 로드하면 Spark는 기록를 샘플링하여 컬렉션의 스키마를 추론합니다.
MongoDB 컬렉션 people.contacts
에 다음 문서가 포함되어 있다고 가정합니다.
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
다음 작업은 people.contacts
에서 데이터를 로드하고 데이터프레임의 스키마를 추론합니다.
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
추론된 스키마를 보려면 다음 예시와 같이 DataFrame
객체에서 printSchema()
함수를 사용합니다.
dataFrame.printSchema()
DataFrame의 데이터를 보려면 다음 예시와 같이 DataFrame
객체에서 show()
함수를 사용합니다.
dataFrame.show()
스키마 없이 데이터세트 또는 데이터프레임을 로드하면 Spark는 기록를 샘플링하여 컬렉션의 스키마를 추론합니다.
MongoDB 컬렉션 people.contacts
에 다음 문서가 포함되어 있다고 가정합니다.
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
다음 작업은 people.contacts
에서 데이터를 로드하고 데이터프레임의 스키마를 추론합니다.
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
추론된 스키마를 보려면 다음 예와 같이 DataFrame
객체에서 printSchema()
메서드를 사용합니다.
dataFrame.printSchema()
데이터프레임에서 데이터를 보려면 다음 예와 같이 DataFrame
객체에서 show()
메서드를 사용합니다.
dataFrame.show()
스키마 힌트로 알려진 필드 지정
schemaHint
구성 옵션을 지정하여 스키마 추론 중에 사용할 알려진 필드 값이 포함된 스키마 를 지정할 수 있습니다. 다음 Spark 형식 중 하나로 schemaHint
옵션을 지정할 수 있습니다.
유형 | 형식 | |||
---|---|---|---|---|
DDL | <field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE> | |||
SQL DDL | STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE> | |||
JSON |
|
다음 예시 에서는 Spark shell 을 사용하여 각 형식에서 schemaHint
옵션을 지정하는 방법을 보여 줍니다. 이 예시 에서는 "value"
이라는 문자열 값 필드 와 "count"
라는 정수 값 필드 를 지정합니다.
import org.apache.spark.sql.types._ val mySchema = StructType(Seq( StructField("value", StringType), StructField("count", IntegerType)) // Generate DDL format mySchema.toDDL // Generate SQL DDL format mySchema.sql // Generate Simple String DDL format mySchema.simpleString // Generate JSON format mySchema.json
다음 예시 와 같이 단순 string DDL 형식 또는 PySpark를 사용하여 JSON 형식으로 schemaHint
옵션을 지정할 수도 있습니다.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType mySchema = StructType([ StructField('value', StringType(), True), StructField('count', IntegerType(), True)]) # Generate Simple String DDL format mySchema.simpleString() # Generate JSON format mySchema.json()
필터
데이터프레임 또는 데이터세트와 함께 필터를 사용하는 경우, 기본 MongoDB 커넥터 코드는 집계 파이프라인을 구성하여 MongoDB에서 데이터를 필터링한 후에 Spark로 보냅니다. 이렇게 하면 필요한 데이터만 조회하고 처리하여 Spark 성능이 향상됩니다.
MongoDB Spark Connector는 다음 필터를 집계 파이프라인 단계로 전환합니다.
개인정보 정책에
EqualNullSafe
EqualTo
보다 큼
GreaterThanOrEqual
인
IsNull
보다 작은
LessThanOrEqual
not
또는
StringContains
StringEndsWith
StringStartsWith
filter()
을(를) 사용하여 MongoDB 컬렉션에서 데이터의 하위 집합을 읽습니다.
다음 문서가 포함된 fruit
컬렉션을 가정해 보겠습니다.
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
먼저 DataFrame
객체를 설정하여 기본 MongoDB 데이터 소스와 연결합니다.
df = spark.read.format("mongodb").load()
다음 예에는 qty
필드가 10
보다 크거나 같은 기록만 포함됩니다.
df.filter(df['qty'] >= 10).show()
이 작업은 다음 출력을 인쇄합니다.
+---+----+------+ |_id| qty| type| +---+----+------+ |2.0|10.0|orange| |3.0|15.0|banana| +---+----+------+
데이터프레임 또는 데이터세트와 함께 필터를 사용하는 경우, 기본 MongoDB 커넥터 코드는 집계 파이프라인을 구성하여 MongoDB에서 데이터를 필터링한 후에 Spark로 보냅니다. 이렇게 하면 필요한 데이터만 조회하고 처리하여 Spark 성능이 향상됩니다.
MongoDB Spark Connector는 다음 필터를 집계 파이프라인 단계로 전환합니다.
개인정보 정책에
EqualNullSafe
EqualTo
보다 큼
GreaterThanOrEqual
인
IsNull
보다 작은
LessThanOrEqual
not
또는
StringContains
StringEndsWith
StringStartsWith
다음 예시 에서는 연령이 100 미만인 문자를 필터링하고 출력합니다.
df.filter(df("age") < 100).show()
이 연산은 다음을 출력합니다.
+--------------------+---+-------------+ | _id|age| name| +--------------------+---+-------------+ |[5755d7b4566878c9...| 50|Bilbo Baggins| |[5755d7b4566878c9...| 82| Fíli| |[5755d7b4566878c9...| 77| Kíli| +--------------------+---+-------------+
SQL 쿼리
데이터세트에서 SQL 쿼리를 실행하기 전에 데이터세트에 대한 임시 뷰를 등록해야 합니다.
다음 작업은 characters
테이블을 등록한 다음 쿼리하여 100개 이상의 모든 문자를 찾습니다.
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show()
다음을 출력합니다.
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
DataFrame에 대해 SQL 쿼리를 실행하기 전에 임시 테이블을 등록해야 합니다.
다음 예시에서는 temp
임시 테이블을 등록한 다음 SQL을 사용하여 type
필드에 e
문자가 포함된 레코드를 쿼리합니다.
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
pyspark
셸에서 작업은 다음과 같은 출력을 출력합니다.
+------+----+ | type| qty| +------+----+ | apple| 5.0| |orange|10.0| +------+----+
데이터세트에서 SQL 쿼리를 실행하기 전에 데이터세트에 대한 임시 뷰를 등록해야 합니다.
다음 작업은 characters
테이블을 등록한 다음 쿼리하여 100개 이상의 모든 문자를 찾습니다.
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
API 문서
이 예시에 사용된 유형에 대해 자세히 알아보려면 다음 Apache Spark API 문서를 참조하세요.