Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Learn why MongoDB was selected as a leader in the 2024 Gartner® Magic Quadrant™
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Conectoreschevron-right

Medindo o desempenho do MongoDB Kafka Connector

Juan Soto, Robert Walters6 min read • Published Feb 15, 2022 • Updated May 09, 2022
Conectores
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Artigo
star-empty
star-empty
star-empty
star-empty
star-empty
Com a necessidade atual de arquiteturas flexíveis orientadas a eventos, empresas de todo o mundo escolhem as melhores tecnologias, como MongoDB e Apache Kafka, para ajudar a resolver esses desafios. Embora essas duas tecnologias complementares forneçam o poder e a flexibilidade para resolver esses desafios de grande escala, o desempenho sempre esteve na vanguarda das preocupações. Neste blog, abordaremos como medir o desempenho do connector MongoDB para Apache Kafka em uma configuração de origem e coletor.

Medição do desempenho do coletor

Lembre-se de que o MongoDB connector grava dados de um tópico do Kafka no MongoDB. Por padrão, as gravações usam o ReplaceOneModel em que os dados são atualizados se estiverem presentes no cluster de destino ou criados como um novo documento se não estiverem presentes. Você não está limitado a esse comportamento upsert. Na verdade, você pode alterar o connector para executar apenas exclusões ou inserções. Esses comportamentos de gravação são definidos pela configuração Estratégia de Modelo de Gravação na configuração do connector.
Para determinar o desempenho do connector do coletor, precisamos de um carimbo de data/hora de quando o documento foi gravado no MongoDB. Atualmente, a única estratégia de modelo de gravação que grava um campo de carimbo de data/hora em nome do usuário é UpdateOneTimestampsStrategy e UpdateOneBusinessKeyTimestampStrategy. Esses dois modelos de gravação inserem um novo campo chamado _insertedTS, que pode ser usado para consultar o atraso entre o Kafka e o MongoDB.
Neste exemplo, usaremos o MongoDB Atlas. O MongoDB Atlas é uma plataforma de dados do MongoDB de nuvem pública que fornece recursos prontos para uso, como o MongoDB Charts, uma ferramenta para criar representações visuais de seus dados do MongoDB. Se quiser acompanhar, você pode criar uma camada grátis para sempre.

Gerar dados de amostra

Geraremos dados de amostra usando o Kafka Connectordatagen fornecido pela Confluent. O Datagen é uma maneira conveniente de criar dados de teste no ecossistema Kafka. Existem algumas especificações de esquema de início rápido agrupadas com esse connector. Usaremos um início rápido chamado users.
1curl -X POST -H "Content-Type: application/json" --data '
2 {"name": "datagen-users",
3 "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
4 "kafka.topic": "topic333",
5 "quickstart": "users",
6 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
7 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
8 "value.converter.schemas.enable": "false",
9 "max.interval": 50,
10 "iterations": 5000,
11 "tasks.max": "2"
12}}' http://localhost:8083/connectors -w "\n"

Configurar o conector do coletor

Agora que os dados foram gerados e gravados no tópico do Kafka, "topic333, ", vamos criar nosso connector de sink do MongoDB para escrever os dados desse tópico no MongoDB Atlas. Conforme mencionado anteriormente, adicionaremos um campo _insertedTS para uso no cálculo do atraso entre o carimbo de data/hora da mensagem e esse valor. Para executar a inserção, vamos usar a estratégia de modo de gravaçãoUpdateOneTimestampsStrategy.
1curl -X POST -H "Content-Type: application/json" --data '
2{"name": "kafkametadata3",
3 "config": {
4 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
5 "topics": "topic333",
6 "connection.uri": "MONGODB CONNECTION STRING GOES HERE",
7 "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
8 "database": "kafka",
9 "collection": "datagen",
10 "errors.log.include.messages": true,
11 "errors.deadletterqueue.context.headers.enable": true,
12 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
13 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
14 "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy",
15 "tasks.max": 2,
16 "value.converter.schemas.enable":false,
17 "transforms": "InsertField",
18 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
19 "transforms.InsertField.offset.field": "offsetColumn",
20 "transforms": "InsertField",
21 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
22 "transforms.InsertField.timestamp.field": "timestampColumn"
23}}' http://localhost:8083/connectors -w "\n"
Observação: o campo _insertedTS é preenchido com o valor de tempo do servidor de conexão Kafka.

Visualizando resultados com o MongoDB Charts

