Ajustando o MongoDB Connector para Apache Kafka
Robert Walters, Diego Rodriguez10 min read • Published Mar 01, 2022 • Updated Sep 17, 2024
Avalie esse Tutorial
O MongoDB Connector para Apache Kafka (MongoDB Connector) é um aplicativo Java de código aberto que funciona com o Apache Kafka Connect, permitindo a integração perfeita de dados do MongoDB com o ecossistema do Apache Kafka. Ao trabalhar com o MongoDB Connector, os valores padrão cobrem uma grande variedade de cenários, mas há alguns cenários que exigem um ajuste mais refinado. Neste artigo, percorreremos propriedades de configuração importantes que afetam o desempenho dos MongoDB Kafka Source e Sink Connectors e compartilharemos recomendações gerais.
Vamos primeiro dar uma olhada no connector quando ele é configurado para ler dados do MongoDB e escrevê-los em um tópico do Kafka. Quando você configura o connector dessa forma, ele é conhecido como um "source connector. "
Quando o connector é configurado como uma origem, um change stream é aberto dentro do cluster MongoDB com base em qualquer configuração especificada, como pipeline. Esses eventos de fluxo de alterações são lidos no connector e, em seguida, gravados no tópico Kafka, e eles se assemelham ao seguinte:
1 { 2 _id : { <BSON Object> }, 3 "operationType" : "<operation>", 4 "fullDocument" : { <document> }, 5 "ns" : { 6 "db" : "<database>", 7 "coll" : "<collection>" 8 }, 9 "to" : { 10 "db" : "<database>", 11 "coll" : "<collection>" 12 }, 13 "documentKey" : { "_id" : <value> }, 14 "updateDescription" : { 15 "updatedFields" : { <document> }, 16 "removedFields" : [ "<field>", ... ], 17 "truncatedArrays" : [ 18 { "field" : <field>, "newSize" : <integer> }, 19 ... 20 ] 21 }, 22 "clusterTime" : <Timestamp>, 23 "txnNumber" : <NumberLong>, 24 "lsid" : { 25 "id" : <UUID>, 26 "uid" : <BinData> 27 } 28 }
As propriedades de configuração do connector ajudam a definir quais dados são gravados no Kafka. Por exemplo, considere o cenário em que inserimos no MongoDB o seguinte:
1 Use Stocks 2 db.StockData.insertOne({'symbol':'MDB','price':441.67,'tx_time':Date.now()})
Quando publish.full.document.only é definido como false (a configuração padrão), o connector grava todo o evento, conforme mostrado abaixo:
1 {"_id": 2 {"_data": "826205217F000000022B022C0100296E5A1004AA1707081AA1414BB9F647FD49855EE846645F696400646205217FC26C3DE022E9488E0004"}, 3 "operationType": "insert", 4 "clusterTime": 5 {"$timestamp": 6 {"t": 1644503423, "i": 2}}, 7 "fullDocument": 8 {"_id": 9 {"$oid": "6205217fc26c3de022e9488e"}, 10 "symbol": "MDB", 11 "price": 441.67, 12 "tx_time": 1.644503423267E12}, 13 "ns": 14 {"db": "Stocks", "coll": "StockData"}, 15 "documentKey": 16 {"_id": {"$oid": "6205217fc26c3de022e9488e"}}}} 17 }
Quando publish.full.document.only é definido como true e emitimos uma declaração semelhante, ela tem a seguinte aparência:
1 use Stocks 2 db.StockData.insertOne({'symbol':'TSLA','price':920.00,'tx_time':Date.now()})
Podemos ver que os dados gravados no tópico Kafka são apenas o próprio documento alterado, que, neste exemplo, é um documento inserido.
1 {"_id": {"$oid": "620524b89d2c7fb2a606aa16"}, "symbol": "TSLA", 2 "price": 920, 3 "tx_time": 1.644504248732E12}"}
importação Outro conceito a ser aceito com connector de origem é retomar tokens. Os tokens de retomada possibilitam que o connector falhe, seja reiniciado e retome de onde parou de ler o change stream do MongoDB. Os tokens de retomada por padrão são armazenados em um tópico do Kafka definido pelo parâmetrooffset.storage.topic(configurável no nível do Kafka Connect Worker para ambientes distribuídos) ou no sistema de arquivos em um arquivo definido pelo parâmetrooffset.storage.file.filename(configurável no nível do Kafka Connect Worker para ambientes standalone). Caso o connector tenha estado offline e o oplog subjacente do MongoDB tenha sido acumulado, você poderá receber um erro quando o connector for reiniciado. Leia a seçãoToken de retomada inválido da documentação online para saber mais sobre essa condição.
O conjunto completo de propriedades do Kafka Source Connector pode ser encontrado na documentação. As propriedades que devem ser consideradas com relação ao ajuste de desempenho são as seguintes:
- batch.size: o tamanho do lote do cursor que define quantos documentos do change stream são recuperados em cada operaçãogetMore. O padrão é 1,000.
- poll.await.time.ms: A quantidade máxima de tempo em milissegundos que o servidor aguarda novas alterações de dados para relatar ao cursor do fluxo de alterações antes de retornar um lote vazio. O padrão é 5,000.
- poll.max.batch.size: número máximo de registros de origem a serem enviados para o Kafka de uma só vez. Essa configuração pode ser usada para limitar a quantidade de dados armazenados em buffer internamente no connector. O padrão é 1,000.
- pipeline: uma matriz de estágios de pipeline de agregação a serem executados em seu fluxo de alterações. O padrão é um pipeline vazio que não fornece filtragem.
- copy.existing.max.threads: o número de threadsa serem usados ao realizar a cópia de dados. O padrão é o número de processadores.
- copy.existing.queue.size: o tamanho máximo da fila a ser usado ao copiar dados. Isso é armazenado em buffer internamente pelo connector. O padrão é 16,000.
Veja a seguir algumas recomendações e considerações gerais ao configurar o connector de origem:
Uma das perguntas mais comuns é como dimensionar o connector de origem. Para cenários em que você tem uma grande quantidade de dados a serem copiados por meio decopy.existing, lembre-se de que usar o connector de origem dessa forma pode não ser a melhor maneira de mover essa grande quantidade de dados. Considere o processo de copy.existing:
- Armazene o token de retomada do change stream mais recente.
- Crie um thread (até copy.existing.max.threads) para cada namespace que está sendo copiado.
- Quando todos os threads são concluídos, os tokens de retomada são lidos, gravados e capturados até a hora atual.
Embora tecnicamente os dados sejam copiados, esse processo é relativamente lento. E se o tamanho dos dados for grande e os dados recebidos forem mais rápidos do que o processo de cópia, o connector poderá nunca entrar em um estado em que as novas alterações de dados sejam tratadas pelo connector.
Para conjuntos de dados de alta taxa de transferência que tentam ser copiados com copy.existing, uma situação típica é substituir o token de retomada armazenado em (1) devido à alta atividade de gravação. Isso interrompe a funcionalidade copy.existing e precisará ser reiniciada, além de lidar com as mensagens que já foram processadas para o tópico Kafka. Quando isso acontece, as alternativas são:
- Aumente o tamanho do oplog para garantir que a fase copy.existing possa ser concluída.
- Acelere a atividade de gravação no cluster de origem até que a fase copy.existing seja concluída.
Outra opção para lidar com a alta taxa de transferência de dados de alteração é configurar vários connector de origem. Cada connector de origem deve usar um pipeline e capturar alterações de um subconjunto do total de dados. Lembre-se de que cada vez que você cria um connector de origem apontado para o mesmo cluster do MongoDB, ele cria um fluxo de alterações separado. Cada fluxo de alterações requer recursos do cluster do MongoDB e adicioná-los continuamente diminuirá o desempenho do servidor. Dito isso, essa degradação pode não se tornar perceptível até que a quantidade de connector atinja o intervalo 100+, portanto, dividir suas coleções em cinco a 10 pipelines de connector é a melhor maneira de aumentar o desempenho da origem. Além disso, o uso de vários connector de origem diferentes no mesmo namespace altera a ordenação total dos dados no coletor em comparação com a ordem original dos dados no cluster de origem.
Ao criar sua configuração do Kafka Source Connector, certifique-se de ajustar adequadamente o "pipeline " para que apenas os eventos desejados fluam do MongoDB para o Kafka Connect, o que ajuda a reduzir o tráfego da rede e o tempo de processamento. Para obter um exemplo detalhado de pipeline, consulte a seção Personalizar um pipeline para filtrar eventos de alteração da documentação online.
Seu Kafka connector pode estar assistindo a um conjunto de collection com um baixo volume de eventos ou o oposto, um conjunto de collection com um volume muito alto de eventos.
Além disso, talvez você queira ajustar seu Kafka Source Connector para reagir mais rapidamente a alterações, reduzir as viagens de ida e volta ao MongoDB ou Kafka e alterações semelhantes.
Com isso em mente, considere ajustar as seguintes propriedades para o connector de Origem Kafka:
- Ajuste o valor de batch.size:
- Valores mais altos significam tempos de processamento mais longos no cluster de origem, mas menos viagens de ida e volta para ele. Isso também pode aumentar as chances de encontrar eventos de mudança relevantes quando o volume de eventos que está sendo observado é pequeno.
- Valores mais baixos significam tempos de processamento mais curtos no cluster de origem, mas mais viagens de ida e volta até ele. Isso pode reduzir as chances de encontrar eventos de alteração relevantes quando o volume de eventos observados é pequeno.
- Ajuste o valor de poll.max.batch.size:
- Valores mais altos exigem mais memória para armazenar em buffer os registros de origem com menos idas e voltas ao Kafka. Isso ocorre às custas dos requisitos de memória e do aumento da latência a partir do momento em que uma alteração ocorre no MongoDB até o ponto em que a mensagem Kafka associada a essa alteração atinge o tópico de destino.
- Valores mais baixos exigem menos memória para armazenar em buffer os registros de origem com mais viagens de ida e volta para o Kafka. Também pode ajudar a reduzir a latência a partir do momento em que uma alteração ocorre no MongoDB até o ponto em que a mensagem Kafka associada a essa alteração chegue ao tópico de destino.
- Ajuste o valor de poll.await.time.ms:
- Valores mais altos podem permitir que clusters de origem com um baixo volume de eventos tenham qualquer informação a ser enviada para o Kafka às custas do aumento da latência a partir do momento em que uma alteração ocorre no MongoDB até o ponto em que a mensagem do Kafka associada a essa alteração chega ao tópico de destino .
- Valores mais baixos reduzem a latência a partir do momento em que uma alteração ocorre no MongoDB até o ponto em que a mensagem Kafka associada a essa alteração atinge o tópico de destino. Mas para clusters de origem com um baixo volume de eventos, isso pode impedir que eles tenham qualquer informação a ser enviada ao Kafka.
Essas informações são uma visão geral do que esperar ao alterar esses valores, mas lembre-se de que eles estão profundamente interconectados, e o volume de eventos de alteração no cluster de origem também tem um impacto importante:
- O Kafka Source Connector emite comandos getMore para o cluster de origem usando batch.size.
- O Kafka Source Connector recebe os resultados da etapa 1 e aguarda até que poll.max.batch.size ou poll.await.time.ms seja atingido. Enquanto isso não acontece, o Kafka Source Connector mantém "feeding " ele mesmo com mais resultados getMore.
- Quando poll.max.batch.size ou poll.await.time.ms é atingido, os registros de origem são enviados para o Kafka.
Ao executar com a propriedadecopy.existing definida como true, considere estas propriedades adicionais:
- copy.existing.queue.size: a quantidade de registros que o Kafka Source Connector armazena internamente. Essa fila e seu tamanho incluem todos os namespaces a serem copiados pelo recurso "Copy Existing". Se essa fila estiver cheia, o Kafka Source Connector bloqueará até que haja espaço disponível.
- copy.existing.max.threads: a quantidade de threads simultâneas usadas para copiar os diferentes namespaces. Há um mapeamento de um namespace para um thread, portanto, é comum aumentar esse valor até o número máximo de namespaces que estão sendo copiados. Se o número exceder o número de núcleos disponíveis no sistema, os ganhos de desempenho poderão ser reduzidos.
- copy.existing.allow.disk.use: permite que a agregação copy existing use o armazenamento temporário em disco, se necessário. O padrão é definido como true, mas deve ser definido como false se o usuário não tiver as permissões de acesso ao disco.
Se você tiver problemas com a JVM “out of memory” no processo do Kafka Connect Worker, tente reduzir as duas propriedades a seguir que controlam a quantidade de dados armazenados internamente em buffer:
- poll.max.batch.size
- copy.existing.queue.size: aplicável se a propriedade "copy.existing " estiver definida como true.
É importante observar que reduzir esses valores pode resultar em um impacto indesejado. Recomenda-se ajustar o tamanho do heap da JVM às necessidades do ambiente, desde que você tenha recursos disponíveis e as necessidades de memória não sejam resultado de vazamentos de memória.
Quando o MongoDB Connector é configurado como um coletor, ele lê de um tópico Kafka e grava em uma coleção MongoDB.
Assim como na origem, existe um mecanismo para garantir que os offsets sejam armazenados no caso de uma falha no sink. O Kafka Connect gerencia isso e as informações são armazenadas no tópico __consumer_offsets. O MongoDB Connector tem propriedades de configuração que afetam o desempenho. São os seguintes:
- max.batch.size: o número máximo de registros de sink a serem agrupados para processamento. Um número maior resultará no envio de mais documentos como parte de um único comando em massa. O valor padrão é 0.
- rate.limiting.every.n: número de lotes processados que trigger o limite de taxa. Um valor de 0 significa que não há limitação de taxa. O valor padrão é 0. Na prática, essa configuração raramente é usada.
- rate.limiting.timeout: quanto tempo (em milissegundos) deve ser aguardado antes de continuar a processar os dados quando o limite de taxa for atingido. O valor padrão é 0. Essa configuração raramente é usada.
- tasks.max: o número máximo de tarefas. O valor padrão é 1.
As gravações realizadas pelo connector do coletor levam mais tempo para serem concluídas à medida que o tamanho da collection MongoDB subjacente aumenta. Para evitar a deterioração do desempenho, use um índice para dar suporte a essas consultas de gravação.
O Kafka Sink Connector (KSC) pode tirar proveito da execução paralela graças à propriedadetasks.max. O número especificado de tarefas só será criado se o tópico de origem tiver o mesmo número de partições. Observação: uma partição deve ser considerada como um grupo lógico de registros ordenados, e o produtor dos dados determina o que cada partição contém. Aqui está o detalhamento das diferentes combinações do número de partições no tópico de origem e dos valores tasks.max:
Se estiver trabalhando com mais de uma partição, mas uma tarefa:
- A tarefa processa as partições uma a uma: depois que um lote de uma partição é processado, ele passa para outro para que a ordem em cada partição ainda seja garantida.
- A ordem entre todas as partições não é garantida.
Se estiver trabalhando com mais de uma partição e um número igual de tarefas:
- Cada tarefa recebe uma partição e a ordem é garantida dentro de cada partição.
- A ordem entre todas as partições não é garantida.
Se estiver trabalhando com mais de uma partição e um número menor de tarefas:
- As tarefas atribuídas a mais de uma partição processam as partições uma por uma: depois que um lote de uma partição é processado, ele passa para outro para que a ordem em cada partição ainda seja garantida.
- A ordem entre todas as partições não é garantida.
Se estiver trabalhando com mais de uma partição e um número maior de tarefas:
- Cada tarefa recebe uma partição e a ordem é garantida dentro de cada partição.
- O KSC não gerará um número excessivo de tarefas.
- A ordem entre todas as partições não é garantida.
O processamento das partições pode não estar em ordem, o que significa que a Partição B pode ser processada antes da Partição A. Todas as mensagens dentro da partição conservam uma ordem estrita.
Observação: ao usar o MongoDB para gravar dados de CDC, a ordem dos dados é importante porque, por exemplo, você não deseja processar uma exclusão antes de uma atualização nos mesmos dados. Se você especificar mais de uma partição para os dados do CDC, correrá o risco de os dados estarem fora de ordem na coleção de coletores.
O Kafka Sink Connector (KSC) funciona emitindo operações de gravação em massa. Todas as operações em massa que o KSC executa são, por padrão, ordenadas e, como tal, a ordem das mensagens é garantida dentro de uma partição. Consulte Operações ordenadas versus não ordenadas para obter mais informações. Observação: a partir de 1.7, bulk.write.ordered, se definido como falso, processará o volume fora de ordem, permitindo que mais documentos dentro do lote sejam gravados no caso de falha de uma parte do lote.
A quantidade de operações enviadas em um único comando em massa pode ter um impacto direto no desempenho. Você pode modificar isso ajustando max.batch.size:
- Um número maior resultará em mais operações sendo enviadas como parte de um único comando em massa. Isso ajuda a melhorar a taxa de transferência às custas de alguma latência adicional. No entanto, um número muito grande pode resultar em pressão de cache no cluster de destino.
- Um pequeno número facilitará os possíveis problemas de pressão de cache, que podem ser úteis para clusters de destino com menos recursos. No entanto, a taxa de transferência diminui e você pode sentir um atraso do consumidor nos tópicos de origem, pois o produtor pode publicar mensagens no tópico mais rapidamente do que o KSC as processa.
- Este valor afeta o processamento dentro de cada uma das tarefas do KSC.
Caso o cluster do MongoDB de destino não seja capaz de lidar com uma taxa de transferência consistente, você poderá configurar um mecanismo de limitação. Você pode fazer isso com duas propriedades:
- rate.limiting.every.n: número de lotes processados que devem trigger o limite de taxa. Um valor de 0 significa que não há limitação de taxa.
- rate.limiting.timeout: quanto tempo (em milissegundos) esperar antes de continuar a processar dados quando o limite de taxa for atingido.
O resultado final é que, sempre que o KSC grava rate.limiting.every.n número de lotes, ele espera rate.limiting.timeout milissegundos antes de gravar o próximo lote. Isso permite que um cluster MongoDB de destino que não consegue lidar com uma taxa de transferência consistente se recupere antes de receber uma nova carga do KSC.