fix: WebSocketStream ping event causes pending promises (#15235)

This commit is contained in:
Leo Kettmeir 2022-07-18 22:49:49 +02:00 committed by GitHub
parent 2bebdc9116
commit 2eb27c92db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 36 deletions

View file

@ -662,6 +662,46 @@ fn websocketstream() {
assert!(status.success());
}
#[test]
fn websocketstream_ping() {
use deno_runtime::deno_websocket::tokio_tungstenite::tungstenite;
let _g = util::http_server();
let script = util::testdata_path().join("websocketstream_ping_test.ts");
let root_ca = util::testdata_path().join("tls/RootCA.pem");
let mut child = util::deno_cmd()
.arg("test")
.arg("--unstable")
.arg("--allow-net")
.arg("--cert")
.arg(root_ca)
.arg(script)
.stdout(std::process::Stdio::piped())
.spawn()
.unwrap();
let server = std::net::TcpListener::bind("127.0.0.1:4513").unwrap();
let (stream, _) = server.accept().unwrap();
let mut socket = tungstenite::accept(stream).unwrap();
socket
.write_message(tungstenite::Message::Text(String::from("A")))
.unwrap();
socket
.write_message(tungstenite::Message::Ping(vec![]))
.unwrap();
socket
.write_message(tungstenite::Message::Text(String::from("B")))
.unwrap();
let message = socket.read_message().unwrap();
assert_eq!(message, tungstenite::Message::Pong(vec![]));
socket
.write_message(tungstenite::Message::Text(String::from("C")))
.unwrap();
socket.close(None).unwrap();
assert!(child.wait().unwrap().success());
}
#[test]
fn websocket_server_multi_field_connection_header() {
let script = util::testdata_path()

View file

@ -0,0 +1,5 @@
const wss = new WebSocketStream("ws://127.0.0.1:4513");
const { readable } = await wss.connection;
for await (const _ of readable) {
//
}

View file

@ -232,6 +232,43 @@
await this.closed;
},
});
const pull = async (controller) => {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);
switch (kind) {
case "string": {
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(value);
break;
}
case "ping": {
await core.opAsync("op_ws_send", this[_rid], {
kind: "pong",
});
await pull(controller);
break;
}
case "closed":
case "close": {
this[_closed].resolve(value);
core.tryClose(this[_rid]);
break;
}
case "error": {
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
core.tryClose(this[_rid]);
break;
}
}
};
const readable = new ReadableStream({
start: (controller) => {
PromisePrototypeThen(this.closed, () => {
@ -250,42 +287,7 @@
}
});
},
pull: async (controller) => {
const { kind, value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);
switch (kind) {
case "string": {
controller.enqueue(value);
break;
}
case "binary": {
controller.enqueue(value);
break;
}
case "ping": {
await core.opAsync("op_ws_send", this[_rid], {
kind: "pong",
});
break;
}
case "closed":
case "close": {
this[_closed].resolve(value);
core.tryClose(this[_rid]);
break;
}
case "error": {
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
core.tryClose(this[_rid]);
break;
}
}
},
pull,
cancel: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});