Menu Docs
Página inicial do Docs
/
MongoDB Atlas
/ /

$source

Nesta página

  • Definição
  • Sintaxe
  • Corretora Apache Kafka
  • Fluxo de alterações da coleção do MongoDB
  • change stream do MongoDB database
  • Array do documento
  • Comportamento
$source

O estágio $source especifica uma conexão no Registro de conexão para transmitir dados. Os seguintes tipos de conexão são suportados:

Observação

Você não pode utilizar instâncias sem servidor do Atlas como um $source.

Para operar na transmissão de dados de um Apache Kafka corretor, o $source estágio tem o seguinte formato de protótipo:

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : "<source-topic>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

O estágio $source recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição
connectionName
string
Obrigatório
Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados.
topic
string
Obrigatório
Nome do Apache Kafka tópico do qual transmitir mensagens.
timeField
documento
Opcional

documento que define um carimbo de data/hora oficial para mensagens recebidas.

Se você usar timeField, deverá defini-lo como um dos seguintes:

  • uma expressão $toDate que usa um campo de mensagem de origem como argumento

  • uma expressão $dateFromString que usa um campo de mensagem de origem como argumento.

Se você não declarar um timeField, o Atlas Stream Processing criará um carimbo de data/hora a partir do carimbo de data/hora da mensagem fornecido pela origem.

tsFieldName
string
Opcional

Nome que substitui o nome do campo de carimbo de data/hora projetado pelo $source.

O estágio $source em um pipeline do Atlas Stream Processing projeta um campo chamado _ts com o carimbo de data/hora atribuído do documento. Fontes de dados de streaming também podem usar um campo chamado _ts para armazenar os registros de data/hora de cada mensagem. Para evitar um conflito entre esses campos, use tsFieldName para renomear qualquer campo fornecido pela fonte chamado _ts antes que ocorra processamento adicional.

partitionIdleTimeout
documento
Opcional
documento que especifica a quantidade de tempo que uma partição pode ficar ociosa antes de ser ignorada nos cálculos de marca d'agua.
partitionIdleTimeout.size
inteiro
Opcional
Número que especifica a duração do tempo limite de inatividade da partição.
partitionIdleTimeout.unit
string
Opcional

Unidade de tempo para a duração do tempo limite de inatividade da partição.

O valor de unit pode ser um dos seguintes:

  • "ms" (milésimo de segundo)

  • "second"

  • "minute"

  • "hour"

  • "day"

config
documento
Opcional
documento que contém campo que substituem vários valores padrão.
config.auto_offset_reset
string
Opcional

Especifica qual evento no Apache Kafka tópico de origem pelo qual iniciar a ingestão. auto_offset_reset usa os seguintes valores:

  • end, latest ou largest : para iniciar a ingestão a partir do evento mais recente no tópico no momento em que a agregação é inicializada.

  • earliest, beginning ou smallest : para iniciar a ingestão a partir do evento mais antigo no tópico.

Padrão é latest.

config.group_id
string
Opcional

ID do grupo de consumidores Kafka a ser associado ao processador de stream. Se omitido, o Atlas Stream Processing associa a instância de processamento de stream a um ID gerado automaticamente no seguinte formato:

asp-${streamProcessorId}-consumer

O Atlas Stream Processing confirma deslocamentos de partição para o Apache Kafka corretor para o ID do grupo de consumidores especificado após a confirmação de um checkpoint. Ele confirma um deslocamento quando as mensagens acima desse deslocamento são registradas de forma duradoura em um ponto de verificação. Isso permite que você acompanhe o atraso de compensação e o progresso do processador de fluxo diretamente dos metadados do grupo de consumidores do corretor Kafka.

config.keyFormat
string
Opcional

Tipo de dados usado para desserializar o Apache Kafka dados principais. Deve ser um dos seguintes valores:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Padrão é binData.

config.keyFormatError
string
Opcional

Como lidar com erros encontrados ao desserializar o Apache Kafka dados principais. Deve ser um dos seguintes valores:

  • dlq, que escreve o documento na sua dead letter queue (DLQ).

  • passThrough, que envia o documento para o próximo estágio sem dados principais.

Observação

O Atlas Stream Processing exige que os documentos no stream de dados de origem sejam json ou ejson válidos. O Atlas Stream Processing define os documentos que não atendem a esse requisito para sua fila de letras mortas , se você tiver configurado uma.

