【SpringBoot + JavaScript】WebSocketを使ってサーバーからリアルタイムに通知を送る方法

こんにちは、Jeannieです。

先日、クライアント上にお知らせや重要情報などの通知を表示する方法を検討する機会があり、いろいろ調べてみましたが、ドンピシャの記事があまりなかったので備忘録を兼ねてアウトプットしておきます。

やりたいこと

やりたいことは以下の通りです。

  • ある端末で操作した内容を特定のクライアントに通知する。
  • 通知が表示されるタイミングに即時性は不要だが、なるべくリアルタイムで表示したい。

ここでの通知は、push通知ではなく、以下のようなよくあるベルアイコンで表示される通知となります。

ベルアイコンをクリックすると通知の一覧が表示され、さらにその通知をクリックすると詳細ページに遷移する想定です。

これをクライアント側=JavaScript、サーバー側=Java(SpringBoot)で実現する方法を検討しました。

考えられる実現方法

このような通知を表示するにあたって考えられる実現方法は、

  1. クライアントからサーバーに一定間隔でデータを取得する。(ポーリング)
  2. WebSocketを使ってクライアント⇔サーバー間でコネクションを張り、サーバーから通知を送信する。

ざっくり考えると、上記の2通りが考えられるかなと思います。

このうち、1.のポーリングだと、複数のクライアントからサーバーに対して通知の取得リクエストが投げられるため、クライアントが大量になるとサーバーに負荷がかかります。しかも、通知がない場合でも一定間隔でリクエストが送られることで、無駄な通信コストも発生します。

一方、2.のWebSocketを使った方法だと、一度サーバーに接続した後はコネクションが繋がりっぱなしなので、クライアント→サーバーへの無駄なリクエストが発生しません。

さらに、サーバーから通知を送る際は、送る必要があるクライアントに対してだけ送信することができるので、こちらも通信コストが下がります。

WebSocketを使う場合は、コネクションが切断された場合の挙動なども考慮する必要がありますが、無駄が少ないというメリットが大きいため、こちらを採用することにしました。

全体像

全体像をざっくりとした図で表すと以下の通りです。

①ある端末のユーザーが何かしらの操作を行います。
②それを受けたサーバーはDBに通知データを登録します。
③その後、通知を送る必要があるユーザーA、ユーザーBに対してだけ通知を送信します。

②でDBに登録しているのは、通知の未読・既読を管理するためです。
DBへの登録時は未読として登録し、ユーザーが通知をクリックして既読になるタイミングで既読フラグを立てるイメージです。

クライアント⇔サーバー間のプロトコル

これを実現するために、プロトコルはSTOMPを使用します。

STOMPは「Simple Text Orientated Messaging Protocol」の略で、WebSocket上で利用できる軽量なプロトコルです。

実装方法

サーバー側

メッセージブローカーの有効化

STOMPを利用するために、build.gradleに以下の依存関係を追加します。

	implementation 'org.springframework.boot:spring-boot-starter-websocket'
	implementation 'org.webjars:webjars-locator-core'
	implementation 'org.webjars:stomp-websocket:2.3.3'
	implementation 'org.webjars:sockjs-client:1.0.2'

