Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Saiba por que o MongoDB foi selecionado como um líder no 2024 Gartner_Magic Quadrupnt()
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Atlaschevron-right

Guia de configuração do Atlas Stream Processing para usuários do Kafka Connector

Robert Walters15 min read • Published Aug 23, 2024 • Updated Aug 23, 2024
KafkaProcessamento de streamAtlas
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
MongoDB O oferece duas interfaces principais — o conector MongoDB connector do Kafka (Kafka Connector) e Atlas Stream Processingo —para integrar seus dados operacionais MongoDB no com dados de streaming Apache Kafka do . Isso cria uma combinação poderosa ao criar aplicativos modernos orientados a eventos.
Esta postagem abordará:
  • Noções básicas de configuração do Atlas Stream Processing
    • Como se conectar a um tópico do Kafka e transmitir dados usando o Atlas Stream Processing
    • Como transformar dados com o Atlas Stream Processing
    • Como transmitir dados para o MongoDB Atlas
  • Uma análise detalhada do ecossistema Kafka
    • Componentes do núcleo
    • Parâmetros de configuração equivalentes do Kafka Connector disponíveis para você no Atlas Stream Processing
    • Limitações atuais do Atlas Stream Processing
Pré-requisitos:
  • Conta do MongoDB Atlas
  • Uma fonte Apache Kafka (neste exemplo, usaremos Redpanda)
  • An Atlas Database
Observação: este artigo será atualizado com frequência à medida que continuamos o desenvolvimento do Atlas Stream Processing.

Comparação do Kafka Connector e do Atlas Stream Processing

O Apache Kafka Connect é um componente de servidor opcional em um sistema do Apache Kafka. Esse componente facilita o streaming de dados entre Apache Kafka o e outros sistemas de dados, fornecendo conectores específicos da fonte de dados, como o MongoDB connector do para o Apache Kafka. Esses conectores podem se conectar às suas respectivas fontes de dados e ler/gravar dados em um tópico do Apache Kafka por meio da framework do Kafka Connect.
A implantação do Kafka Connect pode ser complicada e cara, pois envolve vários servidores com suporte a um modo distribuído que permite a escalabilidade e a disponibilidade dos trabalhadores de connector . Os fornecedores de nuvem facilitam a hospedagem dessa infraestrutura fornecendo o Kafka Connect como um serviço. Isso simplifica o gerenciamento de connector , mas também significa um alto custo por hora para executar a infraestrutura.
Em muitos cenários, com o MongoDB Atlas Stream Processing, você não precisa mais aproveitar o Kafka Connect ao mover dados de e para o Apache Kafka e MongoDB. O Atlas Stream Processing é um recurso do MongoDB Atlas que permite o processamento, a validação e a mesclagem contínuos de dados de streaming de fontes como o Apache Kafka e o MongoDB Change Streams. Atlas Stream Processing também simplifica a leitura e a gravação de dados Apache Kafka MongoDB entre Kafka o e MongoDB connector o Apache Kafka, sem precisar do Connect ou do para o .
Neste artigo, compararemos e contrastaremos os métodos para mover dados entre o Kafka e o MongoDB usando um fluxo de dados de exemplo. Começaremos com dados de origem no Apache Kafka e os processaremos usando uma função de janela. Então, vamos pousar os dados no MongoDB.
Primeiro, vamos usar o para o MongoDB connector Apache Kafka. Em seguida, definiremos as coisas no Atlas Stream Processing sem depender do Kafka Connect ou de qualquer infraestrutura extra, economizando custos de infraestrutura e tempo de gerenciamento.
Uma representação visual de um fluxo de dados no Atlas Stream Processing.

Transmissão de dados com MongoDB connector para Apache Kafka

