Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Learn why MongoDB was selected as a leader in the 2024 Gartner® Magic Quadrant™
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Conectoreschevron-right

Vá para o MongoDB usando conectores Kafka - Guia final do agente

Pavel Duchovny7 min read • Published Feb 08, 2022 • Updated Sep 17, 2024
KafkaConectoresGo
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Go é uma linguagem moderna baseada em conceitos de compilação de código nativo e digitado, ao mesmo tempo em que sente e utiliza alguns benefícios das linguagens dinâmicas. É bastante simples de instalar e usar, pois fornece código legível e robusto para muitos casos de uso de aplicativos.
Um desses casos de uso é a criação de agentes que se reportam a uma plataforma de dados centralizada por meio de streaming. Uma abordagem amplamente aceita é comunicar os dados do agente por meio da inscrição de filas distribuídas como o Kafka. Os tópicos do Kafka podem então propagar os dados para muitas fontes diferentes, como um clusterdo MongoDB Atlas .
Ter um agente Go nos permite utilizar a mesma base de código para vários sistemas operacionais, e o fato de ele ter uma boa integração com dados e pacotes JSON, como um driver MongoDB e oConfluent Go Kafka Client, o torna um candidato tentador para o caso de uso apresentado .
Este artigo demonstrará como os dados de tamanho de arquivo em um host são monitorados a partir de um agente multiplataforma escrito em Go por meio de um cluster Kafka usando um connector de coletor hospedado pelo Confluent para o MongoDB Atlas. O MongoDB Atlas armazena os dados em uma coleção de séries temporais. O produtoMongoDB Charts é uma maneira conveniente de mostrar os dados coletados para o usuário. Visão geral da arquitetura

Preparando o projeto Golang, cluster Kafka e MongoDB Atlas

Configurando um projeto Go

Nosso agente vai executar o Go. Portanto, você precisará instalar o software de idioma Go no seu host.
Quando esta etapa for concluída, criaremos um módulo Go para iniciar nosso projeto em nosso diretório de trabalho:
1go mod init example/main
Agora precisaremos adicionar a dependência do Confluent Kafka ao nosso projeto Golang:
1go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

Configurar um cluster Kafka

A criação de um Confluent Kafka Cluster é feita por meio da UI do Confluent. Comece criando um cluster básico do Kafka no Confluent Cloud. Quando estiver pronto, crie um tópico para ser usado no cluster Kafka. Criei um chamado "files. "
Gere uma chave de API e um segredo de API para interagir com esse cluster Kafka. Para simplificar este tutorial, selecionei a api-key “Global Access. Para produção, recomenda-se fornecer o mínimo de permissões possível para a chave de API usada. Obtenha as chaves geradas para uso futuro.
Obtenha a connection string do cluster Kafka por meio de Visão Geral do Cluster > Configurações do Cluster > Identificação > Servidor Bootstrap para uso futuro. Os clusters básicos estão abertos à Internet e, em produção, você precisará alterar a lista de acesso para que seus hosts específicos se conectem ao cluster por meio de ACLs de cluster avançadas.
Importante: o connector Confluent exige que o Kafka cluster e o Atlas cluster sejam implantados na mesma região.

Configurando o projeto e cluster do Atlas

Crie um projeto e cluster ou utilize um Atlas cluster existente em seu projeto. Atlas Cluster Como estamos usando uma coleção de séries temporais, os clusters devem usar um 5.0versão +. Prepare seu cluster Atlas para uma conexãoAtlas de coletor Confluent. Dentro da lista de acesso do seu projeto, habilite o usuário e endereços IP relevantes dos seus IPs do conector. Os IPs da lista de acesso devem estar associados ao Atlas Sink Connector, que configuraremos em uma seção seguinte.Por fim, controle aconnection stringdo Atlas e o DNS do cluster principal. Para obter mais informações sobre a melhor proteção e a obtenção dos IPs relevantes do seu conector Confluent, leia o seguinte artigo: Conector do MongoDB Atlas Sink para a Confluent Cloud.

Adicionando lógica principal do agente

