基本的なSocket.IOクライアントの構築方法
このガイドでは、Socket.IOプロトコルをより深く理解するために、JavaScriptで基本的なSocket.IOクライアントを実装します。
以下の機能を実装します
- WebSocket接続の作成
- 再接続の管理
- イベントの送信
- イベントの受信
- 手動での切断
公式クライアントには、他にも多くの機能が含まれています。
しかし、これだけでもライブラリが内部でどのように動作するかの概要を掴むのに十分でしょう。
目標は、このようなものを作成することです。
import { io } from "./basic-client.js";
const socket = io();
// connection
socket.on("connect", () => {
  // ...
});
// receiving an event
socket.on("foo", (value) => {
  // ...
});
// sending an event
socket.emit("bar", "abc");
準備はいいですか?始めましょう!
イベントエミッター
Socket.IO APIは、Node.jsのEventEmitterクラスから大きく影響を受けています。
import { EventEmitter } from "node:events";
const myEmitter = new EventEmitter();
myEmitter.on("foo", () => {
  console.log("foo!");
});
myEmitter.emit("foo");
このライブラリは同様のAPIを提供しますが、サーバーとクライアント間で動作します。
- サーバー
io.on("connection", (socket) => {
  // send a "foo" event to the client
  socket.emit("foo");
  // receive a "bar" event from the client
  socket.on("bar", () => {
    // ...
  });
});
- クライアント
import { io } from "socket.io-client";
const socket = io();
// receive a "foo" event from the server
socket.on("foo", () => {
  // ...
});
// send a "bar" event to the server
socket.emit("bar");
サーバーとクライアント間の基礎となる接続(WebSocketまたはHTTPロングポーリング)は、ライブラリによって抽象化され、管理されます。
最小限のEventEmitterクラスを作成しましょう。
class EventEmitter {
  #listeners = new Map();
  on(event, listener) {
    let listeners = this.#listeners.get(event);
    if (!listeners) {
      this.#listeners.set(event, listeners = []);
    }
    listeners.push(listener);
  }
  emit(event, ...args) {
    const listeners = this.#listeners.get(event);
    if (listeners) {
      for (const listener of listeners) {
        listener.apply(null, args);
      }
    }
  }
}
その後、Socketクラスがこのクラスを拡張し、on()メソッドとemit()メソッドの両方を公開します。
class Socket extends EventEmitter {
  constructor(uri, opts) {
    super();
  }
}
コンストラクターでは、uri引数は
- ユーザーによって提供されるか
const socket = io("https://example.com");
- window.locationオブジェクトから推測されます。
const socket = io();
エントリポイントを作成しましょう。
export function io(uri, opts) {
  if (typeof uri !== "string") {
    opts = uri;
    uri = location.origin;
  }
  return new Socket(uri, opts);
}
さて、これは良いスタートです!
WebSocket接続
次に、サーバーへのWebSocket接続を作成しましょう。
class Socket extends EventEmitter {
+ #uri;
+ #opts;
+ #ws;
  constructor(uri, opts) {
    super();
+   this.#uri = uri;
+   this.#opts = Object.assign({
+     path: "/socket.io/"
+   }, opts);
+   this.#open();
  }
+ #open() {
+   this.#ws = new WebSocket(this.#createUrl());
+ }
+
+ #createUrl() {
+   const uri = this.#uri.replace(/^http/, "ws");
+   const queryParams = "?EIO=4&transport=websocket";
+   return `${uri}${this.#opts.path}${queryParams}`;
+ }
}
参照: https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
createUrl()メソッドに関するいくつかの説明
- WebSocket URLはws://またはwss://で始まります。そのため、replace()呼び出しで処理します。
- Socket.IO URLには、常に特定のリクエストパスが含まれており、デフォルトは/socket.io/です。
- 2つの必須クエリパラメータがあります。- EIO=4: Engine.IOプロトコルのバージョン
- transport=websocket: 使用されるトランスポート
 
