Usando MongoDB com Apache Airflow
Avaliar este tutorial
Embora escrever tarefas cron para executar scripts seja uma maneira de realizar a movimentação de dados, à medida que os fluxos de trabalho se tornam mais complexos, gerenciar o agendamento de tarefas se torna muito difícil e sujeito a erros. É aqui que o Apache Airflow se destaca. O Airflow é um sistema de gerenciamento de fluxo de trabalho originalmente projetado pelo Airbnb e passou a ser de código aberto em 2015. Com o Airflow, você pode criar, agendar e monitorar pipelines de dados complexos pelo código de programação. O Airflow é usado em muitos casos de uso com o MongoDB, como:
- Pipelines de aprendizado de máquina.
- Automação de operações de administração de banco de dados.
- Movimentação de dados em lote.
Neste post, você aprenderá os fundamentos de como aproveitar o MongoDB dentro de um pipeline do Airflow.
O Apache Airflow consiste em várias etapas de instalação, incluindo a instalação de um banco de dados e de um servidor da Web. Embora seja possível seguir o script de instalação e configurar o banco de dados e os serviços, a maneira mais fácil de começar a usar o Airflow é usar a CLI do Astronomer. Essa CLI monta um ambiente completo de docker do Airflow a partir de uma única linha de comando.
Da mesma forma, a maneira mais fácil de manter um MongoDB cluster é com o MongoDB Atlas. O Atlas não é apenas um MongoDB cluster hospedado. Na verdade, ele é um conjunto integrado de banco de dados em nuvem e serviços de dados que permitem que você crie rapidamente seus aplicativos Um serviço, o Atlas Data Federation, é um serviço de processamento de queries nativo da nuvem que permite aos usuários criar uma coleção virtual a partir de fontes de dados heterogêneas, como buckets do Amazon S3, clusters do MongoDB e endpoints de API HTTP. Uma vez definido, o usuário simplesmente emite uma query para obter dados combinados dessas fontes.
Por exemplo, considere um cenário em que você estava movendo dados com um Airflow DAG para o MongoDB e queria ingressar no armazenamento de objetos na nuvem (Amazon S3 ou dados do Microsoft Azure Blob Storage com o MongoDB como parte de um aplicativo de análise de dados). Usando o MongoDB Atlas Data Federation, você cria uma coleção virtual que contém um MongoDB cluster e uma coleção de armazenamento de objetos na nuvem. Agora, basta seu aplicativo emitir uma única query e o Atlas se encarrega de unir os dados heterogêneos. Esse recurso e outros como MongoDB Charts, que veremos mais tarde nesta publicação, aumentarão sua produtividade e aprimorarão sua solução do Airflow. Para saber mais sobre o MongoDB Atlas Data Federation, confira o webinar ao vivo no YouTube sobre o MongoDB, Help Your Data Flow with Atlas Data Lake. Para uma visão geral do MongoDB Atlas, confira Introdução ao MongoDB Atlas em 10 minutos | Início, disponível no YouTube.
Nesta publicação, criaremos um fluxo de trabalho do Airflow que executa query de um endpoint HTTP para obter uma lista histórica de valores de moeda em relação ao dólar. Os dados serão então inseridos no MongoDB usando o MongoHook e um gráfico será criado usando os MongoDB Charts. No Airflow, um hook é uma interface para uma plataforma externa ou banco de dados como o MongoDB. O MongoHook envolve o PyMongo Python Driver para MongoDB, desbloqueando todos os recursos do driver em um fluxo de trabalho do Airflow.
Se você ainda não tiver um ambiente Airflow disponível, instale o Astro CLI. Depois de instalado, crie um diretório para o projeto chamado “currency.”
mkdir currency && cd currency
Em seguida, crie o ambiente do Airflow usando a CLI do Astro.
astro dev init
Este comando criará uma estrutura de pasta que inclui uma pasta para DAGs, um Dockerfile e outros arquivos de suporte usados para personalizações.
Os provedores ajudam o Airflow a interagir com sistemas externos. Para adicionar um provedor, modifique o arquivo requirements.txt e adicione o provedor MongoDB.
echo “apache-airflow-providers-mongo==3.0.0” > > requirements.txt
Finalmente, inicie o projeto Airflow.
astro dev start
Esse comando simples iniciará e configurará os quatro contêineres do docker necessários para o Airflow: um servidor web, um agendador, um trigger e um banco de dados Postgres, respectivamente.
Astro dev restart
Observação: você também pode instalar manualmente o provedor MongoDB usando PyPi se não estiver usando o Astro CLI.
Observação: o provedor HTTP já está instalado como parte do tempo de execução do Astro. Se você não usou o Astro, será necessário instalar o provedor HTTP.
Um dos componentes que é instalado com o Airflow é um servidor web. Isso é usado como o principal portal operacional para fluxos de trabalho do Airflow. Para acessar, abra um navegador e navegue até http://localhost:8080. Dependendo de como você instalou o Airflow, você pode ver exemplos de DAGs já preenchidos. Os fluxos de trabalho do Airflow são chamados de DAGs (gráficos acíclicos direcionados) e podem ser qualquer coisa, desde os pipelines de agendamento de trabalho mais básicos até fluxos de trabalho mais complexos de ETL, aprendizado de máquina ou pipeline de dados preditivos, como detecção de fraude. Esses DAGs são scripts Python que dão aos desenvolvedores controle total do fluxo de trabalho. Os DAGs podem ser acionados manualmente por meio de uma chamada de API ou da interface do usuário da Web. Os DAGs também podem ser agendados para execução uma vez, de forma recorrente ou em qualquer configuração de tipo cron.
Vamos começar a explorar o Airflow criando um arquivo Python “currency.py, " dentro da pasta dags usando seu editor favorito.
Veja a seguir o código-fonte completo do DAG.
1 import os 2 import json 3 from airflow import DAG 4 from airflow.operators.python import PythonOperator 5 from airflow.operators.bash import BashOperator 6 from airflow.providers.http.operators.http import SimpleHttpOperator 7 from airflow.providers.mongo.hooks.mongo import MongoHook 8 from datetime import datetime,timedelta 9 10 def on_failure_callback(**context): 11 print(f"Task {context['task_instance_key_str']} failed.") 12 13 def uploadtomongo(ti, **context): 14 try: 15 hook = MongoHook(mongo_conn_id='mongoid') 16 client = hook.get_conn() 17 db = client.MyDB 18 currency_collection=db.currency_collection 19 print(f"Connected to MongoDB - {client.server_info()}") 20 d=json.loads(context["result"]) 21 currency_collection.insert_one(d) 22 except Exception as e: 23 printf("Error connecting to MongoDB -- {e}") 24 25 with DAG( 26 dag_id="load_currency_data", 27 schedule_interval=None, 28 start_date=datetime(2022,10,28), 29 catchup=False, 30 tags= ["currency"], 31 default_args={ 32 "owner": "Rob", 33 "retries": 2, 34 "retry_delay": timedelta(minutes=5), 35 'on_failure_callback': on_failure_callback 36 } 37 ) as dag: 38 39 t1 = SimpleHttpOperator( 40 task_id='get_currency', 41 method='GET', 42 endpoint='2022-01-01..2022-06-30', 43 headers={"Content-Type": "application/json"}, 44 do_xcom_push=True, 45 dag=dag) 46 47 t2 = PythonOperator( 48 task_id='upload-mongodb', 49 python_callable=uploadtomongo, 50 op_kwargs={"result": t1.output}, 51 dag=dag 52 ) 53 54 t1 >> t2
hook = MongoHook(mongo_conn_id='mongoid')
Os identificadores de conexão e as configurações de conexão que eles representam são definidos na guia Conexões do menu Admin na interface do usuário do Airflow.

