最近在看LLM的流式输出如何反映到json输出上,现有的解决方案包括 WebSocket 或 Server-Sent Events (SSE) 实时通信技术.
WebSocket
WebSocket API 可在用户浏览器和服务器之间开启双向交互式通信会话。利用该 API,可以向服务器发送信息,并接收事件驱动的响应,而无需轮询服务器以获得回复。
(1)建立在 TCP 协议之上,服务器端的实现比较容易。
(2)与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
(3)数据格式比较轻量,性能开销小,通信高效。
(4)可以发送文本,也可以发送二进制数据。
(5)没有同源限制,客户端可以与任意服务器通信。
(6)协议标识符是ws
(如果加密,则为wss
),服务器网址就是 URL。
客户端1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16let webSocket = new WebSocket(url, protocols);
ws.onopen = function(evt) {
console.log("Connection open ...");
ws.send("Hello WebSockets!");
};
ws.onmessage = function(evt) {
console.log( "Received Message: " + evt.data);
ws.close();
};
ws.onclose = function(evt) {
console.log("Connection closed.");
};
发送JSON1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// 服务器向所有用户发送文本
function sendText() {
// 构造一个 msg 对象,包含了服务器处理所需的数据
var msg = {
type: "message",
text: document.getElementById("text").value,
id: clientID,
date: Date.now(),
};
// 把 msg 对象作为 JSON 格式字符串发送
exampleSocket.send(JSON.stringify(msg));
// 清空文本输入元素,为接收下一条消息做好准备。
document.getElementById("text").value = "";
}
服务端
可以使用Socket.IO,1
2
3
4
5
6
7
8
9const { Server } = require("socket.io");
const io = new Server({ /* options */ });
io.on("connection", (socket) => {
// ...
});
io.listen(3000);1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16const { readFileSync } = require("fs");
const { createServer } = require("https");
const { Server } = require("socket.io");
const httpsServer = createServer({
key: readFileSync("/path/to/my/key.pem"),
cert: readFileSync("/path/to/my/cert.pem")
});
const io = new Server(httpsServer, { /* options */ });
io.on("connection", (socket) => {
// ...
});
httpsServer.listen(3000);
SSE
HTTP 协议本身不允许服务器主动给客户端发送信息,但有一种方法可以让服务器持续向客户端发送数据流。这种方法不是发送一次性数据包,而是保持连接开放,像视频播放那样连续发送数据。这种长时间的下载过程实际上是以数据流的形式进行的。
服务器发送事件(SSE)就是利用了这个特性,通过HTTP协议让服务器可以向浏览器推送实时更新的信息。传统上,网页需要先请求服务器才能获取新数据,但是使用SSE,服务器可以在任何时候主动向网页推送新的数据和消息,这些消息可以在网页内作为事件来处理。
服务端推送的数据是单向的,只从服务器到客户端流动。当不需要从客户端向服务器发送信息时,比如更新社交媒体状态、新闻推送或把数据传送到客户端存储(如IndexedDB或Web Storage),SSE就非常适合。
与SSE不同的是,WebSocket提供了一个更强大的双向通信通道,允许客户端和服务器之间互相发送信息。SSE则是单向的,主要用于服务器向浏览器发送信息。如果浏览器需要向服务器发送信息,它必须发起一个新的HTTP请求。
这里有几个关于SSE和WebSocket的区别:
- SSE基于HTTP,因此所有现有的服务器软件都能支持它;而WebSocket是一个独立的协议。
- SSE设置起来简单得多,适合轻量级应用;WebSocket则更为复杂。
- SSE自带断线重连功能,而WebSocket需要开发者自己实现这一功能。
- SSE主要用于传输文本数据,若要发送二进制数据则需编码;WebSocket直接支持二进制数据传输。
- SSE允许自定义消息类型,增加了灵活性。
事件流格式
事件流是一个简单的文本数据流,文本应该使用UTF-8格式的编码。事件流中的消息由一对换行符分开。以冒号开头的行为注释行,会被忽略。
备注: 注释行可以用来防止连接超时,服务器可以定期发送一条消息注释行,以保持连接不断。
每条消息由一行或多行文字组成,列出该消息的字段。每个字段由字段名表示,后面是冒号,然后是该字段值的文本数据。
规范中规定了下面这些字段:
event
一个用于标识事件类型的字符串。如果指定了这个字符串,浏览器会将具有指定事件名称的事件分派给相应的监听器;网站源代码应该使用
addEventListener()
来监听指定的事件。如果一个消息没有指定事件名称,那么onmessage
处理程序就会被调用。data
消息的数据字段。当
EventSource
接收到多个以data:
开头的连续行时,会将它们连接起来,在它们之间插入一个换行符。末尾的换行符会被删除。id
事件 ID,会成为当前
EventSource
对象的内部属性“最后一个事件 ID”的属性值。retry
重新连接的时间。如果与服务器的连接丢失,浏览器将等待指定的时间,然后尝试重新连接。这必须是一个整数,以毫秒为单位指定重新连接的时间。如果指定了一个非整数值,该字段将被忽略。
所有其他的字段名都会被忽略
事件流具体例子
1 | Content-Type: text/event-stream |
每一次发送的信息,由若干个message
组成,每个message
之间用\n\n
分隔。每个message
内部由若干行组成,每一行都是如下格式。1
[field]: value\n
1
2
3
4
5
6
7: comment
id: 2025\n
event: foo\n
retry: 100\n \\指定浏览器重新发起连接的时间间隔。
data: This is the mesage\n
data: test\n
data: this is the end\n\n
在浏览器上lastEventId
属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID
头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。
命名事件
1 | event: userconnect |
每个事件都有一个由 event
字段指定的事件名称和一个 data
字段,其值是一个适当的 JSON 字符串,包含客户端对该事件采取行动所需的数据。data
字段可以包含任何字符串数据,它不一定是 JSON。
混合两种事件
可以在一个事件流中同时使用命名事件和未命名事件。1
2
3
4
5
6
7
8event: userconnect
data: {"username": "bobby", "time": "02:33:48"}
data: Here's a system message of some kind that will get used
data: to accomplish some task.
event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}
客户端
默认情况下,如果客户端和服务器之间的连接关闭,则连接将重新启动。可以使用 .close()
方法终止连接。1
2
3
4
5
6
7
8
9
10
11
12
13
14const evtSource = new EventSource("xxx");//url可以与当前网址同域,也可以跨域。
evtSource.onmessage = function(event) {
xxx
}
evtSource.onerror = (err) => {
console.error("EventSource failed:", err);
};
//自定义事件
source.addEventListener('foo', function (event) {
var data = event.data;
// handle message
}, false);
服务端1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24date_default_timezone_set("America/New_York");
header("Cache-Control: no-store");
header("Content-Type: text/event-stream");
$counter = rand(1, 10);
while (true) {
// Every second, send a "ping" event.
echo "event: ping\n"; # 声明事件
$curDate = date(DATE_ISO8601);
echo 'data: {"time": "' . $curDate . '"}';
echo "\n\n"; # 一个事件结束
// Send a simple message at random intervals.
$counter--;
if (!$counter) {
echo 'data: This is a message at time ' . $curDate . "\n\n"; # 默认onmessage事件处理
$counter = rand(1, 10);
}
ob_end_flush();
flush();
// Break the loop if the client aborted the connection (closed the page)
if (connection_aborted()) break;
sleep(1);
}