そのため、最終的なURLは次のようになります: wss://example.com/socket.io/?EIO=4&transport=websocket
Engine.IOプロトコル
Socket.IOコードベースは、2つの異なるレイヤーに分かれています。
- 低レベルの配管: Socket.IO内部のエンジンであるEngine.IOと呼びます。
- 高レベルAPI: Socket.IO自体
参照
WebSocketを使用する場合、ワイヤーを介して送信されるメッセージの形式は、単に<パケットタイプ><ペイロード>です。
プロトコルのバージョン4(上記のEIO=4)の異なるパケットタイプを以下に示します。
| 名前 | 表現 | 説明 | 
|---|---|---|
| OPEN | 0 | ハンドシェイク中に使用されます。 | 
| CLOSE | 1 | トランスポートを閉じることができることを示すために使用されます。 | 
| PING | 2 | ハートビートメカニズムで使用されます。 | 
| PONG | 3 | ハートビートメカニズムで使用されます。 | 
| MESSAGE | 4 | ペイロードを相手に送信するために使用されます。 | 
| UPGRADE | 5 | アップグレードプロセス中に使用されます(ここでは使用されません)。 | 
| NOOP | 6 | アップグレードプロセス中に使用されます(ここでは使用されません)。 | 
例
4hello
with:
4      => MESSAGE packet type
hello  => message payload (UTF-8 encoded)
WebSocketメッセージを処理しましょう。
+const EIOPacketType = {
+  OPEN: "0",
+  CLOSE: "1",
+  PING: "2",
+  PONG: "3",
+  MESSAGE: "4",
+};
+function noop() {}
class Socket extends EventEmitter {
  [...]
  #open() {
    this.#ws = new WebSocket(this.#createUrl());
+   this.#ws.onmessage = ({ data }) => this.#onMessage(data);
+   this.#ws.onclose = () => this.#onClose("transport close");
  }
+ #onMessage(data) {
+   if (typeof data !== "string") {
+     // TODO handle binary payloads
+     return;
+   }
+
+   switch (data[0]) {
+     case EIOPacketType.CLOSE:
+       this.#onClose("transport close");
+       break;
+
+     default:
+       this.#onClose("parse error");
+       break;
+   }
+ }
+
+ #onClose(reason) {
+   if (this.#ws) {
+     this.#ws.onclose = noop;
+     this.#ws.close();
+   }
+ }
+}
ハートビート
サーバーとクライアント間の接続が正常であることを確認するために、ハートビートメカニズムが実装されています。
サーバーは、最初のハンドシェイク中にpingIntervalとpingTimeoutの2つの値を送信します。
その後、pingIntervalミリ秒ごとにPINGパケットを送信し、クライアントからPONGパケットが返されるのを待ちます。これをやってみましょう。
class Socket extends EventEmitter {
+ #pingTimeoutTimer;
+ #pingTimeoutDelay;
  [...]
  #onMessage(data) {
    if (typeof data !== "string") {
      // TODO handle binary payloads
      return;
    }
    switch (data[0]) {
+     case EIOPacketType.OPEN:
+       this.#onOpen(data);
+       break;
+
      case EIOPacketType.CLOSE:
        this.#onClose("transport close");
        break;
+     case EIOPacketType.PING:
+       this.#resetPingTimeout();
+       this.#send(EIOPacketType.PONG);
+       break;
      default:
        this.#onClose("parse error");
        break;
    }
  }
+ #onOpen(data) {
+   let handshake;
+   try {
+     handshake = JSON.parse(data.substring(1));
+   } catch (e) {
+     return this.#onClose("parse error");
+   }
+   this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
+   this.#resetPingTimeout();
+ }
+
+ #resetPingTimeout() {
+   clearTimeout(this.#pingTimeoutTimer);
+   this.#pingTimeoutTimer = setTimeout(() => {
+     this.#onClose("ping timeout");
+   }, this.#pingTimeoutDelay);
+ }
+
+ #send(data) {
+   if (this.#ws.readyState === WebSocket.OPEN) {
+     this.#ws.send(data);
+   }
+ }
  #onClose(reason) {
    if (this.#ws) {
      this.#ws.onclose = noop;
      this.#ws.close();
    }
