Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Apresentando MongoDB 8.0, o MongoDB mais rápido de todos os tempos!
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
MongoDBchevron-right

Usando MongoDB com Apache Airflow

Robert Walters8 min read • Published Nov 15, 2022 • Updated Jun 12, 2023
MongoDB
Ícone do FacebookÍcone do Twitterícone do linkedin
Usando MongoDB com Apache Airflow
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Embora escrever trabalhos cron para executar scripts seja uma maneira de realizar a movimentação de dados, à medida que os fluxos de trabalho se tornam mais complexos, o gerenciamento do agendamento de tarefas se torna muito difícil e propenso a erros. É aqui que o Apache Airflow entra. O Airflow é um sistema de gerenciamento de fluxo de trabalho de código aberto, originalmente projetado pelo Airbnb em 2015. Com o Airflow, você pode programaticamente criar, programar e monitorar pipelines de dados complexos. O Airflow é usado em muitos casos de uso com o MongoDB, incluindo:
  • Pipelines de aprendizado de máquina.
  • Automação de operações de administração de banco de dados.
  • Movimentação de dados em lote.
Neste post, você aprenderá os fundamentos de como aproveitar o MongoDB dentro de um pipeline do Airflow.

Começar

O Apache Airflow consiste em várias etapas de instalação, incluindo a instalação de um banco de dados e de um servidor da Web. Embora seja possível seguir o script de instalação e configurar o banco de dados e os serviços, a maneira mais fácil de começar a usar o Airflow é usar a CLI do Astronomer. Essa CLI monta um ambiente completo de docker do Airflow a partir de uma única linha de comando.
Da mesma forma, a maneira mais fácil de criar um cluster do MongoDB é com o MongoDB Atlas. O Atlas não é apenas um cluster hospedado do MongoDB. Em vez disso, é um conjunto integrado de banco de dados e serviços de dados na nuvem que permitem criar aplicativos rapidamente. O Atlas Data Federation é um serviço de processamento de queries nativo na nuvem que permite aos usuários criar uma coleção virtual a partir de fontes de dados heterogêneas, como buckets do Amazon S3 , clusters do MongoDB e endpoints da API HTTP. Uma vez definido, o usuário simplesmente emite uma query para obter dados combinados a partir dessas fontes.
Por exemplo, considere um cenário em que você estava movendo dados com um Airflow DAG para o MongoDB e queria ingressar no armazenamento de objeto na nuvem - dados do Amazon S3 ou do Microsoft Azure Blob Storage com o MongoDB como parte de uma aplicação de análise de dados.  Usando o MongoDB Atlas Data Federation, você cria uma coleção que contém um cluster do MongoDB e uma coleção de armazenamento de objeto na nuvem. Agora, tudo o que sua aplicação precisa fazer é emitir uma única query e o Atlas se encarrega de unir os dados heterogêneos. Esse recurso e outros como o MongoDB Charts, que veremos mais adiante neste post, aumentarão sua produtividade e aprimorarão sua solução de Airflow. Para saber mais sobre o MongoDB Atlas Data Federation, confira o webinar MongoDB.live no Youtube, "Help You Data Flow with Atlas Data Lake".  Para uma visão geral do MongoDB Atlas, confiraIntrodução ao MongoDB Atlas em 10 minutos | Jumpstart, disponível no Youtube.

Moeda ao longo do tempo

Nesta publicação, criaremos um fluxo de trabalho do Airflow que consulta um endpoint HTTP para obter uma lista histórica de valores de moedas em relação ao dólar. Os dados serão então inseridos no MongoDB usando o MongoHook e um gráfico será criado usando os MongoDB Charts. No Airflow, um hook é uma interface para uma plataforma externa ou banco de dados como o MongoDB. O MongoHook envolve o PyMongo Python Driver para MongoDB, desbloqueando todos os recursos do driver em um fluxo de trabalho do Airflow.

Etapa 1: iniciar o ambiente do Airflow

Se você ainda não tiver um ambiente Airflow disponível, instale o Astro CLI. Depois de instalado, crie um diretório para o projeto chamado “currency.”
mkdir currency && cd currency
Em seguida, crie o ambiente do Airflow usando a CLI do Astro.
astro dev init
Este comando criará uma estrutura de pasta que inclui uma pasta para DAGs, um Dockerfile e outros arquivos de suporte usados para personalizações.

