Usando MongoDB com Apache Airflow
Avalie esse Tutorial
Embora escrever trabalhos cron para executar scripts seja uma maneira de realizar a movimentação de dados, à medida que os fluxos de trabalho se tornam mais complexos, o gerenciamento do agendamento de tarefas se torna muito difícil e propenso a erros. É aqui que o Apache Airflow entra. O Airflow é um sistema de gerenciamento de fluxo de trabalho de código aberto, originalmente projetado pelo Airbnb em 2015. Com o Airflow, você pode programaticamente criar, programar e monitorar pipelines de dados complexos. O Airflow é usado em muitos casos de uso com o MongoDB, incluindo:
- Pipelines de aprendizado de máquina.
- Automação de operações de administração de banco de dados.
- Movimentação de dados em lote.
Neste post, você aprenderá os fundamentos de como aproveitar o MongoDB dentro de um pipeline do Airflow.
O Apache Airflow consiste em várias etapas de instalação, incluindo a instalação de um banco de dados e de um servidor da Web. Embora seja possível seguir o script de instalação e configurar o banco de dados e os serviços, a maneira mais fácil de começar a usar o Airflow é usar a CLI do Astronomer. Essa CLI monta um ambiente completo de docker do Airflow a partir de uma única linha de comando.
Da mesma forma, a maneira mais fácil de criar um cluster do MongoDB é com o MongoDB Atlas. O Atlas não é apenas um cluster hospedado do MongoDB. Em vez disso, é um conjunto integrado de banco de dados e serviços de dados na nuvem que permitem criar aplicativos rapidamente. O Atlas Data Federation é um serviço de processamento de queries nativo na nuvem que permite aos usuários criar uma coleção virtual a partir de fontes de dados heterogêneas, como buckets do Amazon S3 , clusters do MongoDB e endpoints da API HTTP. Uma vez definido, o usuário simplesmente emite uma query para obter dados combinados a partir dessas fontes.
Por exemplo, considere um cenário em que você estava movendo dados com um Airflow DAG para o MongoDB e queria ingressar no armazenamento de objeto na nuvem - dados do Amazon S3 ou do Microsoft Azure Blob Storage com o MongoDB como parte de uma aplicação de análise de dados. Usando o MongoDB Atlas Data Federation, você cria uma coleção que contém um cluster do MongoDB e uma coleção de armazenamento de objeto na nuvem. Agora, tudo o que sua aplicação precisa fazer é emitir uma única query e o Atlas se encarrega de unir os dados heterogêneos. Esse recurso e outros como o MongoDB Charts, que veremos mais adiante neste post, aumentarão sua produtividade e aprimorarão sua solução de Airflow. Para saber mais sobre o MongoDB Atlas Data Federation, confira o webinar MongoDB.live no Youtube, "Help You Data Flow with Atlas Data Lake". Para uma visão geral do MongoDB Atlas, confiraIntrodução ao MongoDB Atlas em 10 minutos | Jumpstart, disponível no Youtube.
Nesta publicação, criaremos um fluxo de trabalho do Airflow que consulta um endpoint HTTP para obter uma lista histórica de valores de moedas em relação ao dólar. Os dados serão então inseridos no MongoDB usando o MongoHook e um gráfico será criado usando os MongoDB Charts. No Airflow, um hook é uma interface para uma plataforma externa ou banco de dados como o MongoDB. O MongoHook envolve o PyMongo Python Driver para MongoDB, desbloqueando todos os recursos do driver em um fluxo de trabalho do Airflow.
Se você ainda não tiver um ambiente Airflow disponível, instale o Astro CLI. Depois de instalado, crie um diretório para o projeto chamado “currency.”
mkdir currency && cd currency
Em seguida, crie o ambiente do Airflow usando a CLI do Astro.
astro dev init
Este comando criará uma estrutura de pasta que inclui uma pasta para DAGs, um Dockerfile e outros arquivos de suporte usados para personalizações.
Os provedores ajudam o Airflow a interagir com sistemas externos. Para adicionar um provedor, modifique o arquivo requirements.txt e adicione o provedor MongoDB.
echo “apache-airflow-providers-mongo==3.0.0” > > requirements.txt
Finalmente, inicie o projeto Airflow.
astro dev start
Esse comando simples iniciará e configurará os quatro contêineres do docker necessários para o Airflow: um servidor web, um agendador, um trigger e um banco de dados Postgres, respectivamente.
Astro dev restart
Nota: você também pode instalar manualmente o provedor do MongoDB usando PyPi se não estiver usando o Astro CLI.
Observação: o provedor HTTP já está instalado como parte do tempo de execução do Astro. Se você não tiver usado o Astro, precisará instalar o provedor HTTP.
Um dos componentes instalados com o Airflow é um servidor web. Isso é usado como o principal portal operacional para fluxos de trabalho do Airflow. Para acessar, abra um navegador e navegue até http://localhost:8080. Dependendo de como você instalou o Airflow, você poderá ver exemplos de DAGs já preenchidos. Os fluxos de trabalho do Airflow são chamados de DAGs (Directed Acycle Graphs) e podem ser qualquer coisa, desde os pipelines de agendamento de tarefas mais básicos até ETL mais complexos, aprendizado de máquina ou fluxos de trabalho preditivos do pipeline de dados, como detecção de fraudes. Esses DAGs são scripts Python que oferecem aos desenvolvedores controle total do fluxo de trabalho. Os DAGs podem ser acionados manualmente por meio de uma chamada de API ou da interface do usuário da Web. Os DAGs também podem ser agendados para execução única, recorrente ou em qualquer configuração semelhante ao cron.
Vamos começar a explorar o Airflow criando um arquivo Python “currency.py, " dentro da pasta dags usando seu editor favorito.
Veja a seguir o código-fonte completo do DAG.
1 import os 2 import json 3 from airflow import DAG 4 from airflow.operators.python import PythonOperator 5 from airflow.operators.bash import BashOperator 6 from airflow.providers.http.operators.http import SimpleHttpOperator 7 from airflow.providers.mongo.hooks.mongo import MongoHook 8 from datetime import datetime,timedelta 9 10 def on_failure_callback(**context): 11 print(f"Task {context['task_instance_key_str']} failed.") 12 13 def uploadtomongo(ti, **context): 14 try: 15 hook = MongoHook(mongo_conn_id='mongoid') 16 client = hook.get_conn() 17 db = client.MyDB 18 currency_collection=db.currency_collection 19 print(f"Connected to MongoDB - {client.server_info()}") 20 d=json.loads(context["result"]) 21 currency_collection.insert_one(d) 22 except Exception as e: 23 printf("Error connecting to MongoDB -- {e}") 24 25 with DAG( 26 dag_id="load_currency_data", 27 schedule_interval=None, 28 start_date=datetime(2022,10,28), 29 catchup=False, 30 tags= ["currency"], 31 default_args={ 32 "owner": "Rob", 33 "retries": 2, 34 "retry_delay": timedelta(minutes=5), 35 'on_failure_callback': on_failure_callback 36 } 37 ) as dag: 38 39 t1 = SimpleHttpOperator( 40 task_id='get_currency', 41 method='GET', 42 endpoint='2022-01-01..2022-06-30', 43 headers={"Content-Type": "application/json"}, 44 do_xcom_push=True, 45 dag=dag) 46 47 t2 = PythonOperator( 48 task_id='upload-mongodb', 49 python_callable=uploadtomongo, 50 op_kwargs={"result": t1.output}, 51 dag=dag 52 ) 53 54 t1 >> t2
Ao observar o código, observe que não há strings de conexão no arquivo Python. Os identificadores de conexão, conforme mostrado no trecho de código abaixo, são espaços reservados para strings de conexão.
hook = MongoHook(mongo_conn_id='mongoid')
Os identificadores de conexão e as configurações de conexão que eles representam são definidos na guia Conexões do menu Admin na interface do usuário do Airflow.
Neste exemplo, como estamos nos conectando ao MongoDB e a uma API HTTP, precisamos definir duas conexões. Primeiro, vamos criar a conexão MongoDB clicando no botão "Add a new record ".
Isso apresentará uma página na qual você poderá preencher as informações de conexão. Selecione “MongoDB” no menu suspenso Tipo de conexão e preencha os seguintes campos:
ID de conexão | Mongoid |
Tipo de conexão | MongoDB |
Anfitrião | XXXX..mongodb.net(Coloque seu nome de host do MongoDB Atlas aqui) |
Esquema | MeuDB (por exemplo, o banco de dados no MongoDB) |
login | (Coloque seu nome de usuário do banco de dados aqui) |
Senha | (Coloque aqui a senha do seu banco de dados) |
Extra | {"srv": true} |
Clique em “Save” e “Add a new record” para criar a conexão HTTP API.
Selecione "HTTP " para o Tipo de Conexão e preencha os seguintes campos:
ID de conexão | http_default |
Tipo de conexão | HTTP |
Anfitrião | api.frankfurter.app |
Observação: as strings de conexão também podem ser armazenadas em variáveis de ambiente ou armazenadas com segurança usando um back-end de segredos externo, como o HashiCorp Vault ou o AWS SSM Parameter Store.
Clique no menu DAGs e depois em “load_currency_data.” Você verá vários subitens que abordam o fluxo de trabalho, como o menu Código que mostra o código Python que compõe o DAG.
Clicar em Grafo mostrará uma representação visual do DAG analisado a partir do código Python.
In our example, “get_currency” uses the SimpleHttpOperator to obtain a historical list of currency values versus the Euro.
1 t1 = SimpleHttpOperator( 2 task_id='get_currency', 3 method='GET', 4 endpoint='2022-01-01..2022-06-30', 5 headers={"Content-Type": "application/json"}, 6 do_xcom_push=True, 7 dag=dag)
O Airflow passa informações entre tarefas usando XComs. Neste exemplo, armazenamos os dados de retorno da chamada da API para o XCom. O próximo operador, "upload-mongodb,", usa o PythonOperator para chamar uma função python, "uploadtomongo."
1 t2 = PythonOperator( 2 task_id='upload-mongodb', 3 python_callable=uploadtomongo, 4 op_kwargs={"result": t1.output}, 5 dag=dag 6 )
Essa função acessa os dados armazenados no XCom e usa o MongoHook para inserir os dados obtidos da chamada de API em um MongoDB cluster.
1 def uploadtomongo(ti, **context): 2 try: 3 hook = MongoHook(mongo_conn_id='mongoid') 4 client = hook.get_conn() 5 db = client.MyDB 6 currency_collection=db.currency_collection 7 print(f"Connected to MongoDB - {client.server_info()}") 8 d=json.loads(context["result"]) 9 currency_collection.insert_one(d) 10 except Exception as e: 11 printf("Error connecting to MongoDB -- {e}")
Embora nosso fluxo de trabalho de exemplo seja simples, execute uma tarefa e, em seguida, outra tarefa.
1 t1 >> t2
O Airflow sobrecarregou o operador bit a bit “>>” para descrever o fluxo de tarefas. Para obter mais informações, consulte “Bitshift Composition.”
O Airflow pode permitir fluxos de trabalho mais complexos, como os seguintes:
A execução da tarefa pode ser condicional com vários caminhos de execução.
O Airflow é mais conhecido por seus recursos de agendamento de fluxo de trabalho, e eles são definidos como parte da definição do DAG.
1 with DAG( 2 dag_id="load_currency_data", 3 schedule=None, 4 start_date=datetime(2022,10,28), 5 catchup=False, 6 tags= ["currency"], 7 default_args={ 8 "owner": "Rob", 9 "retries": 2, 10 "retry_delay": timedelta(minutes=5), 11 'on_failure_callback': on_failure_callback 12 } 13 ) as dag:
O intervalo de agendamento pode ser definido usando uma expressão cron, um timedelta ou uma das predefinições do AIrflow, como a usada neste exemplo, "None."
Os DAGs podem ser agendados para começar em uma data no passado. Se você quiser que o Airflow atualize e execute o DAG tantas vezes quanto teriam sido feitas no horário de início e agora, você pode definir a propriedade “catchup”. Observação: “Catchup” é definido como “True,” por padrão, então certifique-se de definir o valor adequadamente.
No nosso exemplo, você pode ver apenas algumas das opções de configuração disponíveis.
Após a execução, você pode clicar no item de menu DAG e Grid para exibir o status do tempo de execução do DAG.
No exemplo acima, o DAG foi executado quatro vezes, todas com sucesso. Você pode visualizar o log de cada etapa clicando na tarefa e, em seguida, em "Log" no menu.
O log é útil para solucionar problemas da tarefa. Aqui podemos ver nossa saída do comando
print(f"Connected to MongoDB - {client.server_info()}")
dentro do PythonOperator.Depois de executar o DAG, os dados estarão no MongoDB Atlas cluster. Navegando para o cluster, podemos ver que o "currency_collection " foi criado e preenchido com dados monetários.
Em seguida, podemos visualizar os dados usando o MongoDB Charts.
Observe que os dados foram armazenados no MongoDB a partir da API com um subdocumento para cada dia do período determinado. Uma amostra desses dados é a seguinte:
1 { 2 _id: ObjectId("635b25bdcef2d967af053e2c"), 3 amount: 1, 4 base: 'EUR', 5 start_date: '2022-01-03', 6 end_date: '2022-06-30', 7 rates: { 8 '2022-01-03': { 9 AUD: 1.5691, 10 BGN: 1.9558, 11 BRL: 6.3539, 12 … }, 13 }, 14 '2022-01-04': { 15 AUD: 1.5682, 16 BGN: 1.9558, 17 BRL: 6.4174, 18 … }
Com o MongoDB Charts, podemos definir um filtro de pipeline de agregação para transformar os dados em um formato que será otimizado para a criação de gráficos. Por exemplo, considere o seguinte filtro de pipeline de agregação:
1 [{$project:{ 2 rates:{ 3 $objectToArray:"$rates"}}},{ 4 $unwind:"$rates" 5 } 6 ,{ 7 $project:{ 8 _id:0,"date":"$rates.k","Value":"$rates.v"}}]
Isso transforma os dados em subdocumentos que têm dois pares de valores de chave da data e dos valores, respectivamente.
1 { 2 date: '2022-01-03', 3 Value: { 4 AUD: 1.5691, 5 BGN: 1.9558, 6 BRL: 6.3539, 7 … }, 8 { 9 date: '2022-01-04', 10 Value: { 11 AUD: 1.5682, 12 BGN: 1.9558, 13 BRL: 6.4174, 14 ..} 15 }
Podemos adicionar esse filtro de pipeline de agregação ao Charts e criar um gráfico comparando o dólar americano (USD) ao euro (EUR) durante esse período.
For more information on MongoDB Charts, check out the YouTube video “Intro to MongoDB Charts (demo)” for a walkthrough of the feature.
O Airflow é um agendador de fluxo de trabalho de código aberto usado por muitas empresas em todo o mundo. A integração do MongoDB com o Airflow é simples usando o MongoHook. O Astronomer facilita a criação rápida de uma implantação local do Airflow. O Astronomer também tem um registro que fornece um local central para os operadores do Airflow, incluindo o MongoHook e o MongoSensor.