A Voyage AI se une ao MongoDB para impulsionar aplicativos de AI mais precisos e confiáveis no Atlas.

Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Usando MongoDB com Apache Airflow

Robert Walters8 min read • Published Nov 15, 2022 • Updated Jun 12, 2023
Facebook Icontwitter iconlinkedin icon
Using MongoDB with Apache Airflow
Avaliar este tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Embora escrever tarefas cron para executar scripts seja uma maneira de realizar a movimentação de dados, à medida que os fluxos de trabalho se tornam mais complexos, gerenciar o agendamento de tarefas se torna muito difícil e sujeito a erros. É aqui que o Apache Airflow se destaca. O Airflow é um sistema de gerenciamento de fluxo de trabalho originalmente projetado pelo Airbnb e passou a ser de código aberto em 2015. Com o Airflow, você pode criar, agendar e monitorar pipelines de dados complexos pelo código de programação. O Airflow é usado em muitos casos de uso com o MongoDB, como:
  • Pipelines de aprendizado de máquina.
  • Automação de operações de administração de banco de dados.
  • Movimentação de dados em lote.
Neste post, você aprenderá os fundamentos de como aproveitar o MongoDB dentro de um pipeline do Airflow.

Começar

O Apache Airflow consiste em várias etapas de instalação, incluindo a instalação de um banco de dados e de um servidor da Web. Embora seja possível seguir o script de instalação e configurar o banco de dados e os serviços, a maneira mais fácil de começar a usar o Airflow é usar a CLI do Astronomer. Essa CLI monta um ambiente completo de docker do Airflow a partir de uma única linha de comando.
Da mesma forma, a maneira mais fácil de manter um MongoDB cluster é com o MongoDB Atlas. O Atlas não é apenas um MongoDB cluster hospedado. Na verdade, ele é um conjunto integrado de banco de dados em nuvem e serviços de dados que permitem que você crie rapidamente seus aplicativos Um serviço, o Atlas Data Federation, é um serviço de processamento de queries nativo da nuvem que permite aos usuários criar uma coleção virtual a partir de fontes de dados heterogêneas, como buckets do Amazon S3, clusters do MongoDB e endpoints de API HTTP. Uma vez definido, o usuário simplesmente emite uma query para obter dados combinados dessas fontes.
Por exemplo, considere um cenário em que você estava movendo dados com um Airflow DAG para o MongoDB e queria ingressar no armazenamento de objetos na nuvem (Amazon S3 ou dados do Microsoft Azure Blob Storage com o MongoDB como parte de um aplicativo de análise de dados). Usando o MongoDB Atlas Data Federation, você cria uma coleção virtual que contém um MongoDB cluster e uma coleção de armazenamento de objetos na nuvem. Agora, basta seu aplicativo emitir uma única query e o Atlas se encarrega de unir os dados heterogêneos. Esse recurso e outros como MongoDB Charts, que veremos mais tarde nesta publicação, aumentarão sua produtividade e aprimorarão sua solução do Airflow. Para saber mais sobre o MongoDB Atlas Data Federation, confira o webinar ao vivo no YouTube sobre o MongoDB, Help Your Data Flow with Atlas Data Lake. Para uma visão geral do MongoDB Atlas, confira Introdução ao MongoDB Atlas em 10 minutos | Início, disponível no YouTube.

Moeda ao longo do tempo

Nesta publicação, criaremos um fluxo de trabalho do Airflow que executa query de um endpoint HTTP para obter uma lista histórica de valores de moeda em relação ao dólar. Os dados serão então inseridos no MongoDB usando o MongoHook e um gráfico será criado usando os MongoDB Charts. No Airflow, um hook é uma interface para uma plataforma externa ou banco de dados como o MongoDB. O MongoHook envolve o PyMongo Python Driver para MongoDB, desbloqueando todos os recursos do driver em um fluxo de trabalho do Airflow.

Etapa 1: iniciar o ambiente do Airflow

Se você ainda não tiver um ambiente Airflow disponível, instale o Astro CLI. Depois de instalado, crie um diretório para o projeto chamado “currency.”
mkdir currency && cd currency
Em seguida, crie o ambiente do Airflow usando a CLI do Astro.
astro dev init
Este comando criará uma estrutura de pasta que inclui uma pasta para DAGs, um Dockerfile e outros arquivos de suporte usados para personalizações.