Você precisará acessar um Apache Kafka tópico Apache Kafka do com o Connect e o MongoDB connector para o Apache Kafka instalados. Observação: se você não tiver acesso a um ambiente, poderá usar o Docker script de composição disponível no MongoDB Kafka Connector Github repositório do de origem do . Como os dados residem no Kafka tópico , definimos o MongoDB connector para Apache Kafka como um coletor com a seguinte definição:
1{
2 "name": "mongo-tutorial-sink",
3 "config": {
4 "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
5 "topics": "Stocks",
6 "connection.uri": "mongodb://mongo1",
7 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
8 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
9 "value.converter.schemas.enable": false,
10 "database": "Stocks",
11 "collection": "StockData",
12 "mongo.errors.tolerance":"data",
13 "errors.tolerance":"all",
14 "errors.deadletterqueue.topic.name":"mongoerror",
15 "errors.deadletterqueue.topic.replication.factor":1
16 }
17 }
Para transformar seus dados, você precisará usar outro serviço, como o Apache Flink, para transmitir a saída da função de janela para outro tópico. Configurar isso e configurar a infraestrutura necessária está fora do escopo deste artigo. No entanto, você pode imaginar que há muitas partes móveis para fazer isso funcionar.
Em seguida, vamos dar uma olhada no Processamento de Stream do MongoDB Atlas Stream Processing.

Transmitindo dados com o Atlas Stream Processing

Esta configuração é intencionalmente de alto nível. Para obter um tutorial passo a passo que inclui detalhes adicionais, como configuração de segurança, consulte o tutorialIntrodução ao Atlas Stream Processing em nossa documentação.

Etapa 1: Criar a instância do Atlas Stream Processing

Usando a interface do usuário do MongoDB Atlas, faça login no seu projeto, clique em "Stream Processing " no menu de serviços e clique no botão "Create Instance ". Forneça um nome, "KafkaTestSPI, " e clique no botão "Create ".
Observação: para este passo a passo, você pode escolher qualquer camada disponível do Atlas Stream Processing Instance (SPI). A cobrança começa quando você executa trabalhos de processador de fluxo, não ao criar SPIs. Consulte a documentação para obter mais informações.

Etapa 2: Crie uma conexão com um serviço Kafka hospedado

Depois de criar um SPI, clique em "Configure " e depois em "Connection Registry. ". Aqui, você definirá as conexões que o Atlas Stream Processing usará, como conexões com corretors Kafka ou outros clusters Atlas dentro de seu projeto Atlas.
A visualização da Instância de Processamento de Fluxo do Atlas no Atlas.
Em seguida, clique em "Create Connection " e selecione "Kafka. "
Conectando-se a uma fonte de dados Apache Kafka a partir da interface Atlas Stream Processing no Atlas.
Atlas Stream Processing usa o Kafka Wire Protocol e pode se conectar a qualquer serviço compatível com Kafka , como Confluent Cloud, Redpanda Cloud, Amazon Web Services MSK ou Azure Event Hubs. Aceitamos configurações de segurança SASL_PLAINText e SASL_SSL.

Etapa 3: Crie uma conexão com um cluster MongoDB Atlas

Depois de adicionar a conexão ao Kafka, crie outra conexão e selecione "Atlas Database. " Dê um nome à conexão e selecione qualquer cluster do Atlas disponível em seu projeto atual.
Conectando-se a um banco de dados Atlas a partir da interface Atlas Stream Processing no Atlas.
Agora que definimos uma conexão com nossa implantação do Kafka e uma conexão com nosso Atlas cluster, podemos nos conectar à instância de Processamento de Stream do Atlas e criar processadores de fluxo.
Ao contrário do MongoDB connector para Apache Kafka, nenhum arquivo de configuração define a origem e o coletor especificamente. Atlas Stream Processing usa conexões de registro de conexões para ambos os casos. O comportamento depende se a conexão é usada em um operador de pipeline Atlas Stream Processing $source ou$merge . Se a conexão for usada como $source, ela lerá os dados da conexão para o pipeline. Se o estiver gravando dados na conexão, ele usará $emit ou $merge dependendo se estiver gravando no Kafka ou MongoDB, respectivamente. Para demonstrar isso, vamos continuar o exemplo, criando um processador de fluxo que lerá dados de ($source) nosso Kafka tópico hospedado no e gravará dados ($merge) em nosso Atlas cluster.

Etapa 4: Criar o processador do Stream