Neste exemplo, como estamos nos conectando ao MongoDB e a uma API HTTP, precisamos definir duas conexões. Primeiro, vamos criar a conexão MongoDB clicando no botão "Add a new record ".

Isso apresentará uma página na qual você poderá preencher as informações de conexão. Selecione “MongoDB” no menu suspenso Tipo de conexão e preencha os seguintes campos:
ID de conexão | Mongoid |
Tipo de conexão | MongoDB |
Anfitrião | XXXX..mongodb.net (Coloque seu nome de host do MongoDB Atlas aqui) |
Esquema | MeuDB (por exemplo, o banco de dados no MongoDB) |
login | (Coloque seu nome de usuário do banco de dados aqui) |
Senha | (Coloque aqui a senha do seu banco de dados) |
Extra | {"srv":true} |
Clique em “Save” e “Add a new record” para criar a conexão HTTP API.
Selecione "HTTP " para o Tipo de Conexão e preencha os seguintes campos:
ID de conexão | http_default |
Tipo de conexão | HTTP |
Anfitrião | api.frankfurter.app |
Clique no menu DAGs e depois em “load_currency_data.” Você verá vários subitens que abordam o fluxo de trabalho, como o menu Código que mostra o código Python que compõe o DAG.
Clicar em Grafo mostrará uma representação visual do DAG analisado a partir do código Python.