Etapa 2: instalar o provedor MongoDB Airflow

Os provedores ajudam o Airflow a interagir com sistemas externos. Para adicionar um provedor, modifique o arquivo requirements.txt e adicione o provedor MongoDB.
echo “apache-airflow-providers-mongo==3.0.0” > > requirements.txt
Finalmente, inicie o projeto Airflow.
astro dev start
Esse comando simples iniciará e configurará os quatro contêineres do docker necessários para o Airflow: um servidor web, um agendador, um trigger e um banco de dados Postgres, respectivamente.
Astro dev restart
Observação: você também pode instalar manualmente o provedor MongoDB usando PyPi se não estiver usando o Astro CLI.
Observação: o provedor HTTP já está instalado como parte do tempo de execução do Astro. Se você não usou o Astro, será necessário instalar o provedor HTTP.

Etapa 3: criação o fluxo de trabalho DAG

Um dos componentes que é instalado com o Airflow é um servidor web. Isso é usado como o principal portal operacional para fluxos de trabalho do Airflow. Para acessar, abra um navegador e navegue até http://localhost:8080. Dependendo de como você instalou o Airflow, você pode ver exemplos de DAGs já preenchidos. Os fluxos de trabalho do Airflow são chamados de DAGs (gráficos acíclicos direcionados) e podem ser qualquer coisa, desde os pipelines de agendamento de trabalho mais básicos até fluxos de trabalho mais complexos de ETL, aprendizado de máquina ou pipeline de dados preditivos, como detecção de fraude. Esses DAGs são scripts Python que dão aos desenvolvedores controle total do fluxo de trabalho. Os DAGs podem ser acionados manualmente por meio de uma chamada de API ou da interface do usuário da Web. Os DAGs também podem ser agendados para execução uma vez, de forma recorrente ou em qualquer configuração de tipo cron.
Vamos começar a explorar o Airflow criando um arquivo Python “currency.py, " dentro da pasta dags usando seu editor favorito.
Veja a seguir o código-fonte completo do DAG.
1import os
2import json
3from airflow import DAG
4from airflow.operators.python import PythonOperator
5from airflow.operators.bash import BashOperator
6from airflow.providers.http.operators.http import SimpleHttpOperator
7from airflow.providers.mongo.hooks.mongo import MongoHook
8from datetime import datetime,timedelta
9
10def on_failure_callback(**context):
11 print(f"Task {context['task_instance_key_str']} failed.")
12
13def uploadtomongo(ti, **context):
14 try:
15 hook = MongoHook(mongo_conn_id='mongoid')
16 client = hook.get_conn()
17 db = client.MyDB
18 currency_collection=db.currency_collection
19 print(f"Connected to MongoDB - {client.server_info()}")
20 d=json.loads(context["result"])
21 currency_collection.insert_one(d)
22 except Exception as e:
23 printf("Error connecting to MongoDB -- {e}")
24
25with DAG(
26 dag_id="load_currency_data",
27 schedule_interval=None,
28 start_date=datetime(2022,10,28),
29 catchup=False,
30 tags= ["currency"],
31 default_args={
32 "owner": "Rob",
33 "retries": 2,
34 "retry_delay": timedelta(minutes=5),
35 'on_failure_callback': on_failure_callback
36 }
37) as dag:
38
39 t1 = SimpleHttpOperator(
40 task_id='get_currency',
41 method='GET',
42 endpoint='2022-01-01..2022-06-30',
43 headers={"Content-Type": "application/json"},
44 do_xcom_push=True,
45 dag=dag)
46
47 t2 = PythonOperator(
48 task_id='upload-mongodb',
49 python_callable=uploadtomongo,
50 op_kwargs={"result": t1.output},
51 dag=dag
52 )
53
54 t1 >> t2

Etapa 4: configurar conexões

Ao observar o código, perceba que não há strings de conexão no arquivo Python. Os identificadores de conexão, conforme mostrado no trecho de código abaixo, são espaços reservados para strings de conexão.
hook = MongoHook(mongo_conn_id='mongoid')
Os identificadores de conexão e as configurações de conexão que eles representam são definidos na guia Conexões do menu Admin na interface do usuário do Airflow.
admin menu item drop down list
Neste exemplo, como estamos nos conectando ao MongoDB e a uma API HTTP, precisamos definir duas conexões. Primeiro, vamos criar a conexão MongoDB clicando no botão "Add a new record ".
Add a new record button
Isso apresentará uma página na qual você poderá preencher as informações de conexão. Selecione “MongoDB” no menu suspenso Tipo de conexão e preencha os seguintes campos:
ID de conexãoMongoid
Tipo de conexãoMongoDB
AnfitriãoXXXX..mongodb.net