Etapa 2: instalar o provedor MongoDB Airflow

Os provedores ajudam o Airflow a interagir com sistemas externos. Para adicionar um provedor, modifique o arquivo requirements.txt e adicione o provedor MongoDB.
echo “apache-airflow-providers-mongo==3.0.0” > > requirements.txt
Finalmente, inicie o projeto Airflow.
astro dev start
Esse comando simples iniciará e configurará os quatro contêineres do docker necessários para o Airflow: um servidor web, um agendador, um trigger e um banco de dados Postgres, respectivamente.
Astro dev restart
Nota: você também pode instalar manualmente o provedor do MongoDB usando PyPi se não estiver usando o Astro CLI.
Observação: o provedor HTTP já está instalado como parte do tempo de execução do Astro. Se você não tiver usado o Astro, precisará instalar o provedor HTTP.

Etapa 3: criação o fluxo de trabalho DAG

Um dos componentes instalados com o Airflow é um servidor web. Isso é usado como o principal portal operacional para fluxos de trabalho do Airflow. Para acessar, abra um navegador e navegue até http://localhost:8080. Dependendo de como você instalou o Airflow, você poderá ver exemplos de DAGs já preenchidos. Os fluxos de trabalho do Airflow são chamados de DAGs (Directed Acycle Graphs) e podem ser qualquer coisa, desde os pipelines de agendamento de tarefas mais básicos até ETL mais complexos, aprendizado de máquina ou fluxos de trabalho preditivos do pipeline de dados, como detecção de fraudes. Esses DAGs são scripts Python que oferecem aos desenvolvedores controle total do fluxo de trabalho. Os DAGs podem ser acionados manualmente por meio de uma chamada de API ou da interface do usuário da Web. Os DAGs também podem ser agendados para execução única, recorrente ou em qualquer configuração semelhante ao cron.
Vamos começar a explorar o Airflow criando um arquivo Python “currency.py, " dentro da pasta dags usando seu editor favorito.
Veja a seguir o código-fonte completo do DAG.
1import os
2import json
3from airflow import DAG
4from airflow.operators.python import PythonOperator
5from airflow.operators.bash import BashOperator
6from airflow.providers.http.operators.http import SimpleHttpOperator
7from airflow.providers.mongo.hooks.mongo import MongoHook
8from datetime import datetime,timedelta
9
10def on_failure_callback(**context):
11 print(f"Task {context['task_instance_key_str']} failed.")
12
13def uploadtomongo(ti, **context):
14 try:
15 hook = MongoHook(mongo_conn_id='mongoid')
16 client = hook.get_conn()
17 db = client.MyDB
18 currency_collection=db.currency_collection
19 print(f"Connected to MongoDB - {client.server_info()}")
20 d=json.loads(context["result"])
21 currency_collection.insert_one(d)
22 except Exception as e:
23 printf("Error connecting to MongoDB -- {e}")
24
25with DAG(
26 dag_id="load_currency_data",
27 schedule_interval=None,
28 start_date=datetime(2022,10,28),
29 catchup=False,
30 tags= ["currency"],
31 default_args={
32 "owner": "Rob",
33 "retries": 2,
34 "retry_delay": timedelta(minutes=5),
35 'on_failure_callback': on_failure_callback
36 }
37) as dag:
38
39 t1 = SimpleHttpOperator(
40 task_id='get_currency',
41 method='GET',
42 endpoint='2022-01-01..2022-06-30',
43 headers={"Content-Type": "application/json"},
44 do_xcom_push=True,
45 dag=dag)
46
47 t2 = PythonOperator(
48 task_id='upload-mongodb',
49 python_callable=uploadtomongo,
50 op_kwargs={"result": t1.output},
51 dag=dag
52 )
53
54 t1 >> t2

Etapa 4: configurar conexões

Ao observar o código, observe que não há strings de conexão no arquivo Python. Os identificadores de conexão, conforme mostrado no trecho de código abaixo, são espaços reservados para strings de conexão.
hook = MongoHook(mongo_conn_id='mongoid')
Os identificadores de conexão e as configurações de conexão que eles representam são definidos na guia Conexões do menu Admin na interface do usuário do Airflow.
lista suspensa de itens do menu admin
Neste exemplo, como estamos nos conectando ao MongoDB e a uma API HTTP, precisamos definir duas conexões. Primeiro, vamos criar a conexão MongoDB clicando no botão "Add a new record ".
Adicionar um botão de nova gravação
Isso apresentará uma página na qual você poderá preencher as informações de conexão. Selecione “MongoDB” no menu suspenso Tipo de conexão e preencha os seguintes campos:
ID de conexãoMongoid
Tipo de conexãoMongoDB
AnfitriãoXXXX..mongodb.net(Coloque seu nome de host do MongoDB Atlas aqui)
EsquemaMeuDB

