Menu Docs
Página inicial do Docs
/
Conector do Spark

Primeiros passos com o Spark Connector

Nesta página

  • Pré-requisitos
  • Começar
  • Tutorials
  • Conhecimento básico de trabalho do MongoDB e Apache Spark. Consulte a documentação do MongoDB , Documentação do Spark, e este whitepaper do MongoDB para obter mais detalhes.

  • MongoDB versão 4.0 ou posterior

  • Versão 3.1 a 3.5 do Spark

  • Java 8 ou posterior

Importante

Na versão 10.0.0 e posterior do Connector, use o formato mongodb para ler e escrever no MongoDB:

df = spark.read.format("mongodb").load()

Forneça as dependências Spark Core, Spark SQL e MongoDB Spark Connector à sua ferramenta de gerenciamento de dependência.

A partir da versão 3.2.0, O Apache Spark é compatível com Scala 2.12 e 2.13. O Spark 3.1.3 e as versões anteriores são compatíveis apenas com Scala 2.12. Para fornecer suporte para ambas as versões Scala , a versão 10.4.0 do Spark Connector produz dois artefatos:

  • org.mongodb.spark:mongo-spark-connector_2.12:10.4.0 é compilado em relação ao Scala 2.12 e é compatível com o Spark 3.1.x e superior.

  • org.mongodb.spark:mongo-spark-connector_2.13:10.4.0 é compilado em relação ao Scala 2.13 e é compatível com o Spark 3.2.x e superior.

Importante

Use o artefato Spark Connector compatível com suas versões do Scala e Spark.

O seguinte trecho de um arquivo Maven pom.xml mostra como incluir dependências compatíveis com Scala 2.12:

<dependencies>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.12</artifactId>
<version>10.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>

Ao especificar a configuração do Connector via SparkSession, você deve prefixar as configurações adequadamente. MongoDB Spark Connector Para obter detalhes e outras opções disponíveis do Spark , consulte o guia Configuração do .

package com.mongodb.spark_examples;
import org.apache.spark.sql.SparkSession;
public final class GettingStarted {
public static void main(final String[] args) throws InterruptedException {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Application logic
}
}
  • O spark.mongodb.read.connection.uri especifica o endereço do servidor MongoDB (127.0.0.1), o banco de dados de dados para conectar (test) e a collection (myCollection) da qual ler os dados e a preferência de leitura.

  • O spark.mongodb.write.connection.uri especifica o endereço do servidor MongoDB(127.0.0.1), o banco de dados para conectar (test) e a collection (myCollection) na qual escrever dados.

Você pode utilizar um objeto SparkSession para escrever dados no MongoDB, ler dados do MongoDB, criar conjuntos de dados e executar operações SQL.

Importante

Na versão 10.0.0 e posterior do conector, use o formato mongodb para ler e escrever no MongoDB:

df = spark.read.format("mongodb").load()

Este tutorial usa o shell pyspark, mas o código também funciona com aplicativos Python independentes.

Ao iniciar o shell pyspark, você pode especificar:

  • a opção --packages para baixar o pacote do conector Spark do MongoDB. O seguinte pacote está disponível:

    • mongo-spark-connector

  • a opção --conf para configurar o conector Spark do MongoDB. Estas configurações definem o objeto SparkConf.

    Observação

    Se você usar SparkConf para configurar o Spark Connector, deverá prefixar as configurações adequadamente. Para obter detalhes e outras opções MongoDB Spark Connector disponíveis do , consulte o guia Configurar .Spark

O exemplo a seguir inicia o shell pyspark na linha de comando:

./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
  • O spark.mongodb.read.connection.uri especifica o endereço do servidor MongoDB (127.0.0.1), o banco de dados para conectar (test) e a collection (myCollection) da qual ler os dados e a read preference.

  • O spark.mongodb.write.connection.uri especifica o endereço do servidor MongoDB (127.0.0.1), o banco de dados de dados para conectar (test) e a collection (myCollection) na qual escrever dados. Conecta à porta 27017 por padrão.

  • A opção packages especifica as coordenadas Maven do conector Spark, no formato groupId:artifactId:version.

