$source
Nesta página
Definição
O estágio$source
especifica uma conexão noRegistro de Conexão para transmitir dados. Os seguintes tipos de conexão são suportados:
Apache Kafka corretor
Fluxo de alterações da collection do MongoDB
Fluxo de alteração do banco de dados MongoDB
array de documentos
Observação
Você não pode usar instâncias sem servidor do Atlas como um $source
.
Sintaxe
Corretora Apache Kafka
Para operar na transmissão de dados de um Apache Kafka corretor, o $source
estágio tem o seguinte formato de protótipo:
{ "$source": { "connectionName": "<registered-connection>", "topic" : ["<source-topic>", ...], "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
O estágio $source
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição | |
---|---|---|---|---|
connectionName | string | Obrigatório | Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados. | |
topic | cadeia de caracteres ou matriz de cadeias de caracteres | Obrigatório | Nome de um ou mais tópicos do Apache Kafka dos quais transmitir mensagens. Se você quiser transmitir mensagens de mais de um tópico, especifique-os em uma array. | |
timeField | documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um | |
tsFieldName | string | Opcional | Nome que substitui o nome do campo do carimbo de data/hora projetado pelo $source. O estágio $source em um pipeline do Atlas Stream Processing projeta um campo chamado | |
partitionIdleTimeout | documento | Opcional | documento que especifica a quantidade de tempo que uma partição pode ficar ociosa antes de ser ignorada nos cálculos de marca d'agua. | |
partitionIdleTimeout.size | inteiro | Opcional | Número que especifica a duração do tempo limite de inatividade da partição. | |
partitionIdleTimeout.unit | string | Opcional | Unidade de tempo para a duração do tempo limite de inatividade da partição. O valor de
| |
config | documento | Opcional | documento que contém campo que substituem vários valores padrão. | |
config.auto_offset_reset | string | Opcional | Especifica qual evento no Apache Kafka tópico de origem para iniciar a ingestão.
Padrão é | |
config.group_id | string | Opcional | ID do grupo de consumidores Kafka a ser associado ao processador de stream. Se omitido, o Atlas Stream Processing associa a instância de processamento de stream a um ID gerado automaticamente no seguinte formato:
O Atlas Stream Processing confirma deslocamentos de partição para o Apache Kafka corretor para o ID do grupo de consumidores especificado após a confirmação de um checkpoint. Ele confirma um deslocamento quando as mensagens acima desse deslocamento são registradas de forma duradoura em um ponto de verificação. Isso permite que você acompanhe o atraso de compensação e o progresso do processador de fluxo diretamente dos metadados do grupo de consumidores do corretor Kafka. | |
config.keyFormat | string | Opcional | Tipo de dados usado para desserializar dados-chave do Apache Kafka. Deve ser um dos seguintes valores:
Padrão é | |
config.keyFormatError | string | Opcional | Como lidar com erros encontrados ao desserializar os dados-chave do Apache Kafka. Deve ser um dos seguintes valores:
|
Observação
O Atlas Stream Processing requer que os documentos no fluxo de dados de origem sejam json
ou ejson
válidos. O Atlas Stream Processing define os documentos que não atendem a esse requisito na sua fila de mensagens não entregues, se você tiver configurado uma.
Fluxo de alterações da coleção do MongoDB
Para operar na transmissão de dados de um change stream do Atlas, o estágio $source
tem o seguinte formato de protótipo:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "coll" : ["<source-coll>",...], "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
O estágio $source
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
connectionName | string | Condicional | Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados. |
timeField | documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um |
tsFieldName | string | Opcional | Nome que substitui o nome dos campos padrão de carimbo de data/hora declarados pela origem. Os pipelines do Atlas Stream Processing adicionam internamente um campo às mensagens recebidas chamado |
db | string | Obrigatório | Nome de um banco de dados MongoDB hospedado na instância do Atlas especificado por connectionName . O change stream desse banco de dados atua como a fonte de dados de streaming. |
coll | cadeia de caracteres ou matriz de cadeias de caracteres | Obrigatório | Nome de uma ou mais coleções do MongoDB hospedadas na instância do Atlas especificada pelo connectionName . O fluxo de alterações dessas coleções funciona como a fonte de dados de streaming. Se você omitir este campo, seu processador de fluxo terá como origem um fluxo de alterações de banco de dados MongoDB . |
config | documento | Opcional | documento que contém campo que substituem vários valores padrão. |
config.startAfter | token | Condicional | O evento de alteração após o qual a fonte começa a relatar. Isso assume a forma de um resume token. Você pode usar apenas um entre |
config.startAtOperationTime | timestamp | Condicional | O tempo de operação após o qual a fonte deve começar a relatar. Você pode usar apenas um entre |
config.fullDocument | string | Condicional | Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:
Se você não especificar um valor para fullDocument, o padrão será Para usar este campo com um fluxo de alterações de coleção, você deve habilitar o fluxo de alterações pré e pós-imagens nessa coleção. |
config.fullDocumentOnly | booleano | Condicional | Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de Para usar este campo com um fluxo de alterações de coleção, você deve habilitar o fluxo de alterações pré e pós-imagens nessa coleção. |
config.fullDocumentBeforeChange | string | Opcional | Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:
Se você não especificar um valor para Para usar este campo com um fluxo de alterações de coleção, você deve habilitar o fluxo de alterações pré e pós-imagens nessa coleção. |
config.pipeline | documento | Opcional | Especifica um pipeline de agregação para filtrar a saída do fluxo de alterações no ponto de origem. Este pipeline deve estar em conformidade com os parâmetros descritos em change-stream-modify-output. |
change stream do MongoDB database
Para operar na transmissão de dados de um fluxo de alteração de banco de dados do Atlas, o estágio $source
tem o seguinte formato de protótipo:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
O estágio $source
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
connectionName | string | Condicional | Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados. |
timeField | documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um |
tsFieldName | string | Opcional | Nome que substitui o nome dos campos padrão de carimbo de data/hora declarados pela origem. Os pipelines do Atlas Stream Processing adicionam internamente um campo às mensagens recebidas chamado |
db | string | Obrigatório | Nome de um banco de dados MongoDB hospedado na instância do Atlas especificado por connectionName . O change stream desse banco de dados atua como a fonte de dados de streaming. |
config | documento | Opcional | documento que contém campo que substituem vários valores padrão. |
config.startAfter | token | Condicional | O evento de alteração após o qual a fonte começa a relatar. Isso assume a forma de um resume token. Você pode usar apenas um entre |
config.startAtOperationTime | timestamp | Condicional | O tempo de operação após o qual a fonte deve começar a relatar. Você pode usar apenas um entre |
config.fullDocument | string | Condicional | Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:
Se você não especificar um valor para fullDocument, o padrão será Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
config.fullDocumentOnly | booleano | Condicional | Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
config.fullDocumentBeforeChange | string | Opcional | Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:
Se você não especificar um valor para Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
config.pipeline | documento | Opcional | Especifica um pipeline de agregação para filtrar a saída do fluxo de alterações no ponto de origem. Este pipeline deve estar em conformidade com os parâmetros descritos em change-stream-modify-output. |
Fonte de fluxo de alterações em todo o cluster do MongoDB
Para operar em dados de streaming de um fluxo de mudança de cluster inteiro do Atlas, o estágio $source
tem a seguinte forma de protótipo:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
O estágio $source
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
connectionName | string | Condicional | Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados. |
timeField | documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um |
tsFieldName | string | Opcional | Nome que substitui o nome dos campos padrão de carimbo de data/hora declarados pela origem. Os pipelines do Atlas Stream Processing adicionam internamente um campo às mensagens recebidas chamado |
config | documento | Opcional | documento que contém campo que substituem vários valores padrão. |
config.startAfter | token | Condicional | O evento de alteração após o qual a fonte começa a relatar. Isso assume a forma de um resume token. Você pode usar apenas um entre |
config.startAtOperationTime | timestamp | Condicional | O tempo de operação após o qual a fonte deve começar a relatar. Você pode usar apenas um entre |
config.fullDocument | string | Condicional | Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:
Se você não especificar um valor para fullDocument, o padrão será Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
config.fullDocumentOnly | booleano | Condicional | Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
config.fullDocumentBeforeChange | string | Opcional | Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:
Se você não especificar um valor para Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
config.pipeline | documento | Opcional | Especifica um pipeline de agregação para filtrar a saída do fluxo de alterações no ponto de origem. Este pipeline deve estar em conformidade com os parâmetros descritos em change-stream-modify-output. |
Array do documento
Para operar em uma array de documentos, o estágio $source
tem o seguinte formato de protótipo:
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "documents" : [{source-doc},...] | <expression> } }
O estágio $source
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
timeField | documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um |
tsFieldName | string | Opcional | Nome que substitui o nome dos campos padrão de carimbo de data/hora declarados pela origem. Os pipelines do Atlas Stream Processing adicionam internamente um campo às mensagens recebidas chamado |
documents | array | Condicional | Array de documentos para usar como fonte de dados de streaming. O valor deste campo pode ser uma matriz de objetos ou uma expressão que avalia para uma matriz de objetos. Não utilize este campo ao utilizar o campo connectionName . |
Comportamento
$source
deve ser o primeiro estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $source
por pipeline.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:
O estágio
$source
estabelece uma conexão com o broker do Apache Kafka que está coletando esses relatórios em um tópico chamadomy_weatherdata
, expondo cada registro à medida que é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime
.O estágio
$match
exclui documentos que têm umdewPoint.value
menor ou igual a5.0
e passa os documentos comdewPoint.value
maior que5.0
para o próximo estágio.O estágio
$merge
grava a saída na coleção do Atlas chamadastream
no banco de dadossample_weatherstream
. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para visualizar os documentos na coleção sample_weatherstream.stream
resultante, conecte-se ao cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.