refactor: migrate ext/websocket to hyper 1.1 (#21699)

This commit is contained in:
Bartek Iwańczuk 2023-12-26 11:20:49 +01:00 committed by GitHub
parent d1f4d81dcf
commit 753777b4af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 58 additions and 45 deletions

6
Cargo.lock generated
View file

@ -1771,9 +1771,15 @@ dependencies = [
"deno_net",
"deno_tls",
"fastwebsockets 0.5.0",
"fastwebsockets 0.6.0",
"h2 0.3.22",
"h2 0.4.0",
"http 0.2.11",
"http 1.0.0",
"http-body-util",
"hyper 0.14.27",
"hyper 1.1.0",
"hyper-util",
"once_cell",
"rustls-tokio-stream",
"serde",

View file

@ -19,9 +19,15 @@ deno_core.workspace = true
deno_net.workspace = true
deno_tls.workspace = true
fastwebsockets = { workspace = true, features = ["upgrade", "unstable-split"] }
fastwebsockets_06 = { package = "fastwebsockets", version = "0.6", features = ["upgrade", "unstable-split"] }
h2.workspace = true
h2_04 = { package = "h2", version = "0.4" }
http.workspace = true
http-body-util = "0.1"
http_1 = { package = "http", version = "1.0" }
hyper = { workspace = true, features = ["backports"] }
hyper-util.workspace = true
hyper1.workspace = true
once_cell.workspace = true
rustls-tokio-stream.workspace = true
serde.workspace = true

View file

@ -25,15 +25,14 @@ use deno_tls::create_client_config;
use deno_tls::rustls::ClientConfig;
use deno_tls::RootCertStoreProvider;
use deno_tls::SocketUse;
use http::header::CONNECTION;
use http::header::UPGRADE;
use http::HeaderName;
use http::HeaderValue;
use http::Method;
use http::Request;
use http::StatusCode;
use http::Uri;
use hyper::Body;
use http_1::header::CONNECTION;
use http_1::header::UPGRADE;
use http_1::HeaderName;
use http_1::HeaderValue;
use http_1::Method;
use http_1::Request;
use http_1::StatusCode;
use http_1::Uri;
use once_cell::sync::Lazy;
use rustls_tokio_stream::rustls::RootCertStore;
use rustls_tokio_stream::rustls::ServerName;
@ -55,13 +54,13 @@ use tokio::io::ReadHalf;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use fastwebsockets::CloseCode;
use fastwebsockets::FragmentCollectorRead;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
use fastwebsockets::Role;
use fastwebsockets::WebSocket;
use fastwebsockets::WebSocketWrite;
use fastwebsockets_06::CloseCode;
use fastwebsockets_06::FragmentCollectorRead;
use fastwebsockets_06::Frame;
use fastwebsockets_06::OpCode;
use fastwebsockets_06::Role;
use fastwebsockets_06::WebSocket;
use fastwebsockets_06::WebSocketWrite;
mod stream;
@ -157,7 +156,7 @@ async fn handshake_websocket(
uri: &Uri,
protocols: &str,
headers: Option<Vec<(ByteString, ByteString)>>,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
let mut request = Request::builder().method(Method::GET).uri(
uri
.path_and_query()
@ -176,14 +175,14 @@ async fn handshake_websocket(
.header(CONNECTION, "Upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
fastwebsockets_06::handshake::generate_key(),
);
let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone();
request =
populate_common_request_headers(request, &user_agent, protocols, &headers)?;
let request = request.body(Body::empty())?;
let request = request.body(http_body_util::Empty::new())?;
let domain = &uri.host().unwrap().to_string();
let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
Some("wss") => 443,
@ -218,19 +217,19 @@ async fn handshake_websocket(
}
async fn handshake_http1_ws(
request: Request<Body>,
request: Request<http_body_util::Empty<Bytes>>,
addr: &String,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?;
handshake_connection(request, tcp_socket).await
}
async fn handshake_http1_wss(
state: &Rc<RefCell<OpState>>,
request: Request<Body>,
request: Request<http_body_util::Empty<Bytes>>,
domain: &str,
addr: &str,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?;
let tls_config = create_ws_client_config(state, SocketUse::Http1Only)?;
let dnsname =
@ -256,7 +255,7 @@ async fn handshake_http2_wss(
domain: &str,
headers: &Option<Vec<(ByteString, ByteString)>>,
addr: &str,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?;
let tls_config = create_ws_client_config(state, SocketUse::Http2Only)?;
let dnsname =
@ -268,7 +267,7 @@ async fn handshake_http2_wss(
if handshake.alpn.is_none() {
bail!("Didn't receive h2 alpn, aborting connection");
}
let h2 = h2::client::Builder::new();
let h2 = h2_04::client::Builder::new();
let (mut send, conn) = h2.handshake::<_, Bytes>(tls_connector).await?;
spawn(conn);
let mut request = Request::builder();
@ -281,13 +280,13 @@ async fn handshake_http2_wss(
request = request.uri(uri);
request =
populate_common_request_headers(request, user_agent, protocols, headers)?;
request = request.extension(h2::ext::Protocol::from("websocket"));
request = request.extension(h2_04::ext::Protocol::from("websocket"));
let (resp, send) = send.send_request(request.body(())?, false)?;
let resp = resp.await?;
if resp.status() != StatusCode::OK {
bail!("Invalid status code: {}", resp.status());
}
let (http::response::Parts { headers, .. }, recv) = resp.into_parts();
let (http_1::response::Parts { headers, .. }, recv) = resp.into_parts();
let mut stream = WebSocket::after_handshake(
WebSocketStream::new(stream::WsStreamKind::H2(send, recv), None),
Role::Client,
@ -302,11 +301,12 @@ async fn handshake_http2_wss(
async fn handshake_connection<
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
>(
request: Request<Body>,
request: Request<http_body_util::Empty<Bytes>>,
socket: S,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
let (upgraded, response) =
fastwebsockets::handshake::client(&LocalExecutor, request, socket).await?;
fastwebsockets_06::handshake::client(&LocalExecutor, request, socket)
.await?;
let upgraded = upgraded.into_inner();
let stream =
@ -340,11 +340,11 @@ pub fn create_ws_client_config(
/// Headers common to both http/1.1 and h2 requests.
fn populate_common_request_headers(
mut request: http::request::Builder,
mut request: http_1::request::Builder,
user_agent: &str,
protocols: &str,
headers: &Option<Vec<(ByteString, ByteString)>>,
) -> Result<http::request::Builder, AnyError> {
) -> Result<http_1::request::Builder, AnyError> {
request = request
.header("User-Agent", user_agent)
.header("Sec-WebSocket-Version", "13");
@ -362,14 +362,14 @@ fn populate_common_request_headers(
let is_disallowed_header = matches!(
name,
http::header::HOST
| http::header::SEC_WEBSOCKET_ACCEPT
| http::header::SEC_WEBSOCKET_EXTENSIONS
| http::header::SEC_WEBSOCKET_KEY
| http::header::SEC_WEBSOCKET_PROTOCOL
| http::header::SEC_WEBSOCKET_VERSION
| http::header::UPGRADE
| http::header::CONNECTION
http_1::header::HOST
| http_1::header::SEC_WEBSOCKET_ACCEPT
| http_1::header::SEC_WEBSOCKET_EXTENSIONS
| http_1::header::SEC_WEBSOCKET_KEY
| http_1::header::SEC_WEBSOCKET_PROTOCOL
| http_1::header::SEC_WEBSOCKET_VERSION
| http_1::header::UPGRADE
| http_1::header::CONNECTION
);
if !is_disallowed_header {
request = request.header(name, v);
@ -892,7 +892,7 @@ pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> {
#[derive(Clone)]
struct LocalExecutor;
impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
impl<Fut> hyper1::rt::Executor<Fut> for LocalExecutor
where
Fut: Future + 'static,
Fut::Output: 'static,

View file

@ -2,9 +2,10 @@
use bytes::Buf;
use bytes::Bytes;
use deno_net::raw::NetworkStream;
use h2::RecvStream;
use h2::SendStream;
use hyper::upgrade::Upgraded;
use h2_04::RecvStream;
use h2_04::SendStream;
use hyper1::upgrade::Upgraded;
use hyper_util::rt::TokioIo;
use std::io::ErrorKind;
use std::pin::Pin;
use std::task::ready;
@ -15,7 +16,7 @@ use tokio::io::ReadBuf;
// TODO(bartlomieju): remove this
pub(crate) enum WsStreamKind {
Upgraded(Upgraded),
Upgraded(TokioIo<Upgraded>),
Network(NetworkStream),
H2(SendStream<Bytes>, RecvStream),
}