(por exemplo, o banco de dados no MongoDB)
login(Coloque seu nome de usuário do banco de dados aqui)
Senha(Coloque aqui a senha do seu banco de dados)
Extra{"srv": true}
Clique em “Save” e “Add a new record” para criar a conexão HTTP API.
Selecione "HTTP " para o Tipo de Conexão e preencha os seguintes campos:
ID de conexãohttp_default
Tipo de conexãoHTTP
Anfitriãoapi.frankfurter.app
Observação: as strings de conexão também podem ser armazenadas em variáveis de ambiente ou armazenadas com segurança usando um back-end de segredos externo, como o HashiCorp Vault ou o AWS SSM Parameter Store.

Etapa 5: o fluxo de trabalho DAG

Clique no menu DAGs e depois em “load_currency_data.”  Você verá vários subitens que abordam o fluxo de trabalho, como o menu Código que mostra o código Python que compõe o DAG.
Clicar em Grafo mostrará uma representação visual do DAG analisado a partir do código Python.
visualização de gráfico do fluxo de trabalho DAG Em nosso exemplo, "get_currency " usa o SimpleHttpOperator para obter uma lista histórica de valores de moeda em relação ao real.
1t1 = SimpleHttpOperator(
2 task_id='get_currency',
3 method='GET',
4 endpoint='2022-01-01..2022-06-30',
5 headers={"Content-Type": "application/json"},
6 do_xcom_push=True,
7 dag=dag)
O Airflow passa informações entre tarefas usando XComs. Neste exemplo, armazenamos os dados de retorno da chamada da API para o XCom. O próximo operador, "upload-mongodb,", usa o PythonOperator para chamar uma função python, "uploadtomongo."
1t2 = PythonOperator(
2 task_id='upload-mongodb',
3 python_callable=uploadtomongo,
4 op_kwargs={"result": t1.output},
5 dag=dag
6 )
Essa função acessa os dados armazenados no XCom e usa o MongoHook para inserir os dados obtidos da chamada de API em um MongoDB cluster.
1def uploadtomongo(ti, **context):
2 try:
3 hook = MongoHook(mongo_conn_id='mongoid')
4 client = hook.get_conn()
5 db = client.MyDB
6 currency_collection=db.currency_collection
7 print(f"Connected to MongoDB - {client.server_info()}")
8 d=json.loads(context["result"])
9 currency_collection.insert_one(d)
10 except Exception as e:
11 printf("Error connecting to MongoDB -- {e}")
Embora nosso fluxo de trabalho de exemplo seja simples, execute uma tarefa e, em seguida, outra tarefa.
1t1 >> t2
O Airflow sobrecarregou o operador bit a bit “>>” para descrever o fluxo de tarefas. Para obter mais informações, consulte “Bitshift Composition.”
O Airflow pode permitir fluxos de trabalho mais complexos, como os seguintes:
Visualização de gráfico do fluxo de trabalho DAG
A execução da tarefa pode ser condicional com vários caminhos de execução.

Etapa 6: agendamento do DAG

O Airflow é mais conhecido por seus recursos de agendamento de fluxo de trabalho, e eles são definidos como parte da definição do DAG.
1with DAG(
2 dag_id="load_currency_data",
3 schedule=None,
4 start_date=datetime(2022,10,28),
5 catchup=False,
6 tags= ["currency"],
7 default_args={
8 "owner": "Rob",
9 "retries": 2,
10 "retry_delay": timedelta(minutes=5),
11 'on_failure_callback': on_failure_callback
12 }
13) as dag:
intervalo de agendamento pode ser definido usando uma expressão cron, um timedelta ou uma das predefinições do AIrflow, como a usada neste exemplo, "None."
Os DAGs podem ser agendados para começar em uma data no passado. Se você quiser que o Airflow atualize e execute o DAG tantas vezes quanto teriam sido feitas no horário de início e agora, você pode definir a propriedade “catchup”. Observação: “Catchup” é definido como “True,” por padrão, então certifique-se de definir o valor adequadamente.
No nosso exemplo, você pode ver apenas algumas das opções de configuração disponíveis.