Agora que temos nosso cluster Kafka e clusters Atlas criados e preparados, podemos inicializar nosso código de agente criando um pequeno arquivo principal que monitorará meu diretório./files e capturará os nomes e tamanhos dos arquivos.Adicionei um arquivo chamadotest.txt com alguns dados para trazê-lo para ~200MB.
Vamos criar um arquivo chamado main.go e escrever uma pequena lógica que executa um loop constante com um 1 min de sono para percorrer os arquivos na pastafiles :
1package main
2
3import (
4 "fmt"
5 "encoding/json"
6 "time"
7 "os"
8 "path/filepath"
9)
10
11type Message struct {
12 Name string
13 Size float64
14 Time int64
15}
16
17func samplePath (startPath string) error {
18 err := filepath.Walk(startPath,
19 func(path string, info os.FileInfo, err error) error {
20
21 var bytes int64
22 bytes = info.Size()
23
24
25 var kilobytes int64
26 kilobytes = (bytes / 1024)
27
28 var megabytes float64
29 megabytes = (float64)(kilobytes / 1024) // cast to type float64
30
31 var gigabytes float64
32 gigabytes = (megabytes / 1024)
33
34 now := time.Now().Unix()*1000
35
36
37
38
39 m := Message{info.Name(), gigabytes, now}
40 value, err := json.Marshal(m)
41
42
43 if err != nil {
44 panic(fmt.Sprintf("Failed to parse JSON: %s", err))
45 }
46
47
48 fmt.Printf("value: %v\n", string(value))
49 return nil;
50 })
51 if err != nil {
52 return err
53 }
54 return nil;
55}
56
57func main() {
58 for {
59 err := samplePath("./files");
60 if err != nil {
61 panic(fmt.Sprintf("Failed to run sample : %s", err))
62 }
63 time.Sleep(time.Minute)
64 }
65
66}
O código acima simplesmente importa módulos auxiliares para percorrer os diretórios e documentos JSON a partir dos arquivos encontrados.
Como precisamos que os dados sejam marcados com o tempo da amostra, eles são uma ótima opção para dados de séries temporais e, portanto, devem ser armazenados em uma coleção de séries temporais no Atlas. Se quiser saber mais sobre a coleta de dados de séries temporais, leia nosso artigoDados de séries temporais do MongoDB.
Podemos testar este agente executando o seguinte comando:
1go run main.go
O agente produzirá documentos JSON semelhantes ao seguinte formato:
1value: {"Name":"files","Size":0,"Time":1643881924000}
2value: {"Name":"test.txt","Size":0.185546875,"Time":1643881924000}

Criando um connector Confluent MongoDB para Kafka

Agora vamos criar um Kafka Sink connector para escrever os dados que chegam no tópico "files " para a coleção de séries temporais do nosso Atlas Cluster.
O Confluent Cloud tem uma integração muito popular que executa o Kafka Connector do MongoDB como uma solução hospedada integrada a seus clusters Kafka. Siga estas etapas para iniciar um sistema de connector.
A seguir estão as entradas fornecidas ao connector: configuração connector 1 configuração connector 2 configuração connector 3 Depois de configurá-lo, seguindo o guia, você eventualmente terá uma página de resumo de lançamento semelhante: Resumo final do connector Após o provisionamento, cada documento preenchido na filafiles será enviado para uma coleção de séries temporais hostMonitor.files onde o campo de data é Time e o campo de metadados é Name.

Enviando dados para o Kafka

