Atlas Stream Processing を使い始める
項目一覧
このチュートリアルでは、Atlas Stream Processing を設定し、最初のストリーム プロセッサを実行する手順について説明します。
前提条件
Atlas の サンプル データ セット からの映画データを含むコレクションを使用します。
Atlas プロジェクト
mongosh
バージョン2.0以上Atlas user
Project Owner
Project Stream Processing Owner
ストリーム プロセシング インスタンスと接続レジストリを管理する、 または ロールを持つ注意
Project Owner
ロールでは、データベース配置の作成、プロジェクト アクセスとプロジェクト設定の管理、IP アクセス リスト エントリの管理などを行うことができます。Project Stream Processing Owner
ロールにより、ストリーム プロセシング インスタンスの表示、作成、削除、編集などの Atlas Stream Processing アクションや、接続レジストリ内の接続の表示、追加、変更、削除などの Atlas Stream Processing アクションが可能になります。2 つのロールの違いの詳細については、「プロジェクト ロール」を参照してください。
ストリーム プロセッサを作成および実行する
atlasAdmin
ロールを持つデータベースユーザーAtlas クラスター
手順
AtlasGoStream ProcessingAtlas で、プロジェクトの ページにGoします。
まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。
まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。
サイドバーで、 Services見出しの下のStream Processingをクリックします。
Atlas Stream Processingページが表示されます。
Atlas Stream Processing インスタンスの接続文字列を取得します。
Atlas Stream Processing インスタンスの概要パネルを見つけて、 Connectをクリックします。
I have the MongoDB shell installed を選択します。
Select your mongo shell versionドロップダウン メニューから、
mongosh
の最新バージョンを選択します。Run your connection string in your command line の下に提供される接続文字列をコピーします。 これは後の手順で必要になります。
[Close] をクリックします。
MongoDB Atlas 接続を接続レジストリに追加します。
この接続は、ストリーミング データ シンクとして機能します。
Atlas Stream Processing インスタンスの ペインで、 Configureをクリックします。
[ Connection Registryタブで、右上の [ + Add Connection ] をクリックします。
[ Atlas Databaseをクリックします。 Connection Nameフィールドに
mongodb1
と入力します。 Atlas Clusterドロップダウンから、データが保存されていない Atlas クラスターを選択します。[Add connection] をクリックします。
ストリーミング データソースが メッセージを発行することを確認します。
ストリーム プロセシング インスタンスには、 sample_stream_solar
というサンプル データソースへの接続が事前に構成されています。 このソースは、さまざまな ソート デバイスからのレポートのストリームを生成します。 各レポートには、特定の時点における単一のソート デバイスの測定値と温度、およびそのデバイスの最大出力サイズが記載されています。
次のドキュメントは、その一般的な例です。
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 }, _ts: ISODate('2024-08-12T21:41:01.788Z'), _stream_meta: { source: { type: 'generated' } } }
このソースがメッセージを発行することを確認するには、ストリーム プロセッサを対話的に作成します。
希望のターミナルアプリケーションを開きます。
mongosh
を使用してストリーム プロセシング インスタンスに接続します。前の手順でコピーした
mongosh
接続文字列をターミナルに貼り付けます。ここで、<atlas-stream-processing-url>
はストリーム プロセシング インスタンスのURL 、<username>
はatlasAdmin
ロールを持つユーザーです。mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> パスワードの入力を求められたら、入力します。
ストリーム プロセッサを作成します。
次のコードを
mongosh
プロンプトにコピーします。sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) sample_stream_solar
接続からのデータがコンソールに表示されていることを確認し、プロセスを終了します。sp.process()
を使用して作成したストリーム プロセッサは、終了後に永続することはありません。
永続的なストリーム プロセッサを作成します。
集計パイプラインを使用すると、取り込まれる各ドキュメントを変換できます。 次の集計パイプラインは、1 秒間隔で各ソート デバイスの最大温度と平均、中央値、最大および最小出力を出力します。
$source
ステージを構成します。次の
$source
ステージは、sample_stream_solar
ソースからデータを取り込み、Atlas Stream Processing 時間フィールドの値をソースのtimestamp
フィールドの値と等しく設定します。let s = { source: { connectionName: "sample_stream_solar", timeField: { $dateFromString: { dateString: '$timestamp' } } } } $group
ステージを構成します。次の
$group
ステージでは、すべての受信データがgroup_id
obs.temp
obs.watts
group_id
に従って整理され、各 のすべてのドキュメントの フィールドと フィールドの値を累積して、必要なデータを生成します。let g = { group: { _id: "$group_id", max_temp: { $avg: "$obs.temp" }, avg_watts: { $min: "$obs.watts" }, median_watts: { $min: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } $tumblingWindow
ステージを構成します。ストリーミング データで
$group
などのアキュムレーションを実行するために、 Atlas Stream ProcessingはWindowsを使用してデータセットをバインドします。 次の$tumblingWindow
ステージでは、ストリームを連続する10秒間隔に分割します。つまり、たとえば、
$group
ステージがmedian_watts
の値を計算する場合、過去10秒に特定のgroup_id
が取り込まれたすべてのドキュメントに対してobs.watts
値が使用されます。let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } $mergeステージを構成します。
$merge
では、処理されたストリーミング データを Atlas データベースに書き込むことができます。let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } ストリーム プロセッサを作成します。
新しいストリーム プロセッサに名前を割り当て、各ステージを順番に一覧表示してその集計パイプラインを宣言します。
$group
ステージは$tumblingWindow
のネストされたパイプラインに属しており、プロセッサ パイプライン定義にこれを含めることはできません。sp.createStreamProcessor("solarDemo", [s, t, m])
これにより、以前に定義されたクエリを適用し、接続したクラスター上のsolarDb
データベースのsolarColl
コレクションに処理されたデータを書込むsolarDemo
という名前のストリーム プロセッサが作成されます。 ソート デバイスからの10秒間隔の観察から派生したさまざまな測定値を返します。
Atlas Stream Processing が保管中のデータベースに書き込む方法の詳細については、 $merge
を参照してください。
ストリーム プロセッサを起動します。
mongosh
で次のコマンドを実行します。
sp.solarDemo.start()
ストリーム プロセッサの出力を確認します。
プロセッサがアクティブであることを確認するには、 mongosh
で次のコマンドを実行します。
sp.solarDemo.stats()
このコマンドは、 solarDemo
ストリーム プロセッサの運用統計を報告します。
ストリーム プロセッサが Atlas クラスターにデータを書き込んだことを確認するには、次の手順を実行します。
Atlas で、プロジェクトの [Clusters] ページに移動します。
まだ表示されていない場合は、希望するプロジェクトを含む組織を選択しますナビゲーション バーのOrganizationsメニュー
まだ表示されていない場合は、ナビゲーション バーのProjectsメニューから目的のプロジェクトを選択します。
まだ表示されていない場合は、サイドバーの Clusters をクリックしてください。
[ Clusters (クラスター) ] ページが表示されます。
クラスターの [Browse Collections] ボタンをクリックします。
Data Explorerが表示されます。
MySolar
コレクションを表示します。
あるいは、 mongosh
を使用して、処理されたドキュメントのサンプルをターミナルに表示することもできます。
sp.solarDemo.sample()
{ _id: 10, max_watts: 136, min_watts: 130, avg_watts: 133, median_watts: 130, max_temp: 7, _stream_meta: { source: { type: 'generated' }, window: { start: ISODate('2024-08-12T22:49:05.000Z'), end: ISODate('2024-08-12T22:49:10.000Z') } } }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。
ストリーム プロセッサを削除します。
mongosh
で次のコマンドを実行します。
sp.solarDemo.drop()
avgWatts
を削除したことを確認するには、使用可能なすべてのストリーム プロセッサを一覧表示します。
sp.listStreamProcessors()
次のステップ
次の方法を学習します: