【Inside AIoT】機械データモニタリングの核心、リアルタイムWeb通知SSEを実装する

目次

はじめに

こんにちは!
機械とAIが繋がるスマートな製造現場を作っているEdgeCrossです。

私たちは、実際の現場で使われるAIとIoT技術について、より多くの方に分かりやすくお伝えするためにInside AIoTを連載しています。

現場でスマートマシンソリューションを導入する際、最も満足度が高い機能の一つが、異常状況が発生した際に迅速にリアルタイムで通知することです。
今回は、EdgeCrossが機械モニタリング・遠隔制御ソリューション Machine Manager(PROGIX, SCAUTR)で、どのようにリアルタイム通知機能を実装しているのかご紹介します。


リアルタイムモニタリングがIoTシステムで持つ重要度は非常に大きいです。
人が継続的にモニタリングを実行するのは限界があるため、EdgeCrossのマシンマネージャーソリューションを利用すれば、ウェブまたはアプリを通じてリアルタイムの通知を受け取ることができます。

ユーザーは自分のマシンの状態と現場の状況に合わせて通知を個別に設定することができ、ウェブ/アプリを通じて重要な状態データの通知を受けることができます。
このような通知は、アプリプッシュ、SMS、ウェブブラウザ通知の両方でサポートされます。
また、状況に応じて様々な通知サービスを追加で選択することもできます。

本記事では、その中でもウェブブラウザを通じた通知の送信方法について詳しく説明します。
ウェブブラウザ通知は、ユーザーのブラウザを通じてリアルタイムで情報を伝達する機能です。
これを実装する方法はいくつかありますが、エッジクロスはServer-Sent Events(SSE)という技術を活用して実装しています。

Server-Sent Events(SSE)、概念から確認する

Server-Sent Events(SSE)は、クライアントがサーバーの最初の接続要求をした後、別途の要求なしにサーバーから新しいデータを受信できるようにする技術です。
この技術は主にリアルタイム通知のような機能をウェブブラウザに実装する際に活用されます。
例えば、ユーザーが新しいメッセージや更新を待つたびにサーバーにリクエストを送信する代わりに、サーバーが自動的に関連情報をクライアントに送信するようにすることができます。

SSEの動作原理

SSEはHTTPプロトコルに基づいて動作します。
クライアント側はEventSourceインターフェースを使用して、サーバーとの初期接続を開始します。
この接続はHTTP GETリクエストで行われ、このリクエストは特にtext/event-streamというMIMEタイプを使用して、クライアントがSSEをサポートしていることをサーバーに通知します。

サーバーがこの接続要求を受け付けると、接続を閉じることなく開いたままにします。
このような継続的な接続により、サーバーは新しいデータが発生するたびに、テキストデータをイベントストリーム形式でクライアントにリアルタイムで送信することができ、各データ送信は次の形式に従います。

data: Message\\n\\n

サーバーからイベントを送信するたびに、クライアントのEventSourceオブジェクトはこれを検知し、そのデータをonmessageイベントとして処理してユーザーに必要なアクションを実行します。
このプロセスは、サーバーからクライアントへの一方向の通信を可能にし、クライアントは別途のリクエストを送る必要がなく、継続的にデータを受け取ることができます。

SSEとWebSocketの特徴と違い

リアルタイム通信のためのもう一つの一般的な方法は、Websocketを使用することです。
SSEとWebsocketはそれぞれ独特な特性を持っています。
最も顕著な違いは、通信の方向性にあります。
SSEは一方向通信、つまり、サーバーからクライアントにのみデータを送信する方式ですが、Websocketは双方向通信が可能で、サーバーとクライアントがお互いにデータを送受信することができます。

この2つの技術のプロトコルの違いも重要な考慮事項です。
SSEは既存のHTTPプロトコルを利用して接続を維持し、データをストリーム形式でデータを転送します。

一方、Websocketは別のWebsocketプロトコルを使用してメッセージの交換を可能にします。

Web SocketSSE
ブラウザサポートほとんどのブラウザでサポートほとんどのブラウザでサポート(polyfills可能)
通信方向双方向単方向
リアルタイムOO
データ形式Binary、UTF-8UTF-8
自動再接続XO(3秒毎に試行)
最大同時接続数ブラウザの接続制限はないが、サーバーのセットアップによって異なるHTTPを経由する場合、ブラウザあたり6個まで可能 / HTTP2では100個が基本。
プロトコルWebsocketHTTP
バッテリー消費量大きい少ない
FirewallフレンドリーXO

EdgeCrossは、リアルタイム通知機能の実装において、サーバーからクライアントにデータを送信する一方向の通信のみが必要なため、SSEを選択しました。
SSEはHTTP接続をリサイクルしてデータを転送するため、別途のプロトコルインターフェースを実装しなくても、迅速かつ効率的にリアルタイムのWeb通知機能を実装することができます。
これにより、開発時間とコストを削減することができ、既存のウェブインフラとの互換性も高めることができます。

SSEを利用したWebブラウザ通知の実装

それでは、実際にEdgeCrossのソリューションで通知がどのように実装されたのか、サーバーアーキテクチャ、実際の実装段階、サンプルコードを参考にして説明します。

上の図は、現在のウェブ通知システムの簡単なアーキテクチャです。

STEP
AIoTデバイス

機械データを収集して処理し、特定の通知データを検出します。検出された通知データをMQTT Brokerに送信します。

STEP
MQTTブローカーサーバー

このサーバーは、AIoTデバイスからデータを受信し、事前に協議された通知トピックでデータを受け取ります。(MQTTの詳細については、こちらの記事を参照してください。)

