Definição
O estágio$source especifica uma conexão noRegistro de Conexão para transmitir dados. Os seguintes tipos de conexão são suportados:
Fluxo de alterações da collection do MongoDB
Fluxo de alteração do banco de dados MongoDB
Fluxo de alteração do cluster MongoDB
Fluxo de dados do AWS Kinesis
array de documentos
Sintaxe
Corretora Apache Kafka
Para operar em dados de transmissão de um intermediário Apache Kafka, o estágio $source tem a seguinte forma de protótipo:
{ "$source": { "connectionName": "<registered-connection>", "topic" : ["<source-topic>", ...], "timeField": { $toDate | $dateFromString: <expression> }, "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
O estágio $source recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição | |
|---|---|---|---|---|
| string | Obrigatório | Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados. | |
| cadeia de caracteres ou matriz de cadeias de caracteres | Obrigatório | Nome de um ou mais tópicos do Apache Kafka a partir dos quais transmitir mensagens. Se você deseja transmitir mensagens de mais de um tópico, especifique-os em uma array. | |
| documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um | |
| documento | Opcional | documento que especifica a quantidade de tempo que uma partição pode ficar ociosa antes de ser ignorada nos cálculos de marca d'agua. Este campo é desabilitado por padrão. Para lidar com partições que não progridem devido à inatividade, atribua um valor a este campo. | |
| inteiro | Opcional | Número que especifica a duração do tempo limite de inatividade da partição. | |
| string | Opcional | Unidade de tempo para a duração do tempo limite de inatividade da partição. O valor de
| |
| documento | Opcional | documento que contém campo que substituem vários valores padrão. | |
| string | Opcional | Especifica qual evento no Apache Kafka tópico de origem para iniciar a ingestão.
Padrão é | |
| string | Opcional | ID do grupo de consumidores Kafka a ser associado ao processador de fluxo. Se omitido, o Atlas Stream Processing associa o espaço de trabalho do processamento de fluxos a um ID gerado automaticamente no seguinte formato: O Atlas Stream Processing gera automaticamente um valor para este parâmetro para todos os processadores de stream persistentes. Para processadores de fluxo efêmero definidos com start.process(), esse parâmetro será definido somente se você definí-lo manualmente. | |
| booleano | Condicional | Sinalizador que determina a política de confirmação para compensações de partição do Kafka Corretor. O Atlas Stream Processing suporta duas políticas de confirmação:
Você pode definir este parâmetro somente se Para um processador de fluxo efêmero definido com sp.process(), esse parâmetro é padrão como | |
| string | Opcional | Tipo de dados usado para desserializar dados-chave do Apache Kafka. Deve ser um dos seguintes valores:
Padrão é | |
| string | Opcional | Como lidar com erros encontrados ao desserializar os dados-chave do Apache Kafka. Deve ser um dos seguintes valores:
|
Observação
O Atlas Stream Processing requer que os documentos no fluxo de dados de origem sejam json ou ejson válidos. O Atlas Stream Processing define os documentos que não atendem a esse requisito na sua fila de mensagens não entregues, se você tiver configurado uma.
Fluxo de alterações da coleção do MongoDB
Um fluxo de alterações de coleção do Atlas permite que aplicativos acessem mudanças de dados em tempo real em uma única coleção. Para aprender como abrir um fluxo de alterações em uma coleção, veja Change Streams.
Para operar na transmissão de dados de um change stream do Atlas, o estágio $source tem o seguinte formato de protótipo:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "db" : "<source-db>", "coll" : ["<source-coll>",...], "initialSync": { "enable": <boolean>, "parallelism": <integer> }, "readPreference": "<read-preference>", "readPreferenceTags": [ {"<key>": "<value>"}, . . . ] "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }], "maxAwaitTimeMS": <time-ms>, } } }
O estágio $source recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
|---|---|---|---|
| string | Condicional | Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados. |
| documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um |
| string | Obrigatório | Nome de um banco de dados MongoDB hospedado na instância do Atlas especificado por |
| cadeia de caracteres ou matriz de cadeias de caracteres | Obrigatório | Nome de uma ou mais collections MongoDB hospedadas na instância do Atlas especificada por |
| documento | Opcional | Documento contendo campos referentes à funcionalidade O Atlas Stream Processing Se você habilitar IMPORTANTE: você só pode usar |
| booleano | Condicional | Determina se deseja ou não habilitar o |
| inteiro | Opcional | Determina o nível de paralelismo com o qual processar a operação Cada processador de fluxo tem um valor máximo de paralelismo cumulativo determinado por seu nível. O paralelismo cumulativo de um processador de fluxo é calculado da seguinte forma:
Onde Por exemplo, se o seu estágio Se um processador de fluxo exceder o paralelismo cumulativo máximo para seu nível, o Atlas Stream Processing lançará um erro e o avisará sobre o nível mínimo de processador necessário para o nível pretendido de paralelismo. Você deve dimensionar o processador para um nível superior ou reduzir os valores de paralelismo de seus estágios para resolver o erro. Para saber mais, consulte Stream Processing. |
| documento | Opcional | Preferência de leitura para operações do Padrão é |
| documento | Opcional | Leia as tags de preferência para operações do |
| documento | Opcional | documento que contém campo que substituem vários valores padrão. |
| token | Condicional | O evento de alteração após o qual a fonte começa a relatar. Isso assume a forma de um resume token. Você pode usar apenas um entre |
| timestamp | Condicional | O tempo de operação após o qual a fonte deve começar a relatar. Você pode usar apenas um entre Aceita JSON estendido do MongoDB valores de |
| string | Condicional | Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:
Se você não especificar um valor para fullDocument, o padrão será Para usar este campo com um fluxo de alterações de coleção, você deve habilitar o fluxo de alterações pré e pós-imagens nessa coleção. |
| booleano | Condicional | Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de Para usar este campo com um fluxo de alterações de coleção, você deve habilitar o fluxo de alterações pré e pós-imagens nessa coleção. |
| string | Opcional | Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:
Se você não especificar um valor para Para usar este campo com um fluxo de alterações de coleção, você deve habilitar o fluxo de alterações pré e pós-imagens nessa coleção. |
| documento | Opcional | Especifica um pipeline de agregação para filtrar a saída do fluxo de alterações antes de passá-la para processamento adicional. Este pipeline deve estar em conformidade com os parâmetros descritos em Modificar Saída do Fluxo de Alterações. IMPORTANTE: cada Alterar evento inclui os campos |
| inteiro | Opcional | Tempo máximo, em milissegundos, para aguardar que novas alterações de dados sejam relatadas ao cursor do fluxo de alterações antes de retornar um lote vazio. Padrão é |
change stream do MongoDB database
Um fluxo de alterações do banco de dados Atlas permite que aplicativos acessem mudanças de dados em tempo real em um único banco de dados. Para saber como abrir um fluxo de alteração em um banco de dados, consulte Fluxos de alteração.
Para operar na transmissão de dados de um fluxo de alteração de banco de dados do Atlas, o estágio $source tem o seguinte formato de protótipo:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
O estágio $source recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
|---|---|---|---|
| string | Condicional | Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados. |
| documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um |
| string | Obrigatório | Nome de um banco de dados MongoDB hospedado na instância do Atlas especificado por |
| documento | Opcional | documento que contém campo que substituem vários valores padrão. |
| token | Condicional | O evento de alteração após o qual a fonte começa a relatar. Isso assume a forma de um resume token. Você pode usar apenas um entre |
| timestamp | Condicional | O tempo de operação após o qual a fonte deve começar a relatar. Você pode usar apenas um entre Aceita JSON estendido do MongoDB valores de |
| string | Condicional | Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:
Se você não especificar um valor para fullDocument, o padrão será Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
| booleano | Condicional | Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
| string | Opcional | Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:
Se você não especificar um valor para Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
| documento | Opcional | Especifica um pipeline de agregação para filtrar a saída do fluxo de alterações no ponto de origem. Este pipeline deve estar em conformidade com os parâmetros descritos em Modificar Saída do Fluxo de Alterações. IMPORTANTE: cada Alterar evento inclui os campos |
| inteiro | Opcional | Tempo máximo, em milissegundos, para aguardar que novas alterações de dados sejam relatadas ao cursor do fluxo de alterações antes de retornar um lote vazio. Padrão é |
Fonte de fluxo de alterações em todo o cluster do MongoDB
Para operar em dados de streaming de um fluxo de mudança de cluster inteiro do Atlas, o estágio $source tem a seguinte forma de protótipo:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
O estágio $source recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
|---|---|---|---|
| string | Condicional | Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados. |
| documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um |
| documento | Opcional | documento que contém campo que substituem vários valores padrão. |
| token | Condicional | O evento de alteração após o qual a fonte começa a relatar. Isso assume a forma de um resume token. Você pode usar apenas um entre |
| data | registro de data e hora | Condicional | O tempo de operação após o qual a fonte deve começar a relatar. Você pode usar apenas um entre Aceita JSON estendido do MongoDB valores de |
| string | Condicional | Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:
Se você não especificar um valor para fullDocument, o padrão será Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
| booleano | Condicional | Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
| string | Opcional | Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:
Se você não especificar um valor para Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados. |
| documento | Opcional | Especifica um pipeline de agregação para filtrar a saída do fluxo de alterações no ponto de origem. Este pipeline deve estar em conformidade com os parâmetros descritos em Modificar Saída do Fluxo de Alterações. Observe que o Atlas Stream Processing espera receber os campos |
| inteiro | Opcional | Tempo máximo, em milissegundos, para aguardar que novas alterações de dados sejam relatadas ao cursor do fluxo de alterações antes de retornar um lote vazio. Padrão é |
AWS Kinesis Data Stream
Para operar em dados de um fluxo de dados do AWS Kinesis, o estágio $source tem o seguinte formulário de protótipo:
{ "$source": { "connectionName": "<registered-connection>", "stream": "<stream-name>", "region": "<aws-region>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<field-name>", "shardIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "consumerARN": "<aws-arn>", "initialPosition": <initial-position>, reshardDetectionIntervalSecs: <interval> } } }
O estágio $source recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
|---|---|---|---|
| string | Obrigatório | Etiqueta que identifica a conexão no Registro de conexão a partir da qual consumir dados. |
| string | Obrigatório | Fluxo de dados doAWS Kinesis do qual transmitir mensagens. |
| string | Condicional | Região da AWS na qual o fluxo especificado existe. O Kinesis oferece suporte a vários fluxos de dados com o mesmo nome em diferentes regiões. Se você usar o mesmo nome para streams de dados em duas ou mais regiões dentro da mesma conexão, deverá usar esse campo para especificar qual combinação de nome e região usar. |
| documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um |
| documento | Opcional | Documento que especifica o tempo de espera que um fragmento pode ficar ocioso antes de ser ignorado nos cálculos de marca d'água. Este campo está desabilitado por padrão. Para lidar com fragmentos que não avançam devido à ociosidade, defina um valor para esse campo. |
| documento | Opcional | Número que especifica a duração do tempo limite ocioso do fragmento. |
| documento | Opcional | Unidade de tempo durante o tempo limite ocioso do fragmento. O valor de
|
| documento | Opcional | documento que contém campo que substituem vários valores padrão. |
| string | Opcional | ARN correspondente a um consumidor Kinesis. Se você especificar este campo, seu consumidor usará a distribuição aprimorada; caso contrário, a Kinesis usará um consumidor padrão. |
| string | Opcional | Posição no histórico do fluxo de dados do Kinesis a partir do qual iniciar a ingestão de mensagens. Deve ser um dos seguintes:
O padrão é "LATEST". |
| inteiro | Opcional | Intervalo, em segundos, entre as verificações da taxa de fluxo de dados pelo fluxo do Kinesis para fins de refragmentação. |
Array do documento
Para operar em uma array de documentos, o estágio $source tem o seguinte formato de protótipo:
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "documents" : [{source-doc},...] | <expression> } }
O estágio $source recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
|---|---|---|---|
| documento | Opcional | documento que define um carimbo de data/hora oficial para mensagens recebidas. Se você usar
Se você não declarar um |
| array | Condicional | Array de documentos para usar como fonte de dados de streaming. O valor deste campo pode ser uma matriz de objetos ou uma expressão que avalia para uma matriz de objetos. Não utilize este campo ao utilizar o campo |
Comportamento
$source deve ser o primeiro estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $source por pipeline.
Para os estágios Kafka $source, o Atlas Stream Processing lê em paralelo a partir de várias partições dentro do tópico de origem. O limite de partição é determinado pelo nível do processador. Para saber mais, veja a referência de faturamento do Stream Processing.
Exemplos
Exemplo do Kafka
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:
O estágio
$sourceestabelece uma conexão com o broker do Apache Kafka que está coletando esses relatórios em um tópico chamadomy_weatherdata, expondo cada registro à medida que é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime.O estágio
$matchexclui documentos que têm umdewPoint.valuemenor ou igual a5.0e passa os documentos comdewPoint.valuemaior que5.0para o próximo estágio.O estágio
$mergegrava a saída na coleção do Atlas chamadastreamno banco de dadossample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para visualizar os documentos na coleção sample_weatherstream.stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.
Exemplo de fluxo de alteração
A seguinte agregação ingere dados da fonte cluster0-collection, que se conecta a um cluster do Atlas carregado com o conjunto de dados de amostra. Para aprender a criar um espaço de trabalho do processamento de fluxos e adicionar uma conexão a um cluster do Atlas ao registro de conexão, veja Introdução ao Atlas Stream Processing. Esta agregação executa duas etapas para abrir um fluxo de alterações e registrar as alterações na coleção data no banco de dados sample_weatherdata:
O estágio
$sourceconecta-se à origemcluster0-collectione abre um fluxo de alterações na coleçãodatano banco de dadossample_weatherdata.O estágio
$mergegrava os documentos filtrados do fluxo de alterações em uma coleção do Atlas chamadadata_changesno banco de dadossample_weatherdata. Se não existir tal coleção, o Atlas a criará.
{ $source: { connectionName: "cluster0-connection", db : "sample_weatherdata", coll : "data" }, $merge: { into: { connectionName: "cluster0-connection", db: "sample_weatherdata", coll: "data_changes" } } }
O seguinte comando mongosh exclui um documento data:
db.getSiblingDB("sample_weatherdata").data.deleteOne( { _id: ObjectId("5553a99ae4b02cf715120e4b") } )
Após a exclusão do documento data, o processador de fluxo grava o documento de evento do fluxo de alterações na coleção sample_weatherdata.data_changes. Para visualizar os documentos na coleção sample_weatherdata.data_changes resultante, use mongosh para conectar-se ao seu cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherdata").data_changes.find()
[ { _id: { _data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004' }, clusterTime: Timestamp({ t: 1738790819, i: 1 }), documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') }, ns: { db: 'sample_weatherdata', coll: 'data' }, operationType: 'delete', wallTime: ISODate('2025-02-05T21:26:59.313Z') } ]