Etapa 7: executando o DAG

Você pode executar um DAG ad-hoc pela Web usando o botão "play" na coluna de ação.
opções do menu de ações
Após a execução, você pode clicar no item de menu DAG e Grid para exibir o status do tempo de execução do DAG.
visão da execução de dag
No exemplo acima, o DAG foi executado quatro vezes, todas com sucesso. Você pode visualizar o log de cada etapa clicando na tarefa e, em seguida, em "Log" no menu.
mostrando a visualização de detalhes do log
O log é útil para solucionar problemas da tarefa. Aqui podemos ver nossa saída do comando print(f"Connected to MongoDB - {client.server_info()}") dentro do PythonOperator.
log por tentativas

Etapa 8: explorar os dados no MongoDB Atlas

Depois de executar o DAG, os dados estarão no MongoDB Atlas cluster. Navegando para o cluster, podemos ver que o "currency_collection " foi criado e preenchido com dados monetários.
Exibição de coleção do MongoDB Atlas

Etapa 9: Visualizando os dados usando MongoDB Charts

Em seguida, podemos visualizar os dados usando o MongoDB Charts.
Observe que os dados foram armazenados no MongoDB a partir da API com um subdocumento para cada dia do período determinado. Uma amostra desses dados é a seguinte:
1{
2 _id: ObjectId("635b25bdcef2d967af053e2c"),
3 amount: 1,
4 base: 'EUR',
5 start_date: '2022-01-03',
6 end_date: '2022-06-30',
7 rates: {
8 '2022-01-03': {
9 AUD: 1.5691,
10 BGN: 1.9558,
11 BRL: 6.3539,
12… },
13},
14 '2022-01-04': {
15 AUD: 1.5682,
16 BGN: 1.9558,
17 BRL: 6.4174,
18… }
Com o MongoDB Charts, podemos definir um filtro de pipeline de agregação para transformar os dados em um formato que será otimizado para a criação de gráficos. Por exemplo, considere o seguinte filtro de pipeline de agregação:
1[{$project:{
2 rates:{
3 $objectToArray:"$rates"}}},{
4 $unwind:"$rates"
5 }
6 ,{
7 $project:{
8 _id:0,"date":"$rates.k","Value":"$rates.v"}}]
Isso transforma os dados em subdocumentos que têm dois pares de valores de chave da data e dos valores, respectivamente.
1{
2 date: '2022-01-03',
3 Value: {
4 AUD: 1.5691,
5 BGN: 1.9558,
6 BRL: 6.3539,
7… },
8 {
9 date: '2022-01-04',
10 Value: {
11 AUD: 1.5682,
12 BGN: 1.9558,
13 BRL: 6.4174,
14..}
15}
Podemos adicionar esse filtro de pipeline de agregação ao Charts e criar um gráfico comparando o dólar americano (USD) ao euro (EUR) durante esse período.
Podemos adicionar esse filtro de pipeline de agregação ao Charts e criar um gráfico comparando o dólar americano (USD) ao euro (EUR) durante esse período. Para obter mais informações sobre o MongoDB Charts, assista ao vídeo do Youtube "Intro to MongoDB Charts (demo)" para ver uma explicação passo a passo do recurso.

Resumo

O Airflow é um agendador de fluxo de trabalho de código aberto usado por muitas empresas em todo o mundo. A integração do MongoDB com o Airflow é simples usando o MongoHook. O Astronomer facilita a criação rápida de uma implantação local do Airflow. O Astronomer também tem um registro que fornece um local central para os operadores do Airflow, incluindo o MongoHook e o MongoSensor. 

Recursos úteis

Saiba mais sobre o Astronomer e consulte a documentação do MongoHook.

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Explorando os recursos avançados do Atlas Search com o MongoDB Atlas Atlas Search


Aug 20, 2024 | 6 min read
Tutorial

Introdução ao MongoDB e ao AWS Codewhisperer


Sep 26, 2024 | 3 min read
Tutorial

Série temporal MongoDB com C++


Sep 17, 2024 | 6 min read
Tutorial

Interaja com o MongoDB Atlas em uma função do AWS Lambda usando C#


Jan 23, 2024 | 5 min read
Sumário
  • Começar