続いて、STOMPメッセージングを有効にするためにconfigクラスを作成します。

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private CustomHandshakeHandler customHandshakeHandler;

    @Autowired
    private UserInterceptor userInterceptor;

    /**
     * メッセージブローカーの設定
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // メッセージブローカーを有効化。
        // "/topic"、"/queue"から始まるパスはメッセージブローカーを経由する。
        registry.enableSimpleBroker("/topic", "/queue");

        // メッセージブローカーを経由させず、Controllerで直接受ける場合は、
        // "/app"から始まるパスで受ける。
        registry.setApplicationDestinationPrefixes("/app");
    }

    /**
     * エンドポイントの設定
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // クライアントから接続する際のエンドポイントを"/endpoint"に設定する。
        // ハンドシェイク(接続が確立する際の)ハンドラとしてcustomHandshakeHandlerを設定する。
        // customHandshakeHandlerは独自で実装しているクラスで、
        // コネクションに対してユーザーの定義を行っている。(後述)
        registry.addEndpoint("/endpoint")
                .setHandshakeHandler(customHandshakeHandler)
                .withSockJS();
    }

    /**
     * クライアントからのメッセージ受信時の処理定義
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        // クライアントからメッセージを受信した際のインターセプタ(割り込み処理)としてuserInterceptorを設定する。
        // こちらも独自で実装しているクラスで、セッションとユーザー情報の紐づけを行っている。(後述)
        registration.interceptors(userInterceptor);
    }
}

この設定を追加することで、メッセージブローカーが有効化されます。
メッセージブローカーはメッセージやり取りの仲介役で、「/topic」「/queue」から始まるパスで送信されるメッセージは、このメッセージブローカーを経由して送信されます。

メッセージブローカーを経由した場合、そのパスをsubscribe(購読)しているユーザーに一斉送信したりすることも可能です。

例えば、「/topic/notice」宛てのメッセージを5人のユーザーがsubscribeしていたとすると、このパスにメッセージを送信するだけで、5人のユーザーに一斉送信されることになります。

上記の設定は、「/topic」を一斉送信用、「/queue」を個別送信用として想定しています。

こちらをざっくりと図に表すと以下のようになります。

ユーザー情報の設定

特定のクライアントにメッセージを送信するために、各コネクションにユーザー情報を定義します。

どのように定義するかというと、クライアント→サーバーへの接続が確立する際のイベントをハンドルして独自のユーザー情報を定義します。

  1. 独自のPrincipalクラスを定義する。
  2. DefaultHandshakeHandlerを継承したクラスを作成して、1を使用してユーザー情報を定義する。
public class StompPrincipal implements Principal {
    private final String name;

    public StompPrincipal(String name) {
        this.name = name;
    }

    @Override
    public String getName() {
        return this.name;
    }
}
@Component
public class CustomHandshakeHandler extends DefaultHandshakeHandler {
    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        String userName = request.getHeaders().get("sec-websocket-key").get(0);
        return new StompPrincipal(userName);
    }
}

DefaultHandshakeHandlerのdetermineUserメソッドをオーバーライドすることで、コネクション確立時にユーザー情報を定義することが出来ます。

上記の例では、determineUserメソッド内でServerHttpRequestのヘッダーから「sec-websocket-key」を取得して、それをユーザー名としています。

そのユーザー名を独自で作ったStompPrincipalクラスにセットすることでユーザー定義しています。

セッションごとのユーザー管理

前のセクションでユーザー情報を定義したことで、クライアント→サーバーにメッセージが送られる時にユーザー情報を取得することが出来ます。

実際には、ChannelInterceptorインターフェースを実装したクラスを定義することで、メッセージ受信処理をインターセプトし、どのユーザーからメッセージが送信されたかを判別することが出来ます。

@Component
public class UserInterceptor implements ChannelInterceptor {
    /**
     * 送信されたメッセージを処理する前のイベント
     * @param message
     * @param channel
     * @return
     */
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        // メッセージをStompHeaderAccessorに変換
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

        // メッセージ種別が「CONNECT」の場合
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            //
 ヘッダーの「simpUser」からユーザー情報を取得できる
            StompPrincipal principal = (StompPrincipal) accessor.getHeader("simpUser");

            // 取得したユーザー名を独自のセッションプールで管理
            StompSessionPool.put(principal.getName());
        }
        return message;
    }
}

preSendメソッドは、クライアントから送信されたメッセージを処理する前のイベントです。
送信されたメッセージをStompHeaderAccessorに変換することで、getCommandメソッドを使用してメッセージ種別を判別することが可能です。

上の例では、メッセージ種別が「CONNECT」、つまりクライアントからの接続要求の場合に処理を行うようにしています。

そして、StompHeaderAccessorでメッセージヘッダーから「simpUser」の値を取得することで、前のセクションで独自に定義したユーザー情報を取得することが出来ます。

