Primeiros passos com o Spark Connector
Nesta página
Pré-requisitos
Conhecimento básico de trabalho do MongoDB e Apache Spark. Consulte a documentação do MongoDB , Documentação do Spark, e este whitepaper do MongoDB para obter mais detalhes.
MongoDB versão 4.0 ou posterior
Versão 3.1 a 3.5 do Spark
Java 8 ou posterior
Começar
Importante
Na versão 10.0.0 e posterior do Connector, use o formato mongodb
para ler e escrever no MongoDB:
df = spark.read.format("mongodb").load()
Gestão de dependência
Forneça as dependências Spark Core, Spark SQL e MongoDB Spark Connector à sua ferramenta de gerenciamento de dependência.
A partir da versão 3.2.0, O Apache Spark é compatível com Scala 2.12 e 2.13. O Spark 3.1.3 e as versões anteriores são compatíveis apenas com Scala 2.12. Para fornecer suporte para ambas as versões Scala , a versão 10.4.0 do Spark Connector produz dois artefatos:
org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
é compilado em relação ao Scala 2.12 e é compatível com o Spark 3.1.x e superior.org.mongodb.spark:mongo-spark-connector_2.13:10.4.0
é compilado em relação ao Scala 2.13 e é compatível com o Spark 3.2.x e superior.
Importante
Use o artefato Spark Connector compatível com suas versões do Scala e Spark.
O seguinte trecho de um arquivo Maven pom.xml
mostra como incluir dependências compatíveis com Scala 2.12:
<dependencies> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.12</artifactId> <version>10.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.1</version> </dependency> </dependencies>
Configuração
Ao especificar a configuração do Connector via SparkSession
, você deve prefixar as configurações adequadamente. MongoDB Spark Connector Para obter detalhes e outras opções disponíveis do Spark , consulte o guia Configuração do .
package com.mongodb.spark_examples; import org.apache.spark.sql.SparkSession; public final class GettingStarted { public static void main(final String[] args) throws InterruptedException { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ SparkSession spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate(); // Application logic } }
O
spark.mongodb.read.connection.uri
especifica o endereço do servidor MongoDB (127.0.0.1
), o banco de dados de dados para conectar (test
) e a collection (myCollection
) da qual ler os dados e a preferência de leitura.O
spark.mongodb.write.connection.uri
especifica o endereço do servidor MongoDB(127.0.0.1
), o banco de dados para conectar (test
) e a collection (myCollection
) na qual escrever dados.
Você pode utilizar um objeto SparkSession
para escrever dados no MongoDB, ler dados do MongoDB, criar conjuntos de dados e executar operações SQL.
Importante
Na versão 10.0.0 e posterior do conector, use o formato mongodb
para ler e escrever no MongoDB:
df = spark.read.format("mongodb").load()
Python Spark Shell
Este tutorial usa o shell pyspark
, mas o código também funciona com aplicativos Python independentes.
Ao iniciar o shell pyspark
, você pode especificar:
a opção
--packages
para baixar o pacote do conector Spark do MongoDB. O seguinte pacote está disponível:mongo-spark-connector
a opção
--conf
para configurar o conector Spark do MongoDB. Estas configurações definem o objetoSparkConf
.Observação
Se você usar
SparkConf
para configurar o Spark Connector, deverá prefixar as configurações adequadamente. Para obter detalhes e outras opções MongoDB Spark Connector disponíveis do , consulte o guia Configurar .Spark
O exemplo a seguir inicia o shell pyspark
na linha de comando:
./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
O
spark.mongodb.read.connection.uri
especifica o endereço do servidor MongoDB (127.0.0.1
), o banco de dados para conectar (test
) e a collection (myCollection
) da qual ler os dados e a read preference.O
spark.mongodb.write.connection.uri
especifica o endereço do servidor MongoDB (127.0.0.1
), o banco de dados de dados para conectar (test
) e a collection (myCollection
) na qual escrever dados. Conecta à porta27017
por padrão.A opção
packages
especifica as coordenadas Maven do conector Spark, no formatogroupId:artifactId:version
.
Os exemplos neste tutorial usarão esse banco de dados e coleção.
Criar um SparkSession
objeto
Observação
Ao iniciar pyspark
você obtém um objeto SparkSession
chamado spark
por padrão. Em um aplicativo Python autônomo, você precisa criar seu objeto SparkSession
explicitamente, conforme mostrado abaixo.
Se você especificou as opções de configuração spark.mongodb.read.connection.uri
e spark.mongodb.write.connection.uri
ao iniciar pyspark
, o objeto SparkSession
padrão as utilizará. Se preferir criar seu próprio objeto SparkSession
de dentro do pyspark
, você pode usar SparkSession.builder
e especificar diferentes opções de configuração.
from pyspark.sql import SparkSession my_spark = SparkSession \ .builder \ .appName("myApp") \ .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \ .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \ .getOrCreate()
Você pode utilizar um objeto SparkSession
para gravar dados no MongoDB, ler dados do MongoDB, criar DataFrames e executar operações SQL.
Importante
Na versão 10.0.0 e posterior do conector, use o formato mongodb
para ler e escrever no MongoDB:
df = spark.read.format("mongodb").load()
Shell Spark
Ao iniciar a concha Spark, especifique:
a opção
--packages
para baixar o pacote do conector Spark do MongoDB. O seguinte pacote está disponível:mongo-spark-connector
a opção
--conf
para configurar o conector Spark do MongoDB. Estas configurações definem o objetoSparkConf
.Observação
Se você usar
SparkConf
para configurar o Spark Connector, deverá prefixar as configurações adequadamente. Para obter detalhes e outras opções MongoDB Spark Connector disponíveis do , consulte o guia Configurar .Spark
Por exemplo,
./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
O
spark.mongodb.read.connection.uri
especifica o endereço do servidor MongoDB (127.0.0.1
), o banco de dados para conectar (test
) e a collection (myCollection
) da qual ler os dados e a read preference.O
spark.mongodb.write.connection.uri
especifica o endereço do servidor MongoDB (127.0.0.1
), o banco de dados de dados para conectar (test
) e a collection (myCollection
) na qual escrever dados. Conecta à porta27017
por padrão.A opção
packages
especifica as coordenadas Maven do conector Spark, no formatogroupId:artifactId:version
.
Importar o pacote do conector MongoDB
Habilite as funções e implicações específicas do MongoDB Connector para seus objetos SparkSession
e Dataset
importando o seguinte pacote no shell do Spark:
import com.mongodb.spark._
Conecte-se ao MongoDB
A conexão com o MongoDB acontece automaticamente quando uma ação do Dataset requer uma leitura do MongoDB ou uma gravação no MongoDB.
Aplicativo Scala autônomo
Gestão de dependência
Forneça as dependências Spark Core, Spark SQL e MongoDB Spark Connector à sua ferramenta de gerenciamento de dependência.
O seguinte trecho demonstra como incluir essas dependências em um SBT build.scala
Arquivo :
scalaVersion := "2.12", libraryDependencies ++= Seq( "org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.4.0", "org.apache.spark" %% "spark-core" % "3.3.1", "org.apache.spark" %% "spark-sql" % "3.3.1" )
Configuração
Ao especificar a configuração do conector via SparkSession
, você deve prefixar as configurações adequadamente. Para informações e outras opções disponíveis do Conector MongoDB Spark, consulte o guia Configurando o Spark.
package com.mongodb object GettingStarted { def main(args: Array[String]): Unit = { /* Create the SparkSession. * If config arguments are passed from the command line using --conf, * parse args for the values to set. */ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local") .appName("MongoSparkConnectorIntro") .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection") .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection") .getOrCreate() } }
Solução de problemas
Se você receber um java.net.BindException: Can't assign requested address
,
Verifique se você não tem outro shell do Spark já em execução.
Tente definir a variável de ambiente
SPARK_LOCAL_IP
; por exemploexport SPARK_LOCAL_IP=127.0.0.1 Tente incluir a seguinte opção ao iniciar a concha Spark:
--driver-java-options "-Djava.net.preferIPv4Stack=true"
Se você tiver erros ao executar os exemplos neste tutorial, você poderá precisar limpar seu cache ívy local (~/.ivy2/cache/org.mongodb.spark
e ~/.ivy2/jars
).