Use o botão Conectar para se conectar à SPI "KafkaTestSPI " que criamos anteriormente. Isso iniciará uma caixa de diálogo de conexão familiar, como mostrado abaixo.
A caixa de diálogo de conexão para se conectar a um SPI.
Usando uma conta de usuário de banco de dados, use o MongoDB Shell (mongosh) para se conectar ao SPI. Uma vez conectado, podemos construir o processador de fluxo definindo variáveis para cada estágio do pipeline e, por fim, combinando-as quando estivermos prontos para criar o processador de fluxo. Nosso tópico Kafka será preenchido com dados de estoque fictícios na forma do seguinte valor:
1{
2"exchange":"NASDAQ"
3"company_symbol":"AHL"
4"company_name":"AMUSED HAWK LABS"
5"price":8.41
6"tx_time":"2024-04-03T14:51:40Z"
7}
Vamos criar a variável $source para nossa conexão HostedKafka.
1s_kafka = { $source: { connectionName: "HostedKafka", "topic": "Stocks" } }
Em seguida, vamos criar uma janela em cascata a cada cinco segundos de dados.
1tw = { $tumblingWindow: { interval: { size: 5, unit: "second" }, pipeline: [{ $group: { _id: "$company_symbol", max: { $max: "$price" }, avg: { $avg: "$price" } } }] } }
Observação: para ter uma ideia de como seria o fluxo de dados, podemos usar o arquivo .process comando, passando por cada estágio do pipeline e exibindo os resultados no console.
1sp.process([s_kafka,tw])
Em seguida, vamos escrever esse fluxo de dados no Atlas cluster que definimos no registro de conexão.
1m={$merge: { into: { connectionName: "AtlasSource",db: "Stock",coll: "StockReport" }}}
Por fim, vamos criar o processador de stream usando o comando createStreamProcessor.
1sp.createStreamProcessor(‘mytestsp’,[s_kafka,tw,m])
Em seguida, iniciaremos o processador de fluxo emitindo o arquivo .start comando.
1mytestsp.start()
Você pode visualizar estatísticas sobre seu processador de stream usando o .stats comando.
1mytestsp.stats()
Aqui está um exemplo do resultado de retorno:
1{
2 ok: 1,
3 ns: '66744a49d8619c0695a7e5e9.5e98cd46cd7309236ceaab3c.testsp',
4 stats: {
5 name: 'mytestsp',
6 processorId: '66780d901f7c0b19f89f559f',
7 status: 'running',
8 scaleFactor: Long('1'),
9 inputMessageCount: Long('633972'),
10 inputMessageSize: 280913520,
11 outputMessageCount: Long('296963'),
12 outputMessageSize: 475437763,
13 dlqMessageCount: Long('0'),
14 dlqMessageSize: 0,
15 stateSize: 3328,
16 memoryTrackerBytes: 33554432,
17 watermark: ISODate('2024-08-07T20:49:43.047Z'),
18 ok: 1
19 },
20 pipeline: [
21 {
22 '$source': {
23 connectionName: ‘HostedKafka’,
24 Topic: ‘Stocks’
25 }
26 },
27 {
28 '$tumblingWindow': {
29 interval: { size: 5, unit: 'second' },
30 pipeline: [
31 {
32 '$group': {
33 _id: [Object],
34 max: [Object],
35 avg: [Object]
36 }
37 }
38 ]
39 }
40 },
41 {
42 '$merge': {
43 into: {
44 connectionName: 'AtlasSource',
45 db: 'Stock',
46 coll: 'StockReport'
47 }
48 }
49 }
50 ]
51}
É isso para mover dados do Kafka para o MongoDB! Mover dados do MongoDB para o Kafka é igualmente simples. Basta definir um $source para seu cluster do Atlas e usar $emit para sua conexão do Kafka para transmitir rapidamente dados sem nenhum sistema ou configuração complexa do Kafka Connect e reduzir os custos.
Em seguida, abordaremos considerações detalhadas e opções disponíveis para você em configurações mais avançadas do Kafka. Vamos começar com algum contexto sobre o ecossistema Apache Kafka.

Componentes do ecossistema Apache Kafka

Esta seção destaca alguns dos principais componentes do Kafka a serem lembrados ao configurar uma integração entre o Kafka e o MongoDB, mas não se destina a fornecer uma lista abrangente de componentes. Incluímos componentes mais relevantes nos casos de uso do Atlas Stream Processing, mas consultar uma lista completa está além do escopo deste artigo. Consulte a documentação doKafka e/ou Kafka Connect para qualquer coisa que não abordamos.

Registro de Esquema