+   clearTimeout(this.#pingTimeoutTimer);
  }
}
再接続
ついでに、再接続も処理します。WebSocketは素晴らしいですが、切断される可能性があります(そして現実の世界では切断されます)。そのため、注意する必要があります。
class Socket extends EventEmitter {
  [...]
  constructor(uri, opts) {
    super();
    this.#uri = uri;
    this.#opts = Object.assign(
      {
        path: "/socket.io/",
+       reconnectionDelay: 2000,
      },
      opts
    );
    this.#open();
  }
  #onClose(reason) {
    if (this.#ws) {
      this.#ws.onclose = noop;
      this.#ws.close();
    }
    clearTimeout(this.#pingTimeoutTimer);
+   setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
  }
}
公式のSocket.IOクライアントは、多くのクライアントが同時に再接続するときの負荷のスパイクを防ぐために、少しランダム性のある高度な指数関数的遅延を使用していますが、ここでは単純化して定数値を使用します。
さて、要約すると、次のことができるクライアントができました。
- サーバーへのWebSocket接続を開く
- PINGパケットに応答することでハートビートメカニズムを尊重する
- 障害発生時に自動的に再接続する
Engine.IOプロトコルはこれで終わりです!今度はSocket.IOプロトコルを見ていきましょう。
Socket.IOプロトコル
Socket.IOプロトコルは、前述のEngine.IOプロトコル上に構築されているため、すべてのSocket.IOパケットは、ワイヤーを介して送信されるときに「4」(Engine.IO MESSAGEパケットタイプ)が接頭辞として付けられます。
参照: Socket.IOプロトコル
バイナリ要素がない場合、形式は次のとおりです。
<packet type>[JSON-stringified payload]
利用可能なパケットタイプのリストを以下に示します。
| タイプ | ID | 使用方法 | 
|---|---|---|
| CONNECT | 0 | 名前空間への接続中に使用されます。 | 
| DISCONNECT | 1 | 名前空間から切断するときに使用されます。 | 
| EVENT | 2 | 相手にデータを送信するために使用されます。 | 
| ACK | 3 | イベントを確認するために使用されます(ここでは使用されません)。 | 
| CONNECT_ERROR | 4 | 名前空間への接続中に使用されます(ここでは使用されません)。 | 
| BINARY_EVENT | 5 | 相手にバイナリデータを送信するために使用されます(ここでは使用されません)。 | 
| BINARY_ACK | 6 | イベントを確認するために使用されます(応答にはバイナリデータが含まれます)(ここでは使用されません)。 | 
例
2["hello","world"]
with:
2                   => EVENT packet type
["hello","world"]   => JSON.stringified() payload
接続
クライアントは、Socket.IOセッションの開始時にCONNECTパケットを送信する必要があります。
+const SIOPacketType = {
+  CONNECT: 0,
+  DISCONNECT: 1,
+  EVENT: 2,
+};
class Socket extends EventEmitter {
  [...]
  #onOpen(data) {
    let handshake;
    try {
      handshake = JSON.parse(data.substring(1));
    } catch (e) {
      return this.#onClose("parse error");
    }
    this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
    this.#resetPingTimeout();
+   this.#doConnect();
  }
+ #doConnect() {
+   this.#sendPacket({ type: SIOPacketType.CONNECT });
+ }
+
+ #sendPacket(packet) {
+   this.#send(EIOPacketType.MESSAGE + encode(packet));
+ }
}
+function encode(packet) {
+  let output = "" + packet.type;
+
+  return output;
+}
接続が許可されると、サーバーはCONNECTパケットを返します。
class Socket extends EventEmitter {
+ id;
  [...]
  #onMessage(data) {
    switch (data[0]) {
      [...]
+     case EIOPacketType.MESSAGE:
+       let packet;
+       try {
+         packet = decode(data);
+       } catch (e) {
+         return this.#onClose("parse error");
+       }
+       this.#onPacket(packet);
+       break;
    }
  }
