プライベートメッセージング - パート IV
このガイドは4つの異なるパートで構成されています
3番目のパートの最後にいたところです

高可用性/負荷分散のために、複数のSocket.IOサーバーにスケールする方法を見ていきます。
インストール
パート IVのブランチをチェックアウトしましょう
git checkout examples/private-messaging-part-4
現在のディレクトリに以下が表示されるはずです
├── babel.config.js
├── package.json
├── public
│   ├── favicon.ico
│   ├── fonts
│   │   └── Lato-Regular.ttf
│   └── index.html
├── README.md
├── server
│   ├── cluster.js (created)
│   ├── docker-compose.yml (created)
│   ├── index.js (updated)
│   ├── messageStore.js (updated)
│   ├── package.json (updated)
│   └── sessionStore.js (updated)
└── src
    ├── App.vue
    ├── components
    │   ├── Chat.vue
    │   ├── MessagePanel.vue
    │   ├── SelectUsername.vue
    │   ├── StatusIcon.vue
    │   └── User.vue
    ├── main.js
    └── socket.js
完全な差分はこちらで確認できます。
サーバーの更新
この最後のパートでは、サーバー側でさらに3つの依存関係が必要です
- ioredis: 優れたRedisクライアント
- socket.io-redis: Redis pub/subメカニズムに基づくSocket.IOアダプター
- @socket.io/sticky: Node.js クラスター内でSocket.IOを実行するためのモジュール
Redisインスタンスも必要です。便宜上、docker-compose.ymlファイルが提供されています
cd server
docker-compose up -d
npm install
npm start
これにより、それぞれ同じindex.jsファイルを実行する4つのNode.jsワーカーが作成されます。
クライアント側では変更は必要ありません。ここではサーバー側に焦点を当てます。
仕組み
複数のサーバーの作成
複数のSocket.IOサーバーを作成する場合、2つのことを行う必要があります
- スティッキーセッションを有効にする必要があります(完全な説明についてはこちらを参照してください)
- デフォルトのインメモリ アダプターを Redis アダプター(または別の互換性のあるアダプター)に置き換える必要があります
この例では、@socket.io/stickyモジュールを使用して、特定のクライアントからのリクエストが常に同じSocket.IOサーバーにルーティングされるようにしています。これが「スティッキーセッション」と呼ばれるものです

注: 異なるポートをリッスンする複数のプロセスを作成したり(または複数のホストを使用したり)、それらの前にリバースプロキシを追加したりすることもできました。NginXやHAProxyなどの一般的なリバースプロキシソリューションでスティッキーセッションを有効にする方法は、ドキュメントで説明されています。
クラスターはserver/cluster.jsファイルで作成されます
const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");
const WORKERS_COUNT = 4;
if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`);
  for (let i = 0; i < WORKERS_COUNT; i++) {
    cluster.fork();
  }
  cluster.on("exit", (worker) => {
    console.log(`Worker ${worker.process.pid} died`);
    cluster.fork();
  });
  const httpServer = http.createServer();
  setupMaster(httpServer, {
    loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
  });
  const PORT = process.env.PORT || 3000;
  httpServer.listen(PORT, () =>
    console.log(`server listening at https://:${PORT}`)
  );
} else {
  console.log(`Worker ${process.pid} started`);
  require("./index");
}
既存のserver/index.jsファイルには、1つの変更があります。ワーカープロセスによって作成されたHTTPサーバーは、実際には特定のポートをリッスンしません。リクエストはマスタープロセスによって処理され、適切なワーカーに転送されます。
前
httpServer.listen(PORT, () =>
  console.log(`server listening at https://:${PORT}`)
);
後
setupWorker(io);
@socket.io/stickyによって提供されるsetupWorkerメソッドは、マスターとワーカー間の同期を処理します。
セッションとメッセージ
これでスティッキーセッションが有効になったため、Socket.IOサーバー間でセッションとメッセージを共有する必要があります。
Redisに基づいて新しいSessionStoreを作成します。各セッションをRedisハッシュにHSETコマンドで格納します
class RedisSessionStore extends SessionStore {
  // ...
  saveSession(id, { userID, username, connected }) {
    this.redisClient
      .multi()
      .hset(`session:${id}`, "userID", userID, "username", username, "connected", connected)
      .expire(`session:${id}`, SESSION_TTL)
      .exec();
  }
  // ...
}
古いセッションをクリーンアップするために、キーに有効期限も設定します。
HMGETコマンドを使用すると、セッションのフェッチは非常に簡単です
const mapSession = ([userID, username, connected]) =>
  userID ? { userID, username, connected: connected === "true" } : undefined;
