From 4f4dcf52916b5e9d200d223d046cef332b5a598e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 27 Dec 2023 11:45:12 +0100 Subject: [PATCH] refactor: rewrite remaining test server to Hyper 1.1 (#21708) Ref https://github.com/denoland/deno/issues/21578 --- Cargo.lock | 4 +- test_util/Cargo.toml | 4 +- test_util/src/servers/mod.rs | 687 ++++++++++++++++++------------ test_util/src/servers/registry.rs | 47 +- 4 files changed, 442 insertions(+), 300 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2a7d188de3..3772c739df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5969,15 +5969,15 @@ dependencies = [ "base64 0.21.5", "bytes", "console_static_text", + "deno_unsync 0.3.0", "denokv_proto", "fastwebsockets", "flate2", "futures", "glob", - "h2 0.3.22", "h2 0.4.0", "http 1.0.0", - "hyper 0.14.27", + "http-body-util", "hyper 1.1.0", "hyper-util", "lazy-regex", diff --git a/test_util/Cargo.toml b/test_util/Cargo.toml index bb0f84d764..d51252a3b3 100644 --- a/test_util/Cargo.toml +++ b/test_util/Cargo.toml @@ -19,15 +19,15 @@ async-stream = "0.3.3" base64.workspace = true bytes.workspace = true console_static_text.workspace = true +deno_unsync = "0.3.0" denokv_proto.workspace = true fastwebsockets.workspace = true flate2 = { workspace = true, features = ["default"] } futures.workspace = true glob.workspace = true -h2.workspace = true h2_04 = { package = "h2", version = "0.4" } +http-body-util = "0.1" http_1 = { package = "http", version = "1.0" } -hyper = { workspace = true, features = ["server", "http1", "http2", "runtime"] } hyper-util.workspace = true hyper1.workspace = true lazy-regex.workspace = true diff --git a/test_util/src/servers/mod.rs b/test_util/src/servers/mod.rs index bf85aaed08..18953b7f04 100644 --- a/test_util/src/servers/mod.rs +++ b/test_util/src/servers/mod.rs @@ -3,6 +3,7 @@ // otherwise this starts multiple servers on many ports for test endpoints. use base64::prelude::BASE64_STANDARD; use base64::Engine; +use bytes::Bytes; use denokv_proto::datapath::AtomicWrite; use denokv_proto::datapath::AtomicWriteOutput; use denokv_proto::datapath::AtomicWriteStatus; @@ -10,17 +11,22 @@ use denokv_proto::datapath::ReadRangeOutput; use denokv_proto::datapath::SnapshotRead; use denokv_proto::datapath::SnapshotReadOutput; use denokv_proto::datapath::SnapshotReadStatus; +use futures::Future; use futures::FutureExt; use futures::Stream; use futures::StreamExt; -use hyper::header::HeaderValue; -use hyper::server::Server; -use hyper::service::make_service_fn; -use hyper::service::service_fn; -use hyper::Body; -use hyper::Request; -use hyper::Response; -use hyper::StatusCode; +use http::HeaderValue; +use http::Method; +use http::Request; +use http::Response; +use http::StatusCode; +use http_1 as http; +use http_body_util::combinators::UnsyncBoxBody; +use http_body_util::BodyExt; +use http_body_util::Empty; +use http_body_util::Full; +use hyper1 as hyper; +use hyper_util::rt::TokioIo; use pretty_assertions::assert_eq; use prost::Message; use std::collections::HashMap; @@ -33,10 +39,9 @@ use std::net::SocketAddrV6; use std::path::PathBuf; use std::pin::Pin; use std::result::Result; -use std::task::Context; -use std::task::Poll; use std::time::Duration; use tokio::io::AsyncWriteExt; +use tokio::net::TcpListener; use tokio::net::TcpStream; mod grpc; @@ -89,6 +94,8 @@ pub async fn run_all_servers() { return hyper_hello(port.parse::().unwrap()).await; } + // TODO(bartlomieju): in a follow up all these `wrap_` handlers could be removed + // in favor of spawning a hyper server directly with a config object let redirect_server_fut = wrap_redirect_server(REDIRECT_PORT); let double_redirects_server_fut = wrap_double_redirect_server(DOUBLE_REDIRECTS_PORT); @@ -156,34 +163,45 @@ pub async fn run_all_servers() { server_fut.await; } +fn empty_body() -> UnsyncBoxBody { + UnsyncBoxBody::new(Empty::new()) +} + +fn string_body(str_: &str) -> UnsyncBoxBody { + UnsyncBoxBody::new(Full::new(Bytes::from(str_.to_string()))) +} + +fn json_body(value: serde_json::Value) -> UnsyncBoxBody { + let str_ = value.to_string(); + string_body(&str_) +} + /// Benchmark server that just serves "hello world" responses. async fn hyper_hello(port: u16) { println!("hyper hello"); let addr = SocketAddr::from(([127, 0, 0, 1], port)); - let hello_svc = make_service_fn(|_| async move { - Ok::<_, Infallible>(service_fn(move |_: Request| async move { - Ok::<_, Infallible>(Response::new(Body::from("Hello World!"))) - })) - }); - - let server = Server::bind(&addr).serve(hello_svc); - if let Err(e) = server.await { - eprintln!("server error: {e}"); - } + let handler = move |_: Request| async move { + Ok::<_, anyhow::Error>(Response::new(UnsyncBoxBody::new( + http_body_util::Full::new(Bytes::from("Hello World!")), + ))) + }; + run_hyper1_server(addr, handler, "server error").await; } -fn redirect_resp(url: String) -> Response { - let mut redirect_resp = Response::new(Body::empty()); +fn redirect_resp(url: String) -> Response> { + let mut redirect_resp = Response::new(UnsyncBoxBody::new(Empty::new())); *redirect_resp.status_mut() = StatusCode::MOVED_PERMANENTLY; redirect_resp.headers_mut().insert( - hyper::header::LOCATION, + http::header::LOCATION, HeaderValue::from_str(&url[..]).unwrap(), ); redirect_resp } -async fn redirect(req: Request) -> hyper::Result> { +async fn redirect( + req: Request, +) -> Result>, anyhow::Error> { let p = req.uri().path(); assert_eq!(&p[0..1], "/"); let url = format!("http://localhost:{PORT}{p}"); @@ -191,7 +209,9 @@ async fn redirect(req: Request) -> hyper::Result> { Ok(redirect_resp(url)) } -async fn double_redirects(req: Request) -> hyper::Result> { +async fn double_redirects( + req: Request, +) -> Result>, anyhow::Error> { let p = req.uri().path(); assert_eq!(&p[0..1], "/"); let url = format!("http://localhost:{REDIRECT_PORT}{p}"); @@ -199,7 +219,9 @@ async fn double_redirects(req: Request) -> hyper::Result> { Ok(redirect_resp(url)) } -async fn inf_redirects(req: Request) -> hyper::Result> { +async fn inf_redirects( + req: Request, +) -> Result>, anyhow::Error> { let p = req.uri().path(); assert_eq!(&p[0..1], "/"); let url = format!("http://localhost:{INF_REDIRECTS_PORT}{p}"); @@ -207,7 +229,9 @@ async fn inf_redirects(req: Request) -> hyper::Result> { Ok(redirect_resp(url)) } -async fn another_redirect(req: Request) -> hyper::Result> { +async fn another_redirect( + req: Request, +) -> Result>, anyhow::Error> { let p = req.uri().path(); assert_eq!(&p[0..1], "/"); let url = format!("http://localhost:{PORT}/subdir{p}"); @@ -215,7 +239,9 @@ async fn another_redirect(req: Request) -> hyper::Result> { Ok(redirect_resp(url)) } -async fn auth_redirect(req: Request) -> hyper::Result> { +async fn auth_redirect( + req: Request, +) -> Result>, anyhow::Error> { if let Some(auth) = req .headers() .get("authorization") @@ -229,14 +255,14 @@ async fn auth_redirect(req: Request) -> hyper::Result> { } } - let mut resp = Response::new(Body::empty()); + let mut resp = Response::new(UnsyncBoxBody::new(Empty::new())); *resp.status_mut() = StatusCode::NOT_FOUND; Ok(resp) } async fn basic_auth_redirect( - req: Request, -) -> hyper::Result> { + req: Request, +) -> Result>, anyhow::Error> { if let Some(auth) = req .headers() .get("authorization") @@ -252,7 +278,7 @@ async fn basic_auth_redirect( } } - let mut resp = Response::new(Body::empty()); + let mut resp = Response::new(UnsyncBoxBody::new(Empty::new())); *resp.status_mut() = StatusCode::NOT_FOUND; Ok(resp) } @@ -331,8 +357,8 @@ async fn run_tls_server(port: u16) { } async fn absolute_redirect( - req: Request, -) -> hyper::Result> { + req: Request, +) -> Result>, anyhow::Error> { let path = req.uri().path(); if path == "/" { @@ -373,7 +399,7 @@ async fn absolute_redirect( let file_path = testdata_path().join(&req.uri().path()[1..]); if file_path.is_dir() || !file_path.exists() { - let mut not_found_resp = Response::new(Body::empty()); + let mut not_found_resp = Response::new(UnsyncBoxBody::new(Empty::new())); *not_found_resp.status_mut() = StatusCode::NOT_FOUND; return Ok(not_found_resp); } @@ -384,12 +410,14 @@ async fn absolute_redirect( } async fn main_server( - req: Request, -) -> Result, hyper::http::Error> { + req: Request, +) -> Result>, anyhow::Error> { return match (req.method(), req.uri().path()) { (_, "/echo_server") => { let (parts, body) = req.into_parts(); - let mut response = Response::new(body); + let mut response = Response::new(UnsyncBoxBody::new(Full::new( + body.collect().await?.to_bytes(), + ))); if let Some(status) = parts.headers.get("x-status") { *response.status_mut() = @@ -398,9 +426,9 @@ async fn main_server( response.headers_mut().extend(parts.headers); Ok(response) } - (&hyper::Method::POST, "/echo_multipart_file") => { + (&Method::POST, "/echo_multipart_file") => { let body = req.into_body(); - let bytes = &hyper::body::to_bytes(body).await.unwrap()[0..]; + let bytes = &body.collect().await.unwrap().to_bytes()[0..]; let start = b"--boundary\t \r\n\ Content-Disposition: form-data; name=\"field_1\"\r\n\ \r\n\ @@ -413,7 +441,8 @@ async fn main_server( let end = b"\r\n--boundary--\r\n"; let b = [start as &[u8], bytes, end].concat(); - let mut response = Response::new(Body::from(b)); + let mut response = + Response::new(UnsyncBoxBody::new(Full::new(Bytes::from(b)))); response.headers_mut().insert( "content-type", HeaderValue::from_static("multipart/form-data;boundary=boundary"), @@ -434,7 +463,7 @@ async fn main_server( console.log(\"Hi\")\ \r\n--boundary--\r\n\ Epilogue"; - let mut res = Response::new(Body::from(b)); + let mut res = Response::new(string_body(b)); res.headers_mut().insert( "content-type", HeaderValue::from_static("multipart/form-data;boundary=boundary"), @@ -455,7 +484,7 @@ async fn main_server( console.log(\"Hi\")\ \r\n--boundary--\r\n\ Epilogue"; - let mut res = Response::new(Body::from(b)); + let mut res = Response::new(string_body(b)); res.headers_mut().insert( "content-type", HeaderValue::from_static("multipart/form-datatststs;boundary=boundary"), @@ -463,17 +492,17 @@ async fn main_server( Ok(res) } (_, "/bad_redirect") => { - let mut res = Response::new(Body::empty()); + let mut res = Response::new(empty_body()); *res.status_mut() = StatusCode::FOUND; Ok(res) } (_, "/server_error") => { - let mut res = Response::new(Body::empty()); + let mut res = Response::new(empty_body()); *res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; Ok(res) } (_, "/x_deno_warning.js") => { - let mut res = Response::new(Body::empty()); + let mut res = Response::new(empty_body()); *res.status_mut() = StatusCode::MOVED_PERMANENTLY; res .headers_mut() @@ -485,7 +514,7 @@ async fn main_server( Ok(res) } (_, "/non_ascii_redirect") => { - let mut res = Response::new(Body::empty()); + let mut res = Response::new(empty_body()); *res.status_mut() = StatusCode::MOVED_PERMANENTLY; res.headers_mut().insert( "location", @@ -497,7 +526,7 @@ async fn main_server( let if_none_match = req.headers().get("if-none-match"); if if_none_match == Some(&HeaderValue::from_static("33a64df551425fcc55e")) { - let mut resp = Response::new(Body::empty()); + let mut resp = Response::new(empty_body()); *resp.status_mut() = StatusCode::NOT_MODIFIED; resp.headers_mut().insert( "Content-type", @@ -509,7 +538,7 @@ async fn main_server( Ok(resp) } else { - let mut resp = Response::new(Body::from("console.log('etag')")); + let mut resp = Response::new(string_body("console.log('etag')")); resp.headers_mut().insert( "Content-type", HeaderValue::from_static("application/typescript"), @@ -521,7 +550,7 @@ async fn main_server( } } (_, "/xTypeScriptTypes.js") => { - let mut res = Response::new(Body::from("export const foo = 'foo';")); + let mut res = Response::new(string_body("export const foo = 'foo';")); res.headers_mut().insert( "Content-type", HeaderValue::from_static("application/javascript"), @@ -533,7 +562,7 @@ async fn main_server( Ok(res) } (_, "/xTypeScriptTypes.jsx") => { - let mut res = Response::new(Body::from("export const foo = 'foo';")); + let mut res = Response::new(string_body("export const foo = 'foo';")); res .headers_mut() .insert("Content-type", HeaderValue::from_static("text/jsx")); @@ -545,7 +574,7 @@ async fn main_server( } (_, "/xTypeScriptTypes.ts") => { let mut res = - Response::new(Body::from("export const foo: string = 'foo';")); + Response::new(string_body("export const foo: string = 'foo';")); res.headers_mut().insert( "Content-type", HeaderValue::from_static("application/typescript"), @@ -557,7 +586,7 @@ async fn main_server( Ok(res) } (_, "/xTypeScriptTypes.d.ts") => { - let mut res = Response::new(Body::from("export const foo: 'foo';")); + let mut res = Response::new(string_body("export const foo: 'foo';")); res.headers_mut().insert( "Content-type", HeaderValue::from_static("application/typescript"), @@ -565,7 +594,7 @@ async fn main_server( Ok(res) } (_, "/run/type_directives_redirect.js") => { - let mut res = Response::new(Body::from("export const foo = 'foo';")); + let mut res = Response::new(string_body("export const foo = 'foo';")); res.headers_mut().insert( "Content-type", HeaderValue::from_static("application/javascript"), @@ -579,7 +608,7 @@ async fn main_server( Ok(res) } (_, "/run/type_headers_deno_types.foo.js") => { - let mut res = Response::new(Body::from( + let mut res = Response::new(string_body( "export function foo(text) { console.log(text); }", )); res.headers_mut().insert( @@ -596,7 +625,7 @@ async fn main_server( } (_, "/run/type_headers_deno_types.d.ts") => { let mut res = - Response::new(Body::from("export function foo(text: number): void;")); + Response::new(string_body("export function foo(text: number): void;")); res.headers_mut().insert( "Content-type", HeaderValue::from_static("application/typescript"), @@ -605,7 +634,7 @@ async fn main_server( } (_, "/run/type_headers_deno_types.foo.d.ts") => { let mut res = - Response::new(Body::from("export function foo(text: string): void;")); + Response::new(string_body("export function foo(text: string): void;")); res.headers_mut().insert( "Content-type", HeaderValue::from_static("application/typescript"), @@ -613,7 +642,7 @@ async fn main_server( Ok(res) } (_, "/subdir/xTypeScriptTypesRedirect.d.ts") => { - let mut res = Response::new(Body::from( + let mut res = Response::new(string_body( "import './xTypeScriptTypesRedirected.d.ts';", )); res.headers_mut().insert( @@ -623,7 +652,7 @@ async fn main_server( Ok(res) } (_, "/subdir/xTypeScriptTypesRedirected.d.ts") => { - let mut res = Response::new(Body::from("export const foo: 'foo';")); + let mut res = Response::new(string_body("export const foo: 'foo';")); res.headers_mut().insert( "Content-type", HeaderValue::from_static("application/typescript"), @@ -631,7 +660,7 @@ async fn main_server( Ok(res) } (_, "/referenceTypes.js") => { - let mut res = Response::new(Body::from("/// \r\nexport const foo = \"foo\";\r\n")); + let mut res = Response::new(string_body("/// \r\nexport const foo = \"foo\";\r\n")); res.headers_mut().insert( "Content-type", HeaderValue::from_static("application/javascript"), @@ -639,7 +668,7 @@ async fn main_server( Ok(res) } (_, "/subdir/file_with_:_in_name.ts") => { - let mut res = Response::new(Body::from( + let mut res = Response::new(string_body( "console.log('Hello from file_with_:_in_name.ts');", )); res.headers_mut().insert( @@ -650,7 +679,7 @@ async fn main_server( } (_, "/v1/extensionless") => { let mut res = - Response::new(Body::from(r#"export * from "/subdir/mod1.ts";"#)); + Response::new(string_body(r#"export * from "/subdir/mod1.ts";"#)); res.headers_mut().insert( "content-type", HeaderValue::from_static("application/typescript"), @@ -658,7 +687,7 @@ async fn main_server( Ok(res) } (_, "/subdir/no_js_ext@1.0.0") => { - let mut res = Response::new(Body::from( + let mut res = Response::new(string_body( r#"import { printHello } from "./mod2.ts"; printHello(); "#, @@ -678,19 +707,19 @@ async fn main_server( body, )) } else { - Ok(Response::new(Body::empty())) + Ok(Response::new(empty_body())) } } (_, "/http_version") => { let version = format!("{:?}", req.version()); - Ok(Response::new(version.into())) + Ok(Response::new(string_body(&version))) } (_, "/content_length") => { let content_length = format!("{:?}", req.headers().get("content-length")); - Ok(Response::new(content_length.into())) + Ok(Response::new(string_body(&content_length))) } (_, "/jsx/jsx-runtime") | (_, "/jsx/jsx-dev-runtime") => { - let mut res = Response::new(Body::from( + let mut res = Response::new(string_body( r#"export function jsx( _type, _props, @@ -711,8 +740,8 @@ async fn main_server( Ok(res) } (_, "/dynamic") => { - let mut res = Response::new(Body::from( - serde_json::to_string_pretty(&std::time::SystemTime::now()).unwrap(), + let mut res = Response::new(string_body( + &serde_json::to_string_pretty(&std::time::SystemTime::now()).unwrap(), )); res .headers_mut() @@ -720,8 +749,8 @@ async fn main_server( Ok(res) } (_, "/dynamic_cache") => { - let mut res = Response::new(Body::from( - serde_json::to_string_pretty(&std::time::SystemTime::now()).unwrap(), + let mut res = Response::new(string_body( + &serde_json::to_string_pretty(&std::time::SystemTime::now()).unwrap(), )); res.headers_mut().insert( "cache-control", @@ -730,7 +759,7 @@ async fn main_server( Ok(res) } (_, "/dynamic_module.ts") => { - let mut res = Response::new(Body::from(format!( + let mut res = Response::new(string_body(&format!( r#"export const time = {};"#, std::time::SystemTime::now().elapsed().unwrap().as_nanos() ))); @@ -742,17 +771,16 @@ async fn main_server( } (_, "/echo_accept") => { let accept = req.headers().get("accept").map(|v| v.to_str().unwrap()); - let res = Response::new(Body::from( - serde_json::json!({ "accept": accept }).to_string(), - )); + let res = + Response::new(json_body(serde_json::json!({ "accept": accept }))); Ok(res) } (_, "/search_params") => { let query = req.uri().query().map(|s| s.to_string()); - let res = Response::new(Body::from(query.unwrap_or_default())); + let res = Response::new(string_body(&query.unwrap_or_default())); Ok(res) } - (&hyper::Method::POST, "/kv_remote_authorize") => { + (&Method::POST, "/kv_remote_authorize") => { if req .headers() .get("authorization") @@ -763,7 +791,7 @@ async fn main_server( return Ok( Response::builder() .status(StatusCode::UNAUTHORIZED) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ); } @@ -771,25 +799,22 @@ async fn main_server( Ok( Response::builder() .header("content-type", "application/json") - .body(Body::from( - serde_json::json!({ - "version": 1, - "databaseId": KV_DATABASE_ID, - "endpoints": [ - { - "url": format!("http://localhost:{}/kv_blackhole", PORT), - "consistency": "strong", - } - ], - "token": KV_DATABASE_TOKEN, - "expiresAt": "2099-01-01T00:00:00Z", - }) - .to_string(), - )) + .body(json_body(serde_json::json!({ + "version": 1, + "databaseId": KV_DATABASE_ID, + "endpoints": [ + { + "url": format!("http://localhost:{}/kv_blackhole", PORT), + "consistency": "strong", + } + ], + "token": KV_DATABASE_TOKEN, + "expiresAt": "2099-01-01T00:00:00Z", + }))) .unwrap(), ) } - (&hyper::Method::POST, "/kv_remote_authorize_invalid_format") => { + (&Method::POST, "/kv_remote_authorize_invalid_format") => { if req .headers() .get("authorization") @@ -800,7 +825,7 @@ async fn main_server( return Ok( Response::builder() .status(StatusCode::UNAUTHORIZED) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ); } @@ -808,17 +833,14 @@ async fn main_server( Ok( Response::builder() .header("content-type", "application/json") - .body(Body::from( - serde_json::json!({ - "version": 1, - "databaseId": KV_DATABASE_ID, - }) - .to_string(), - )) + .body(json_body(serde_json::json!({ + "version": 1, + "databaseId": KV_DATABASE_ID, + }))) .unwrap(), ) } - (&hyper::Method::POST, "/kv_remote_authorize_invalid_version") => { + (&Method::POST, "/kv_remote_authorize_invalid_version") => { if req .headers() .get("authorization") @@ -829,7 +851,7 @@ async fn main_server( return Ok( Response::builder() .status(StatusCode::UNAUTHORIZED) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ); } @@ -837,25 +859,22 @@ async fn main_server( Ok( Response::builder() .header("content-type", "application/json") - .body(Body::from( - serde_json::json!({ - "version": 1000, - "databaseId": KV_DATABASE_ID, - "endpoints": [ - { - "url": format!("http://localhost:{}/kv_blackhole", PORT), - "consistency": "strong", - } - ], - "token": KV_DATABASE_TOKEN, - "expiresAt": "2099-01-01T00:00:00Z", - }) - .to_string(), - )) + .body(json_body(serde_json::json!({ + "version": 1000, + "databaseId": KV_DATABASE_ID, + "endpoints": [ + { + "url": format!("http://localhost:{}/kv_blackhole", PORT), + "consistency": "strong", + } + ], + "token": KV_DATABASE_TOKEN, + "expiresAt": "2099-01-01T00:00:00Z", + }))) .unwrap(), ) } - (&hyper::Method::POST, "/kv_blackhole/snapshot_read") => { + (&Method::POST, "/kv_blackhole/snapshot_read") => { if req .headers() .get("authorization") @@ -866,20 +885,23 @@ async fn main_server( return Ok( Response::builder() .status(StatusCode::UNAUTHORIZED) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ); } - let body = hyper::body::to_bytes(req.into_body()) + let body = req + .into_body() + .collect() .await - .unwrap_or_default(); + .unwrap_or_default() + .to_bytes(); let Ok(body): Result = prost::Message::decode(&body[..]) else { return Ok( Response::builder() .status(StatusCode::BAD_REQUEST) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ); }; @@ -887,13 +909,13 @@ async fn main_server( return Ok( Response::builder() .status(StatusCode::BAD_REQUEST) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ); } Ok( Response::builder() - .body(Body::from( + .body(UnsyncBoxBody::new(Full::new(Bytes::from( SnapshotReadOutput { ranges: body .ranges @@ -905,11 +927,11 @@ async fn main_server( status: SnapshotReadStatus::SrSuccess.into(), } .encode_to_vec(), - )) + )))) .unwrap(), ) } - (&hyper::Method::POST, "/kv_blackhole/atomic_write") => { + (&Method::POST, "/kv_blackhole/atomic_write") => { if req .headers() .get("authorization") @@ -920,68 +942,71 @@ async fn main_server( return Ok( Response::builder() .status(StatusCode::UNAUTHORIZED) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ); } - let body = hyper::body::to_bytes(req.into_body()) + let body = req + .into_body() + .collect() .await - .unwrap_or_default(); + .unwrap_or_default() + .to_bytes(); let Ok(_body): Result = prost::Message::decode(&body[..]) else { return Ok( Response::builder() .status(StatusCode::BAD_REQUEST) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ); }; Ok( Response::builder() - .body(Body::from( + .body(UnsyncBoxBody::new(Full::new(Bytes::from( AtomicWriteOutput { status: AtomicWriteStatus::AwSuccess.into(), versionstamp: vec![0u8; 10], failed_checks: vec![], } .encode_to_vec(), - )) + )))) .unwrap(), ) } - (&hyper::Method::GET, "/upgrade/sleep/release-latest.txt") => { + (&Method::GET, "/upgrade/sleep/release-latest.txt") => { tokio::time::sleep(Duration::from_secs(95)).await; return Ok( Response::builder() .status(StatusCode::OK) - .body(Body::from("99999.99.99")) + .body(string_body("99999.99.99")) .unwrap(), ); } - (&hyper::Method::GET, "/upgrade/sleep/canary-latest.txt") => { + (&Method::GET, "/upgrade/sleep/canary-latest.txt") => { tokio::time::sleep(Duration::from_secs(95)).await; return Ok( Response::builder() .status(StatusCode::OK) - .body(Body::from("bda3850f84f24b71e02512c1ba2d6bf2e3daa2fd")) + .body(string_body("bda3850f84f24b71e02512c1ba2d6bf2e3daa2fd")) .unwrap(), ); } - (&hyper::Method::GET, "/release-latest.txt") => { + (&Method::GET, "/release-latest.txt") => { return Ok( Response::builder() .status(StatusCode::OK) // use a deno version that will never happen - .body(Body::from("99999.99.99")) + .body(string_body("99999.99.99")) .unwrap(), ); } - (&hyper::Method::GET, "/canary-latest.txt") => { + (&Method::GET, "/canary-latest.txt") => { return Ok( Response::builder() .status(StatusCode::OK) - .body(Body::from("bda3850f84f24b71e02512c1ba2d6bf2e3daa2fd")) + .body(string_body("bda3850f84f24b71e02512c1ba2d6bf2e3daa2fd")) .unwrap(), ); } @@ -1008,7 +1033,8 @@ async fn main_server( Err(err) => { return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(format!("{err:#}").into()); + .body(string_body(&format!("{err:#}"))) + .map_err(|e| e.into()); } } } else if req.uri().path().starts_with("/npm/registry/") { @@ -1026,7 +1052,8 @@ async fn main_server( { return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(format!("{err:#}").into()); + .body(string_body(&format!("{err:#}"))) + .map_err(|e| e.into()); }; // serve the file @@ -1047,19 +1074,21 @@ async fn main_server( return Response::builder() .status(StatusCode::OK) .header("content-type", "application/typescript") - .body(Body::empty()); + .body(empty_body()) + .map_err(|e| e.into()); } Response::builder() .status(StatusCode::NOT_FOUND) - .body(Body::empty()) + .body(empty_body()) + .map_err(|e| e.into()) } }; } fn handle_custom_npm_registry_path( path: &str, -) -> Result>, anyhow::Error> { +) -> Result>>, anyhow::Error> { let parts = path .split('/') .filter(|p| !p.is_empty()) @@ -1130,109 +1159,223 @@ async fn download_npm_registry_file( Ok(()) } -/// Taken from example in https://github.com/ctz/hyper-rustls/blob/a02ef72a227dcdf102f86e905baa7415c992e8b3/examples/server.rs -struct HyperAcceptor<'a> { - acceptor: Pin< - Box> + 'a>, - >, -} +#[derive(Clone)] +struct DenoUnsyncExecutor; -impl hyper::server::accept::Accept for HyperAcceptor<'_> { - type Conn = rustls_tokio_stream::TlsStream; - type Error = io::Error; - - fn poll_accept( - mut self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll>> { - Pin::new(&mut self.acceptor).poll_next(cx) +impl hyper::rt::Executor for DenoUnsyncExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + deno_unsync::spawn(fut); } } -#[allow(clippy::non_send_fields_in_send_ty)] -// SAFETY: unsafe trait must have unsafe implementation -unsafe impl std::marker::Send for HyperAcceptor<'_> {} +#[derive(Debug, Clone, Copy)] +enum Hyper1ServerKind { + Auto, + OnlyHttp1, + OnlyHttp2, +} + +async fn hyper1_serve_connection( + io: I, + service_fn_handler: F, + error_msg: &'static str, + kind: Hyper1ServerKind, +) where + I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, + F: Fn(Request) -> S + Copy + 'static, + S: Future< + Output = Result< + Response>, + anyhow::Error, + >, + > + 'static, +{ + let service = hyper1::service::service_fn(service_fn_handler); + + let result: Result<(), anyhow::Error> = match kind { + Hyper1ServerKind::Auto => { + let builder = + hyper_util::server::conn::auto::Builder::new(DenoUnsyncExecutor); + builder + .serve_connection(io, service) + .await + .map_err(|e| anyhow::anyhow!("{}", e)) + } + Hyper1ServerKind::OnlyHttp1 => { + let builder = hyper1::server::conn::http1::Builder::new(); + builder + .serve_connection(io, service) + .await + .map_err(|e| e.into()) + } + Hyper1ServerKind::OnlyHttp2 => { + let builder = + hyper1::server::conn::http2::Builder::new(DenoUnsyncExecutor); + builder + .serve_connection(io, service) + .await + .map_err(|e| e.into()) + } + }; + + if let Err(e) = result { + eprintln!("{}: {:?}", error_msg, e); + } +} + +async fn run_hyper1_server_inner( + addr: SocketAddr, + service_fn_handler: F, + error_msg: &'static str, + kind: Hyper1ServerKind, +) where + F: Fn(Request) -> S + Copy + 'static, + S: Future< + Output = Result< + Response>, + anyhow::Error, + >, + > + 'static, +{ + let fut: Pin>>> = + async move { + let listener = TcpListener::bind(addr).await?; + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + deno_unsync::spawn(hyper1_serve_connection( + io, + service_fn_handler, + error_msg, + kind, + )); + } + } + .boxed_local(); + + if let Err(e) = fut.await { + eprintln!("{}: {:?}", error_msg, e); + } +} + +async fn run_hyper1_server( + addr: SocketAddr, + service_fn_handler: F, + error_msg: &'static str, +) where + F: Fn(Request) -> S + Copy + 'static, + S: Future< + Output = Result< + Response>, + anyhow::Error, + >, + > + 'static, +{ + run_hyper1_server_inner( + addr, + service_fn_handler, + error_msg, + Hyper1ServerKind::Auto, + ) + .await +} + +// TODO(bartlomieju): dedup with `run_hyper1_server_inner` +async fn run_hyper1_server_with_acceptor<'a, A, F, S>( + mut acceptor: Pin>, + service_fn_handler: F, + error_msg: &'static str, + kind: Hyper1ServerKind, +) where + A: Stream> + ?Sized, + F: Fn(Request) -> S + Copy + 'static, + S: Future< + Output = Result< + Response>, + anyhow::Error, + >, + > + 'static, +{ + let fut: Pin>>> = + async move { + while let Some(result) = acceptor.next().await { + let stream = result?; + let io = TokioIo::new(stream); + deno_unsync::spawn(hyper1_serve_connection( + io, + service_fn_handler, + error_msg, + kind, + )); + } + Ok(()) + } + .boxed_local(); + + if let Err(e) = fut.await { + eprintln!("{}: {:?}", error_msg, e); + } +} async fn wrap_redirect_server(port: u16) { - let redirect_svc = - make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(redirect)) }); let redirect_addr = SocketAddr::from(([127, 0, 0, 1], port)); - let redirect_server = Server::bind(&redirect_addr).serve(redirect_svc); - if let Err(e) = redirect_server.await { - eprintln!("Redirect error: {e:?}"); - } + run_hyper1_server(redirect_addr, redirect, "Redirect error").await; } async fn wrap_double_redirect_server(port: u16) { - let double_redirects_svc = make_service_fn(|_| async { - Ok::<_, Infallible>(service_fn(double_redirects)) - }); let double_redirects_addr = SocketAddr::from(([127, 0, 0, 1], port)); - let double_redirects_server = - Server::bind(&double_redirects_addr).serve(double_redirects_svc); - if let Err(e) = double_redirects_server.await { - eprintln!("Double redirect error: {e:?}"); - } + run_hyper1_server( + double_redirects_addr, + double_redirects, + "Double redirect error", + ) + .await; } async fn wrap_inf_redirect_server(port: u16) { - let inf_redirects_svc = make_service_fn(|_| async { - Ok::<_, Infallible>(service_fn(inf_redirects)) - }); let inf_redirects_addr = SocketAddr::from(([127, 0, 0, 1], port)); - let inf_redirects_server = - Server::bind(&inf_redirects_addr).serve(inf_redirects_svc); - if let Err(e) = inf_redirects_server.await { - eprintln!("Inf redirect error: {e:?}"); - } + run_hyper1_server(inf_redirects_addr, inf_redirects, "Inf redirect error") + .await; } async fn wrap_another_redirect_server(port: u16) { - let another_redirect_svc = make_service_fn(|_| async { - Ok::<_, Infallible>(service_fn(another_redirect)) - }); let another_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port)); - let another_redirect_server = - Server::bind(&another_redirect_addr).serve(another_redirect_svc); - if let Err(e) = another_redirect_server.await { - eprintln!("Another redirect error: {e:?}"); - } + run_hyper1_server( + another_redirect_addr, + another_redirect, + "Another redirect error", + ) + .await; } async fn wrap_auth_redirect_server(port: u16) { - let auth_redirect_svc = make_service_fn(|_| async { - Ok::<_, Infallible>(service_fn(auth_redirect)) - }); let auth_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port)); - let auth_redirect_server = - Server::bind(&auth_redirect_addr).serve(auth_redirect_svc); - if let Err(e) = auth_redirect_server.await { - eprintln!("Auth redirect error: {e:?}"); - } + run_hyper1_server(auth_redirect_addr, auth_redirect, "Auth redirect error") + .await; } async fn wrap_basic_auth_redirect_server(port: u16) { - let basic_auth_redirect_svc = make_service_fn(|_| async { - Ok::<_, Infallible>(service_fn(basic_auth_redirect)) - }); let basic_auth_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port)); - let basic_auth_redirect_server = - Server::bind(&basic_auth_redirect_addr).serve(basic_auth_redirect_svc); - if let Err(e) = basic_auth_redirect_server.await { - eprintln!("Basic auth redirect error: {e:?}"); - } + run_hyper1_server( + basic_auth_redirect_addr, + basic_auth_redirect, + "Basic auth redirect error", + ) + .await; } async fn wrap_abs_redirect_server(port: u16) { - let abs_redirect_svc = make_service_fn(|_| async { - Ok::<_, Infallible>(service_fn(absolute_redirect)) - }); let abs_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port)); - let abs_redirect_server = - Server::bind(&abs_redirect_addr).serve(abs_redirect_svc); - if let Err(e) = abs_redirect_server.await { - eprintln!("Absolute redirect error: {e:?}"); - } + run_hyper1_server( + abs_redirect_addr, + absolute_redirect, + "Absolute redirect error", + ) + .await; } async fn wrap_main_server(port: u16) { @@ -1249,23 +1392,19 @@ async fn wrap_main_ipv6_server(port: u16) { } async fn wrap_main_server_for_addr(main_server_addr: &SocketAddr) { - let main_server_svc = - make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) }); - let main_server = Server::bind(main_server_addr).serve(main_server_svc); - if let Err(e) = main_server.await { - eprintln!("HTTP server error: {e:?}"); - } + run_hyper1_server(*main_server_addr, main_server, "HTTP server error").await; } async fn wrap_main_https_server(port: u16) { let tls = get_tls_listener_stream("https", port, Default::default()).await; - let main_server_https_svc = - make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) }); - let main_server_https = Server::builder(HyperAcceptor { - acceptor: tls.boxed_local(), - }) - .serve(main_server_https_svc); - let _ = main_server_https.await; + let tls_acceptor = tls.boxed_local(); + run_hyper1_server_with_acceptor( + tls_acceptor, + main_server, + "HTTPS server error", + Hyper1ServerKind::Auto, + ) + .await } async fn wrap_https_h1_only_tls_server(port: u16) { @@ -1276,15 +1415,13 @@ async fn wrap_https_h1_only_tls_server(port: u16) { ) .await; - let main_server_https_svc = - make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) }); - let main_server_https = Server::builder(HyperAcceptor { - acceptor: tls.boxed_local(), - }) - .http1_only(true) - .serve(main_server_https_svc); - - let _ = main_server_https.await; + run_hyper1_server_with_acceptor( + tls.boxed_local(), + main_server, + "HTTP1 only TLS server error", + Hyper1ServerKind::OnlyHttp1, + ) + .await } async fn wrap_https_h2_only_tls_server(port: u16) { @@ -1295,37 +1432,35 @@ async fn wrap_https_h2_only_tls_server(port: u16) { ) .await; - let main_server_https_svc = - make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) }); - let main_server_https = Server::builder(HyperAcceptor { - acceptor: tls.boxed_local(), - }) - .http2_only(true) - .serve(main_server_https_svc); - - let _ = main_server_https.await; + run_hyper1_server_with_acceptor( + tls.boxed_local(), + main_server, + "HTTP2 only TLS server error", + Hyper1ServerKind::OnlyHttp2, + ) + .await } async fn wrap_http_h1_only_server(port: u16) { let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], port)); - - let main_server_http_svc = - make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) }); - let main_server_http = Server::bind(&main_server_http_addr) - .http1_only(true) - .serve(main_server_http_svc); - let _ = main_server_http.await; + run_hyper1_server_inner( + main_server_http_addr, + main_server, + "HTTP1 only server error:", + Hyper1ServerKind::OnlyHttp1, + ) + .await; } async fn wrap_http_h2_only_server(port: u16) { let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], port)); - - let main_server_http_svc = - make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) }); - let main_server_http = Server::bind(&main_server_http_addr) - .http2_only(true) - .serve(main_server_http_svc); - let _ = main_server_http.await; + run_hyper1_server_inner( + main_server_http_addr, + main_server, + "HTTP1 only server error:", + Hyper1ServerKind::OnlyHttp2, + ) + .await; } async fn wrap_client_auth_https_server(port: u16) { @@ -1345,18 +1480,22 @@ async fn wrap_client_auth_https_server(port: u16) { } }; - let main_server_https_svc = - make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) }); - let main_server_https = Server::builder(HyperAcceptor { - acceptor: tls.boxed_local(), - }) - .serve(main_server_https_svc); - - let _ = main_server_https.await; + run_hyper1_server_with_acceptor( + tls.boxed_local(), + main_server, + "Auth TLS server error", + Hyper1ServerKind::Auto, + ) + .await } -fn custom_headers(p: &str, body: Vec) -> Response { - let mut response = Response::new(Body::from(body)); +fn custom_headers( + p: &str, + body: Vec, +) -> Response> { + let mut response = Response::new(UnsyncBoxBody::new( + http_body_util::Full::new(Bytes::from(body)), + )); if p.ends_with("/run/import_compression/brotli") { response diff --git a/test_util/src/servers/registry.rs b/test_util/src/servers/registry.rs index 56a9e31b0e..8f6fd32aa4 100644 --- a/test_util/src/servers/registry.rs +++ b/test_util/src/servers/registry.rs @@ -1,37 +1,38 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use hyper::server::Server; -use hyper::service::make_service_fn; -use hyper::service::service_fn; -use hyper::Body; -use hyper::Request; -use hyper::Response; -use hyper::StatusCode; +use super::run_hyper1_server; +use bytes::Bytes; +use http_body_util::combinators::UnsyncBoxBody; +use http_body_util::Empty; +use http_body_util::Full; +use hyper1::body::Incoming; +use hyper1::Request; +use hyper1::Response; +use hyper1::StatusCode; use serde_json::json; use std::convert::Infallible; use std::net::SocketAddr; pub async fn registry_server(port: u16) { let registry_server_addr = SocketAddr::from(([127, 0, 0, 1], port)); - let registry_server_svc = make_service_fn(|_| async { - Ok::<_, Infallible>(service_fn(registry_server_handler)) - }); - let registry_server = - Server::bind(®istry_server_addr).serve(registry_server_svc); - if let Err(e) = registry_server.await { - eprintln!("Registry server error: {:?}", e); - } + + run_hyper1_server( + registry_server_addr, + registry_server_handler, + "Registry server error", + ) + .await } async fn registry_server_handler( - req: Request, -) -> Result, hyper::http::Error> { + req: Request, +) -> Result>, anyhow::Error> { let path = req.uri().path(); // TODO(bartlomieju): add a proper router here if path.starts_with("/api/scope/") { let body = serde_json::to_string_pretty(&json!({})).unwrap(); - let res = Response::new(Body::from(body)); + let res = Response::new(UnsyncBoxBody::new(Full::from(body))); return Ok(res); } else if path.starts_with("/api/scopes/") { let body = serde_json::to_string_pretty(&json!({ @@ -40,7 +41,7 @@ async fn registry_server_handler( "error": null })) .unwrap(); - let res = Response::new(Body::from(body)); + let res = Response::new(UnsyncBoxBody::new(Full::from(body))); return Ok(res); } else if path.starts_with("/api/publish_status/") { let body = serde_json::to_string_pretty(&json!({ @@ -49,11 +50,13 @@ async fn registry_server_handler( "error": null })) .unwrap(); - let res = Response::new(Body::from(body)); + let res = Response::new(UnsyncBoxBody::new(Full::from(body))); return Ok(res); } - Response::builder() + let empty_body = UnsyncBoxBody::new(Empty::new()); + let res = Response::builder() .status(StatusCode::NOT_FOUND) - .body(Body::empty()) + .body(empty_body)?; + Ok(res) }