上の例では、そのユーザー情報からユーザー名を取得し、独自で作成しているセッションプール(StompSessionPool)に追加して管理しています。(StompSessionPoolは接続されているユーザーを管理するだけの簡単なクラスです)

クライアントへのメッセージ送信

クライアントへのメッセージ送信は、複数クライアントへの一斉送信と特定ユーザーへの個別送信が出来ますが、ここでは特定ユーザーへの個別送信のやり方を記載します。

特定ユーザーにメッセージを送信するためには、前のセクションで管理しているユーザーを指定してメッセージを送信します。

@Component
public class PublishTask {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @Scheduled(fixedRate = 5000)
    public void publish() {
        NoticeMessage reply = NoticeMessage
                .builder()
                .message("重要なお知らせです。")
                .build();

        StompSessionPool.getAll().forEach(userName -> {
            simpMessagingTemplate.convertAndSendToUser(userName, "/queue/notice", reply);
        });
    }
}

上の例では、Scheduledアノテーションを使用して5秒間隔でメッセージを送信するようにしていますが、実際のシステムでは任意のタイミングでメッセージを送信することになると思いますので、この辺りは状況に応じて変わってくると思います。

上の例のNoticeMessageは独自に作成したメッセージクラスで、messageをフィールドに持つだけのデータクラスになります。

通知したいメッセージをこのクラスに設定していて、これをStompSessionPoolで管理しているすべてのユーザーに対して1件ずつ送信しています。

送信する際は、SimpMessagingTemplateクラスのconvertAndSendToUserメソッドを使用することで、ユーザーを指定してメッセージを送信することが出来ます。

上の例では、「/queue/notice」宛てにメッセージを送信することで、メッセージブローカーを経由してメッセージが送信されます。

ここで注意すべきなのは、convertAndSendToUserメソッドを使用した場合、パスの先頭に「/user」が自動的に付与されるということです。

送信先として「/queue/notice」を指定した場合、クライアント側は「/user/queue/notice」でsubscribeしているユーザーに対して送信されます。

また、「/user/queue/notice」をsubscribeしているすべてのユーザーに対してメッセージが送信される訳ではなく、指定したユーザーのみにメッセージが送信されます。(上の例ではセッションプールで管理しているすべてのユーザーに送信しているので、結果的には全ユーザーに送信されることにはなりますが・・・)

クライアント側

スクリプトの読み込み

クライアント側はSockJSを使用しますので、HTMLに以下を記述します。(SockJSについては割愛します)

  <script src="/webjars/sockjs-client/sockjs.min.js"></script>
  <script src="/webjars/stomp-websocket/stomp.min.js"></script>

サーバーへの接続とsubscribe

サーバー側で設定されているendpointに対して接続します。
接続成功後は、「/user/queue/notice」のsubscribeを開始することで、サーバーからのメッセージを受信することが出来ます。

// "/endpoint"を指定してStompClientを生成
let stompClient = Stomp.over(new SockJS('/endpoint'));

// エンドポイントに対して接続を開始
stompClient.connect({}, frame => {

  // 接続成功時のコールバック。
  // 接続に成功したら「/user/queue/notice」のsubscribeを開始。
  stompClient.subscribe('/user/queue/notice', message => {

    // メッセージ受信時のコールバック。
    // サーバーから送信したメッセージが引数のmessage.bodyで取得できる。
    // JSON形式になっているのでパースすることで、前のセクションのNoticeMessage型のオブジェクトに変換できる。
    const body = JSON.parse(message.body);
    console.log(body.message);

  });
});

まとめ

クライアント側はさらっとした内容になってしまいましたが、本来であればサーバー停止などによる切断時の自動再接続なども考慮する必要があります。

stompClientのconnectメソッドのコールバックで切断イベントを検知することが出来ますので、1分毎に再接続するなどで対応可能かなと思います。

WebSocketは使ったことがなかったので、調べるのに苦労しましたが、意外と簡単な印象でした。

これを応用するとチャットアプリなども作れると思いますので、是非応用してみたいと思いました。

コメント