Dê uma olhada na coleção do MongoDB Atlas "datagen " e familiarize-se com os campos adicionados.
Figura 1: collection Datagen conforme vista na página MongoDB Atlas Collections
Neste blog, usaremos MongoDB Charts para exibir um gráfico de desempenho. Para facilitar a construção do gráfico, criaremos uma visualização.
1use kafka
2db.createView("SinkView","datagen",
3[
4 {
5 "$sort" : {
6 "_insertedTS" : 1,
7 "timestampColumn" : 1
8 }
9 },
10 {
11 "$project" : {
12 "_insertedTS" : 1,
13 "timestampColumn" : 1,
14 "_id" : 0
15 }
16 },
17 {
18 "$addFields" : {
19 "diff" : {
20 "$subtract" : [
21 "$_insertedTS",
22 {
23 "$convert" : {
24 "input" : "$timestampColumn",
25 "to" : "date"
26 }
27 }
28 ]
29 }
30 }
31 }
32])
Para criar um gráfico, clique na aba Charts no MongoDB Atlas:
Clique em Fontes de dados e em “Add Data Source.”. A caixa de diálogo mostrará a visualização que foi criada.
Selecione o SinkView e clique em Concluir.
1curl https://gist.githubusercontent.com/RWaltersMA/555b5f17791ecb58e6e683c54bafd381/raw/748301bcb7ae725af4051d40b2e17a8882ef2631/sink-chart-performance.charts -o sink-performance.charts
Escolha Import Dashbaord no menu suspenso Add Dashboard e selecione o arquivo baixado.
Carregue o arquivo sink-perfromance.chart.
Selecione o kafka.SinkView como fonte de dados no destino e clique em Salvar.
Agora o gráfico KafkaPerformance está pronto para ser visualizado. Ao clicar no gráfico, você verá algo parecido com o seguinte:
Este gráfico mostra estatísticas sobre as diferenças entre o timestamp no tópico Kafka e o Kafka Connector. No exemplo acima, o delta de tempo máximo é de aproximadamente um segundo (997ms) a partir da inserção de documentos 40 000 .

Medindo o desempenho da origem

Para medir a origem, adotaremos uma abordagem diferente usando o KSQL para criar um fluxo do carimbo de data/hora clusterTime a partir do fluxo de alterações do MongoDB e da hora em que a linha foi gravada no tópico Kafka. A partir daqui, podemos enviar esses dados para um coletor do MongoDB e exibir os resultados em um gráfico do MongoDB.

Configurar connectorde origem

A primeira etapa será criar o MongoDB connector que será usado para enviar dados para o tópico Kafka.
1curl -X POST -H "Content-Type: application/json" --data '
2{"name": "mongo-source-perf",
3 "config": {
4 "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
5 "errors.log.enable": "true",
6 "errors.log.include.messages": "true",
7 "connection.uri": "mongodb+srv://MONGODB CONNECTION STRING HERE",
8 "database": "kafka",
9 "collection": "source-perf-test",
10 "mongo.errors.log.enable": "true",
11 "topic.prefix":"mdb",
12 "output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
13 "output.format.value":"schema",
14 "output.schema.infer.value":true,
15 "output.format.key":"json",
16 "publish.full.document.only": "false",
17 "change.stream.full.document": "updateLookup"
18}}' http://localhost:8083/connectors -w "\n"

Gerar dados de amostra

Há muitas maneiras de gerar dados de amostra no MongoDB. Nesta postagem do blog, usaremos a ferramentadoc-gen(repositório do Github) para criar rapidamente documentos de exemplo com base no esquema do usuário, que é definido da seguinte forma:
1{
2 "_id" : ObjectId("59b99db4cfa9a34dcd7885b6"),
3 "name" : "Ned Stark",
4 "email" : "sean_bean@gameofthron.es",
5 "password" : "$2b$12$UREFwsRUoyF0CRqGNK0LzO0HM/jLhgUCNNIJ9RJAqMUQ74crlJ1Vu"
6}
Para gerar dados em seu cluster MongoDB, emita o seguinte:
1docker run robwma/doc-gen:1.0 python doc-gen.py -s '{"name":"string","email":"string","password":"string"}' -c "MONGODB CONNECTION STRING GOES HERE" -t 1000 -db "kafka" -col "source-perf-test"

Crie consultas KSQL