O Registro de Esquema é um componente opcional do Apache Kafka que fornece um repositório centralizado para gerenciar e validar esquemas usados no processamento e serialização de dados. Como tudo no Apache Kafka é binário, os desenvolvedores usam o registro de esquema para garantir a conformidade da estrutura de dados, validação de dados, verificação de compatibilidade, controle de versão e evolução, todos os quais são úteis para aplicativos downstream. O Atlas Stream Processing não interage com o Schema Registry no momento em que este artigo foi escrito. Informe-nos por meio doUserVoice se você tiver uma grande necessidade disso.

Kafka Connect

O Kafka Connect oferece um método para integrar dados entre o Kafka e sistemas externos, como o MongoDB. O MongoDB connector para Apache Kafka é construído sobre este método de integração. Com o Atlas Stream Processing, não é necessário usar o Kafka Connect para mover dados entre o Kafka e o MongoDB.

Conversores de chave e valor

No momento em que este artigo foi escrito, o Atlas Stream Processing aceitava apenas o formato de dados JSON. Quando formatos adicionais estiverem disponíveis, este artigo será atualizado adequadamente. Se você tiver uma necessidade urgente, informe-nos por meio da UserVoice.

Tolerância ao erro

O Kafka Connector tem configurações para determinar como lidar com erros. Parâmetros de configuração, como mongo.errors.tolerance e errors.tolerance, podem ser usados junto com seus parâmetros de configuração dead letter queue (DLQ) correspondentes errors.deadletterqueue.topic.name e errors.deadletterqueue.topic.replication.factor. Isso dá aos desenvolvedores a capacidade de interromper o processamento quando ocorre um erro, tolerar erros sem interromper o processamento de mensagens ou escrever erros em um DLQ (um tópico do Kafka ou uma collection do Atlas , dependendo se é um connector de origem ou coletor). Da mesma forma, o Atlas Stream Processing suporta erros de gravação em um DLQ utilizando uma coleção MongoDB Atlas . Para fazer isso, os usuários simplesmente usam o estágio$validate para enviar eventos que não estão em conformidade com um esquema específico para o DLQ.

Transformações de mensagem única (SMTs)