Para operar na transmissão de dados de um change stream do Atlas, o estágio $source tem o seguinte formato de protótipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

O estágio $source recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição
connectionName
string
Condicional
Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados.
timeField
documento
Opcional

documento que define um carimbo de data/hora oficial para mensagens recebidas.

Se você usar timeField, deverá defini-lo como um dos seguintes:

  • uma expressão $toDate que usa um campo de mensagem de origem como argumento

  • uma expressão $dateFromString que usa um campo de mensagem de origem como argumento.

Se você não declarar um timeField, o Atlas Stream Processing criará um carimbo de data/hora a partir do carimbo de data/hora da mensagem fornecido pela origem.

tsFieldName
string
Opcional

Nome que substitui o nome dos campos padrão de carimbo de data/hora declarados pela origem.

Os pipelines do Atlas Stream Processing adicionam internamente um campo às mensagens recebidas chamadas _ts para armazenar informações de checkpoint. Fontes de dados de streaming também podem usar um campo chamado _ts para armazenar os carimbos de data/hora de cada mensagem. Para evitar um conflito entre esses campos, use tsFieldName para renomear qualquer campo fornecido pela origem chamado _ts antes que ocorra um processamento adicional.

db
string
Obrigatório
Nome de um banco de dados MongoDB hospedado na instância do Atlas especificado por connectionName. O change stream desse banco de dados atua como a fonte de dados de streaming.
coll
cadeia de caracteres ou matriz de cadeias de caracteres
Obrigatório
Nome de uma ou mais collections MongoDB hospedadas na instância do Atlas especificada por connectionName. O change stream dessas collections atua como a fonte de dados de streaming. Se você omitir este campo, seu processador de stream obterá a partir de um MongoDB Database Change Stream.
config
documento
Opcional
documento que contém campo que substituem vários valores padrão.
config.startAfter
símbolo
Condicional

O evento de mudança após o qual a fonte começa a reportar. Isso assume a forma de um token de currículo.

Você pode usar apenas um entre config.startAfter ou config.StartAtOperationTime.

config.startAtOperationTime
timestamp
Condicional

O tempo de operação após o qual a fonte deve começar a relatar.

Você pode usar apenas um entre config.startAfter ou config.StartAtOperationTime.

config.fullDocument
string
Condicional

Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:

  • updateLookup : retorna apenas as alterações na atualização.

  • required : Deve retornar um documento completo. Se um documento completo não estiver disponível, não retornará nada.

  • whenAvailable : retorna um documento completo sempre que houver um disponível, caso contrário, retorna alterações.

Se você não especificar um valor para fullDocument, o padrão será updateLookup.

Para usar esse campo com um fluxo de alteração de coleção, você deve ativar a alteração do fluxo de pré e pós-imagens nessa coleção.

config.fullDocumentOnly
booleano
Condicional

Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de fullDocument. Se definido como true, a origem retornará somente o conteúdo de fullDocument.

Para usar esse campo com um fluxo de alteração de coleção, você deve ativar a alteração do fluxo de pré e pós-imagens nessa coleção.

config.fullDocumentBeforeChange
string
Opcional

Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:

  • off : omite o campo fullDocumentBeforeChange .

  • required : deve retornar um documento completo em seu estado anterior às alterações. Se um documento completo em seu estado anterior às alterações não estiver disponível, o processador de fluxo falhará.

  • whenAvailable : retorna um documento completo em seu estado anterior às alterações sempre que um estiver disponível, caso contrário, omite o campo fullDocumentBeforeChange .

Se você não especificar um valor para fullDocumentBeforeChange, o padrão será off.

Para usar esse campo com um fluxo de alteração de coleção, você deve ativar a alteração do fluxo de pré e pós-imagens nessa coleção.

config.pipeline
documento
Opcional
Especifica um aggregation pipeline para filtrar a saída do change stream no ponto de origem. Esse pipeline deve estar em conformidade com os parâmetros descritos em change-stream-modify-output.

Para operar na transmissão de dados de um fluxo de alteração de banco de dados do Atlas, o estágio $source tem o seguinte formato de protótipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

O estágio $source recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição
connectionName
string
Condicional
Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados.
timeField
documento
Opcional

documento que define um carimbo de data/hora oficial para mensagens recebidas.

