An intro to Websocket and SSE

最近在看LLM的流式输出如何反映到json输出上,现有的解决方案包括 WebSocket 或 Server-Sent Events (SSE) 实时通信技术.

WebSocket

WebSocket API 可在用户浏览器和服务器之间开启双向交互式通信会话。利用该 API,可以向服务器发送信息,并接收事件驱动的响应,而无需轮询服务器以获得回复。

img

(1)建立在 TCP 协议之上,服务器端的实现比较容易。

(2)与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。

(3)数据格式比较轻量,性能开销小,通信高效。

(4)可以发送文本,也可以发送二进制数据。

(5)没有同源限制,客户端可以与任意服务器通信。

(6)协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
let webSocket = new WebSocket(url, protocols);
ws.onopen = function(evt) {
console.log("Connection open ...");
ws.send("Hello WebSockets!");
};

ws.onmessage = function(evt) {
console.log( "Received Message: " + evt.data);
ws.close();
};

ws.onclose = function(evt) {
console.log("Connection closed.");
};


发送JSON

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 服务器向所有用户发送文本
function sendText() {
// 构造一个 msg 对象,包含了服务器处理所需的数据
var msg = {
type: "message",
text: document.getElementById("text").value,
id: clientID,
date: Date.now(),
};

// 把 msg 对象作为 JSON 格式字符串发送
exampleSocket.send(JSON.stringify(msg));

// 清空文本输入元素,为接收下一条消息做好准备。
document.getElementById("text").value = "";
}

服务端

可以使用Socket.IO,

1
2
3
4
5
6
7
8
9
const { Server } = require("socket.io");

const io = new Server({ /* options */ });

io.on("connection", (socket) => {
// ...
});

io.listen(3000);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const { readFileSync } = require("fs");
const { createServer } = require("https");
const { Server } = require("socket.io");

const httpsServer = createServer({
key: readFileSync("/path/to/my/key.pem"),
cert: readFileSync("/path/to/my/cert.pem")
});

const io = new Server(httpsServer, { /* options */ });

io.on("connection", (socket) => {
// ...
});

httpsServer.listen(3000);

SSE

HTTP 协议本身不允许服务器主动给客户端发送信息,但有一种方法可以让服务器持续向客户端发送数据流。这种方法不是发送一次性数据包,而是保持连接开放,像视频播放那样连续发送数据。这种长时间的下载过程实际上是以数据流的形式进行的。

服务器发送事件(SSE)就是利用了这个特性,通过HTTP协议让服务器可以向浏览器推送实时更新的信息。传统上,网页需要先请求服务器才能获取新数据,但是使用SSE,服务器可以在任何时候主动向网页推送新的数据和消息,这些消息可以在网页内作为事件来处理。

服务端推送的数据是单向的,只从服务器到客户端流动。当不需要从客户端向服务器发送信息时,比如更新社交媒体状态、新闻推送或把数据传送到客户端存储(如IndexedDB或Web Storage),SSE就非常适合。

与SSE不同的是,WebSocket提供了一个更强大的双向通信通道,允许客户端和服务器之间互相发送信息。SSE则是单向的,主要用于服务器向浏览器发送信息。如果浏览器需要向服务器发送信息,它必须发起一个新的HTTP请求。

这里有几个关于SSE和WebSocket的区别:

  • SSE基于HTTP,因此所有现有的服务器软件都能支持它;而WebSocket是一个独立的协议。
  • SSE设置起来简单得多,适合轻量级应用;WebSocket则更为复杂。
  • SSE自带断线重连功能,而WebSocket需要开发者自己实现这一功能。
  • SSE主要用于传输文本数据,若要发送二进制数据则需编码;WebSocket直接支持二进制数据传输。
  • SSE允许自定义消息类型,增加了灵活性。

事件流格式

事件流是一个简单的文本数据流,文本应该使用UTF-8格式的编码。事件流中的消息由一对换行符分开。以冒号开头的行为注释行,会被忽略。