(Coloque seu nome de host do MongoDB Atlas aqui)
EsquemaMeuDB

(por exemplo, o banco de dados no MongoDB)
login(Coloque seu nome de usuário do banco de dados aqui)
Senha(Coloque aqui a senha do seu banco de dados)
Extra{"srv":true}
Clique em “Save” e “Add a new record” para criar a conexão HTTP API.
Selecione "HTTP " para o Tipo de Conexão e preencha os seguintes campos:
ID de conexãohttp_default
Tipo de conexãoHTTP
Anfitriãoapi.frankfurter.app
Observação: as strings de conexão também podem ser armazenadas em variáveis de ambiente ou armazenadas com segurança usando um back-end de segredos externo, como o HashiCorp Vault ou o AWS SSM Parameter Store.

Etapa 5: o fluxo de trabalho DAG

Clique no menu DAGs e depois em “load_currency_data.”  Você verá vários subitens que abordam o fluxo de trabalho, como o menu Código que mostra o código Python que compõe o DAG.
Clicar em Grafo mostrará uma representação visual do DAG analisado a partir do código Python.
graph view of DAG workflow Em nosso exemplo, “get_currency” usa o SimpleHttpOperator para obter uma lista histórica de valores de moeda em relação ao Euro.
1t1 = SimpleHttpOperator(
2 task_id='get_currency',
3 method='GET',
4 endpoint='2022-01-01..2022-06-30',
5 headers={"Content-Type": "application/json"},
6 do_xcom_push=True,
7 dag=dag)
O Airflow transmite informações entre tarefas usando XComs. Neste exemplo, armazenamos os dados de retorno da chamada de API para XCom. O próximo operador, "upload-mongodb," utiliza o PythonOperator para chamar uma função python, "uploadtomongo."
1t2 = PythonOperator(
2 task_id='upload-mongodb',
3 python_callable=uploadtomongo,
4 op_kwargs={"result": t1.output},
5 dag=dag
6 )
Essa função acessa os dados armazenados no XCom e usa o MongoHook para inserir os dados obtidos da chamada de API em um MongoDB cluster.
1def uploadtomongo(ti, **context):
2 try:
3 hook = MongoHook(mongo_conn_id='mongoid')
4 client = hook.get_conn()
5 db = client.MyDB
6 currency_collection=db.currency_collection
7 print(f"Connected to MongoDB - {client.server_info()}")
8 d=json.loads(context["result"])
9 currency_collection.insert_one(d)
10 except Exception as e:
11 printf("Error connecting to MongoDB -- {e}")
Embora nosso fluxo de trabalho de exemplo seja simples, execute uma tarefa e, em seguida, outra tarefa.
1t1 >> t2
O Airflow sobrecarregou o operador bitwise “>>" para descrever o fluxo de tarefas. Para mais informações, consulte “Bitshift Composition.”
O Airflow pode permitir fluxos de trabalho mais complexos, como os seguintes:
Graph view of DAG workflow
A execução da tarefa pode ser condicional com vários caminhos de execução.

Etapa 6: agendamento do DAG

O Airflow é mais conhecido por seus recursos de agendamento de fluxo de trabalho, e eles são definidos como parte da definição do DAG.
1with DAG(
2 dag_id="load_currency_data",
3 schedule=None,
4 start_date=datetime(2022,10,28),
5 catchup=False,
6 tags= ["currency"],
7 default_args={
8 "owner": "Rob",
9 "retries": 2,
10 "retry_delay": timedelta(minutes=5),
11 'on_failure_callback': on_failure_callback
12 }
13) as dag:
intervalo de agendamento pode ser definido usando uma expressão cron, um timedelta ou uma das predefinições do Airflow, como a usada neste exemplo, "None."
Os DAGs podem ser agendados para começar em uma data no passado. Se você quiser que o Airflow atualize e execute o DAG tantas vezes quanto teriam sido feitas no horário de início e agora, você pode definir a propriedade “catchup”. Observação: “Catchup” é definido como “True,” por padrão, então certifique-se de definir o valor adequadamente.
No nosso exemplo, você pode ver apenas algumas das opções de configuração disponíveis.

