Transmitindo dados com Apache Spark e MongoDB
Avalie esse Artigo
O MongoDB lançou a versão 10 do MongoDB Connector para Apache Spark que aproveita a nova API Spark Data Sources V2 com suporte para o Spark Structured Streaming.
A versão atual do MongoDB Spark Connector foi originalmente escrita em 2016 e é baseada no V1 da API de fontes de dados do Spark. Embora essa versão da API ainda seja suportada, a Databricks lança uma versão atualizada da API, facilitando o trabalho de fontes de dados como o MongoDB com o Spark. O fato de o MongoDB Spark Connector usar o V2 da API, um benefício imediato é uma integração mais estreita com o Spark Structured Streaming.
Nota: Com relação à versão anterior do MongoDB Spark Connector que suportava a API V1, o MongoDB continuará a oferecer suporte a esta versão até que o Databricks deprecie o V1 da API Data Source. Embora nenhum novo recurso seja implementado, as atualizações do connector incluirão correções de bugs e suporte apenas para as versões atuais do Spark.
A nova versão do MongoDB Spark Connector (Versão 10).1) não pretende substituir diretamente seus aplicativos que usam a versão anterior do MongoDB Spark Connector.
O novo connector usa um namespace diferente com um nome curto, "mongodb " (o caminho completo é "com.mongodb.spark.sql.connector.MongoTableProvider "), versus "mongo " (o caminho completo de "com.mongodb.spark.DefaultSource "). Ter um namespace diferente torna possível usar ambas as versões do connector no mesmo aplicativo Spark! Isso é útil para testar a unidade de seu aplicativo com o novo Connector e fazer a transição em seu cronograma.
Além disso, estamos mudando a forma como versionamos o MongoDB Spark Connector. As versões anteriores do MongoDB Spark Connector alinhadas com a versão do Spark que foi suportada — por exemplo, Versão 2.4 do MongoDB Spark Connector funciona com Spark 2.4. Lembre-se de que, no futuro, esse não será o caso. A documentação do MongoDB deixará claro quais versões do Spark são compatíveis com o connector.
O Apache Spark vem com um mecanismo de processamento de fluxo chamado Structured Streaming, baseado no mecanismo SQL do Spark e nas APIs DataFrame. O Spark Structured Streaming trata cada fluxo de dados recebido como um microlote, anexando continuamente cada microlote ao conjunto de dados de destino. Isso facilita a conversão de tarefas em lote existentes do Spark em uma tarefa de streaming. A transmissão estruturada desenvolveu-se com as versões do Spark e no Spark 2.3 introduziu o modo de Processamento Contínuo, que levou a latência de microlote de mais de 100ms para cerca 1ms. Observe que esse recurso ainda está em modo experimental, de acordo com a documentação oficial do Spark. No exemplo a seguir, mostraremos como transmitir dados entre o MongoDB e o Spark usando fluxos estruturados e processamento contínuo. Primeiro, vamos ver como ler dados do MongoDB.
Você pode transmitir dados do MongoDB para o Spark usando o novo Spark Connector. Considere o exemplo a seguir que transmite dados de estoque de um cluster MongoDB Atlas. Um exemplo de documento no MongoDB é o seguinte:
1 { 2 _id: ObjectId("624767546df0f7dd8783f300"), 3 company_symbol: 'HSL', 4 company_name: 'HUNGRY SYNDROME LLC', 5 price: 45.74, 6 tx_time: '2022-04-01T16:57:56Z' 7 }
Neste exemplo de código, usaremos o novo MongoDB Spark Connector e leremos a partir da coleção EstoqueData. Quando o MongoDB Spark Connector abre uma conexão de leitura de streaming com o MongoDB, ele abre a conexão e cria um MongoDB Change Stream para o banco de dados e a coleção fornecidos. Um change stream é usado para assinar as alterações no MongoDB. À medida que os dados são inseridos, atualizados e excluídos, os eventos de fluxo de alterações são criados. São esses eventos de mudança que são passados de volta para o cliente, neste caso, o Spark. Existem opções de configuração que podem alterar a estrutura dessa mensagem de evento. Por exemplo, se você quiser retornar apenas o próprio documento e não incluir os metadados do evento de fluxo de alterações, defina "spark.mongodb.change.stream.publish.full.document.only " como verdadeiro.
1 from pyspark import SparkContext 2 from pyspark.streaming import StreamingContext 3 from pyspark.sql import SparkSession 4 from pyspark.sql.functions import * 5 6 spark = SparkSession.\ 7 builder.\ 8 appName("streamingExampleRead").\ 9 config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12::10.1.1').\ 10 getOrCreate() 11 12 query=(spark.readStream.format("mongodb") 13 .option('spark.mongodb.connection.uri', '<CONNECTION STRING>') 14 .option('spark.mongodb.database', 'Stocks') \ 15 .option('spark.mongodb.collection', 'StockData') \ 16 .option('spark.mongodb.change.stream.publish.full.document.only','true') \ 17 .option("forceDeleteTempCheckpointLocation", "true") \ 18 .load()) 19 20 query.printSchema()
O esquema é inferido a partir da coleção MongoDB. Você pode ver no comando printSchema que a estrutura do nosso documento é a seguinte:
raiz: | |||
---|---|---|---|
_id | string | (nullable=true) | |
company_name | string | (nullable=true) | |
company_symbol | string | (nullable=true) | |
Preço | em dobro | (nullable=true) | |
tx_time | string | (nullable=true) |
Podemos verificar se o conjunto de dados está sendo transmitido com o comando isStreaming.
1 query.isStreaming
Em seguida, vamos ler os dados no console à medida que eles são inseridos no MongoDB.
1 query2=(query.writeStream \ 2 .outputMode("append") \ 3 .option("forceDeleteTempCheckpointLocation", "true") \ 4 .format("console") \ 5 .trigger(continuous="1 second") 6 .start().awaitTermination());
Quando o código acima foi executado por meio do spark-submit, a saída foi semelhante à seguinte:
... removido por brevidade ...
+--------------------+--------------------+--------------+-----+-------------------+ | _id| company_name|company_symbol|price| tx_time| +--------------------+--------------------+--------------+-----+-------------------+ |62476caa6df0f7dd8...| HUNGRY SYNDROME LLC| HSL|45.99|2022-04-01 17:20:42| |62476caa6df0f7dd8...|APPETIZING MARGIN...| AMP|12.81|2022-04-01 17:20:42| |62476caa6df0f7dd8...|EMBARRASSED COCKT...| ECC|38.18|2022-04-01 17:20:42| |62476caa6df0f7dd8...|PERFECT INJURY CO...| PIC|86.85|2022-04-01 17:20:42| |62476caa6df0f7dd8...|GIDDY INNOVATIONS...| GMI|84.46|2022-04-01 17:20:42| +--------------------+--------------------+--------------+-----+-------------------+
... removido por brevidade ...
+--------------------+--------------------+--------------+-----+-------------------+ | _id| company_name|company_symbol|price| tx_time| +--------------------+--------------------+--------------+-----+-------------------+ |62476cab6df0f7dd8...| HUNGRY SYNDROME LLC| HSL|46.04|2022-04-01 17:20:43| |62476cab6df0f7dd8...|APPETIZING MARGIN...| AMP| 12.8|2022-04-01 17:20:43| |62476cab6df0f7dd8...|EMBARRASSED COCKT...| ECC| 38.2|2022-04-01 17:20:43| |62476cab6df0f7dd8...|PERFECT INJURY CO...| PIC|86.85|2022-04-01 17:20:43| |62476cab6df0f7dd8...|GIDDY INNOVATIONS...| GMI|84.46|2022-04-01 17:20:43| +--------------------+--------------------+--------------+-----+-------------------+
Em seguida, vamos considerar um exemplo em que transmitimos dados do Apache Kafka para o MongoDB. Aqui, a origem é um tópico do kafka “stockdata.Stocks.StockData.] À medida que os dados chegam a esse tópico, eles são executados pelo Spark com o conteúdo da mensagem sendo analisado, transformado e gravado no MongoDB. Esta é a lista de código com comentários in-line:
1 from pyspark import SparkContext 2 from pyspark.streaming import StreamingContext 3 from pyspark.sql import SparkSession 4 from pyspark.sql import functions as F 5 from pyspark.sql.functions import * 6 from pyspark.sql.types import StructType,TimestampType, DoubleType, StringType, StructField 7 8 spark = SparkSession.\ 9 builder.\ 10 appName("streamingExampleWrite").\ 11 config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.1.1').\ 12 config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0').\ 13 getOrCreate() 14 15 df = spark \ 16 .readStream \ 17 .format("kafka") \ 18 .option("startingOffsets", "earliest") \ 19 .option("kafka.bootstrap.servers", "KAFKA BROKER HOST HERE") \ 20 .option("subscribe", "stockdata.Stocks.StockData") \ 21 .load() 22 23 schemaStock = StructType([ \ 24 StructField("_id",StringType(),True), \ 25 StructField("company_name",StringType(), True), \ 26 StructField("company_symbol",StringType(), True), \ 27 StructField("price",StringType(), True), \ 28 StructField("tx_time",StringType(), True)]) 29 30 schemaKafka = StructType([ \ 31 StructField("payload",StringType(),True)])
Observe que a mensagem de tópico do Kafka chega neste formato -> chave (binário), valor (binário), tópico (string), partição (int), offset (long), timestamp (long), timestamptype (int). Consulte o Guia de integração do Streaming estruturado + Kafka (corretor do Kafka versão 0.10.0 ou superior) para obter mais informações sobre a integração do Kafka e Spark.
Para processar a mensagem para consumo no MongoDB, queremos escolher o valor que está em formato binário e convertê-lo em JSON.
1 stockDF=df.selectExpr("CAST(value AS STRING)")
Para referência, aqui está um exemplo de um evento (o valor convertido em uma string) que está no tópico Kafka:
1 { 2 "schema": { 3 "type": "string", 4 "optional": false 5 }, 6 "payload": "{\"_id\": {\"$oid\": \"6249f8096df0f7dd8785d70a\"}, \"company_symbol\": \"GMI\", \"company_name\": \"GIDDY INNOVATIONS\", \"price\": 87.57, \"tx_time\": \"2022-04-03T15:39:53Z\"}" 7 }
Queremos isolar o campo de carga e convertê-lo em uma representação JSON aproveitando o shcemaStock definido acima. Para maior clareza, dividimos a operação em várias etapas para explicar o processo. Primeiro, queremos converter o valor em JSON.
1 stockDF=stockDF.select(from_json(col('value'),schemaKafka).alias("json_data")).selectExpr('json_data.*')
O conjunto de dados agora contém dados que se assemelham
1 … 2 { 3 _id: ObjectId("624c6206e152b632f88a8ee2"), 4 payload: '{"_id": {"$oid": "6249f8046df0f7dd8785d6f1"}, "company_symbol": "GMI", "company_name": "GIDDY MONASTICISM INNOVATIONS", "price": 87.62, "tx_time": "2022-04-03T15:39:48Z"}' 5 }, …
Em seguida, queremos capturar apenas o valor do campo de carga útil e convertê-lo em JSON, já que está armazenado como uma string.
1 stockDF=stockDF.select(from_json(col('payload'),schemaStock).alias("json_data2")).selectExpr('json_data2.*')
Agora podemos fazer quaisquer transformações que quisermos fazer nos dados. Nesse caso, vamos converter tx_time em um carimbo de data/hora.
1 stockDF=stockDF.withColumn("tx_time",col("tx_time").cast("timestamp"))
O conjunto de dados está em um formato pronto para consumo no MongoDB, então vamos transmiti-lo para o MongoDB. Para fazer isso, use o método writeStream. Lembre-se de que existem várias opções para definir. Por exemplo, quando presente, a opção “trigger” processa os resultados em lotes. Neste exemplo, é a cada 10 segundos. Remover o campo de trigger resultará em escrita contínua. Para obter mais informações sobre opções e parâmetros, consulte o Guia de Streaming Estruturado.
1 dsw = ( 2 stockDF.writeStream 3 .format("mongodb") 4 .queryName("ToMDB") 5 .option("checkpointLocation", "/tmp/pyspark7/") 6 .option("forceDeleteTempCheckpointLocation", "true") 7 .option('spark.mongodb.connection.uri', ‘<CONNECTION STRING>') 8 .option('spark.mongodb.database', 'Stocks') 9 .option('spark.mongodb.collection', 'Sink') 10 .trigger(continuous="10 seconds") 11 .outputMode("append") 12 .start().awaitTermination());
Embora o modo contínuo ofereça muitas promessas em termos de características de latência e desempenho, o suporte para vários conectores populares, como o AWS S3, por exemplo, é inexistente. Assim, você pode acabar usando o modo de microlote em sua solução. A principal diferença entre os dois é como o Spark lida com a obtenção dos dados do fluxo. Conforme mencionado anteriormente, os dados são agrupados e processados em vez de usar um acréscimo contínuo a uma tabela. A diferença notável é a latência anunciada do microbatch em torno de 100ms, o que, para a maioria das cargas de trabalho, pode não ser um problema.
Ao contrário de quando especificamos uma gravação, quando lemos do MongoDB, não há nenhuma configuração especial para dizer ao Spark que use microbatch ou contínuo. Esse comportamento é determinado somente quando você escreve. Assim, em nosso exemplo de código, ler do MongoDB é o mesmo em ambos os casos, por exemplo:
1 query=(spark.readStream.format("mongodb").\ 2 option('spark.mongodb.connection.uri', '<<MONGODB CONNECTION STRING>>').\ 3 option('spark.mongodb.database', 'Stocks').\ 4 option('spark.mongodb.collection', 'StockData').\ 5 option('spark.mongodb.change.stream.publish.full.document.only','true').\ 6 option("forceDeleteTempCheckpointLocation", "true").\ 7 load())
Lembre-se da discussão anterior sobre a leitura de dados do MongoDB, ao usar
spark.readStream.format("mongodb")
, o MongoDB abre um fluxo de alterações e assina as alterações à medida que ocorrem no banco de dados. Com o microlote, cada evento de microlote abre um novo cursor de change stream, tornando essa forma de streaming de microlote menos eficiente do que os fluxos contínuos. Dito isto, alguns consumidores de dados de streaming, como o AWS S3 , suportam apenas dados de streams de microlote.Considere o código de exemplo do writeStream anterior:
1 dsw = ( 2 stockDF.writeStream 3 .format("mongodb") 4 .queryName("ToMDB") 5 .option("checkpointLocation", "/tmp/pyspark7/") 6 .option("forceDeleteTempCheckpointLocation", "true") 7 .option('spark.mongodb.connection.uri', '<<MONGODB CONNECTION STRING>>') 8 .option('spark.mongodb.database', 'Stocks') 9 .option('spark.mongodb.collection', 'Sink') 10 .trigger(continuous="10 seconds") 11 .outputMode("append") 12 .start().awaitTermination());
Aqui o .trigger o parâmetro foi usado para dizer ao Spark que usasse o streaming no modo contínuo; para usar o microbatch, basta remover o .trigger Parâmetro.
O streaming de dados é um componente fundamental de muitos tipos de aplicativos. O MongoDB tem evoluído ao longo dos anos, adicionando continuamente recursos e funcionalidades para dar suporte a esses tipos de volumes de trabalho. Com o MongoDB Spark Connector versão 10.1, você pode transmitir dados rapidamente de e para o MongoDB com algumas linhas de código.
Para obter mais informações e exemplos sobre a nova versão do Spark Connector 10.1, confira a documentação on -line. Tem dúvidas sobre o connector ou MongoDB? Poste uma pergunta no fórumConectores e integrações da comunidade de desenvolvedores MongoDB .