メインコンテンツへスキップ

プライベートメッセージング - パート IV

このガイドは4つの異なるパートで構成されています

3番目のパートの最後にいたところです

Chat

高可用性/負荷分散のために、複数のSocket.IOサーバーにスケールする方法を見ていきます。

インストール

パート IVのブランチをチェックアウトしましょう

git checkout examples/private-messaging-part-4

現在のディレクトリに以下が表示されるはずです

├── babel.config.js
├── package.json
├── public
│ ├── favicon.ico
│ ├── fonts
│ │ └── Lato-Regular.ttf
│ └── index.html
├── README.md
├── server
│ ├── cluster.js (created)
│ ├── docker-compose.yml (created)
│ ├── index.js (updated)
│ ├── messageStore.js (updated)
│ ├── package.json (updated)
│ └── sessionStore.js (updated)
└── src
├── App.vue
├── components
│ ├── Chat.vue
│ ├── MessagePanel.vue
│ ├── SelectUsername.vue
│ ├── StatusIcon.vue
│ └── User.vue
├── main.js
└── socket.js

完全な差分はこちらで確認できます。

サーバーの更新

この最後のパートでは、サーバー側でさらに3つの依存関係が必要です

Redisインスタンスも必要です。便宜上、docker-compose.ymlファイルが提供されています

cd server
docker-compose up -d

npm install
npm start

これにより、それぞれ同じindex.jsファイルを実行する4つのNode.jsワーカーが作成されます。

クライアント側では変更は必要ありません。ここではサーバー側に焦点を当てます。

仕組み

複数のサーバーの作成

複数のSocket.IOサーバーを作成する場合、2つのことを行う必要があります

  • スティッキーセッションを有効にする必要があります(完全な説明についてはこちらを参照してください)
  • デフォルトのインメモリ アダプターを Redis アダプター(または別の互換性のあるアダプター)に置き換える必要があります

この例では、@socket.io/stickyモジュールを使用して、特定のクライアントからのリクエストが常に同じSocket.IOサーバーにルーティングされるようにしています。これが「スティッキーセッション」と呼ばれるものです

Sticky session

注: 異なるポートをリッスンする複数のプロセスを作成したり(または複数のホストを使用したり)、それらの前にリバースプロキシを追加したりすることもできました。NginXやHAProxyなどの一般的なリバースプロキシソリューションでスティッキーセッションを有効にする方法は、ドキュメントで説明されています。

クラスターはserver/cluster.jsファイルで作成されます

const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");

const WORKERS_COUNT = 4;

if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);

for (let i = 0; i < WORKERS_COUNT; i++) {
cluster.fork();
}

cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});

const httpServer = http.createServer();
setupMaster(httpServer, {
loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
});
const PORT = process.env.PORT || 3000;

httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
} else {
console.log(`Worker ${process.pid} started`);
require("./index");
}

既存のserver/index.jsファイルには、1つの変更があります。ワーカープロセスによって作成されたHTTPサーバーは、実際には特定のポートをリッスンしません。リクエストはマスタープロセスによって処理され、適切なワーカーに転送されます。

httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);

setupWorker(io);

@socket.io/stickyによって提供されるsetupWorkerメソッドは、マスターとワーカー間の同期を処理します。

セッションとメッセージ

これでスティッキーセッションが有効になったため、Socket.IOサーバー間でセッションとメッセージを共有する必要があります。

Redisに基づいて新しいSessionStoreを作成します。各セッションをRedisハッシュにHSETコマンドで格納します

class RedisSessionStore extends SessionStore {
// ...
saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(`session:${id}`, "userID", userID, "username", username, "connected", connected)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}
// ...
}

古いセッションをクリーンアップするために、キーに有効期限も設定します。

HMGETコマンドを使用すると、セッションのフェッチは非常に簡単です

const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;

class RedisSessionStore extends SessionStore {
// ...
findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}
// ...
}

すべてのセッションをフェッチするのは少し複雑です

class RedisSessionStore extends SessionStore {
// ...
async findAllSessions() {
// first, we fetch all the keys with the SCAN command
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);

// and then we retrieve the session details with multiple HMGET commands
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}

同様に、Redisに基づいて新しいMessageStoreを作成します。特定のユーザーにリンクされたすべてのメッセージをRedisリストにRPUSHコマンドで格納します

class RedisMessageStore extends MessageStore {
// ...
saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}
// ...
}

メッセージの取得は、LRANGEコマンドで行われます

class RedisMessageStore extends MessageStore {
// ...
findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}

メッセージの転送

必要な最後の変更が1つあります。受信者が同じSocket.IOサーバーに接続していない場合でも、メッセージが実際に受信者に届くようにする必要があります

Broadcasting with the Redis adapter

これはRedisアダプターの役割であり、Redis pub/subメカニズムを利用して、Socket.IOサーバー間でメッセージをブロードキャストし、最終的にすべてのクライアントに到達させます。

const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});

以上です!マシンにRedis CLIがある場合は、送信されたメッセージをワイヤ上で確認できます

$ redis-cli
127.0.0.1:6379> PSUBSCRIBE socket.io*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "socket.io*"
3) (integer) 1
1) "pmessage"
2) "socket.io*"
3) "socket.io#/#"
4) "\x93\xa6XFD3OF\x83..."

ドキュメント

注:Redisアダプターを使用すると、「切断」ハンドラーで使用されるallSockets()メソッドは、すべてのSocket.IOサーバーでSocket IDを自動的に返すため、更新する必要はありません。

レビュー

わかりました、要約しましょう。完全に機能するチャット(はい、またしても!)、堅牢で水平方向に拡張する準備ができているものを作成しました。これにより、いくつかの便利なSocket.IO機能を導入することができました

お読みいただきありがとうございます!