STEP
プッシュサーバー

このサーバーは、通知トピックを購読しており、新しいデータが到着すると、それをクライアントに送信します。

STEP
クライアント

クライアントはプッシュサーバーからデータを受け取り、通知ウィンドウを通じてユーザーに表示します。

上記のアーキテクチャのうち、ウェブ通知機能を追加するためにサーバーとクライアントの両方に変更が必要で、次のような手順でウェブ通知機能を追加しました。

STEP
サーバー設定

プッシュサーバーはSpring Bootフレームワークを基盤に実装されており、SSE機能を追加してEventStream形式でデータを送信します。この過程で、サーバーはクライアントの接続要求を受け付け、購読した通知トピックを通じて入ってくるデータをリアルタイムで送信します。

STEP
クライアントの設定

クライアントはNext.jsフレームワークに基づいて実装されており、EventSourceオブジェクトを使用してサーバーからデータを受信し、これに基づいてユーザーにリアルタイム通知を提供します。

サンプルコードと例題

それでは、実際のサンプルコードで例題を作って説明します。

*サーバー設定

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.http.MediaType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.time.Duration;

@RestController
public class NotificationController {

    private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

    // 通知イベントを送る
    @CrossOrigin(origins = "<http://localhost:3000>")
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> sendNotification() {
        return sink.asFlux().delayElements(Duration.ofSeconds(1)); 各イベントの間に1秒の間隔を設定します。
    }

    // 通知発生時に呼び出されるAPI
    @GetMapping("/send")
    public void triggerEvent(@RequestParam String message) {
        sink.tryEmitNext("Received message: " + message);
    }
}

上の例題は/streamエンドポイントでSSEコネクションをした後、/send APIでメッセージを送信して接続しているクライアントにメッセージを送るサンプルコードです。

以下はサンプルコードの重要な構成要素です。

Sinks.Many sink

Sinks.many().multicast().onBackpressureBuffer()を使って複数のサブスクライバにメッセージを送信できるsinkを生成します。
これにより、クライアントからのリクエストに対して複数の応答を同時に処理することができます。

streamエンドポイント

@GetMappingアノテーションを使用して、/streamパスへのGETリクエストを処理します。
produces = MediaType.TEXT_EVENT_STREAM_VALUEに設定して、サーバーからクライアントにテキストストリームを継続的に送信します。

sink.asFlux().delayElements(Duration.ofSeconds(1))を呼び出して生成されたデータを1秒間隔でクライアントにストリーミングします。

@CrossOrigin(origins = “[http://localhost:3000](http://localhost:3000)”): クロスオリジンリソースの共有を許可し、指定されたオリジンのクライアントがリソースにアクセスできるように設定します。

sendエンドポイント

/sendパスからメッセージを受け取り、sinkを通じてサブスクライバ(クライアント)に”Received message: +メッセージ”形式で送信します。これはリアルタイム通知やイベントをトリガーするために使います。

*クライアント設定

import { useEffect, useState } from 'react';

export default function Home() {
    const [messages, setMessages] = useState([]);

    useEffect(() => {
        const eventSource = new EventSource('<http://localhost:8080/stream>');
        eventSource.onmessage = function(event) {
            setMessages(prev => [...prev, event.data]);
        };

        return () => {
            eventSource.close();
        };
    }, []);

    return (
        <div>
            <h1>Notifications</h1>
            <ul>
                {messages.map((msg, index) => (
                    <li key={index}>{msg}</li>
                ))}
            </ul>
        </div>
    );
}

上の例は、サーバーの初回接続後、継続的にサーバーからデータを受信して画面に描画する例です。

以下はサンプルコードの重要な構成要素です。

useStateを使ったメッセージ状態管理

useState[]を使ってサーバーから受信したメッセージのリストを管理します。

useEffectとEventSource

useEffectフック内でEventSourceオブジェクトを生成してサーバーの/streamエンドポイントに接続します。
onmessageハンドラを使用してサーバーから受信したメッセージをステータスに追加します。これにより、サーバーから送信されたデータがリアルタイムで画面に反映されます。

画面出力

messages配列を使って受信したメッセージをリスト形式で画面に出力します。

*結果

例題を実行させると、上記のように動作することが確認できます。
成果物を見ると、初回接続時、ネットワークフェンディングが閉じていて、終了しないことが確認できます。
その後、サーバーから継続的にデータを送ると、クライアント(ウェブブラウザ)で別途のリクエストなしでリアルタイムでデータを受信することが確認できます。

本例では、APIを通じて直接イベントを発生させてみましたが、これをMQTTブローカーを通じて特定のトピックをサブスクライブしたり、定期的なバッチ作業を通じて特定の時間ごとにイベントを発生させる方法で応用することもできます。
このように、それぞれのプロジェクト要件に合わせて幅広く適用することができます。

以上、EdgeCrossではリアルタイムWebブラウザ通知を実装する方法の一つであるServer-Sent Events(SSE)技術を中心に紹介しました。
SSEは、ユーザーエクスペリエンスを向上させ、リアルタイムデータモニタリングの効率を高めるのに大きなメリットがあります。
複雑な設定をしなくても、Webアプリケーションでサーバーからのデータストリームを簡単に実装できるため、特に動的なデータ処理が必要な場合に適しています。

私たちが提供した実装事例とサンプルコードがあなたのプロジェクトに有用なインサイトを提供し、SSEを活用した多様な方法を模索するのに役立つことを願っています。

ありがとうございました。

EdgeCross公式ホームページはこちら

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!
目次