Inicie o KSQL e crie um fluxo do clusterTime dentro da mensagem.
Observação: se você não tiver o KSQL, poderá executá-lo como parte da Plataforma Confluent no Docker usando as instruções a seguir.
Se estiver usando o Centro de Controle, clique em ksQLDB, clique em Editor e cole o seguinte KSQL:
1CREATE STREAM stats (
2 clusterTime BIGINT
3 ) WITH (
4 KAFKA_TOPIC='kafka.source-perf-test',
5 VALUE_FORMAT='AVRO'
6 );
A única informação que precisamos da mensagem é o clusterTime. Esse valor é fornecido dentro do evento dechange stream. Para referência, este é um evento de amostra de change stream.
1{
2 _id: { <BSON Object> },
3 "operationType": "<operation>",
4 "fullDocument": { <document> },
5 "ns": {
6 "db": <database>,
7 "coll": <collection>
8 },
9 "to": {
10 "db": <database>,
11 "coll": <collection>
12 },
13 "documentKey": {
14 _id: <value>
15 },
16 "updateDescription": {
17 "updatedFields": { <document> },
18 "removedFields": [ <field>, ... ]
19 },
20 "clusterTime": <Timestamp>,
21 "txnNumber": <NumberLong>,
22 "lsid": {
23 "id": <UUID>,
24 "uid": <BinData>
25 }
26}
Etapa 3
Em seguida, criaremos um fluxo ksql que calcula a diferença entre o tempo de cluster (tempo em que foi criado no MongoDB) e o tempo em que foi inserido no corretor.
1CREATE STREAM STATS2 AS
2 select ROWTIME - CLUSTERTIME as diff, 1 AS ROW from STATS EMIT CHANGES;
Conforme indicado anteriormente, esse valor de diferença pode não ser totalmente preciso se os relógios no Kafka e no MongoDB forem diferentes.
Etapa 4
Para ver como os valores mudam com o tempo, podemos usar uma função de janela e escrever os resultados em uma tabela que pode ser gravada no MongoDB por meio de um connector de pia.
1SET 'ksql.suppress.enabled' = 'true';
2
3CREATE TABLE STATSWINDOW2 AS
4 SELECT AVG( DIFF ) AS AVG, MAX(DIFF) AS MAX, count(*) AS COUNT, ROW FROM STATS2
5 WINDOW TUMBLING (SIZE 10 SECONDS)
6 GROUP BY ROW
7 EMIT FINAL;
O janelamento permite controlar como agrupar registros que têm a mesma chave para operações com estado, como agregações ou junções nas chamadas janelas. Há três maneiras de definir Windows de tempo no ksqlDB: Windows de salto, Windows de queda e Windows de sessão. Neste exemplo, usaremos o tumbling, pois é uma janela de duração fixa, não sobreposta e sem lacunas.

Configurar o conector do coletor

A etapa final é criar um connector para inserir todos esses dados agregados no MongoDB.
1curl -X POST -H "Content-Type: application/json" --data '
2{
3 "name": "MongoSource-SinkPerf",
4 "config": {
5 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
6 "tasks.max": "1",
7 "errors.log.enable": true,
8 "errors.log.include.messages": true,
9 "topics": "STATSWINDOW2",
10 "errors.deadletterqueue.context.headers.enable": true,
11 "connection.uri": "MONGODB CONNECTION STRING GOES HERE",
12 "database": "kafka",
13 "collection": "sourceStats",
14 "mongo.errors.log.enable": true,
15 "transforms": "InsertField",
16 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
17 "transforms.InsertField.timestamp.field": "timestampColumn"
18}}' http://localhost:8083/connectors -w "\n"

Visualizando resultados com o MongoDB Charts

1curl https://gist.githubusercontent.com/RWaltersMA/011f1473cf937badc61b752a6ab769d4/raw/bc180b9c2db533536e6c65f34c30b2d2145872f9/mongodb-source-performance.chart -o source-performance.charts
Escolha Import Dashboard no menu suspenso Add Dashboard e selecione o arquivo baixado.
Você precisará criar uma fonte de dados para a nova coleção de coletores, "kafka.sourceStats. "
Clique no gráfico Fonte de Desempenho do Kafka para ver as estatísticas.
No exemplo acima, você pode visualizar as estatísticas de desempenho da janela móvel de 10segundos para 1.5Documentos M. A diferença média foi 252s, com a diferença máxima sendo 480s. Observe que parte desse delta pode ser diferenças em relógios entre MongoDB e Kafka. Embora não tomemos esses números como absolutos, simplesmente usar essa técnica é suficiente para determinar tendências e se o desempenho está piorando ou melhorando.
Se você tiver alguma dúvida sobre os recursos ou aprimoramentos de funcionalidade que gostaria de ver em relação ao monitoramento do desempenho ou do monitoramento do MongoDB Connector for Apache Kafka em geral, adicione um comentário a KAFka-64.
Tem alguma dúvida? Confira nosso MongoDB fórum da comunidadede conectores e integrações .

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Artigo
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Usando a autenticação AWS IAM com o conector MongoDB para Apache Kafka


Jul 01, 2024 | 4 min read
Tutorial

Ative seu MongoDB e BigQuery usando procedimentos armazenados do BigQuery Spark


Aug 12, 2024 | 5 min read
Artigo

Transmitindo dados com Apache Spark e MongoDB


Aug 28, 2024 | 7 min read
Podcast

Pesquisa do Podcast do MongoDB com a equipe de conectores e tradutores


Sep 11, 2024 | 16 min
Sumário
  • Medição do desempenho do coletor