Agora vamos editar o arquivomain.go para usar um cliente Kafka e enviar cada medida de arquivo para a fila "files ".
Adicione a biblioteca do cliente como um módulo importado:
1import (
2 "fmt"
3 "encoding/json"
4 "time"
5 "os"
6 "path/filepath"
7 "github.com/confluentinc/confluent-kafka-go/kafka"
8)
Adicione as credenciais de cloud Confluent e as informações de DNS do cluster. Substitua o<CONFLUENT-SERVER>:<CONFLUENT-PORT> encontrado na página de detalhes do Kafka Cluster e o <ACCESS-KEY> , <SECRET-KEY> gerado no Kafka Cluster:
1const (
2 bootstrapServers = “<CONFLUENT-SERVER>:<CONFLUENT-PORT>"
3 ccloudAPIKey = "<ACCESS-KEY>"
4 ccloudAPISecret = "<SECRET-KEY>"
5)
O código a seguir iniciará o produtor e produzirá uma mensagem a partir do JSON document ordenado:
1 topic := "files"
2 // Produce a new record to the topic...
3 producer, err := kafka.NewProducer(&kafka.ConfigMap{
4 "bootstrap.servers": bootstrapServers,
5 "sasl.mechanisms": "PLAIN",
6 "security.protocol": "SASL_SSL",
7 "sasl.username": ccloudAPIKey,
8 "sasl.password": ccloudAPISecret})
9
10 if err != nil {
11 panic(fmt.Sprintf("Failed to create producer: %s", err))
12 }
13
14 producer.Produce(&kafka.Message{
15 TopicPartition: kafka.TopicPartition{Topic: &topic,
16 Partition: kafka.PartitionAny},
17 Value: []byte(value)}, nil)
18
19 // Wait for delivery report
20 e := <-producer.Events()
21
22 message := e.(*kafka.Message)
23 if message.TopicPartition.Error != nil {
24 fmt.Printf("failed to deliver message: %v\n",
25 message.TopicPartition)
26 } else {
27 fmt.Printf("delivered to topic %s [%d] at offset %v\n",
28 *message.TopicPartition.Topic,
29 message.TopicPartition.Partition,
30 message.TopicPartition.Offset)
31 }
32
33 producer.Close()
O arquivomain.gointeiro terá a seguinte aparência:
1package main
2
3
4import (
5 "fmt"
6 "encoding/json"
7 "time"
8 "os"
9 "path/filepath"
10 "github.com/confluentinc/confluent-kafka-go/kafka")
11
12type Message struct {
13 Name string
14 Size float64
15 Time int64
16}
17
18
19const (
20 bootstrapServers = "<SERVER-DNS>:<SERVER-PORT>"
21 ccloudAPIKey = "<ACCESS-KEY>"
22 ccloudAPISecret = "<SECRET-KEY>"
23)
24
25func samplePath (startPath string) error {
26
27 err := filepath.Walk(startPath,
28 func(path string, info os.FileInfo, err error) error {
29 if err != nil {
30 return err
31 }
32 fmt.Println(path, info.Size())
33
34 var bytes int64
35 bytes = info.Size()
36
37
38 var kilobytes int64
39 kilobytes = (bytes / 1024)
40
41 var megabytes float64
42 megabytes = (float64)(kilobytes / 1024) // cast to type float64
43
44 var gigabytes float64
45 gigabytes = (megabytes / 1024)
46
47 now := time.Now().Unix()*1000
48
49
50
51
52 m := Message{info.Name(), gigabytes, now}
53 value, err := json.Marshal(m)
54
55
56 if err != nil {
57 panic(fmt.Sprintf("Failed to parse JSON: %s", err))
58 }
59
60
61 fmt.Printf("value: %v\n", string(value))
62
63 topic := "files"
64 // Produce a new record to the topic...
65 producer, err := kafka.NewProducer(&kafka.ConfigMap{
66 "bootstrap.servers": bootstrapServers,
67 "sasl.mechanisms": "PLAIN",
68 "security.protocol": "SASL_SSL",
69 "sasl.username": ccloudAPIKey,
70 "sasl.password": ccloudAPISecret})
71
72 if err != nil {
73 panic(fmt.Sprintf("Failed to create producer: %s", err))
74 }
75
76 producer.Produce(&kafka.Message{
77 TopicPartition: kafka.TopicPartition{Topic: &topic,
78 Partition: kafka.PartitionAny},
79 Value: []byte(value)}, nil)
80
81 // Wait for delivery report
82 e := <-producer.Events()
83
84 message := e.(*kafka.Message)
85 if message.TopicPartition.Error != nil {
86 fmt.Printf("failed to deliver message: %v\n",
87 message.TopicPartition)
88 } else {
89 fmt.Printf("delivered to topic %s [%d] at offset %v\n",
90 *message.TopicPartition.Topic,
91 message.TopicPartition.Partition,
92 message.TopicPartition.Offset)
93 }
94
95 producer.Close()
96
97 return nil;
98})
99if err != nil {
100 return err
101}
102 return nil;
103}
104
105func main() {
106 for {
107 err := samplePath("./files");
108 if err != nil {
109 panic(fmt.Sprintf("Failed to run sample : %s", err))
110 }
111 time.Sleep(time.Minute)
112
113 }
114
115}
Agora, quando executarmos o agente enquanto o connector do Confluent Atlas estiver totalmente provisionado, veremos as mensagens produzidas na coleção de séries temporais hostMonitor.files:Atlas Data

Analisando os dados usando o MongoDB Charts

Para colocar nossos dados em uso, podemos criar alguns gráficos bonito sobre os dados de séries temporais. Em um gráfico de linha, configuramos o eixo X para usar o campo Time, o eixo Y para usar o campo Size e a série para usar o campo Name. O gráfico a seguir mostra as linhas coloridas representadas como a evolução dos diferentes tamanhos de arquivo ao longo do tempo. Gráfico de linhas Agora temos um agente Go e um painel de Atlas Charts totalmente funcional para analisar as tendências de arquivos em crescimento. Essa arquitetura permite um grande espaço para extensibilidade, pois o agente Go pode ter outras funcionalidades, mais assinantes podem consumir os dados monitorados e agir sobre eles e, finalmente, o MongoDB Atlas e Atlas Charts podem ser usados por vários aplicativos e incorporados a diferentes plataformas.

Embrulhar

A criação de aplicativos Go é simples, mas oferece grandes benefícios em termos de desempenho, código entre plataformas e um grande número de bibliotecas e clientes suportados. A adição MongoDB Atlas por meio de um serviço Confluent Cloud Kafka torna a implementação de uma pilha robusta e extensível, transmite dados e armazena-os e apresenta-os de forma eficiente ao usuário final por meio de Charts.
Neste tutorial, abordamos todos os fundamentos que você precisa saber para começar a usar o Go, Kafka e o MongoDB Atlas em seus próximos projetos de streaming.
Experimente o MongoDB Atlas and Go hoje mesmo!

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Artigo

Saiba como aproveitar os dados do MongoDB dentro do Kafka com novos tutoriais!


Sep 17, 2024 | 1 min read
Tutorial

Como começar a usar o MongoDB Atlas Stream Processing e o provedor HashiCorp Terraform MongoDB Atlas


May 20, 2024 | 5 min read
Tutorial

Usando a autenticação AWS IAM com o conector MongoDB para Apache Kafka


Jul 01, 2024 | 4 min read
Tutorial

Ative seu MongoDB e BigQuery usando procedimentos armazenados do BigQuery Spark


Aug 12, 2024 | 5 min read
Sumário