refactor: finish test_util server cleanup, simplify dependencies (#21714)

Closes https://github.com/denoland/deno/issues/21578
This commit is contained in:
Bartek Iwańczuk 2023-12-27 14:38:44 +01:00 committed by GitHub
parent d5f6e271ff
commit 33acd437f5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 298 additions and 257 deletions

1
Cargo.lock generated
View file

@ -965,7 +965,6 @@ dependencies = [
"fwdansi",
"glibc_version",
"glob",
"h2 0.4.0",
"hex",
"http 0.2.11",
"http 1.0.0",

View file

@ -107,6 +107,7 @@ http = "0.2.9"
h2 = { version = "0.3.17", features = ["unstable"] }
httparse = "1.8.0"
hyper-util = { version = "=0.1.2", features = ["tokio", "server", "server-auto"] }
http-body-util = "0.1"
hyper = { version = "0.14.26", features = ["runtime", "http1"] }
hyper1 = { package = "hyper", version = "=1.1.0", features = ["full"] }
indexmap = { version = "2", features = ["serde"] }

View file

@ -152,8 +152,7 @@ nix.workspace = true
deno_bench_util.workspace = true
fastwebsockets_06 = { package = "fastwebsockets", version = "0.6", features = ["upgrade", "unstable-split"] }
flaky_test = "=0.1.0"
h2_04 = { package = "h2", version = "0.4" }
http-body-util = "0.1"
http-body-util.workspace = true
http_1 = { package = "http", version = "1.0" }
hyper-util.workspace = true
hyper1.workspace = true

View file

@ -54,5 +54,5 @@ tokio-util = { workspace = true, features = ["io"] }
[dev-dependencies]
bencher.workspace = true
http-body-util = "0.1"
http-body-util.workspace = true
rand.workspace = true

View file

@ -19,9 +19,9 @@ deno_core.workspace = true
deno_net.workspace = true
deno_tls.workspace = true
fastwebsockets.workspace = true
h2_04 = { package = "h2", version = "0.4" }
http-body-util = "0.1"
http_1 = { package = "http", version = "1.0" }
h2 = "0.4"
http = "1.0"
http-body-util.workspace = true
hyper-util.workspace = true
hyper1.workspace = true
once_cell.workspace = true

View file

@ -25,14 +25,14 @@ use deno_tls::create_client_config;
use deno_tls::rustls::ClientConfig;
use deno_tls::RootCertStoreProvider;
use deno_tls::SocketUse;
use http_1::header::CONNECTION;
use http_1::header::UPGRADE;
use http_1::HeaderName;
use http_1::HeaderValue;
use http_1::Method;
use http_1::Request;
use http_1::StatusCode;
use http_1::Uri;
use http::header::CONNECTION;
use http::header::UPGRADE;
use http::HeaderName;
use http::HeaderValue;
use http::Method;
use http::Request;
use http::StatusCode;
use http::Uri;
use once_cell::sync::Lazy;
use rustls_tokio_stream::rustls::RootCertStore;
use rustls_tokio_stream::rustls::ServerName;
@ -156,7 +156,7 @@ async fn handshake_websocket(
uri: &Uri,
protocols: &str,
headers: Option<Vec<(ByteString, ByteString)>>,
) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
let mut request = Request::builder().method(Method::GET).uri(
uri
.path_and_query()
@ -219,7 +219,7 @@ async fn handshake_websocket(
async fn handshake_http1_ws(
request: Request<http_body_util::Empty<Bytes>>,
addr: &String,
) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?;
handshake_connection(request, tcp_socket).await
}
@ -229,7 +229,7 @@ async fn handshake_http1_wss(
request: Request<http_body_util::Empty<Bytes>>,
domain: &str,
addr: &str,
) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?;
let tls_config = create_ws_client_config(state, SocketUse::Http1Only)?;
let dnsname =
@ -255,7 +255,7 @@ async fn handshake_http2_wss(
domain: &str,
headers: &Option<Vec<(ByteString, ByteString)>>,
addr: &str,
) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?;
let tls_config = create_ws_client_config(state, SocketUse::Http2Only)?;
let dnsname =
@ -267,7 +267,7 @@ async fn handshake_http2_wss(
if handshake.alpn.is_none() {
bail!("Didn't receive h2 alpn, aborting connection");
}
let h2 = h2_04::client::Builder::new();
let h2 = h2::client::Builder::new();
let (mut send, conn) = h2.handshake::<_, Bytes>(tls_connector).await?;
spawn(conn);
let mut request = Request::builder();
@ -280,13 +280,13 @@ async fn handshake_http2_wss(
request = request.uri(uri);
request =
populate_common_request_headers(request, user_agent, protocols, headers)?;
request = request.extension(h2_04::ext::Protocol::from("websocket"));
request = request.extension(h2::ext::Protocol::from("websocket"));
let (resp, send) = send.send_request(request.body(())?, false)?;
let resp = resp.await?;
if resp.status() != StatusCode::OK {
bail!("Invalid status code: {}", resp.status());
}
let (http_1::response::Parts { headers, .. }, recv) = resp.into_parts();
let (http::response::Parts { headers, .. }, recv) = resp.into_parts();
let mut stream = WebSocket::after_handshake(
WebSocketStream::new(stream::WsStreamKind::H2(send, recv), None),
Role::Client,
@ -303,7 +303,7 @@ async fn handshake_connection<
>(
request: Request<http_body_util::Empty<Bytes>>,
socket: S,
) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
let (upgraded, response) =
fastwebsockets::handshake::client(&LocalExecutor, request, socket).await?;
@ -339,11 +339,11 @@ pub fn create_ws_client_config(
/// Headers common to both http/1.1 and h2 requests.
fn populate_common_request_headers(
mut request: http_1::request::Builder,
mut request: http::request::Builder,
user_agent: &str,
protocols: &str,
headers: &Option<Vec<(ByteString, ByteString)>>,
) -> Result<http_1::request::Builder, AnyError> {
) -> Result<http::request::Builder, AnyError> {
request = request
.header("User-Agent", user_agent)
.header("Sec-WebSocket-Version", "13");
@ -361,14 +361,14 @@ fn populate_common_request_headers(
let is_disallowed_header = matches!(
name,
http_1::header::HOST
| http_1::header::SEC_WEBSOCKET_ACCEPT
| http_1::header::SEC_WEBSOCKET_EXTENSIONS
| http_1::header::SEC_WEBSOCKET_KEY
| http_1::header::SEC_WEBSOCKET_PROTOCOL
| http_1::header::SEC_WEBSOCKET_VERSION
| http_1::header::UPGRADE
| http_1::header::CONNECTION
http::header::HOST
| http::header::SEC_WEBSOCKET_ACCEPT
| http::header::SEC_WEBSOCKET_EXTENSIONS
| http::header::SEC_WEBSOCKET_KEY
| http::header::SEC_WEBSOCKET_PROTOCOL
| http::header::SEC_WEBSOCKET_VERSION
| http::header::UPGRADE
| http::header::CONNECTION
);
if !is_disallowed_header {
request = request.header(name, v);

View file

@ -2,8 +2,8 @@
use bytes::Buf;
use bytes::Bytes;
use deno_net::raw::NetworkStream;
use h2_04::RecvStream;
use h2_04::SendStream;
use h2::RecvStream;
use h2::SendStream;
use hyper1::upgrade::Upgraded;
use hyper_util::rt::TokioIo;
use std::io::ErrorKind;

View file

@ -101,7 +101,7 @@ fastwebsockets.workspace = true
filetime = "0.2.16"
fs3.workspace = true
http.workspace = true
http-body-util = "0.1"
http-body-util.workspace = true
http_1 = { package = "http", version = "1.0" }
hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] }
hyper-util.workspace = true

View file

@ -25,9 +25,9 @@ fastwebsockets.workspace = true
flate2 = { workspace = true, features = ["default"] }
futures.workspace = true
glob.workspace = true
h2_04 = { package = "h2", version = "0.4" }
http-body-util = "0.1"
http_1 = { package = "http", version = "1.0" }
h2 = "0.4"
http = "1.0"
http-body-util.workspace = true
hyper-util.workspace = true
hyper1.workspace = true
lazy-regex.workspace = true

View file

@ -1,7 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use futures::StreamExt;
use h2_04 as h2;
use h2;
use hyper1::header::HeaderName;
use hyper1::header::HeaderValue;
use rustls_tokio_stream::TlsStream;

View file

@ -0,0 +1,146 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use bytes::Bytes;
use futures::Future;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
use http;
use http::Request;
use http::Response;
use http_body_util::combinators::UnsyncBoxBody;
use hyper1 as hyper;
use hyper_util::rt::TokioIo;
use std::convert::Infallible;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::result::Result;
use tokio::net::TcpListener;
#[derive(Debug, Clone, Copy)]
pub enum ServerKind {
Auto,
OnlyHttp1,
OnlyHttp2,
}
#[derive(Debug, Clone, Copy)]
pub struct ServerOptions {
pub error_msg: &'static str,
pub addr: SocketAddr,
pub kind: ServerKind,
}
type HandlerOutput =
Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error>;
pub async fn run_server<F, S>(options: ServerOptions, handler: F)
where
F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
S: Future<Output = HandlerOutput> + 'static,
{
let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> =
async move {
let listener = TcpListener::bind(options.addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
deno_unsync::spawn(hyper1_serve_connection(
io,
handler,
options.error_msg,
options.kind,
));
}
}
.boxed_local();
if let Err(e) = fut.await {
eprintln!("{}: {:?}", options.error_msg, e);
}
}
pub async fn run_server_with_acceptor<'a, A, F, S>(
mut acceptor: Pin<Box<A>>,
handler: F,
error_msg: &'static str,
kind: ServerKind,
) where
A: Stream<Item = io::Result<rustls_tokio_stream::TlsStream>> + ?Sized,
F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
S: Future<Output = HandlerOutput> + 'static,
{
let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> =
async move {
while let Some(result) = acceptor.next().await {
let stream = result?;
let io = TokioIo::new(stream);
deno_unsync::spawn(hyper1_serve_connection(
io, handler, error_msg, kind,
));
}
Ok(())
}
.boxed_local();
if let Err(e) = fut.await {
eprintln!("{}: {:?}", error_msg, e);
}
}
async fn hyper1_serve_connection<I, F, S>(
io: I,
handler: F,
error_msg: &'static str,
kind: ServerKind,
) where
I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
S: Future<Output = HandlerOutput> + 'static,
{
let service = hyper1::service::service_fn(handler);
let result: Result<(), anyhow::Error> = match kind {
ServerKind::Auto => {
let builder =
hyper_util::server::conn::auto::Builder::new(DenoUnsyncExecutor);
builder
.serve_connection(io, service)
.await
.map_err(|e| anyhow::anyhow!("{}", e))
}
ServerKind::OnlyHttp1 => {
let builder = hyper1::server::conn::http1::Builder::new();
builder
.serve_connection(io, service)
.await
.map_err(|e| e.into())
}
ServerKind::OnlyHttp2 => {
let builder =
hyper1::server::conn::http2::Builder::new(DenoUnsyncExecutor);
builder
.serve_connection(io, service)
.await
.map_err(|e| e.into())
}
};
if let Err(e) = result {
eprintln!("{}: {:?}", error_msg, e);
}
}
#[derive(Clone)]
struct DenoUnsyncExecutor;
impl<Fut> hyper::rt::Executor<Fut> for DenoUnsyncExecutor
where
Fut: Future + 'static,
Fut::Output: 'static,
{
fn execute(&self, fut: Fut) {
deno_unsync::spawn(fut);
}
}

View file

@ -11,43 +11,44 @@ use denokv_proto::datapath::ReadRangeOutput;
use denokv_proto::datapath::SnapshotRead;
use denokv_proto::datapath::SnapshotReadOutput;
use denokv_proto::datapath::SnapshotReadStatus;
use futures::Future;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
use http;
use http::HeaderValue;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use http_1 as http;
use http_body_util::combinators::UnsyncBoxBody;
use http_body_util::BodyExt;
use http_body_util::Empty;
use http_body_util::Full;
use hyper1 as hyper;
use hyper_util::rt::TokioIo;
use pretty_assertions::assert_eq;
use prost::Message;
use std::collections::HashMap;
use std::convert::Infallible;
use std::env;
use std::io;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::path::PathBuf;
use std::pin::Pin;
use std::result::Result;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
mod grpc;
mod hyper_utils;
mod registry;
mod ws;
use hyper_utils::run_server;
use hyper_utils::run_server_with_acceptor;
use hyper_utils::ServerKind;
use hyper_utils::ServerOptions;
use super::https::get_tls_listener_stream;
use super::https::SupportedHttpVersions;
use super::npm::CUSTOM_NPM_PACKAGE_CACHE;
@ -94,8 +95,6 @@ pub async fn run_all_servers() {
return hyper_hello(port.parse::<u16>().unwrap()).await;
}
// TODO(bartlomieju): in a follow up all these `wrap_` handlers could be removed
// in favor of spawning a hyper server directly with a config object
let redirect_server_fut = wrap_redirect_server(REDIRECT_PORT);
let double_redirects_server_fut =
wrap_double_redirect_server(DOUBLE_REDIRECTS_PORT);
@ -185,7 +184,15 @@ async fn hyper_hello(port: u16) {
http_body_util::Full::new(Bytes::from("Hello World!")),
)))
};
run_hyper1_server(addr, handler, "server error").await;
run_server(
ServerOptions {
addr,
error_msg: "server error",
kind: ServerKind::Auto,
},
handler,
)
.await;
}
fn redirect_resp(url: String) -> Response<UnsyncBoxBody<Bytes, Infallible>> {
@ -1159,221 +1166,93 @@ async fn download_npm_registry_file(
Ok(())
}
#[derive(Clone)]
struct DenoUnsyncExecutor;
impl<Fut> hyper::rt::Executor<Fut> for DenoUnsyncExecutor
where
Fut: Future + 'static,
Fut::Output: 'static,
{
fn execute(&self, fut: Fut) {
deno_unsync::spawn(fut);
}
}
#[derive(Debug, Clone, Copy)]
enum Hyper1ServerKind {
Auto,
OnlyHttp1,
OnlyHttp2,
}
async fn hyper1_serve_connection<I, F, S>(
io: I,
service_fn_handler: F,
error_msg: &'static str,
kind: Hyper1ServerKind,
) where
I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
S: Future<
Output = Result<
Response<UnsyncBoxBody<Bytes, Infallible>>,
anyhow::Error,
>,
> + 'static,
{
let service = hyper1::service::service_fn(service_fn_handler);
let result: Result<(), anyhow::Error> = match kind {
Hyper1ServerKind::Auto => {
let builder =
hyper_util::server::conn::auto::Builder::new(DenoUnsyncExecutor);
builder
.serve_connection(io, service)
.await
.map_err(|e| anyhow::anyhow!("{}", e))
}
Hyper1ServerKind::OnlyHttp1 => {
let builder = hyper1::server::conn::http1::Builder::new();
builder
.serve_connection(io, service)
.await
.map_err(|e| e.into())
}
Hyper1ServerKind::OnlyHttp2 => {
let builder =
hyper1::server::conn::http2::Builder::new(DenoUnsyncExecutor);
builder
.serve_connection(io, service)
.await
.map_err(|e| e.into())
}
};
if let Err(e) = result {
eprintln!("{}: {:?}", error_msg, e);
}
}
async fn run_hyper1_server_inner<F, S>(
addr: SocketAddr,
service_fn_handler: F,
error_msg: &'static str,
kind: Hyper1ServerKind,
) where
F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
S: Future<
Output = Result<
Response<UnsyncBoxBody<Bytes, Infallible>>,
anyhow::Error,
>,
> + 'static,
{
let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> =
async move {
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
deno_unsync::spawn(hyper1_serve_connection(
io,
service_fn_handler,
error_msg,
kind,
));
}
}
.boxed_local();
if let Err(e) = fut.await {
eprintln!("{}: {:?}", error_msg, e);
}
}
async fn run_hyper1_server<F, S>(
addr: SocketAddr,
service_fn_handler: F,
error_msg: &'static str,
) where
F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
S: Future<
Output = Result<
Response<UnsyncBoxBody<Bytes, Infallible>>,
anyhow::Error,
>,
> + 'static,
{
run_hyper1_server_inner(
addr,
service_fn_handler,
error_msg,
Hyper1ServerKind::Auto,
)
.await
}
// TODO(bartlomieju): dedup with `run_hyper1_server_inner`
async fn run_hyper1_server_with_acceptor<'a, A, F, S>(
mut acceptor: Pin<Box<A>>,
service_fn_handler: F,
error_msg: &'static str,
kind: Hyper1ServerKind,
) where
A: Stream<Item = io::Result<rustls_tokio_stream::TlsStream>> + ?Sized,
F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
S: Future<
Output = Result<
Response<UnsyncBoxBody<Bytes, Infallible>>,
anyhow::Error,
>,
> + 'static,
{
let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> =
async move {
while let Some(result) = acceptor.next().await {
let stream = result?;
let io = TokioIo::new(stream);
deno_unsync::spawn(hyper1_serve_connection(
io,
service_fn_handler,
error_msg,
kind,
));
}
Ok(())
}
.boxed_local();
if let Err(e) = fut.await {
eprintln!("{}: {:?}", error_msg, e);
}
}
async fn wrap_redirect_server(port: u16) {
let redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
run_hyper1_server(redirect_addr, redirect, "Redirect error").await;
run_server(
ServerOptions {
addr: redirect_addr,
error_msg: "Redirect error",
kind: ServerKind::Auto,
},
redirect,
)
.await;
}
async fn wrap_double_redirect_server(port: u16) {
let double_redirects_addr = SocketAddr::from(([127, 0, 0, 1], port));
run_hyper1_server(
double_redirects_addr,
run_server(
ServerOptions {
addr: double_redirects_addr,
error_msg: "Double redirect error",
kind: ServerKind::Auto,
},
double_redirects,
"Double redirect error",
)
.await;
}
async fn wrap_inf_redirect_server(port: u16) {
let inf_redirects_addr = SocketAddr::from(([127, 0, 0, 1], port));
run_hyper1_server(inf_redirects_addr, inf_redirects, "Inf redirect error")
.await;
run_server(
ServerOptions {
addr: inf_redirects_addr,
error_msg: "Inf redirect error",
kind: ServerKind::Auto,
},
inf_redirects,
)
.await;
}
async fn wrap_another_redirect_server(port: u16) {
let another_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
run_hyper1_server(
another_redirect_addr,
run_server(
ServerOptions {
addr: another_redirect_addr,
error_msg: "Another redirect error",
kind: ServerKind::Auto,
},
another_redirect,
"Another redirect error",
)
.await;
}
async fn wrap_auth_redirect_server(port: u16) {
let auth_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
run_hyper1_server(auth_redirect_addr, auth_redirect, "Auth redirect error")
.await;
run_server(
ServerOptions {
addr: auth_redirect_addr,
error_msg: "Auth redirect error",
kind: ServerKind::Auto,
},
auth_redirect,
)
.await;
}
async fn wrap_basic_auth_redirect_server(port: u16) {
let basic_auth_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
run_hyper1_server(
basic_auth_redirect_addr,
run_server(
ServerOptions {
addr: basic_auth_redirect_addr,
error_msg: "Basic auth redirect error",
kind: ServerKind::Auto,
},
basic_auth_redirect,
"Basic auth redirect error",
)
.await;
}
async fn wrap_abs_redirect_server(port: u16) {
let abs_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
run_hyper1_server(
abs_redirect_addr,
run_server(
ServerOptions {
addr: abs_redirect_addr,
error_msg: "Absolute redirect error",
kind: ServerKind::Auto,
},
absolute_redirect,
"Absolute redirect error",
)
.await;
}
@ -1392,17 +1271,25 @@ async fn wrap_main_ipv6_server(port: u16) {
}
async fn wrap_main_server_for_addr(main_server_addr: &SocketAddr) {
run_hyper1_server(*main_server_addr, main_server, "HTTP server error").await;
run_server(
ServerOptions {
addr: *main_server_addr,
kind: ServerKind::Auto,
error_msg: "HTTP server error",
},
main_server,
)
.await;
}
async fn wrap_main_https_server(port: u16) {
let tls = get_tls_listener_stream("https", port, Default::default()).await;
let tls_acceptor = tls.boxed_local();
run_hyper1_server_with_acceptor(
run_server_with_acceptor(
tls_acceptor,
main_server,
"HTTPS server error",
Hyper1ServerKind::Auto,
ServerKind::Auto,
)
.await
}
@ -1415,11 +1302,11 @@ async fn wrap_https_h1_only_tls_server(port: u16) {
)
.await;
run_hyper1_server_with_acceptor(
run_server_with_acceptor(
tls.boxed_local(),
main_server,
"HTTP1 only TLS server error",
Hyper1ServerKind::OnlyHttp1,
ServerKind::OnlyHttp1,
)
.await
}
@ -1432,33 +1319,37 @@ async fn wrap_https_h2_only_tls_server(port: u16) {
)
.await;
run_hyper1_server_with_acceptor(
run_server_with_acceptor(
tls.boxed_local(),
main_server,
"HTTP2 only TLS server error",
Hyper1ServerKind::OnlyHttp2,
ServerKind::OnlyHttp2,
)
.await
}
async fn wrap_http_h1_only_server(port: u16) {
let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], port));
run_hyper1_server_inner(
main_server_http_addr,
run_server(
ServerOptions {
addr: main_server_http_addr,
error_msg: "HTTP1 only server error:",
kind: ServerKind::OnlyHttp1,
},
main_server,
"HTTP1 only server error:",
Hyper1ServerKind::OnlyHttp1,
)
.await;
}
async fn wrap_http_h2_only_server(port: u16) {
let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], port));
run_hyper1_server_inner(
main_server_http_addr,
run_server(
ServerOptions {
addr: main_server_http_addr,
error_msg: "HTTP1 only server error:",
kind: ServerKind::OnlyHttp2,
},
main_server,
"HTTP1 only server error:",
Hyper1ServerKind::OnlyHttp2,
)
.await;
}
@ -1480,11 +1371,11 @@ async fn wrap_client_auth_https_server(port: u16) {
}
};
run_hyper1_server_with_acceptor(
run_server_with_acceptor(
tls.boxed_local(),
main_server,
"Auth TLS server error",
Hyper1ServerKind::Auto,
ServerKind::Auto,
)
.await
}

View file

@ -1,6 +1,8 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use super::run_hyper1_server;
use super::run_server;
use super::ServerKind;
use super::ServerOptions;
use bytes::Bytes;
use http_body_util::combinators::UnsyncBoxBody;
use http_body_util::Empty;
@ -16,10 +18,13 @@ use std::net::SocketAddr;
pub async fn registry_server(port: u16) {
let registry_server_addr = SocketAddr::from(([127, 0, 0, 1], port));
run_hyper1_server(
registry_server_addr,
run_server(
ServerOptions {
addr: registry_server_addr,
error_msg: "Registry server error",
kind: ServerKind::Auto,
},
registry_server_handler,
"Registry server error",
)
.await
}

View file

@ -11,10 +11,10 @@ use futures::future::join3;
use futures::future::poll_fn;
use futures::Future;
use futures::StreamExt;
use h2_04::server::Handshake;
use h2_04::server::SendResponse;
use h2_04::Reason;
use h2_04::RecvStream;
use h2::server::Handshake;
use h2::server::SendResponse;
use h2::Reason;
use h2::RecvStream;
use hyper1::upgrade::Upgraded;
use hyper1::Method;
use hyper1::Request;
@ -70,7 +70,7 @@ pub async fn run_wss2_server(port: u16) {
.await;
while let Some(Ok(tls)) = tls.next().await {
tokio::spawn(async move {
let mut h2 = h2_04::server::Builder::new();
let mut h2 = h2::server::Builder::new();
h2.enable_connect_protocol();
// Using Bytes is pretty alloc-heavy but this is a test server
let server: Handshake<_, Bytes> = h2.handshake(tls);
@ -127,7 +127,7 @@ where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let service = hyper1::service::service_fn(
move |mut req: http_1::Request<hyper1::body::Incoming>| async move {
move |mut req: http::Request<hyper1::body::Incoming>| async move {
let (response, upgrade_fut) = fastwebsockets::upgrade::upgrade(&mut req)
.map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))?;
@ -161,13 +161,13 @@ where
async fn handle_wss_stream(
recv: Request<RecvStream>,
mut send: SendResponse<Bytes>,
) -> Result<(), h2_04::Error> {
) -> Result<(), h2::Error> {
if recv.method() != Method::CONNECT {
eprintln!("wss2: refusing non-CONNECT stream");
send.send_reset(Reason::REFUSED_STREAM);
return Ok(());
}
let Some(protocol) = recv.extensions().get::<h2_04::ext::Protocol>() else {
let Some(protocol) = recv.extensions().get::<h2::ext::Protocol>() else {
eprintln!("wss2: refusing no-:protocol stream");
send.send_reset(Reason::REFUSED_STREAM);
return Ok(());