Os exemplos neste tutorial usarão esse banco de dados e coleção.

Observação

Ao iniciar pyspark você obtém um objeto SparkSession chamado spark por padrão. Em um aplicativo Python autônomo, você precisa criar seu objeto SparkSession explicitamente, conforme mostrado abaixo.

Se você especificou as opções de configuração spark.mongodb.read.connection.uri e spark.mongodb.write.connection.uri ao iniciar pyspark, o objeto SparkSession padrão as utilizará. Se preferir criar seu próprio objeto SparkSession de dentro do pyspark, você pode usar SparkSession.builder e especificar diferentes opções de configuração.

from pyspark.sql import SparkSession
my_spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.coll") \
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.coll") \
.getOrCreate()

Você pode utilizar um objeto SparkSession para gravar dados no MongoDB, ler dados do MongoDB, criar DataFrames e executar operações SQL.

Importante

Na versão 10.0.0 e posterior do conector, use o formato mongodb para ler e escrever no MongoDB:

df = spark.read.format("mongodb").load()

Ao iniciar a concha Spark, especifique:

  • a opção --packages para baixar o pacote do conector Spark do MongoDB. O seguinte pacote está disponível:

    • mongo-spark-connector

  • a opção --conf para configurar o conector Spark do MongoDB. Estas configurações definem o objeto SparkConf.

    Observação

    Se você usar SparkConf para configurar o Spark Connector, deverá prefixar as configurações adequadamente. Para obter detalhes e outras opções MongoDB Spark Connector disponíveis do , consulte o guia Configurar .Spark

Por exemplo,

./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
--conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \
--packages org.mongodb.spark:mongo-spark-connector_2.12:10.4.0
  • O spark.mongodb.read.connection.uri especifica o endereço do servidor MongoDB (127.0.0.1), o banco de dados para conectar (test) e a collection (myCollection) da qual ler os dados e a read preference.

  • O spark.mongodb.write.connection.uri especifica o endereço do servidor MongoDB (127.0.0.1), o banco de dados de dados para conectar (test) e a collection (myCollection) na qual escrever dados. Conecta à porta 27017 por padrão.

  • A opção packages especifica as coordenadas Maven do conector Spark, no formato groupId:artifactId:version.

Habilite as funções e implicações específicas do MongoDB Connector para seus objetos SparkSession e Dataset importando o seguinte pacote no shell do Spark:

import com.mongodb.spark._

A conexão com o MongoDB acontece automaticamente quando uma ação do Dataset requer uma leitura do MongoDB ou uma gravação no MongoDB.

Forneça as dependências Spark Core, Spark SQL e MongoDB Spark Connector à sua ferramenta de gerenciamento de dependência.

O seguinte trecho demonstra como incluir essas dependências em um SBT build.scala Arquivo :

scalaVersion := "2.12",
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector_2.12" % "10.4.0",
"org.apache.spark" %% "spark-core" % "3.3.1",
"org.apache.spark" %% "spark-sql" % "3.3.1"
)

Ao especificar a configuração do conector via SparkSession, você deve prefixar as configurações adequadamente. Para informações e outras opções disponíveis do Conector MongoDB Spark, consulte o guia Configurando o Spark.

package com.mongodb
object GettingStarted {
def main(args: Array[String]): Unit = {
/* Create the SparkSession.
* If config arguments are passed from the command line using --conf,
* parse args for the values to set.
*/
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
}
}

Se você receber um java.net.BindException: Can't assign requested address,

  • Verifique se você não tem outro shell do Spark já em execução.

  • Tente definir a variável de ambiente SPARK_LOCAL_IP ; por exemplo

    export SPARK_LOCAL_IP=127.0.0.1
  • Tente incluir a seguinte opção ao iniciar a concha Spark:

    --driver-java-options "-Djava.net.preferIPv4Stack=true"

Se você tiver erros ao executar os exemplos neste tutorial, você poderá precisar limpar seu cache ívy local (~/.ivy2/cache/org.mongodb.spark e ~/.ivy2/jars).

Voltar

MongoDB Connector para Spark