Os SMTs fazem parte do Kafka Connect e transformam as mensagens de entrada depois que um connector de origem as produz e antes de serem gravadas no Kafka. Os SMTs também são usados no coletor, transformando as mensagens de saída antes de enviá-las ao Kafka Connector. Em vez de usar SMTs, o Atlas Stream Processing usa a API de query do MongoDB e a estrutura de agregação para transformar documentos. Ao configurar um processador de stream, observe que o Atlas Stream Processing ainda não oferece suporte a funções personalizadas para transformar mensagens. A tabela abaixo descreve o operador de pipeline de agregação equivalente ou procedimento a ser usado para cada uma das SMTs integradas.
transformação, transformaçãoDescriçãoSuporte ao Atlas Stream Processing
TransmitirConverter campos ou toda a chave ou valor para um tipo específicoSuporte em breve via $convert.
descartarSolte uma chave ou um valor de um registro e defina-o como nulo.Use o operador$set :
{ $set: { _stream.meta.key: null } },
DropHeadersSolte um ou mais cabeçalhos de cada registro.Use o operador$pull :
{ $pull: { _stream.meta.headers: { k: 'header2' } } }
EventRouterDirecione eventos de saída do Debezium usando uma opção de configuração regex.Não suportado.
Extrair campoExtrair o campo especificado de uma estrutura quando o esquema estiver presente ou de um mapa no caso de dados sem esquema. Quaisquer valores nulos são passados sem modificações.O Atlas Stream Processing pode extrair apenas o valor de um campo usando $project, mas ele ainda estará no formato JSON com um nome de campo.
Extrair tópicoSubstitua o tópico do registro por um novo tópico derivado de sua chave ou valor.Suporte limitado. Você pode atribuir um tópico ao escrever de volta no Kafka por meio do operador de pipeline$emit . O $emit permite um nome de tópico explícito, uma referência a um valor no documento ou uma expressão.
Filter (Apache Kafka)Solte todos os registros. Projetado para ser usado em conjunto com um predicado.Use $match e $project para filtrar valores do fluxo.
AchatarNivelar uma estrutura de dados aninhada. Isso gera nomes para cada campo concatenando os nomes de campo em cada nível com um caractere delimitador configurável.Use $addFields com referências ao campo aninhado que você deseja nivelar. Se você deseja usar um delimitador como um período, basta concatenar usando $concat da seguinte forma:
{ $addFields: { fullName: { $concat: ["$name", ".", "$lastname"] } } }
GzipDecompressDescompacte com Gzip toda a entrada de valor ou chave byteArray.Não suportado.
HeaderFromMove ou copia campos em uma chave ou valor de registro para o cabeçalho do registro.Use $emit.config.headers para especificar o cabeçalho Kafka. Observação: você pode fazer referência a chaves ou valores de registro usando o símbolo$ . Isso é usado para indicar caminhos de campo e expressões dentro de estágios e consultas do pipeline de agregação. Exemplo: $emit.config.headers = "$foo"
HoistFieldEnvolve os dados usando o nome do campo especificado em uma estrutura quando o esquema estiver presente ou em um mapa no caso de dados sem esquema.Atualmente, os dados de eventos devem estar no formato JSON.
Inserir campoInsira o campo usando atributos dos metadados do registro ou um valor estático configurado.Utilize $addFields ou $project para criar um novo campo.
InsertHeaderInsira um valor literal como cabeçalho de registro.Use $emit.config.headers. O valor deve resultar em um documento ou array estruturado como [{k': .., 'v': …}, {'k': …, 'v': …}].
MaskFieldMascarar campos especificados com um valor nulo válido para o tipo de campo.Use $set para fornecer um valor nulo ou qualquer valor arbitrário para um campo.
messageTimeStampRouterAtualize o campo de tópico do registro como uma função do valor do tópico original e o campo de carimbo de data/hora do registro.Não suportado.
RegexRouterAtualize o tópico do registro usando a expressão regular configurada e a string de substituição.Você pode modificar os nomes dos tópicos especificando o nome do tópico por string ou regex no operador$emit .
Substituir campoFiltre ou renomeie campos.Para soltar um campo, utilize $unset ou $project. Para renomear um campo, use $project.
SetSchemaMetadataDefina o nome do esquema, a versão ou ambos no esquema de chave ou valor do registro.Não suportado.
TimestampConverterConverta carimbos de data/hora entre diferentes formatos, como época do Unix, cadeias de caracteres e tipos de Connect Date e Timestamp.Converta data e hora usando operadores de agregação $dateFromString e Timestamp.
TimestampRouterAtualize o campo de tópico do registro como uma função do valor original do tópico e o carimbo de data/hora do registro.Não suportado.
TopicRegexRouterAtualize o tópico do registro usando a expressão regular configurada e a string de substituição.Você pode modificar os nomes dos tópicos especificando o nome do tópico por string ou regex no operador$emit .
ValueToKeySubstitua a chave de registro por uma nova chave formada a partir de um subconjunto de campos no valor de registro.Especifique a chave atribuindo-a em $emit. Por exemplo: $emit.config.key: "$foo"
Em seguida, vamos resumir brevemente as propriedades de origem e coletor, pós-processadores e estratégias de modelo de escrita.

Propriedades de origem

Ao configurar o Kafka Connector como uma origem, ele lê dados de um cluster MongoDB e os grava em um tópico Apache Kafka . Semelhante ao Kafka Connector, ao usar o Atlas Stream Processing, os desenvolvedores primeiro definem uma conexão de registro de conexão com um MongoDB Atlas cluster e outra conexão de registro de conexão com seu tópico Kafka . Em seguida, eles usam $source para ler os dados do cluster MongoDB e executar quaisquer transformações ou agregações opcionais nos dados, antes de usar $emit para gravar os dados em seu tópico Kafka .

Propriedades do coletor

Usando o Kafka Connector como coletor, os desenvolvedores definem especificamente a conexão como um coletor usando a classe de connector com.mongodb.kafka.connect.MongoSinkConnector. Com o Atlas Stream Processing, você define uma origem no registro de conexão para uso em fontes e sinks. Ao escrever em um tópico do Kafka “sink,” você usa $emite ao escrever em um MongoDB “sink," você usa $merge.

Pós-processadores

Os pós-processadores são usados no Kafka Connector para modificar o documento antes de gravá-lo em um coletor de cluster MongoDB . No Atlas Stream Processing, você pode modificar o documento usando os estágios do pipeline de agregação do MongoDB antes de gravá-lo em um MongoDB Atlas. Não é necessário escrever seu próprio código Java para realizar transformações específicas da empresa.
Por exemplo, suponha que o seguinte evento esteja sobre o tópico Kafka:
1{
2 device_id: 'device_0',
3 group_id: 8,
4 timestamp: '2024-04-04T14:49:52.943+00:00',
5 max_watts: 250,
6 event_type: 0,
7 obs: {
8 watts: 75,
9 temp: 5
10 },
11 _ts: ISODate('2024-04-04T14:49:52.943Z')
12}
Quando você usa o projetor de lista de permissões no Kafka Connector, o pós-processador só gera dados dos campos especificados.
1post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
2value.projection.type=AllowList
3value.projection.list=device_id,obs.watts,obs.temp
Neste exemplo, a saída seria a seguinte:
1 {
2 device_id: 'device_0',
3 obs: {
4 watts: 75,
5 temp: 5
6 }
7}
Usando o Atlas Stream Processing, basta adicionar um estágio $project em sua query para incluir/excluir campos.
1{$project:{"device_id":1,"obs.watts":1, "obs.temp":1}}
A saída seria a seguinte:
1 {
2 device_id: 'device_1',
3 obs: {
4 Watts: 50,
5 temp: 10
6 }
7}
O Kafka Connector contém pós-processadores e processadores integrados que determinam como lidar com o campo _id no MongoDB de destino. _id é determinado pelo pós-processador DocumentIdAddr.

DocumentIdAddr

com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Este pós-processador insere um campo _id determinado pela estratégia configurada. A estratégia padrão é BsonOidStrategy. Por padrão, um _id será adicionado ao documento se ainda não existir um.
Estratégias do DocumentIdAddr
As estratégias DocumentId são definidas em document.id.strategy. Estas são as configurações possíveis e o equivalente do Atlas Stream Processing. Observe que, com o Atlas Stream Processing, para adicionar ou alterar o campo _id, basta usar o operador de pipeline $addFields para adicionar um novo campo "_id " com o valor especificado na tabela abaixo.
Nome da estratégiaDescriçãoEquivalente Atlas Stream Processing
Estratégia BSONoid
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
PadrãoUm campo_idespecífico não precisa existir no documento, pois um novo _id com um ObjectId será incluído por padrão.
KafkaMetaDataStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Constrói uma string composta pela concatenação de tópico, partição e deslocamento do Kafka.{ $addFields: { "_id": "$_stream_meta.source.topic"}}
FullKeyStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Utiliza a estrutura de chave completa para gerar o valor para o campo_id.{ $addFields: { "_id": "$_stream_meta.source.key"}}
FornecidoInKeyEstratégia
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Utiliza o campo_id especificado na estrutura de chave.{ $addFields: { "_id": "$_stream_meta.source.key._id"}}
ProvidedInValueStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Usa o campo_id especificado na estrutura de valores do documento coletor.Não aplicável, pois quando os dados do Kafka são originados no Atlas Stream Processing, o valor é o próprio documento. Assim, seria utilizadoum_iddentro do documento.
PartialKeyStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Usa uma lista de bloqueios ou permite a projeção da lista da estrutura da chave do documento coletor.Para usar um nome de campo diferente para o _id que faz referência originalmente à chave, use $addFields da seguinte forma:
{ $addFields: { "_id": "$_stream_meta.source.key.$your_desired_id_field"}}
PartialValueStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Usa uma lista de bloqueios ou uma projeção de lista de permissões da estrutura de valor do documento coletor.Para usar um nome de campo diferente para o _id, use $addFields da seguinte forma:
{ $addFields: { "_id": "$your_desired_id_field"}}
UuidProvidedInKeyStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidProvidedInKeyStrategy
Converte o campo-chave_id em um UUID. O valor deve ser do tipo string ou binário e deve estar em conformidade com o formato UUID.Não suportado.
UuidProvidedInValueStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidProvidedInValueStrategy
Converte o campo de valor_id para um UUID. O valor deve ser do tipo string ou binário e deve estar em conformidade com o formato UUID.Não suportado.
UuidStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
Utiliza um UUID gerado aleatoriamente no formato de string.Não suportado.
Estratégias de ID personalizadas definidas pelo usuário não são aceitas.

Pós-processadores integrados

Veja a seguir uma lista de pós-processadores integrados do Kafka Connector e seus equivalentes do Atlas Stream Processing:
Nome do pós-processadorDescriçãoEquivalente Atlas Stream Processing
BlockListKeyProjetor
com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Remove campos-chave correspondentes do registro do coletor.Modifique o $_stream_meta.source.key diretamente utilizando $set.
BlockListValueProjector
com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Remove os campos de valores correspondentes do registro do coletor.Use $project para remover quaisquer campos do documento.
AllowListKeyProjetor
com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Inclui somente os campos-chave correspondentes do registro do coletor.Modifique o $_stream_meta.source.key diretamente utilizando $set.
AllowListValueProjetor
com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
Inclui apenas campos de valor correspondentes do registro do coletor.Use $project para mostrar quaisquer campos do documento.
KafkaMetaAdder
com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adiciona um campo denominado "topic-partition-offset" e define o valor como a concatenação do tópico, da partição e do deslocamento do Kafka no documento.Use $set para adicionar o campo:
{ $set: { "topic-partition-offset": { $concat: ["$_stream_meta.source.topic", "#", "$_stream_meta.source.partition", "#", "$_stream_meta.source.offset"] } } }
RenameByMapping
com.mongodb.kafka.connect.sink.processor.field.reaming.RenameByMapping
Renomeia campos que correspondem exatamente a um nome de campo especificado na chave ou no documento de valor.Use $project para renomear campos. Faça referência aos nomes dos campos usando o operador $.
RenameByRegex
com.mongodb.kafka.connect.sink.processor.field.reaming.RenameByRegex
Renomeia campos que correspondem a uma expressão regular na chave ou no documento de valor.Use $project para renomear campos.

Gravar estratégias de modelo

Uma estratégia de modelo de gravação no MongoDB Kafka Connector é uma classe Java personalizada que define como um connector de pia grava dados usando modelos de gravação. Há três estratégias disponíveis prontas para uso com o Kafka Connector: ReplaceOne BusinessKeyStrategy, DeleteOne BusinessKeyStrategy e UpdateOne BusinessKeyTimestampStrategy. Os detalhes do uso dessas estratégias são os seguintes:
Estratégia de escritaDescriçãoEquivalente Atlas Stream Processing
UpdateOne BusinessKeyTimestampAdiciona campos_insertedTS e _modifiedTS ao coletor.Use $set para adicionar uma data a _insertedTS ou _modifiedTS
Exemplo:
{ $set: { _modifiedTS: new Date() } }
Excluir uma estratégia de chave de negóciosExclui o documento com base em um valor no evento.Não suportado.
Substituir uma estratégia-chave de negócioSubstitui documentos que correspondem ao valor da chave de negócios.Não suportado.

Resumo

Neste artigo, abordamos tudo, desde uma comparação do Kafka Connector com o Atlas Stream Processing até os fundamentos da configuração do Atlas Stream Processing usando uma fonte Kafka (neste caso, Redpanda), fechando com algumas das nuances e detalhes para manter em mente ao integrar Apache Kafka e o MongoDB. O Apache Kafka e o ecossistema ao seu redor oferecem muito, por isso esperam que este artigo o ajude a implementar a solução que funciona melhor para seu caso de uso e necessidades. O Atlas Stream Processing é um serviço totalmente gerenciado integrado ao MongoDB Atlas que criamos para priorizar a facilidade de uso, o custo e o desempenho ao processar dados entre o Kafka e o MongoDB. Embora não tenha a intenção de substituir diretamente o Kafka Connector, o Atlas Stream Processing está se mostrando uma alternativa valiosa para os clientes, e estamos investindo fortemente em mais recursos e funcionalidades para dar suporte aos seus volumes de trabalho.
Pronto para começar? Faça login hoje mesmo! Não consegue encontrar um parâmetro de configuração ou recurso de que você precisa? Informe-nos no UserVoice.
Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.
Iniciar a conversa

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Evento do setor
localizaçãoGENEVA, SWITZERLAND | IN-PERSON

Developer Day Geneva 25


Jan 16, 2025 | 8:00 AM - 5:00 PM UTC
Artigo

Mapa do Coronavírus e rastreador de dados ao vivo com o MongoDB Charts


Nov 15, 2023 | 3 min read
Artigo

Integração do Atlas Data Lake SQL para formar interações de dados poderosas


Jun 12, 2023 | 3 min read
Artigo

AI Shop: o poder da LangChain, OpenAI e MongoDB Atlas trabalhando juntos


Sep 18, 2024 | 7 min read
Sumário