Etapa 7: executando o DAG

Você pode executar um DAG ad-hoc pela web usando o botão "play" na coluna de ação.
actions menu options
Após a execução, você pode clicar no item de menu DAG e Grid para exibir o status do tempo de execução do DAG.
view of dag execution
No exemplo acima, o DAG foi executado quatro vezes, todas com sucesso. Você pode visualizar o log de cada etapa clicando na tarefa e, em seguida, em "Log" no menu.
showing log details view
O log é útil para solucionar problemas da tarefa. Aqui podemos ver nossa saída do comando print(f"Connected to MongoDB - {client.server_info()}") dentro do PythonOperator.
log by attempts

Etapa 8: explorar os dados no MongoDB Atlas

Depois de executar o DAG, os dados estarão no MongoDB Atlas cluster. Navegando para o cluster, podemos ver que o "currency_collection " foi criado e preenchido com dados monetários.
MongoDB Atlas collection view

Etapa 9: Visualizando os dados usando MongoDB Charts

Em seguida, podemos visualizar os dados usando o MongoDB Charts.
Observe que os dados foram armazenados no MongoDB a partir da API com um subdocumento para cada dia do período determinado. Uma amostra desses dados é a seguinte:
1{
2 _id: ObjectId("635b25bdcef2d967af053e2c"),
3 amount: 1,
4 base: 'EUR',
5 start_date: '2022-01-03',
6 end_date: '2022-06-30',
7 rates: {
8 '2022-01-03': {
9 AUD: 1.5691,
10 BGN: 1.9558,
11 BRL: 6.3539,
12… },
13},
14 '2022-01-04': {
15 AUD: 1.5682,
16 BGN: 1.9558,
17 BRL: 6.4174,
18… }
Com o MongoDB Charts, podemos definir um filtro de pipeline de agregação para transformar os dados em um formato que será otimizado para a criação de gráficos. Por exemplo, considere o seguinte filtro de pipeline de agregação:
1[{$project:{
2 rates:{
3 $objectToArray:"$rates"}}},{
4 $unwind:"$rates"
5 }
6 ,{
7 $project:{
8 _id:0,"date":"$rates.k","Value":"$rates.v"}}]
Isso transforma os dados em subdocumentos que têm dois pares de valores de chave da data e dos valores, respectivamente.
1{
2 date: '2022-01-03',
3 Value: {
4 AUD: 1.5691,
5 BGN: 1.9558,
6 BRL: 6.3539,
7… },
8 {
9 date: '2022-01-04',
10 Value: {
11 AUD: 1.5682,
12 BGN: 1.9558,
13 BRL: 6.4174,
14..}
15}
Podemos adicionar esse filtro de pipeline de agregação ao Charts e criar um gráfico comparando o dólar americano (USD) ao euro (EUR) durante esse período.
We can add this aggregation pipeline filter into Charts and build out a chart comparing the US dollar (USD) to the Euro (EUR) over this time period. Para obter mais informações sobre o MongoDB Charts, assista ao vídeo do YouTube "Intro to MongoDB Charts (demo)" para obter uma explicação passo a passo do recurso.

Resumo

O Airflow é um agendador de fluxo de trabalho de código aberto usado por muitas empresas em todo o mundo. Integrar o MongoDB com Airflow é simples usando o MongoHook. O Astronomer facilita a criação rápida de um sistema local do Airflow.O Astronomer também tem um registro que fornece um local central para os operadores do Airflow, incluindo o MongoHook e o MongoSensor. 

Recursos úteis

Saiba mais sobre oAstronomer e consulte a documentação do MongoHook.

Facebook Icontwitter iconlinkedin icon
Avaliar este tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Modernize seus modelos de dados de seguros com o MongoDB Relational Migrator


Jun 03, 2024 | 14 min read
Tutorial

Design de esquema do MongoDB: melhores práticas de modelagem de dados


Oct 01, 2024 | 11 min read
Início rápido

Como criar um aplicativo CRUD com MongoDB, Quarkus e GraalVM


Aug 29, 2024 | 7 min read
Tutorial

Spring Data Unlocked: Começando com Java e MongoDB


Nov 11, 2024 | 5 min read