备注: 注释行可以用来防止连接超时,服务器可以定期发送一条消息注释行,以保持连接不断。

每条消息由一行或多行文字组成,列出该消息的字段。每个字段由字段名表示,后面是冒号,然后是该字段值的文本数据。

规范中规定了下面这些字段:

  • event

    一个用于标识事件类型的字符串。如果指定了这个字符串,浏览器会将具有指定事件名称的事件分派给相应的监听器;网站源代码应该使用 addEventListener() 来监听指定的事件。如果一个消息没有指定事件名称,那么 onmessage 处理程序就会被调用。

  • data

    消息的数据字段。当 EventSource 接收到多个以 data: 开头的连续行时,会将它们连接起来,在它们之间插入一个换行符。末尾的换行符会被删除。

  • id

    事件 ID,会成为当前 EventSource 对象的内部属性“最后一个事件 ID”的属性值。

  • retry

    重新连接的时间。如果与服务器的连接丢失,浏览器将等待指定的时间,然后尝试重新连接。这必须是一个整数,以毫秒为单位指定重新连接的时间。如果指定了一个非整数值,该字段将被忽略。

所有其他的字段名都会被忽略

事件流具体例子

1
2
3
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

每一次发送的信息,由若干个message组成,每个message之间用\n\n分隔。每个message内部由若干行组成,每一行都是如下格式。

1
[field]: value\n
1
2
3
4
5
6
7
: comment
id: 2025\n
event: foo\n
retry: 100\n \\指定浏览器重新发起连接的时间间隔。
data: This is the mesage\n
data: test\n
data: this is the end\n\n

在浏览器上lastEventId属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。

命名事件

1
2
3
4
5
6
7
8
9
10
11
12
event: userconnect
data: {"username": "bobby", "time": "02:33:48"}

event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

event: userdisconnect
data: {"username": "bobby", "time": "02:34:23"}

event: usermessage
data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."}

每个事件都有一个由 event 字段指定的事件名称和一个 data 字段,其值是一个适当的 JSON 字符串,包含客户端对该事件采取行动所需的数据。data 字段可以包含任何字符串数据,它不一定是 JSON。

混合两种事件

可以在一个事件流中同时使用命名事件和未命名事件。

1
2
3
4
5
6
7
8
event: userconnect
data: {"username": "bobby", "time": "02:33:48"}

data: Here's a system message of some kind that will get used
data: to accomplish some task.

event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

客户端

默认情况下,如果客户端和服务器之间的连接关闭,则连接将重新启动。可以使用 .close() 方法终止连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const evtSource = new EventSource("xxx");//url可以与当前网址同域,也可以跨域。
evtSource.onmessage = function(event) {
xxx
}
evtSource.onerror = (err) => {
console.error("EventSource failed:", err);
};

//自定义事件
source.addEventListener('foo', function (event) {
var data = event.data;
// handle message
}, false);

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
date_default_timezone_set("America/New_York");
header("Cache-Control: no-store");
header("Content-Type: text/event-stream");

$counter = rand(1, 10);
while (true) {
// Every second, send a "ping" event.
echo "event: ping\n"; # 声明事件
$curDate = date(DATE_ISO8601);
echo 'data: {"time": "' . $curDate . '"}';
echo "\n\n"; # 一个事件结束
// Send a simple message at random intervals.
$counter--;
if (!$counter) {
echo 'data: This is a message at time ' . $curDate . "\n\n"; # 默认onmessage事件处理
$counter = rand(1, 10);
}
ob_end_flush();
flush();
// Break the loop if the client aborted the connection (closed the page)
if (connection_aborted()) break;
sleep(1);
}

相关资料

  1. Using server-sent events - Web APIs | MDN
  2. WebSocket - Web APIs | MDN
  3. WebSocket 教程 - 阮一峰的网络日志
  4. Server-Sent Events 教程 - 阮一峰的网络日志
-------------本文结束感谢您的阅读-------------
感谢阅读.

欢迎关注我的其它发布渠道