refactor: rewrite tests to "fastwebsockets" crate (#18781)

Migrating off of `tokio-tungstenite` crate.

---------

Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
This commit is contained in:
Bartek Iwańczuk 2023-04-22 11:17:31 +02:00 committed by GitHub
parent a615eb3b56
commit 068228cb45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 229 additions and 145 deletions

8
Cargo.lock generated
View file

@ -721,12 +721,14 @@ dependencies = [
"env_logger",
"eszip",
"fancy-regex",
"fastwebsockets",
"flaky_test",
"flate2",
"fs3",
"fwdansi",
"glibc_version",
"http",
"hyper",
"import_map 0.15.0",
"indexmap",
"jsonc-parser",
@ -1235,6 +1237,7 @@ dependencies = [
"deno_webstorage",
"dlopen",
"encoding_rs",
"fastwebsockets",
"filetime",
"fs3",
"fwdansi",
@ -1791,9 +1794,9 @@ dependencies = [
[[package]]
name = "fastwebsockets"
version = "0.2.4"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcf2f933f24f45831bd66580a8f9394e440f1f5a23806cf0d4d8b6649e1a01e9"
checksum = "a9e973e2bd2dbd77cc9e929ede2ce65984a35ac5481976afbfbd509cb40dc965"
dependencies = [
"base64 0.21.0",
"cc",
@ -4864,6 +4867,7 @@ dependencies = [
"atty",
"base64 0.13.1",
"console_static_text",
"fastwebsockets",
"flate2",
"futures",
"hyper",

View file

@ -91,6 +91,7 @@ data-url = "=0.2.0"
dlopen = "0.1.8"
encoding_rs = "=0.8.31"
ecb = "=0.1.1"
fastwebsockets = "=0.2.5"
flate2 = "=1.0.24"
fs3 = "0.5.0"
futures = "0.3.21"

View file

@ -72,9 +72,11 @@ encoding_rs.workspace = true
env_logger = "=0.9.0"
eszip = "=0.41.0"
fancy-regex = "=0.10.0"
fastwebsockets.workspace = true
flate2.workspace = true
fs3.workspace = true
http.workspace = true
hyper.workspace = true
import_map = "=0.15.0"
indexmap.workspace = true
jsonc-parser = { version = "=0.21.0", features = ["serde"] }

View file

@ -1,15 +1,16 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_core::anyhow::anyhow;
use deno_core::error::AnyError;
use deno_core::futures::prelude::*;
use deno_core::futures::stream::SplitSink;
use deno_core::futures::stream::SplitStream;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::url;
use deno_runtime::deno_fetch::reqwest;
use deno_runtime::deno_websocket::tokio_tungstenite;
use deno_runtime::deno_websocket::tokio_tungstenite::tungstenite;
use fastwebsockets::FragmentCollector;
use fastwebsockets::Frame;
use hyper::upgrade::Upgraded;
use hyper::Request;
use std::io::BufRead;
use test_util as util;
use test_util::TempDir;
@ -17,18 +18,20 @@ use tokio::net::TcpStream;
use util::http_server;
use util::DenoChild;
struct SpawnExecutor;
impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
where
Fut: std::future::Future + Send + 'static,
Fut::Output: Send + 'static,
{
fn execute(&self, fut: Fut) {
tokio::task::spawn(fut);
}
}
struct InspectorTester {
socket_tx: SplitSink<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<TcpStream>,
>,
tungstenite::Message,
>,
socket_rx: SplitStream<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<TcpStream>,
>,
>,
socket: FragmentCollector<Upgraded>,
notification_filter: Box<dyn FnMut(&str) -> bool + 'static>,
child: DenoChild,
stderr_lines: Box<dyn Iterator<Item = String>>,
@ -52,17 +55,42 @@ impl InspectorTester {
let mut stderr_lines =
std::io::BufReader::new(stderr).lines().map(|r| r.unwrap());
let ws_url = extract_ws_url_from_stderr(&mut stderr_lines);
let uri = extract_ws_url_from_stderr(&mut stderr_lines);
let domain = &uri.host().unwrap().to_string();
let port = &uri.port().unwrap_or(match uri.scheme() {
"wss" | "https" => 443,
_ => 80,
});
let addr = format!("{domain}:{port}");
let stream = TcpStream::connect(addr).await.unwrap();
let host = uri.host_str().unwrap();
let req = Request::builder()
.method("GET")
.uri(uri.path())
.header("Host", host)
.header(hyper::header::UPGRADE, "websocket")
.header(hyper::header::CONNECTION, "Upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13")
.body(hyper::Body::empty())
.unwrap();
let (socket, response) =
tokio_tungstenite::connect_async(ws_url).await.unwrap();
fastwebsockets::handshake::client(&SpawnExecutor, req, stream)
.await
.unwrap();
assert_eq!(response.status(), 101); // Switching protocols.
let (socket_tx, socket_rx) = socket.split();
Self {
socket_tx,
socket_rx,
socket: FragmentCollector::new(socket),
notification_filter: Box::new(notification_filter),
child,
stderr_lines: Box::new(stderr_lines),
@ -74,10 +102,10 @@ impl InspectorTester {
// TODO(bartlomieju): add graceful error handling
for msg in messages {
let result = self
.socket_tx
.send(msg.to_string().into())
.socket
.write_frame(Frame::text(msg.to_string().into_bytes()))
.await
.map_err(|e| e.into());
.map_err(|e| anyhow!(e));
self.handle_error(result);
}
}
@ -111,8 +139,9 @@ impl InspectorTester {
async fn recv(&mut self) -> String {
loop {
let result = self.socket_rx.next().await.unwrap().map_err(|e| e.into());
let message = self.handle_error(result).to_string();
let result = self.socket.read_frame().await.map_err(|e| anyhow!(e));
let message =
String::from_utf8(self.handle_error(result).payload).unwrap();
if (self.notification_filter)(&message) {
return message;
}
@ -236,7 +265,7 @@ fn skip_check_line(
let mut line = stderr_lines.next().unwrap();
line = util::strip_ansi_codes(&line).to_string();
if line.starts_with("Check") {
if line.starts_with("Check") || line.starts_with("Download") {
continue;
}
@ -514,8 +543,11 @@ async fn inspector_does_not_hang() {
}
// Check that we can gracefully close the websocket connection.
tester.socket_tx.close().await.unwrap();
tester.socket_rx.for_each(|_| async {}).await;
tester
.socket
.write_frame(Frame::close_raw(vec![]))
.await
.unwrap();
assert_eq!(&tester.stdout_lines.next().unwrap(), "done");
assert!(tester.child.wait().unwrap().success());

View file

@ -161,7 +161,7 @@ Deno.test("websocket error", async () => {
assert(err instanceof ErrorEvent);
// Error message got changed because we don't use warp in test_util
assertEquals(err.message, "UnexpectedEof: tls handshake eof");
assertEquals(err.message, "InvalidData: received corrupt message");
promise1.resolve();
};
await promise1;

View file

@ -16,7 +16,7 @@ path = "lib.rs"
[dependencies]
deno_core.workspace = true
deno_tls.workspace = true
fastwebsockets = { version = "0.2.4", features = ["upgrade"] }
fastwebsockets = { workspace = true, features = ["upgrade"] }
http.workspace = true
hyper.workspace = true
serde.workspace = true

View file

@ -80,6 +80,7 @@ deno_web.workspace = true
deno_webidl.workspace = true
deno_websocket.workspace = true
deno_webstorage.workspace = true
fastwebsockets.workspace = true
atty.workspace = true
console_static_text.workspace = true

View file

@ -1,7 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use core::convert::Infallible as Never; // Alias for the future `!` type.
use deno_core::error::AnyError;
// Alias for the future `!` type.
use core::convert::Infallible as Never;
use deno_core::futures::channel::mpsc;
use deno_core::futures::channel::mpsc::UnboundedReceiver;
use deno_core::futures::channel::mpsc::UnboundedSender;
@ -18,8 +18,9 @@ use deno_core::serde_json::Value;
use deno_core::InspectorMsg;
use deno_core::InspectorSessionProxy;
use deno_core::JsRuntime;
use deno_websocket::tokio_tungstenite::tungstenite;
use deno_websocket::tokio_tungstenite::WebSocketStream;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
use fastwebsockets::WebSocket;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::Infallible;
@ -145,35 +146,27 @@ fn handle_ws_request(
let info = maybe_inspector_info.unwrap();
info.new_session_tx.clone()
};
let resp = tungstenite::handshake::server::create_response(&req)
.map(|resp| resp.map(|_| hyper::Body::empty()))
.or_else(|e| match e {
tungstenite::error::Error::HttpFormat(http_error) => Err(http_error),
_ => http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
.body("Not a valid Websocket Request".into()),
})?;
let (parts, _) = req.into_parts();
let req = http::Request::from_parts(parts, body);
let mut req = http::Request::from_parts(parts, body);
let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) {
Ok(e) => e,
_ => {
return http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
.body("Not a valid Websocket Request".into());
}
};
// spawn a task that will wait for websocket connection and then pump messages between
// the socket and inspector proxy
tokio::task::spawn_local(async move {
let upgrade_result = hyper::upgrade::on(req).await;
let upgraded = if let Ok(u) = upgrade_result {
u
let websocket = if let Ok(w) = fut.await {
w
} else {
eprintln!("Inspector server failed to upgrade to WS connection");
return;
};
let websocket = WebSocketStream::from_raw_socket(
upgraded,
tungstenite::protocol::Role::Server,
None,
)
.await;
// The 'outbound' channel carries messages sent to the websocket.
let (outbound_tx, outbound_rx) = mpsc::unbounded();
@ -324,37 +317,36 @@ async fn server(
/// 'futures' crate, therefore they can't participate in Tokio's cooperative
/// task yielding.
async fn pump_websocket_messages(
websocket: WebSocketStream<hyper::upgrade::Upgraded>,
mut websocket: WebSocket<hyper::upgrade::Upgraded>,
inbound_tx: UnboundedSender<String>,
outbound_rx: UnboundedReceiver<InspectorMsg>,
mut outbound_rx: UnboundedReceiver<InspectorMsg>,
) {
let (websocket_tx, websocket_rx) = websocket.split();
let outbound_pump = outbound_rx
.map(|msg| tungstenite::Message::text(msg.content))
.map(Ok)
.forward(websocket_tx)
.map_err(|_| ());
let inbound_pump = async move {
let _result = websocket_rx
.map_err(AnyError::from)
.map_ok(|msg| {
// Messages that cannot be converted to strings are ignored.
if let Ok(msg_text) = msg.into_text() {
let _ = inbound_tx.unbounded_send(msg_text);
'pump: loop {
tokio::select! {
Some(msg) = outbound_rx.next() => {
let msg = Frame::text(msg.content.into_bytes());
let _ = websocket.write_frame(msg).await;
}
})
.try_collect::<()>()
.await;
// Users don't care if there was an error coming from debugger,
// just about the fact that debugger did disconnect.
eprintln!("Debugger session ended");
Ok(())
};
let _ = future::try_join(outbound_pump, inbound_pump).await;
Ok(msg) = websocket.read_frame() => {
match msg.opcode {
OpCode::Text => {
if let Ok(s) = String::from_utf8(msg.payload) {
let _ = inbound_tx.unbounded_send(s);
}
}
OpCode::Close => {
// Users don't care if there was an error coming from debugger,
// just about the fact that debugger did disconnect.
eprintln!("Debugger session ended");
break 'pump;
}
_ => {
// Ignore other messages.
}
}
}
}
}
}
/// Inspector information that is sent from the isolate thread to the server

View file

@ -19,6 +19,7 @@ async-stream = "0.3.3"
atty.workspace = true
base64.workspace = true
console_static_text.workspace = true
fastwebsockets = { workspace = true, features = ["upgrade"] }
flate2.workspace = true
futures.workspace = true
hyper = { workspace = true, features = ["server", "http1", "http2", "runtime"] }
@ -40,7 +41,7 @@ tar.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
tokio-tungstenite.workspace = true
tokio-tungstenite = { workspace = true, features = ["rustls-tls-webpki-roots"] }
url.workspace = true
[target.'cfg(unix)'.dependencies]

View file

@ -2,6 +2,7 @@
// Usage: provide a port as argument to run hyper_hello benchmark server
// otherwise this starts multiple servers on many ports for test endpoints.
use anyhow::anyhow;
use futures::Future;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
@ -9,6 +10,7 @@ use hyper::header::HeaderValue;
use hyper::server::Server;
use hyper::service::make_service_fn;
use hyper::service::service_fn;
use hyper::upgrade::Upgraded;
use hyper::Body;
use hyper::Request;
use hyper::Response;
@ -49,7 +51,6 @@ use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_rustls::rustls;
use tokio_rustls::TlsAcceptor;
use tokio_tungstenite::accept_async;
use url::Url;
pub mod assertions;
@ -302,69 +303,128 @@ async fn basic_auth_redirect(
Ok(resp)
}
async fn echo_websocket_handler(
ws: fastwebsockets::WebSocket<Upgraded>,
) -> Result<(), anyhow::Error> {
let mut ws = fastwebsockets::FragmentCollector::new(ws);
loop {
let frame = ws.read_frame().await.unwrap();
match frame.opcode {
fastwebsockets::OpCode::Close => break,
fastwebsockets::OpCode::Text | fastwebsockets::OpCode::Binary => {
ws.write_frame(frame).await.unwrap();
}
_ => {}
}
}
Ok(())
}
type WsHandler =
fn(
fastwebsockets::WebSocket<Upgraded>,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send>>;
fn spawn_ws_server<S>(stream: S, handler: WsHandler)
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let srv_fn = service_fn(move |mut req: Request<Body>| async move {
let (response, upgrade_fut) = fastwebsockets::upgrade::upgrade(&mut req)
.map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))?;
tokio::spawn(async move {
let ws = upgrade_fut
.await
.map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))
.unwrap();
if let Err(e) = handler(ws).await {
eprintln!("Error in websocket connection: {}", e);
}
});
Ok::<_, anyhow::Error>(response)
});
tokio::spawn(async move {
let conn_fut = hyper::server::conn::Http::new()
.serve_connection(stream, srv_fn)
.with_upgrades();
if let Err(e) = conn_fut.await {
eprintln!("websocket server error: {e:?}");
}
});
}
async fn run_ws_server(addr: &SocketAddr) {
let listener = TcpListener::bind(addr).await.unwrap();
println!("ready: ws"); // Eye catcher for HttpServerCount
while let Ok((stream, _addr)) = listener.accept().await {
tokio::spawn(async move {
let ws_stream_fut = accept_async(stream);
let ws_stream = ws_stream_fut.await;
if let Ok(ws_stream) = ws_stream {
let (tx, rx) = ws_stream.split();
rx.forward(tx)
.map(|result| {
if let Err(e) = result {
println!("websocket server error: {e:?}");
}
})
.await;
}
});
spawn_ws_server(stream, |ws| Box::pin(echo_websocket_handler(ws)));
}
}
async fn ping_websocket_handler(
ws: fastwebsockets::WebSocket<Upgraded>,
) -> Result<(), anyhow::Error> {
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
let mut ws = fastwebsockets::FragmentCollector::new(ws);
for i in 0..9 {
ws.write_frame(Frame::new(true, OpCode::Ping, None, vec![]))
.await
.unwrap();
let frame = ws.read_frame().await.unwrap();
assert_eq!(frame.opcode, OpCode::Pong);
assert!(frame.payload.is_empty());
ws.write_frame(Frame::text(format!("hello {}", i).as_bytes().to_vec()))
.await
.unwrap();
let frame = ws.read_frame().await.unwrap();
assert_eq!(frame.opcode, OpCode::Text);
assert_eq!(frame.payload, format!("hello {}", i).as_bytes());
}
ws.write_frame(fastwebsockets::Frame::close(1000, b""))
.await
.unwrap();
Ok(())
}
async fn run_ws_ping_server(addr: &SocketAddr) {
let listener = TcpListener::bind(addr).await.unwrap();
println!("ready: ws"); // Eye catcher for HttpServerCount
while let Ok((stream, _addr)) = listener.accept().await {
tokio::spawn(async move {
let ws_stream = accept_async(stream).await;
use futures::SinkExt;
use tokio_tungstenite::tungstenite::Message;
if let Ok(mut ws_stream) = ws_stream {
for i in 0..9 {
ws_stream.send(Message::Ping(vec![])).await.unwrap();
let msg = ws_stream.next().await.unwrap().unwrap();
assert_eq!(msg, Message::Pong(vec![]));
ws_stream
.send(Message::Text(format!("hello {}", i)))
.await
.unwrap();
let msg = ws_stream.next().await.unwrap().unwrap();
assert_eq!(msg, Message::Text(format!("hello {}", i)));
}
ws_stream.close(None).await.unwrap();
}
});
spawn_ws_server(stream, |ws| Box::pin(ping_websocket_handler(ws)));
}
}
async fn close_websocket_handler(
ws: fastwebsockets::WebSocket<Upgraded>,
) -> Result<(), anyhow::Error> {
let mut ws = fastwebsockets::FragmentCollector::new(ws);
ws.write_frame(fastwebsockets::Frame::close_raw(vec![]))
.await
.unwrap();
Ok(())
}
async fn run_ws_close_server(addr: &SocketAddr) {
let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, _addr)) = listener.accept().await {
tokio::spawn(async move {
let ws_stream_fut = accept_async(stream);
let ws_stream = ws_stream_fut.await;
if let Ok(mut ws_stream) = ws_stream {
ws_stream.close(None).await.unwrap();
}
});
spawn_ws_server(stream, |ws| Box::pin(close_websocket_handler(ws)));
}
}
@ -471,18 +531,9 @@ async fn run_wss_server(addr: &SocketAddr) {
tokio::spawn(async move {
match acceptor.accept(stream).await {
Ok(tls_stream) => {
let ws_stream_fut = accept_async(tls_stream);
let ws_stream = ws_stream_fut.await;
if let Ok(ws_stream) = ws_stream {
let (tx, rx) = ws_stream.split();
rx.forward(tx)
.map(|result| {
if let Err(e) = result {
println!("Websocket server error: {e:?}");
}
})
.await;
}
spawn_ws_server(tls_stream, |ws| {
Box::pin(echo_websocket_handler(ws))
});
}
Err(e) => {
eprintln!("TLS accept error: {e:?}");