Vá para o MongoDB usando conectores Kafka - Guia final do agente
Avalie esse Tutorial
Go é uma linguagem moderna baseada em conceitos de compilação de código nativo e digitado, ao mesmo tempo em que sente e utiliza alguns benefícios das linguagens dinâmicas. É bastante simples de instalar e usar, pois fornece código legível e robusto para muitos casos de uso de aplicativos.
Um desses casos de uso é a criação de agentes que se reportam a uma plataforma de dados centralizada por meio de streaming. Uma abordagem amplamente aceita é comunicar os dados do agente por meio da inscrição de filas distribuídas como o Kafka. Os tópicos do Kafka podem então propagar os dados para muitas fontes diferentes, como um clusterdo MongoDB Atlas .
Ter um agente Go nos permite utilizar a mesma base de código para vários sistemas operacionais, e o fato de ele ter uma boa integração com dados e pacotes JSON, como um driver MongoDB e oConfluent Go Kafka Client, o torna um candidato tentador para o caso de uso apresentado .
Este artigo demonstrará como os dados de tamanho de arquivo em um host são monitorados a partir de um agente multiplataforma escrito em Go por meio de um cluster Kafka usando um connector de coletor hospedado pelo Confluent para o MongoDB Atlas. O MongoDB Atlas armazena os dados em uma coleção de séries temporais. O produtoMongoDB Charts é uma maneira conveniente de mostrar os dados coletados para o usuário.
Nosso agente vai executar o Go. Portanto, você precisará instalar o software de idioma Go no seu host.
Quando esta etapa for concluída, criaremos um módulo Go para iniciar nosso projeto em nosso diretório de trabalho:
1 go mod init example/main
Agora precisaremos adicionar a dependência do Confluent Kafka ao nosso projeto Golang:
1 go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
A criação de um Confluent Kafka Cluster é feita por meio da UI do Confluent. Comece criando um cluster básico do Kafka no Confluent Cloud. Quando estiver pronto, crie um tópico para ser usado no cluster Kafka. Criei um chamado "files. "
Gere uma chave de API e um segredo de API para interagir com esse cluster Kafka. Para simplificar este tutorial, selecionei a api-key “Global Access. Para produção, recomenda-se fornecer o mínimo de permissões possível para a chave de API usada. Obtenha as chaves geradas para uso futuro.
Obtenha a connection string do cluster Kafka por meio de Visão Geral do Cluster > Configurações do Cluster > Identificação > Servidor Bootstrap para uso futuro. Os clusters básicos estão abertos à Internet e, em produção, você precisará alterar a lista de acesso para que seus hosts específicos se conectem ao cluster por meio de ACLs de cluster avançadas.
Importante: o connector Confluent exige que o Kafka cluster e o Atlas cluster sejam implantados na mesma região.
Crie um projeto e cluster ou utilize um Atlas cluster existente em seu projeto. Como estamos usando uma coleção de séries temporais, os clusters devem usar um 5.0versão +. Prepare seu cluster Atlas para uma conexãoAtlas de coletor Confluent. Dentro da lista de acesso do seu projeto, habilite o usuário e endereços IP relevantes dos seus IPs do conector. Os IPs da lista de acesso devem estar associados ao Atlas Sink Connector, que configuraremos em uma seção seguinte.Por fim, controle aconnection stringdo Atlas e o DNS do cluster principal. Para obter mais informações sobre a melhor proteção e a obtenção dos IPs relevantes do seu conector Confluent, leia o seguinte artigo: Conector do MongoDB Atlas Sink para a Confluent Cloud.
Agora que temos nosso cluster Kafka e clusters Atlas criados e preparados, podemos inicializar nosso código de agente criando um pequeno arquivo principal que monitorará meu diretório
./files
e capturará os nomes e tamanhos dos arquivos.Adicionei um arquivo chamadotest.txt
com alguns dados para trazê-lo para ~200MB.Vamos criar um arquivo chamado
main.go
e escrever uma pequena lógica que executa um loop constante com um 1 min de sono para percorrer os arquivos na pastafiles
:1 package main 2 3 import ( 4 "fmt" 5 "encoding/json" 6 "time" 7 "os" 8 "path/filepath" 9 ) 10 11 type Message struct { 12 Name string 13 Size float64 14 Time int64 15 } 16 17 func samplePath (startPath string) error { 18 err := filepath.Walk(startPath, 19 func(path string, info os.FileInfo, err error) error { 20 21 var bytes int64 22 bytes = info.Size() 23 24 25 var kilobytes int64 26 kilobytes = (bytes / 1024) 27 28 var megabytes float64 29 megabytes = (float64)(kilobytes / 1024) // cast to type float64 30 31 var gigabytes float64 32 gigabytes = (megabytes / 1024) 33 34 now := time.Now().Unix()*1000 35 36 37 38 39 m := Message{info.Name(), gigabytes, now} 40 value, err := json.Marshal(m) 41 42 43 if err != nil { 44 panic(fmt.Sprintf("Failed to parse JSON: %s", err)) 45 } 46 47 48 fmt.Printf("value: %v\n", string(value)) 49 return nil; 50 }) 51 if err != nil { 52 return err 53 } 54 return nil; 55 } 56 57 func main() { 58 for { 59 err := samplePath("./files"); 60 if err != nil { 61 panic(fmt.Sprintf("Failed to run sample : %s", err)) 62 } 63 time.Sleep(time.Minute) 64 } 65 66 }
O código acima simplesmente importa módulos auxiliares para percorrer os diretórios e documentos JSON a partir dos arquivos encontrados.
Como precisamos que os dados sejam marcados com o tempo da amostra, eles são uma ótima opção para dados de séries temporais e, portanto, devem ser armazenados em uma coleção de séries temporais no Atlas. Se quiser saber mais sobre a coleta de dados de séries temporais, leia nosso artigoDados de séries temporais do MongoDB.
Podemos testar este agente executando o seguinte comando:
1 go run main.go
O agente produzirá documentos JSON semelhantes ao seguinte formato:
1 value: {"Name":"files","Size":0,"Time":1643881924000} 2 value: {"Name":"test.txt","Size":0.185546875,"Time":1643881924000}
Agora vamos criar um Kafka Sink connector para escrever os dados que chegam no tópico "files " para a coleção de séries temporais do nosso Atlas Cluster.
O Confluent Cloud tem uma integração muito popular que executa o Kafka Connector do MongoDB como uma solução hospedada integrada a seus clusters Kafka. Siga estas etapas para iniciar um sistema de connector.
A seguir estão as entradas fornecidas ao connector: Depois de configurá-lo, seguindo o guia, você eventualmente terá uma página de resumo de lançamento semelhante: Após o provisionamento, cada documento preenchido na fila
files
será enviado para uma coleção de séries temporais hostMonitor.files
onde o campo de data é Time
e o campo de metadados é Name
.Agora vamos editar o arquivo
main.go
para usar um cliente Kafka e enviar cada medida de arquivo para a fila "files ".Adicione a biblioteca do cliente como um módulo importado:
1 import ( 2 "fmt" 3 "encoding/json" 4 "time" 5 "os" 6 "path/filepath" 7 "github.com/confluentinc/confluent-kafka-go/kafka" 8 )
Adicione as credenciais de cloud Confluent e as informações de DNS do cluster. Substitua o
<CONFLUENT-SERVER>:<CONFLUENT-PORT>
encontrado na página de detalhes do Kafka Cluster e o <ACCESS-KEY>
, <SECRET-KEY>
gerado no Kafka Cluster:1 const ( 2 bootstrapServers = “<CONFLUENT-SERVER>:<CONFLUENT-PORT>" 3 ccloudAPIKey = "<ACCESS-KEY>" 4 ccloudAPISecret = "<SECRET-KEY>" 5 )
O código a seguir iniciará o produtor e produzirá uma mensagem a partir do JSON document ordenado:
1 topic := "files" 2 // Produce a new record to the topic... 3 producer, err := kafka.NewProducer(&kafka.ConfigMap{ 4 "bootstrap.servers": bootstrapServers, 5 "sasl.mechanisms": "PLAIN", 6 "security.protocol": "SASL_SSL", 7 "sasl.username": ccloudAPIKey, 8 "sasl.password": ccloudAPISecret}) 9 10 if err != nil { 11 panic(fmt.Sprintf("Failed to create producer: %s", err)) 12 } 13 14 producer.Produce(&kafka.Message{ 15 TopicPartition: kafka.TopicPartition{Topic: &topic, 16 Partition: kafka.PartitionAny}, 17 Value: []byte(value)}, nil) 18 19 // Wait for delivery report 20 e := <-producer.Events() 21 22 message := e.(*kafka.Message) 23 if message.TopicPartition.Error != nil { 24 fmt.Printf("failed to deliver message: %v\n", 25 message.TopicPartition) 26 } else { 27 fmt.Printf("delivered to topic %s [%d] at offset %v\n", 28 *message.TopicPartition.Topic, 29 message.TopicPartition.Partition, 30 message.TopicPartition.Offset) 31 } 32 33 producer.Close()
O arquivo
main.go
inteiro terá a seguinte aparência:1 package main 2 3 4 import ( 5 "fmt" 6 "encoding/json" 7 "time" 8 "os" 9 "path/filepath" 10 "github.com/confluentinc/confluent-kafka-go/kafka") 11 12 type Message struct { 13 Name string 14 Size float64 15 Time int64 16 } 17 18 19 const ( 20 bootstrapServers = "<SERVER-DNS>:<SERVER-PORT>" 21 ccloudAPIKey = "<ACCESS-KEY>" 22 ccloudAPISecret = "<SECRET-KEY>" 23 ) 24 25 func samplePath (startPath string) error { 26 27 err := filepath.Walk(startPath, 28 func(path string, info os.FileInfo, err error) error { 29 if err != nil { 30 return err 31 } 32 fmt.Println(path, info.Size()) 33 34 var bytes int64 35 bytes = info.Size() 36 37 38 var kilobytes int64 39 kilobytes = (bytes / 1024) 40 41 var megabytes float64 42 megabytes = (float64)(kilobytes / 1024) // cast to type float64 43 44 var gigabytes float64 45 gigabytes = (megabytes / 1024) 46 47 now := time.Now().Unix()*1000 48 49 50 51 52 m := Message{info.Name(), gigabytes, now} 53 value, err := json.Marshal(m) 54 55 56 if err != nil { 57 panic(fmt.Sprintf("Failed to parse JSON: %s", err)) 58 } 59 60 61 fmt.Printf("value: %v\n", string(value)) 62 63 topic := "files" 64 // Produce a new record to the topic... 65 producer, err := kafka.NewProducer(&kafka.ConfigMap{ 66 "bootstrap.servers": bootstrapServers, 67 "sasl.mechanisms": "PLAIN", 68 "security.protocol": "SASL_SSL", 69 "sasl.username": ccloudAPIKey, 70 "sasl.password": ccloudAPISecret}) 71 72 if err != nil { 73 panic(fmt.Sprintf("Failed to create producer: %s", err)) 74 } 75 76 producer.Produce(&kafka.Message{ 77 TopicPartition: kafka.TopicPartition{Topic: &topic, 78 Partition: kafka.PartitionAny}, 79 Value: []byte(value)}, nil) 80 81 // Wait for delivery report 82 e := <-producer.Events() 83 84 message := e.(*kafka.Message) 85 if message.TopicPartition.Error != nil { 86 fmt.Printf("failed to deliver message: %v\n", 87 message.TopicPartition) 88 } else { 89 fmt.Printf("delivered to topic %s [%d] at offset %v\n", 90 *message.TopicPartition.Topic, 91 message.TopicPartition.Partition, 92 message.TopicPartition.Offset) 93 } 94 95 producer.Close() 96 97 return nil; 98 }) 99 if err != nil { 100 return err 101 } 102 return nil; 103 } 104 105 func main() { 106 for { 107 err := samplePath("./files"); 108 if err != nil { 109 panic(fmt.Sprintf("Failed to run sample : %s", err)) 110 } 111 time.Sleep(time.Minute) 112 113 } 114 115 }
Agora, quando executarmos o agente enquanto o connector do Confluent Atlas estiver totalmente provisionado, veremos as mensagens produzidas na coleção de séries temporais
hostMonitor.files
:Para colocar nossos dados em uso, podemos criar alguns gráficos bonito sobre os dados de séries temporais. Em um gráfico de linha, configuramos o eixo X para usar o campo Time, o eixo Y para usar o campo Size e a série para usar o campo Name. O gráfico a seguir mostra as linhas coloridas representadas como a evolução dos diferentes tamanhos de arquivo ao longo do tempo. Agora temos um agente Go e um painel de Atlas Charts totalmente funcional para analisar as tendências de arquivos em crescimento. Essa arquitetura permite um grande espaço para extensibilidade, pois o agente Go pode ter outras funcionalidades, mais assinantes podem consumir os dados monitorados e agir sobre eles e, finalmente, o MongoDB Atlas e Atlas Charts podem ser usados por vários aplicativos e incorporados a diferentes plataformas.
A criação de aplicativos Go é simples, mas oferece grandes benefícios em termos de desempenho, código entre plataformas e um grande número de bibliotecas e clientes suportados. A adição MongoDB Atlas por meio de um serviço Confluent Cloud Kafka torna a implementação de uma pilha robusta e extensível, transmite dados e armazena-os e apresenta-os de forma eficiente ao usuário final por meio de Charts.
Neste tutorial, abordamos todos os fundamentos que você precisa saber para começar a usar o Go, Kafka e o MongoDB Atlas em seus próximos projetos de streaming.
Relacionado
Artigo
Saiba como aproveitar os dados do MongoDB dentro do Kafka com novos tutoriais!
Sep 17, 2024 | 1 min read
Tutorial
Como começar a usar o MongoDB Atlas Stream Processing e o provedor HashiCorp Terraform MongoDB Atlas
May 20, 2024 | 5 min read