プライベートメッセージング - パート IV
このガイドは4つの異なるパートで構成されています
3番目のパートの最後にいたところです

高可用性/負荷分散のために、複数のSocket.IOサーバーにスケールする方法を見ていきます。
インストール
パート IVのブランチをチェックアウトしましょう
git checkout examples/private-messaging-part-4
現在のディレクトリに以下が表示されるはずです
├── babel.config.js
├── package.json
├── public
│ ├── favicon.ico
│ ├── fonts
│ │ └── Lato-Regular.ttf
│ └── index.html
├── README.md
├── server
│ ├── cluster.js (created)
│ ├── docker-compose.yml (created)
│ ├── index.js (updated)
│ ├── messageStore.js (updated)
│ ├── package.json (updated)
│ └── sessionStore.js (updated)
└── src
├── App.vue
├── components
│ ├── Chat.vue
│ ├── MessagePanel.vue
│ ├── SelectUsername.vue
│ ├── StatusIcon.vue
│ └── User.vue
├── main.js
└── socket.js
完全な差分はこちらで確認できます。
サーバーの更新
この最後のパートでは、サーバー側でさらに3つの依存関係が必要です
ioredis
: 優れたRedisクライアントsocket.io-redis
: Redis pub/subメカニズムに基づくSocket.IOアダプター@socket.io/sticky
: Node.js クラスター内でSocket.IOを実行するためのモジュール
Redisインスタンスも必要です。便宜上、docker-compose.yml
ファイルが提供されています
cd server
docker-compose up -d
npm install
npm start
これにより、それぞれ同じindex.js
ファイルを実行する4つのNode.jsワーカーが作成されます。
クライアント側では変更は必要ありません。ここではサーバー側に焦点を当てます。
仕組み
複数のサーバーの作成
複数のSocket.IOサーバーを作成する場合、2つのことを行う必要があります
- スティッキーセッションを有効にする必要があります(完全な説明についてはこちらを参照してください)
- デフォルトのインメモリ アダプターを Redis アダプター(または別の互換性のあるアダプター)に置き換える必要があります
この例では、@socket.io/sticky
モジュールを使用して、特定のクライアントからのリクエストが常に同じSocket.IOサーバーにルーティングされるようにしています。これが「スティッキーセッション」と呼ばれるものです

注: 異なるポートをリッスンする複数のプロセスを作成したり(または複数のホストを使用したり)、それらの前にリバースプロキシを追加したりすることもできました。NginXやHAProxyなどの一般的なリバースプロキシソリューションでスティッキーセッションを有効にする方法は、ドキュメントで説明されています。
クラスターはserver/cluster.js
ファイルで作成されます
const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");
const WORKERS_COUNT = 4;
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
for (let i = 0; i < WORKERS_COUNT; i++) {
cluster.fork();
}
cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});
const httpServer = http.createServer();
setupMaster(httpServer, {
loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
});
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
} else {
console.log(`Worker ${process.pid} started`);
require("./index");
}
既存のserver/index.js
ファイルには、1つの変更があります。ワーカープロセスによって作成されたHTTPサーバーは、実際には特定のポートをリッスンしません。リクエストはマスタープロセスによって処理され、適切なワーカーに転送されます。
前
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
後
setupWorker(io);
@socket.io/sticky
によって提供されるsetupWorker
メソッドは、マスターとワーカー間の同期を処理します。
セッションとメッセージ
これでスティッキーセッションが有効になったため、Socket.IOサーバー間でセッションとメッセージを共有する必要があります。
Redisに基づいて新しいSessionStoreを作成します。各セッションをRedisハッシュにHSETコマンドで格納します
class RedisSessionStore extends SessionStore {
// ...
saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(`session:${id}`, "userID", userID, "username", username, "connected", connected)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}
// ...
}
古いセッションをクリーンアップするために、キーに有効期限も設定します。
HMGETコマンドを使用すると、セッションのフェッチは非常に簡単です
const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;
class RedisSessionStore extends SessionStore {
// ...
findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}
// ...
}
すべてのセッションをフェッチするのは少し複雑です
class RedisSessionStore extends SessionStore {
// ...
async findAllSessions() {
// first, we fetch all the keys with the SCAN command
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);
// and then we retrieve the session details with multiple HMGET commands
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}
同様に、Redisに基づいて新しいMessageStoreを作成します。特定のユーザーにリンクされたすべてのメッセージをRedisリストにRPUSHコマンドで格納します
class RedisMessageStore extends MessageStore {
// ...
saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}
// ...
}
メッセージの取得は、LRANGEコマンドで行われます
class RedisMessageStore extends MessageStore {
// ...
findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}
メッセージの転送
必要な最後の変更が1つあります。受信者が同じSocket.IOサーバーに接続していない場合でも、メッセージが実際に受信者に届くようにする必要があります

これはRedisアダプターの役割であり、Redis pub/subメカニズムを利用して、Socket.IOサーバー間でメッセージをブロードキャストし、最終的にすべてのクライアントに到達させます。
const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});
以上です!マシンにRedis CLIがある場合は、送信されたメッセージをワイヤ上で確認できます
$ redis-cli
127.0.0.1:6379> PSUBSCRIBE socket.io*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "socket.io*"
3) (integer) 1
1) "pmessage"
2) "socket.io*"
3) "socket.io#/#"
4) "\x93\xa6XFD3OF\x83..."
ドキュメント
注:Redisアダプターを使用すると、「切断」ハンドラーで使用されるallSockets()
メソッドは、すべてのSocket.IOサーバーでSocket IDを自動的に返すため、更新する必要はありません。
レビュー
わかりました、要約しましょう。完全に機能するチャット(はい、またしても!)、堅牢で水平方向に拡張する準備ができているものを作成しました。これにより、いくつかの便利なSocket.IO機能を導入することができました
- ミドルウェア
- ルーム
- 複数のSocket.IOサーバーへのスケーリング
お読みいただきありがとうございます!