class RedisSessionStore extends SessionStore {
  // ...
  findSession(id) {
    return this.redisClient
      .hmget(`session:${id}`, "userID", "username", "connected")
      .then(mapSession);
  }
  // ...
}
すべてのセッションをフェッチするのは少し複雑です
class RedisSessionStore extends SessionStore {
  // ...
  async findAllSessions() {
    // first, we fetch all the keys with the SCAN command
    const keys = new Set();
    let nextIndex = 0;
    do {
      const [nextIndexAsStr, results] = await this.redisClient.scan(
        nextIndex,
        "MATCH",
        "session:*",
        "COUNT",
        "100"
      );
      nextIndex = parseInt(nextIndexAsStr, 10);
      results.forEach((s) => keys.add(s));
    } while (nextIndex !== 0);
    // and then we retrieve the session details with multiple HMGET commands
    const commands = [];
    keys.forEach((key) => {
      commands.push(["hmget", key, "userID", "username", "connected"]);
    });
    return this.redisClient
      .multi(commands)
      .exec()
      .then((results) => {
        return results
          .map(([err, session]) => (err ? undefined : mapSession(session)))
          .filter((v) => !!v);
      });
  }
}
同様に、Redisに基づいて新しいMessageStoreを作成します。特定のユーザーにリンクされたすべてのメッセージをRedisリストにRPUSHコマンドで格納します
class RedisMessageStore extends MessageStore {
  // ...
  saveMessage(message) {
    const value = JSON.stringify(message);
    this.redisClient
      .multi()
      .rpush(`messages:${message.from}`, value)
      .rpush(`messages:${message.to}`, value)
      .expire(`messages:${message.from}`, CONVERSATION_TTL)
      .expire(`messages:${message.to}`, CONVERSATION_TTL)
      .exec();
  }
  // ...
}
メッセージの取得は、LRANGEコマンドで行われます
class RedisMessageStore extends MessageStore {
  // ...
  findMessagesForUser(userID) {
    return this.redisClient
      .lrange(`messages:${userID}`, 0, -1)
      .then((results) => {
        return results.map((result) => JSON.parse(result));
      });
  }
}
メッセージの転送
必要な最後の変更が1つあります。受信者が同じSocket.IOサーバーに接続していない場合でも、メッセージが実際に受信者に届くようにする必要があります

これはRedisアダプターの役割であり、Redis pub/subメカニズムを利用して、Socket.IOサーバー間でメッセージをブロードキャストし、最終的にすべてのクライアントに到達させます。
const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
  cors: {
    origin: "https://:8080",
  },
  adapter: require("socket.io-redis")({
    pubClient: redisClient,
    subClient: redisClient.duplicate(),
  }),
});
以上です!マシンにRedis CLIがある場合は、送信されたメッセージをワイヤ上で確認できます
$ redis-cli
127.0.0.1:6379> PSUBSCRIBE socket.io*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "socket.io*"
3) (integer) 1
1) "pmessage"
2) "socket.io*"
3) "socket.io#/#"
4) "\x93\xa6XFD3OF\x83..."
ドキュメント
注:Redisアダプターを使用すると、「切断」ハンドラーで使用されるallSockets()メソッドは、すべてのSocket.IOサーバーでSocket IDを自動的に返すため、更新する必要はありません。
レビュー
わかりました、要約しましょう。完全に機能するチャット(はい、またしても!)、堅牢で水平方向に拡張する準備ができているものを作成しました。これにより、いくつかの便利なSocket.IO機能を導入することができました
- ミドルウェア
- ルーム
- 複数のSocket.IOサーバーへのスケーリング
お読みいただきありがとうございます!