Medindo o desempenho do MongoDB Kafka Connector
Avalie esse Artigo
Com a necessidade atual de arquiteturas flexíveis orientadas a eventos, empresas de todo o mundo escolhem as melhores tecnologias, como MongoDB e Apache Kafka, para ajudar a resolver esses desafios. Embora essas duas tecnologias complementares forneçam o poder e a flexibilidade para resolver esses desafios de grande escala, o desempenho sempre esteve na vanguarda das preocupações. Neste blog, abordaremos como medir o desempenho do connector MongoDB para Apache Kafka em uma configuração de origem e coletor.
Lembre-se de que o MongoDB connector grava dados de um tópico do Kafka no MongoDB. Por padrão, as gravações usam o ReplaceOneModel em que os dados são atualizados se estiverem presentes no cluster de destino ou criados como um novo documento se não estiverem presentes. Você não está limitado a esse comportamento upsert. Na verdade, você pode alterar o connector para executar apenas exclusões ou inserções. Esses comportamentos de gravação são definidos pela configuração Estratégia de Modelo de Gravação na configuração do connector.
Para determinar o desempenho do connector do coletor, precisamos de um carimbo de data/hora de quando o documento foi gravado no MongoDB. Atualmente, a única estratégia de modelo de gravação que grava um campo de carimbo de data/hora em nome do usuário é UpdateOneTimestampsStrategy e UpdateOneBusinessKeyTimestampStrategy. Esses dois modelos de gravação inserem um novo campo chamado _insertedTS, que pode ser usado para consultar o atraso entre o Kafka e o MongoDB.
Neste exemplo, usaremos o MongoDB Atlas. O MongoDB Atlas é uma plataforma de dados do MongoDB de nuvem pública que fornece recursos prontos para uso, como o MongoDB Charts, uma ferramenta para criar representações visuais de seus dados do MongoDB. Se quiser acompanhar, você pode criar uma camada grátis para sempre.
1 curl -X POST -H "Content-Type: application/json" --data ' 2 {"name": "datagen-users", 3 "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", 4 "kafka.topic": "topic333", 5 "quickstart": "users", 6 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 7 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 8 "value.converter.schemas.enable": "false", 9 "max.interval": 50, 10 "iterations": 5000, 11 "tasks.max": "2" 12 }}' http://localhost:8083/connectors -w "\n"
Agora que os dados foram gerados e gravados no tópico do Kafka, "topic333, ", vamos criar nosso connector de sink do MongoDB para escrever os dados desse tópico no MongoDB Atlas. Conforme mencionado anteriormente, adicionaremos um campo _insertedTS para uso no cálculo do atraso entre o carimbo de data/hora da mensagem e esse valor. Para executar a inserção, vamos usar a estratégia de modo de gravaçãoUpdateOneTimestampsStrategy.
1 curl -X POST -H "Content-Type: application/json" --data ' 2 {"name": "kafkametadata3", 3 "config": { 4 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", 5 "topics": "topic333", 6 "connection.uri": "MONGODB CONNECTION STRING GOES HERE", 7 "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy", 8 "database": "kafka", 9 "collection": "datagen", 10 "errors.log.include.messages": true, 11 "errors.deadletterqueue.context.headers.enable": true, 12 "key.converter": "org.apache.kafka.connect.storage.StringConverter", 13 "value.converter": "org.apache.kafka.connect.json.JsonConverter", 14 "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy", 15 "tasks.max": 2, 16 "value.converter.schemas.enable":false, 17 "transforms": "InsertField", 18 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", 19 "transforms.InsertField.offset.field": "offsetColumn", 20 "transforms": "InsertField", 21 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", 22 "transforms.InsertField.timestamp.field": "timestampColumn" 23 }}' http://localhost:8083/connectors -w "\n"
Observação: o campo _insertedTS é preenchido com o valor de tempo do servidor de conexão Kafka.
Dê uma olhada na coleção do MongoDB Atlas "datagen " e familiarize-se com os campos adicionados.
Neste blog, usaremos MongoDB Charts para exibir um gráfico de desempenho. Para facilitar a construção do gráfico, criaremos uma visualização.
1 use kafka 2 db.createView("SinkView","datagen", 3 [ 4 { 5 "$sort" : { 6 "_insertedTS" : 1, 7 "timestampColumn" : 1 8 } 9 }, 10 { 11 "$project" : { 12 "_insertedTS" : 1, 13 "timestampColumn" : 1, 14 "_id" : 0 15 } 16 }, 17 { 18 "$addFields" : { 19 "diff" : { 20 "$subtract" : [ 21 "$_insertedTS", 22 { 23 "$convert" : { 24 "input" : "$timestampColumn", 25 "to" : "date" 26 } 27 } 28 ] 29 } 30 } 31 } 32 ])
Para criar um gráfico, clique na aba Charts no MongoDB Atlas:
Clique em Fontes de dados e em “Add Data Source.”. A caixa de diálogo mostrará a visualização que foi criada.
Selecione o SinkView e clique em Concluir.
1 curl https://gist.githubusercontent.com/RWaltersMA/555b5f17791ecb58e6e683c54bafd381/raw/748301bcb7ae725af4051d40b2e17a8882ef2631/sink-chart-performance.charts -o sink-performance.charts
Escolha Import Dashbaord no menu suspenso Add Dashboard e selecione o arquivo baixado.
Carregue o arquivo sink-perfromance.chart.
Selecione o kafka.SinkView como fonte de dados no destino e clique em Salvar.
Agora o gráfico KafkaPerformance está pronto para ser visualizado. Ao clicar no gráfico, você verá algo parecido com o seguinte:
Este gráfico mostra estatísticas sobre as diferenças entre o timestamp no tópico Kafka e o Kafka Connector. No exemplo acima, o delta de tempo máximo é de aproximadamente um segundo (997ms) a partir da inserção de documentos 40 000 .
Para medir a origem, adotaremos uma abordagem diferente usando o KSQL para criar um fluxo do carimbo de data/hora clusterTime a partir do fluxo de alterações do MongoDB e da hora em que a linha foi gravada no tópico Kafka. A partir daqui, podemos enviar esses dados para um coletor do MongoDB e exibir os resultados em um gráfico do MongoDB.
A primeira etapa será criar o MongoDB connector que será usado para enviar dados para o tópico Kafka.
1 curl -X POST -H "Content-Type: application/json" --data ' 2 {"name": "mongo-source-perf", 3 "config": { 4 "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", 5 "errors.log.enable": "true", 6 "errors.log.include.messages": "true", 7 "connection.uri": "mongodb+srv://MONGODB CONNECTION STRING HERE", 8 "database": "kafka", 9 "collection": "source-perf-test", 10 "mongo.errors.log.enable": "true", 11 "topic.prefix":"mdb", 12 "output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson", 13 "output.format.value":"schema", 14 "output.schema.infer.value":true, 15 "output.format.key":"json", 16 "publish.full.document.only": "false", 17 "change.stream.full.document": "updateLookup" 18 }}' http://localhost:8083/connectors -w "\n"
Há muitas maneiras de gerar dados de amostra no MongoDB. Nesta postagem do blog, usaremos a ferramentadoc-gen(repositório do Github) para criar rapidamente documentos de exemplo com base no esquema do usuário, que é definido da seguinte forma:
1 { 2 "_id" : ObjectId("59b99db4cfa9a34dcd7885b6"), 3 "name" : "Ned Stark", 4 "email" : "sean_bean@gameofthron.es", 5 "password" : "$2b$12$UREFwsRUoyF0CRqGNK0LzO0HM/jLhgUCNNIJ9RJAqMUQ74crlJ1Vu" 6 }
Para gerar dados em seu cluster MongoDB, emita o seguinte:
1 docker run robwma/doc-gen:1.0 python doc-gen.py -s '{"name":"string","email":"string","password":"string"}' -c "MONGODB CONNECTION STRING GOES HERE" -t 1000 -db "kafka" -col "source-perf-test"
Inicie o KSQL e crie um fluxo do clusterTime dentro da mensagem.
Observação: se você não tiver o KSQL, poderá executá-lo como parte da Plataforma Confluent no Docker usando as instruções a seguir.
Se estiver usando o Centro de Controle, clique em ksQLDB, clique em Editor e cole o seguinte KSQL:
1 CREATE STREAM stats ( 2 clusterTime BIGINT 3 ) WITH ( 4 KAFKA_TOPIC='kafka.source-perf-test', 5 VALUE_FORMAT='AVRO' 6 );
A única informação que precisamos da mensagem é o clusterTime. Esse valor é fornecido dentro do evento dechange stream. Para referência, este é um evento de amostra de change stream.
1 { 2 _id: { <BSON Object> }, 3 "operationType": "<operation>", 4 "fullDocument": { <document> }, 5 "ns": { 6 "db": <database>, 7 "coll": <collection> 8 }, 9 "to": { 10 "db": <database>, 11 "coll": <collection> 12 }, 13 "documentKey": { 14 _id: <value> 15 }, 16 "updateDescription": { 17 "updatedFields": { <document> }, 18 "removedFields": [ <field>, ... ] 19 }, 20 "clusterTime": <Timestamp>, 21 "txnNumber": <NumberLong>, 22 "lsid": { 23 "id": <UUID>, 24 "uid": <BinData> 25 } 26 }
Etapa 3
Em seguida, criaremos um fluxo ksql que calcula a diferença entre o tempo de cluster (tempo em que foi criado no MongoDB) e o tempo em que foi inserido no corretor.
1 CREATE STREAM STATS2 AS 2 select ROWTIME - CLUSTERTIME as diff, 1 AS ROW from STATS EMIT CHANGES;
Conforme indicado anteriormente, esse valor de diferença pode não ser totalmente preciso se os relógios no Kafka e no MongoDB forem diferentes.
Etapa 4
Para ver como os valores mudam com o tempo, podemos usar uma função de janela e escrever os resultados em uma tabela que pode ser gravada no MongoDB por meio de um connector de pia.
1 SET 'ksql.suppress.enabled' = 'true'; 2 3 CREATE TABLE STATSWINDOW2 AS 4 SELECT AVG( DIFF ) AS AVG, MAX(DIFF) AS MAX, count(*) AS COUNT, ROW FROM STATS2 5 WINDOW TUMBLING (SIZE 10 SECONDS) 6 GROUP BY ROW 7 EMIT FINAL;
O janelamento permite controlar como agrupar registros que têm a mesma chave para operações com estado, como agregações ou junções nas chamadas janelas. Há três maneiras de definir Windows de tempo no ksqlDB: Windows de salto, Windows de queda e Windows de sessão. Neste exemplo, usaremos o tumbling, pois é uma janela de duração fixa, não sobreposta e sem lacunas.
A etapa final é criar um connector para inserir todos esses dados agregados no MongoDB.
1 curl -X POST -H "Content-Type: application/json" --data ' 2 { 3 "name": "MongoSource-SinkPerf", 4 "config": { 5 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", 6 "tasks.max": "1", 7 "errors.log.enable": true, 8 "errors.log.include.messages": true, 9 "topics": "STATSWINDOW2", 10 "errors.deadletterqueue.context.headers.enable": true, 11 "connection.uri": "MONGODB CONNECTION STRING GOES HERE", 12 "database": "kafka", 13 "collection": "sourceStats", 14 "mongo.errors.log.enable": true, 15 "transforms": "InsertField", 16 "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value", 17 "transforms.InsertField.timestamp.field": "timestampColumn" 18 }}' http://localhost:8083/connectors -w "\n"
1 curl https://gist.githubusercontent.com/RWaltersMA/011f1473cf937badc61b752a6ab769d4/raw/bc180b9c2db533536e6c65f34c30b2d2145872f9/mongodb-source-performance.chart -o source-performance.charts
Escolha Import Dashboard no menu suspenso Add Dashboard e selecione o arquivo baixado.
Você precisará criar uma fonte de dados para a nova coleção de coletores, "kafka.sourceStats. "
Clique no gráfico Fonte de Desempenho do Kafka para ver as estatísticas.
No exemplo acima, você pode visualizar as estatísticas de desempenho da janela móvel de 10segundos para 1.5Documentos M. A diferença média foi 252s, com a diferença máxima sendo 480s. Observe que parte desse delta pode ser diferenças em relógios entre MongoDB e Kafka. Embora não tomemos esses números como absolutos, simplesmente usar essa técnica é suficiente para determinar tendências e se o desempenho está piorando ou melhorando.
Se você tiver alguma dúvida sobre os recursos ou aprimoramentos de funcionalidade que gostaria de ver em relação ao monitoramento do desempenho ou do monitoramento do MongoDB Connector for Apache Kafka em geral, adicione um comentário a KAFka-64.