Docs Menu
Docs Home
/
MongoDB Atlas
/

Atlas Stream Processing を使い始める

項目一覧

  • 前提条件
  • 手順
  • Atlasで、プロジェクトのGo Stream Processing{0 ページに します。
  • Atlas Stream Processing インスタンスの作成。
  • Atlas Stream Processing インスタンスの接続文字列を取得します。
  • MongoDB Atlas 接続を接続レジストリに追加します。
  • ストリーミング データソースが メッセージを発行することを確認します。
  • 永続的なストリーム プロセッサを作成します。
  • ストリーム プロセッサを起動します。
  • ストリーム プロセッサの出力を確認します。
  • ストリーム プロセッサを削除します。
  • 次のステップ

このチュートリアルでは、Atlas Stream Processing を設定し、最初のストリーム プロセッサを実行する手順について説明します。

Atlas の サンプル データ セット からの映画データを含むコレクションを使用します。

  • Atlas プロジェクト

  • mongosh バージョン2.0以上

  • Atlas userProject OwnerProject Stream Processing Ownerストリーム プロセシング インスタンスと接続レジストリを管理する、 または ロールを持つ

    注意

    Project Ownerロールでは、データベース配置の作成、プロジェクト アクセスとプロジェクト設定の管理、IP アクセス リスト エントリの管理などを行うことができます。

    Project Stream Processing Ownerロールにより、ストリーム プロセシング インスタンスの表示、作成、削除、編集などの Atlas Stream Processing アクションや、接続レジストリ内の接続の表示、追加、変更、削除などの Atlas Stream Processing アクションが可能になります。

    2 つのロールの違いの詳細については、「プロジェクト ロール」を参照してください。

  • ストリーム プロセッサを作成および実行するatlasAdminロールを持つデータベースユーザー

  • Atlas クラスター

1
  1. まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。

  2. まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。

  3. サイドバーで、 Services見出しの下のStream Processingをクリックします。

    Atlas Stream Processingページが表示されます。

2
  1. 右下隅の [ Get Startedをクリックします。 Atlas では、Atlas Stream Processing の主要コンポーネントについて簡単に説明しています。

  2. Create instanceボタンをクリックします。

  3. Create a stream processing instanceページで、インスタンスを次のように構成します。

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. [Create] をクリックします。

3
  1. Atlas Stream Processing インスタンスの概要パネルを見つけて、 Connectをクリックします。

  2. I have the MongoDB shell installed を選択します。

  3. Select your mongo shell versionドロップダウン メニューから、 mongoshの最新バージョンを選択します。

  4. Run your connection string in your command line の下に提供される接続文字列をコピーします。 これは後の手順で必要になります。

  5. [Close] をクリックします。

4

この接続は、ストリーミング データ シンクとして機能します。

  1. Atlas Stream Processing インスタンスの ペインで、 Configureをクリックします。

  2. [ Connection Registryタブで、右上の [ + Add Connection ] をクリックします。

  3. [ Atlas Databaseをクリックします。 Connection Nameフィールドにmongodb1と入力します。 Atlas Clusterドロップダウンから、データが保存されていない Atlas クラスターを選択します。

  4. [Add connection] をクリックします。

5

ストリーム プロセシング インスタンスには、 sample_stream_solarというサンプル データソースへの接続が事前に構成されています。 このソースは、さまざまな ソート デバイスからのレポートのストリームを生成します。 各レポートには、特定の時点における単一のソート デバイスの測定値と温度、およびそのデバイスの最大出力サイズが記載されています。

次のドキュメントは、その一般的な例です。

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
},
_ts: ISODate('2024-08-12T21:41:01.788Z'),
_stream_meta: {
source: {
type: 'generated'
}
}
}

このソースがメッセージを発行することを確認するには、ストリーム プロセッサを対話的に作成します。

  1. 希望のターミナルアプリケーションを開きます。

  2. mongoshを使用してストリーム プロセシング インスタンスに接続します。

    前の手順でコピーしたmongosh接続文字列をターミナルに貼り付けます。ここで、<atlas-stream-processing-url> はストリーム プロセシング インスタンスのURL 、<username>atlasAdminロールを持つユーザーです。

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>

    パスワードの入力を求められたら、入力します。

  3. ストリーム プロセッサを作成します。

    次のコードをmongoshプロンプトにコピーします。

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    sample_stream_solar接続からのデータがコンソールに表示されていることを確認し、プロセスを終了します。

    sp.process()を使用して作成したストリーム プロセッサは、終了後に永続することはありません。

6

集計パイプラインを使用すると、取り込まれる各ドキュメントを変換できます。 次の集計パイプラインは、1 秒間隔で各ソート デバイスの最大温度と平均、中央値、最大および最小出力を出力します。

  1. $sourceステージを構成します。

    次の $source ステージは、sample_stream_solar ソースからデータを取り込みます。

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  2. $groupステージを構成します。

    次の$group ステージでは、すべての受信データがgroup_id obs.tempobs.wattsgroup_idに従って整理され、各 のすべてのドキュメントの フィールドと フィールドの値を累積して、必要なデータを生成します。

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $avg: "$obs.temp"
    },
    avg_watts: {
    $min: "$obs.watts"
    },
    median_watts: {
    $min: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  3. $tumblingWindowステージを構成します。

    ストリーミング データで $group などのアキュムレーションを実行するために、 Atlas Stream ProcessingはWindowsを使用してデータセットをバインドします。 次の$tumblingWindowステージでは、ストリームを連続する10秒間隔に分割します。

    つまり、たとえば、 $groupステージがmedian_wattsの値を計算する場合、過去10秒に特定のgroup_idが取り込まれたすべてのドキュメントに対してobs.watts値が使用されます。

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  4. $mergeステージを構成します。

    $merge では、処理されたストリーミング データを Atlas データベースに書き込むことができます。

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  5. ストリーム プロセッサを作成します。

    新しいストリーム プロセッサに名前を割り当て、各ステージを順番に一覧表示してその集計パイプラインを宣言します。 $groupステージは$tumblingWindowのネストされたパイプラインに属しており、プロセッサ パイプライン定義にこれを含めることはできません。

    sp.createStreamProcessor("solarDemo", [s, t, m])

これにより、以前に定義されたクエリを適用し、接続したクラスター上のsolarDbデータベースのsolarCollコレクションに処理されたデータを書込むsolarDemoという名前のストリーム プロセッサが作成されます。 ソート デバイスからの10秒間隔の観察から派生したさまざまな測定値を返します。

Atlas Stream Processing が保管中のデータベースに書き込む方法の詳細については、 $mergeを参照してください。

7

mongoshで次のコマンドを実行します。

sp.solarDemo.start()
8

プロセッサがアクティブであることを確認するには、 mongoshで次のコマンドを実行します。

sp.solarDemo.stats()

このコマンドは、 solarDemoストリーム プロセッサの運用統計を報告します。

ストリーム プロセッサが Atlas クラスターにデータを書き込んだことを確認するには、次の手順を実行します。

  1. Atlas で、プロジェクトの [Clusters] ページに移動します。

    1. まだ表示されていない場合は、希望するプロジェクトを含む組織を選択しますナビゲーション バーのOrganizationsメニュー

    2. まだ表示されていない場合は、ナビゲーション バーのProjectsメニューから目的のプロジェクトを選択します。

    3. まだ表示されていない場合は、サイドバーの [Clusters] をクリックします。

      [ Clusters (クラスター) ] ページが表示されます。

  2. クラスターの [Browse Collections] ボタンをクリックします。

    Data Explorerが表示されます。

  3. MySolarコレクションを表示します。

あるいは、 mongoshを使用して、処理されたドキュメントのサンプルをターミナルに表示することもできます。

sp.solarDemo.sample()
{
_id: 10,
max_watts: 136,
min_watts: 130,
avg_watts: 133,
median_watts: 130,
max_temp: 7,
_stream_meta: {
source: {
type: 'generated'
},
window: {
start: ISODate('2024-08-12T22:49:05.000Z'),
end: ISODate('2024-08-12T22:49:10.000Z')
}
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

9

mongoshで次のコマンドを実行します。

sp.solarDemo.drop()

avgWattsを削除したことを確認するには、使用可能なすべてのストリーム プロセッサを一覧表示します。

sp.listStreamProcessors()

次の方法を学習します:

戻る

Overview