1 t1 = SimpleHttpOperator( 2 task_id='get_currency', 3 method='GET', 4 endpoint='2022-01-01..2022-06-30', 5 headers={"Content-Type": "application/json"}, 6 do_xcom_push=True, 7 dag=dag)
O Airflow transmite informações entre tarefas usando XComs. Neste exemplo, armazenamos os dados de retorno da chamada de API para XCom. O próximo operador, "upload-mongodb," utiliza o PythonOperator para chamar uma função python, "uploadtomongo."
1 t2 = PythonOperator( 2 task_id='upload-mongodb', 3 python_callable=uploadtomongo, 4 op_kwargs={"result": t1.output}, 5 dag=dag 6 )
Essa função acessa os dados armazenados no XCom e usa o MongoHook para inserir os dados obtidos da chamada de API em um MongoDB cluster.
1 def uploadtomongo(ti, **context): 2 try: 3 hook = MongoHook(mongo_conn_id='mongoid') 4 client = hook.get_conn() 5 db = client.MyDB 6 currency_collection=db.currency_collection 7 print(f"Connected to MongoDB - {client.server_info()}") 8 d=json.loads(context["result"]) 9 currency_collection.insert_one(d) 10 except Exception as e: 11 printf("Error connecting to MongoDB -- {e}")
Embora nosso fluxo de trabalho de exemplo seja simples, execute uma tarefa e, em seguida, outra tarefa.
1 t1 >> t2
O Airflow sobrecarregou o operador bitwise “>>" para descrever o fluxo de tarefas. Para mais informações, consulte “Bitshift Composition.”
O Airflow pode permitir fluxos de trabalho mais complexos, como os seguintes:

A execução da tarefa pode ser condicional com vários caminhos de execução.
O Airflow é mais conhecido por seus recursos de agendamento de fluxo de trabalho, e eles são definidos como parte da definição do DAG.
1 with DAG( 2 dag_id="load_currency_data", 3 schedule=None, 4 start_date=datetime(2022,10,28), 5 catchup=False, 6 tags= ["currency"], 7 default_args={ 8 "owner": "Rob", 9 "retries": 2, 10 "retry_delay": timedelta(minutes=5), 11 'on_failure_callback': on_failure_callback 12 } 13 ) as dag:
O intervalo de agendamento pode ser definido usando uma expressão cron, um timedelta ou uma das predefinições do Airflow, como a usada neste exemplo, "None."
Os DAGs podem ser agendados para começar em uma data no passado. Se você quiser que o Airflow atualize e execute o DAG tantas vezes quanto teriam sido feitas no horário de início e agora, você pode definir a propriedade “catchup”. Observação: “Catchup” é definido como “True,” por padrão, então certifique-se de definir o valor adequadamente.
No nosso exemplo, você pode ver apenas algumas das opções de configuração disponíveis.

Após a execução, você pode clicar no item de menu DAG e Grid para exibir o status do tempo de execução do DAG.

No exemplo acima, o DAG foi executado quatro vezes, todas com sucesso. Você pode visualizar o log de cada etapa clicando na tarefa e, em seguida, em "Log" no menu.

O log é útil para solucionar problemas da tarefa. Aqui podemos ver nossa saída do comando
print(f"Connected to MongoDB - {client.server_info()}")
dentro do PythonOperator.
Depois de executar o DAG, os dados estarão no MongoDB Atlas cluster. Navegando para o cluster, podemos ver que o "currency_collection " foi criado e preenchido com dados monetários.

Em seguida, podemos visualizar os dados usando o MongoDB Charts.
Observe que os dados foram armazenados no MongoDB a partir da API com um subdocumento para cada dia do período determinado. Uma amostra desses dados é a seguinte:
1 { 2 _id: ObjectId("635b25bdcef2d967af053e2c"), 3 amount: 1, 4 base: 'EUR', 5 start_date: '2022-01-03', 6 end_date: '2022-06-30', 7 rates: { 8 '2022-01-03': { 9 AUD: 1.5691, 10 BGN: 1.9558, 11 BRL: 6.3539, 12 … }, 13 }, 14 '2022-01-04': { 15 AUD: 1.5682, 16 BGN: 1.9558, 17 BRL: 6.4174, 18 … }
Com o MongoDB Charts, podemos definir um filtro de pipeline de agregação para transformar os dados em um formato que será otimizado para a criação de gráficos. Por exemplo, considere o seguinte filtro de pipeline de agregação:
1 [{$project:{ 2 rates:{ 3 $objectToArray:"$rates"}}},{ 4 $unwind:"$rates" 5 } 6 ,{ 7 $project:{ 8 _id:0,"date":"$rates.k","Value":"$rates.v"}}]
Isso transforma os dados em subdocumentos que têm dois pares de valores de chave da data e dos valores, respectivamente.
1 { 2 date: '2022-01-03', 3 Value: { 4 AUD: 1.5691, 5 BGN: 1.9558, 6 BRL: 6.3539, 7 … }, 8 { 9 date: '2022-01-04', 10 Value: { 11 AUD: 1.5682, 12 BGN: 1.9558, 13 BRL: 6.4174, 14 ..} 15 }
Podemos adicionar esse filtro de pipeline de agregação ao Charts e criar um gráfico comparando o dólar americano (USD) ao euro (EUR) durante esse período.

O Airflow é um agendador de fluxo de trabalho de código aberto usado por muitas empresas em todo o mundo. Integrar o MongoDB com Airflow é simples usando o MongoHook. O Astronomer facilita a criação rápida de um sistema local do Airflow.O Astronomer também tem um registro que fornece um local central para os operadores do Airflow, incluindo o MongoHook e o MongoSensor.