+ #onPacket(packet) {
+   switch (packet.type) {
+     case SIOPacketType.CONNECT:
+       this.#onConnect(packet);
+       break;
+   }
+ }
+ #onConnect(packet) {
+   this.id = packet.data.sid;
+
+   super.emit("connect");
+ }
}
+function decode(data) {
+  let i = 1; // skip "4" prefix
+
+  const packet = {
+    type: parseInt(data.charAt(i++), 10),
+  };
+
+  if (!isPacketValid(packet)) {
+    throw new Error("invalid format");
+  }
+
+  return packet;
+}
+
+function isPacketValid(packet) {
+  switch (packet.type) {
+    case SIOPacketType.CONNECT:
+      return typeof packet.data === "object";
+    default:
+      return false;
+  }
+}
後でイベントを送信するためにemit()メソッドをオーバーライドできるように、super.emit(...)を使用しています。
イベントの送信
サーバーにデータを送信しましょう。基礎となる接続の状態を追跡し、接続の準備が整うまでパケットをバッファリングする必要があります。
class Socket extends EventEmitter {
+ connected = false;
+ #sendBuffer = [];
  [...]
+ emit(...args) {
+   const packet = {
+     type: SIOPacketType.EVENT,
+     data: args,
+   };
+
+   if (this.connected) {
+     this.#sendPacket(packet);
+   } else {
+     this.#sendBuffer.push(packet);
+   }
+ }
  #onConnect(packet) {
    this.id = packet.data.sid;
+   this.connected = true;
+   this.#sendBuffer.forEach((packet) => this.#sendPacket(packet));
+   this.#sendBuffer.slice(0);
    super.emit("connect");
  }
}
function encode(packet) {
  let output = "" + packet.type;
+ if (packet.data) {
+   output += JSON.stringify(packet.data);
+ }
  return output;
}
イベントの受信
逆に、サーバーから送信されたEVENTパケットを処理しましょう。
class Socket extends EventEmitter {
  [...]
  #onPacket(packet) {
    switch (packet.type) {
      case SIOPacketType.CONNECT:
        this.#onConnect(packet);
        break;
+     case SIOPacketType.EVENT:
+       super.emit.apply(this, packet.data);
+       break;
    }
  }
}
function decode(data) {
  let i = 1; // skip "4" prefix
  const packet = {
    type: parseInt(data.charAt(i++), 10),
  };
+ if (data.charAt(i)) {
+   packet.data = JSON.parse(data.substring(i));
+ }
  if (!isPacketValid(packet)) {
    throw new Error("invalid format");
  }
  return packet;
}
function isPacketValid(packet) {
  switch (packet.type) {
    case SIOPacketType.CONNECT:
      return typeof packet.data === "object";
+   case SIOPacketType.EVENT: {
+     const args = packet.data;
+     return (
+       Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
+     );
+   }
    default:
      return false;
  }
}
手動での切断
最後に、ソケットが再接続を試行すべきではないいくつかのケースを処理しましょう。
- クライアントがsocket.disconnect()を呼び出した場合
- サーバーがsocket.disconnect()を呼び出した場合
class Socket extends EventEmitter {
+ #reconnectTimer;
+ #shouldReconnect = true;
  [...]
  #onPacket(packet) {
    switch (packet.type) {
      case SIOPacketType.CONNECT:
        this.#onConnect(packet);
        break;
+     case SIOPacketType.DISCONNECT:
+       this.#shouldReconnect = false;
+       this.#onClose("io server disconnect");
+       break;
      case SIOPacketType.EVENT:
        super.emit.apply(this, packet.data);
        break;
    }
  }
  #onClose(reason) {
    if (this.#ws) {
      this.#ws.onclose = noop;
      this.#ws.close();
    }
    clearTimeout(this.#pingTimeoutTimer);
+   clearTimeout(this.#reconnectTimer);
+
+   if (this.#shouldReconnect) {
+     this.#reconnectTimer = setTimeout(
+       () => this.#open(),
+       this.#opts.reconnectionDelay
+     );
+   }
-   setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
  }
+ disconnect() {
+   this.#shouldReconnect = false;
+   this.#onClose("io client disconnect");
+ }
}
function isPacketValid(packet) {
  switch (packet.type) {
    case SIOPacketType.CONNECT:
      return typeof packet.data === "object";
+   case SIOPacketType.DISCONNECT:
+     return packet.data === undefined;
    case SIOPacketType.EVENT: {
      const args = packet.data;
      return (
        Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
      );
    }
    default:
      return false;
  }
}
結び
基本的なSocket.IOクライアントはこれで完成です!要約しましょう。
以下の機能を実装しました。
- WebSocket接続の作成
- 再接続の管理
- イベントの送信
- イベントの受信
- 手動での切断
これで、ライブラリが内部でどのように動作するかについて、より深く理解できたことを願っています。
完全なソースコードはこちらにあります。
読んでいただきありがとうございます!