Spark Connector 시작하기
전제 조건
MongoDB 및 Apache Spark에 대한 기본 실무 지식. MongoDB 문서, Spark 문서 를 참조하세요. 자세한 내용은 이 MongoDB 백서 를 참조하세요.
MongoDB 버전 4.0 이상
Spark 버전 3.1 부터 3.5 까지
Java 8 이상
시작하기
중요
Connector의 10.0.0 버전 이상에서는 mongodb
형식을 사용하여 MongoDB 에서 읽고 쓰기 (write) 수 있습니다.
df = spark.read.format("mongodb").load()
종속성 관리
Spark 코어, Spark SQL 및 MongoDB Spark Connector 종속성을 종속성 관리 도구에 제공하십시오.
버전 3.2.0 부터 시작됩니다. Apache Spark 는 Scala 2.12 과 2.13을 모두 지원합니다. Spark 3.1.3 및 이전 버전은 Scala 2 만 지원합니다.12. Scala 의 10.4. 버전은 두 버전 모두에 대한 지원을 제공하기0 Spark Connector 위해 두 개의 아티팩트를 생성합니다.
org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
Scala 2.12에 대해 컴파일되고 Spark 3.1.x 이상을 지원합니다.org.mongodb.spark:mongo-spark-connector_2.13:10.4.0
Scala 2.13에 대해 컴파일되고 Spark 3.2.x 이상을 지원합니다.
중요
Spark Connector 사용 중인 Scala 및 버전과 호환되는 아티팩트를 Spark 사용하세요.
다음 Maven pom.xml
파일 발췌문은 Scala 2.12과 호환되는 종속성을 포함하는 방법을 보여줍니다.
<dependencies> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.12</artifactId> <version>10.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.1</version> </dependency> </dependencies>
구성
SparkSession
를 통해 Connector 구성을 지정할 때는 설정 앞에 적절히 접두사를 붙여야 합니다. 자세한 내용 및 기타 사용 가능한 MongoDB Spark Connector 옵션은 Spark 구성 가이드 를 참조하세요.
package com.mongodb.spark_examples; import org.apache.spark.sql.SparkSession; public final class GettingStarted { public static void main(final String[] args) throws InterruptedException { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Application logic } }
spark.mongodb.read.connection.uri
은 MongoDB 서버 주소(127.0.0.1
), 연결할 데이터베이스 (test
), 데이터를 읽을 컬렉션 (myCollection
) 및 읽기 설정 (read preference) 을 지정합니다.spark.mongodb.write.connection.uri
은 MongoDB 서버 주소(127.0.0.1
), 연결할 데이터베이스(test
) 및 데이터를 쓸 컬렉션(myCollection
)을 지정합니다.
SparkSession
객체를 사용하여 MongoDB에 데이터를 쓰고, MongoDB에서 데이터를 읽고, 데이터 세트를 만들고, SQL 작업을 수행할 수 있습니다.
중요
Connector 버전 10.0.0 이상에서는 mongodb
형식을 사용하여 MongoDB에서 읽고 씁니다.
df = spark.read.format("mongodb").load()
Python Spark 셸
이 튜토리얼에서는 pyspark
셸을 사용하지만 코드는 독립형 Python 애플리케이션에서도 작동합니다.
pyspark
셸을 시작할 때 다음을 지정할 수 있습니다.
--packages
옵션을 클릭하여 MongoDB Spark Connector 패키지를 다운로드합니다. 다음 패키지를 이용할 수 있습니다.mongo-spark-connector
--conf
옵션을 사용하여 MongoDB Spark Connector를 구성합니다. 이 설정은SparkConf
객체를 구성합니다.참고
SparkConf
를 사용하여 Spark Connector 를 구성하는 경우 설정 접두사를 적절히 붙여야 합니다. 자세한 내용 및 기타 사용 가능한 MongoDB Spark Connector 옵션은 Spark 구성 가이드 를 참조하세요.
다음은 명령줄에서 pyspark
셸을 시작하는 예시입니다.
./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
spark.mongodb.read.connection.uri
은 MongoDB 서버 주소(127.0.0.1
), 연결할 데이터베이스(test
), 데이터를 읽을 컬렉션(myCollection
) 및 읽기 설정을 지정합니다.spark.mongodb.write.connection.uri
은 MongoDB 서버 주소 (127.0.0.1
), 연결할 데이터베이스 (test
) 및 데이터를 쓰기 (write) 컬렉션 (myCollection
)을 지정합니다. 기본값 으로 포트27017
에 연결합니다.packages
옵션은 Spark Connector의 Maven 좌표를groupId:artifactId:version
형식으로 지정합니다.
이 튜토리얼의 예시에서는 이 데이터베이스와 컬렉션을 사용합니다.
객체만들기 SparkSession
참고
pyspark
을 시작하면 기본적으로 spark
라는 SparkSession
객체가 생성됩니다. 독립형 Python 애플리케이션에서는 SparkSession
객체를 아래와 같이 명시적으로 생성해야 합니다.
spark.mongodb.read.connection.uri
및 spark.mongodb.write.connection.uri
구성 옵션을 지정하여 pyspark
를 시작하는 경우, 기본 SparkSession
객체가 해당 옵션을 사용합니다. pyspark
내에서 고유한 SparkSession
객체를 만들려는 경우 SparkSession.builder
를 사용하고 다른 구성 옵션을 지정할 수 있습니다.
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName("myApp") \ .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \ .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \ .getOrCreate()
SparkSession
객체를 사용하여 MongoDB에 데이터를 쓰고, MongoDB에서 데이터를 읽고, DataFrame을 생성하고, SQL 작업을 수행할 수 있습니다.
중요
Connector 버전 10.0.0 이상에서는 mongodb
형식을 사용하여 MongoDB에서 읽고 씁니다.
df = spark.read.format("mongodb").load()
Spark 셸
Spark 셸을 시작할 때 다음을 지정합니다:
--packages
옵션을 클릭하여 MongoDB Spark Connector 패키지를 다운로드합니다. 다음 패키지를 이용할 수 있습니다.mongo-spark-connector
--conf
옵션을 사용하여 MongoDB Spark Connector를 구성합니다. 이 설정은SparkConf
객체를 구성합니다.참고
SparkConf
를 사용하여 Spark Connector 를 구성하는 경우 설정 접두사를 적절히 붙여야 합니다. 자세한 내용 및 기타 사용 가능한 MongoDB Spark Connector 옵션은 Spark 구성 가이드 를 참조하세요.
예를 들면 다음과 같습니다.
./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
spark.mongodb.read.connection.uri
은 MongoDB 서버 주소(127.0.0.1
), 연결할 데이터베이스(test
), 데이터를 읽을 컬렉션(myCollection
) 및 읽기 설정을 지정합니다.spark.mongodb.write.connection.uri
은 MongoDB 서버 주소 (127.0.0.1
), 연결할 데이터베이스 (test
) 및 데이터를 쓰기 (write) 컬렉션 (myCollection
)을 지정합니다. 기본값 으로 포트27017
에 연결합니다.packages
옵션은 Spark Connector의 Maven 좌표를groupId:artifactId:version
형식으로 지정합니다.
MongoDB Connector 패키지 가져오기
에서 다음 패키지를 가져와서 및 객체에 대한 Connector 관련 함수 및 암시적 기능을 활성화합니다.MongoDB SparkSession
Dataset
Spark shell
import com.mongodb.spark._
MongoDB에 연결
데이터 세트 작업에 MongoDB에서 읽기 또는 MongoDB에 쓰기가 필요한 경우 MongoDB에 대한 연결이 자동으로 발생합니다.
독립형 Scala 애플리케이션
종속성 관리
Spark 코어, Spark SQL 및 MongoDB Spark Connector 종속성을 종속성 관리 도구에 제공하십시오.
다음 발췌문은 이러한 종속성을 SBT 에 포함하는 방법을 build.scala
보여줍니다. 파일:
scalaVersion := "2.12", libraryDependencies ++= Seq( "org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.4.0", "org.apache.spark" %% "spark-core" % "3.3.1", "org.apache.spark" %% "spark-sql" % "3.3.1" )
구성
SparkSession
를 통해 connector 구성을 지정할 때는 설정 접두사를 적절히 붙여야 합니다. 자세한 내용 및 기타 사용 가능한 MongoDB Spark Connector 옵션은 Spark 구성 가이드를 참조하세요.
package com.mongodb object GettingStarted { def main(args: Array[String]): Unit = { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate() } }
문제 해결
java.net.BindException: Can't assign requested address
을 받을 경우
다른 Spark 셸이 이미 실행 중이 아닌지 확인합니다.
SPARK_LOCAL_IP
환경 변수를 설정합니다. 예:export SPARK_LOCAL_IP=127.0.0.1 Spark 셸을 시작할 때 다음 옵션을 포함합니다:
--driver-java-options "-Djava.net.preferIPv4Stack=true"
이 튜토리얼의 예시를 실행하는 중에 오류가 발생하는 경우 로컬 Ivy 캐시(~/.ivy2/cache/org.mongodb.spark
및 ~/.ivy2/jars
)를 지워야 할 수 있습니다.