Se você usar timeField, deverá defini-lo como um dos seguintes:

  • uma expressão $toDate que usa um campo de mensagem de origem como argumento

  • uma expressão $dateFromString que usa um campo de mensagem de origem como argumento.

Se você não declarar um timeField, o Atlas Stream Processing criará um carimbo de data/hora a partir do carimbo de data/hora da mensagem fornecido pela origem.

tsFieldName
string
Opcional

Nome que substitui o nome dos campos padrão de carimbo de data/hora declarados pela origem.

Os pipelines do Atlas Stream Processing adicionam internamente um campo às mensagens recebidas chamadas _ts para armazenar informações de checkpoint. Fontes de dados de streaming também podem usar um campo chamado _ts para armazenar os carimbos de data/hora de cada mensagem. Para evitar um conflito entre esses campos, use tsFieldName para renomear qualquer campo fornecido pela origem chamado _ts antes que ocorra um processamento adicional.

db
string
Obrigatório
Nome de um banco de dados MongoDB hospedado na instância do Atlas especificado por connectionName. O change stream desse banco de dados atua como a fonte de dados de streaming.
config
documento
Opcional
documento que contém campo que substituem vários valores padrão.
config.startAfter
símbolo
Condicional

O evento de mudança após o qual a fonte começa a reportar. Isso assume a forma de um token de currículo.

Você pode usar apenas um entre config.startAfter ou config.StartAtOperationTime.

config.startAtOperationTime
timestamp
Condicional

O tempo de operação após o qual a fonte deve começar a relatar.

Você pode usar apenas um entre config.startAfter ou config.StartAtOperationTime.

config.fullDocument
string
Condicional

Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:

  • updateLookup : retorna apenas as alterações na atualização.

  • required : Deve retornar um documento completo. Se um documento completo não estiver disponível, não retornará nada.

  • whenAvailable : retorna um documento completo sempre que houver um disponível, caso contrário, retorna alterações.

Se você não especificar um valor para fullDocument, o padrão será updateLookup.

Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados.

config.fullDocumentOnly
booleano
Condicional

Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de fullDocument. Se definido como true, a origem retornará somente o conteúdo de fullDocument.

Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados.

config.fullDocumentBeforeChange
string
Opcional

Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:

  • off : omite o campo fullDocumentBeforeChange .

  • required : deve retornar um documento completo em seu estado anterior às alterações. Se um documento completo em seu estado anterior às alterações não estiver disponível, o processador de fluxo falhará.

  • whenAvailable : retorna um documento completo em seu estado anterior às alterações sempre que um estiver disponível, caso contrário, omite o campo fullDocumentBeforeChange .

Se você não especificar um valor para fullDocumentBeforeChange, o padrão será off.

Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados.

config.pipeline
documento
Opcional
Especifica um aggregation pipeline para filtrar a saída do change stream no ponto de origem. Esse pipeline deve estar em conformidade com os parâmetros descritos em change-stream-modify-output.

Para operar em uma array de documentos, o estágio $source tem o seguinte formato de protótipo:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"documents" : [{source-doc},...] | <expression>
}
}

O estágio $source recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição
timeField
documento
Opcional

documento que define um carimbo de data/hora oficial para mensagens recebidas.

Se você usar timeField, deverá defini-lo como um dos seguintes:

  • uma expressão $toDate que usa um campo de mensagem de origem como argumento

  • uma expressão $dateFromString que usa um campo de mensagem de origem como argumento.

Se você não declarar um timeField, o Atlas Stream Processing criará um carimbo de data/hora a partir do carimbo de data/hora da mensagem fornecido pela origem.

tsFieldName
string
Opcional

Nome que substitui o nome dos campos padrão de carimbo de data/hora declarados pela origem.

Os pipelines do Atlas Stream Processing adicionam internamente um campo às mensagens recebidas chamadas _ts para armazenar informações de checkpoint. Fontes de dados de streaming também podem usar um campo chamado _ts para armazenar os carimbos de data/hora de cada mensagem. Para evitar um conflito entre esses campos, use tsFieldName para renomear qualquer campo fornecido pela origem chamado _ts antes que ocorra um processamento adicional.

documents
array
Condicional
Array de documentos para usar como fonte de dados de streaming. O valor deste campo pode ser uma matriz de objetos ou uma expressão que avalia para uma matriz de objetos. Não utilize este campo ao utilizar o campo connectionName .

$source deve ser o primeiro estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $source por pipeline.

← Aggregation Pipelines