feat(ext/node): http2.connect() API (#19671)

This commit improves compatibility of "node:http2" module by polyfilling
"connect" method and "ClientHttp2Session" class. Basic operations like
streaming, header and trailer handling are working correctly. 
Refing/unrefing is still a TODO and "npm:grpc-js/grpc" is not yet working
correctly.

---------

Co-authored-by: Matt Mastracci <matthew@mastracci.com>
This commit is contained in:
Bartek Iwańczuk 2023-09-15 21:51:25 +02:00 committed by GitHub
parent 11f0ccf805
commit 5a1505db67
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 1850 additions and 215 deletions

19
Cargo.lock generated
View file

@ -484,9 +484,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.13.0"
version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
[[package]]
name = "byteorder"
@ -533,9 +533,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.30"
version = "0.4.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877"
checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
dependencies = [
"android-tzdata",
"iana-time-zone",
@ -1386,12 +1386,14 @@ dependencies = [
"aead-gcm-stream",
"aes",
"brotli",
"bytes",
"cbc",
"data-encoding",
"deno_core",
"deno_fetch",
"deno_fs",
"deno_media_type",
"deno_net",
"deno_npm",
"deno_semver",
"digest 0.10.7",
@ -1399,8 +1401,10 @@ dependencies = [
"ecb",
"elliptic-curve 0.13.5",
"errno 0.2.8",
"h2",
"hex",
"hkdf",
"http",
"idna 0.3.0",
"indexmap 2.0.0",
"lazy-regex",
@ -1432,6 +1436,7 @@ dependencies = [
"signature 1.6.4",
"tokio",
"typenum",
"url",
"whoami",
"winapi",
"x25519-dalek",
@ -5445,11 +5450,13 @@ dependencies = [
"anyhow",
"async-stream",
"base64 0.13.1",
"bytes",
"console_static_text",
"fastwebsockets",
"flate2",
"futures",
"glob",
"h2",
"hyper 0.14.27",
"lazy-regex",
"libc",
@ -5887,9 +5894,9 @@ checksum = "0685c84d5d54d1c26f7d3eb96cd41550adb97baed141a761cf335d3d33bcd0ae"
[[package]]
name = "typenum"
version = "1.16.0"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unic-char-property"

View file

@ -96,6 +96,7 @@ futures = "0.3.21"
glob = "0.3.1"
hex = "0.4"
http = "0.2.9"
h2 = "0.3.17"
httparse = "1.8.0"
hyper = { version = "0.14.26", features = ["runtime", "http1"] }
# TODO(mmastrac): indexmap 2.0 will require multiple synchronized changes

View file

@ -7,6 +7,9 @@ import {
fail,
} from "./test_util.ts";
const servePort = 4248;
const serveUrl = `ws://localhost:${servePort}/`;
Deno.test({ permissions: "none" }, function websocketPermissionless() {
assertThrows(
() => new WebSocket("ws://localhost"),
@ -81,13 +84,13 @@ Deno.test(
signal: ac.signal,
onListen: () => listeningPromise.resolve(),
hostname: "localhost",
port: 4246,
port: servePort,
});
await listeningPromise;
const promise = deferred();
const ws = new WebSocket("ws://localhost:4246/");
assertEquals(ws.url, "ws://localhost:4246/");
const ws = new WebSocket(serveUrl);
assertEquals(ws.url, serveUrl);
ws.onerror = () => fail();
ws.onmessage = (e) => {
assertEquals(e.data, "Hello");
@ -133,13 +136,13 @@ Deno.test({
signal: ac.signal,
onListen: () => listeningPromise.resolve(),
hostname: "localhost",
port: 4247,
port: servePort,
});
await listeningPromise;
const ws = new WebSocket("ws://localhost:4247/");
assertEquals(ws.url, "ws://localhost:4247/");
const ws = new WebSocket(serveUrl);
assertEquals(ws.url, serveUrl);
ws.onerror = () => fail();
ws.onmessage = () => ws.send("bye");
ws.onclose = () => {
@ -173,13 +176,13 @@ Deno.test({
signal: ac.signal,
onListen: () => listeningPromise.resolve(),
hostname: "localhost",
port: 4247,
port: servePort,
});
await listeningPromise;
const ws = new WebSocket("ws://localhost:4247/");
assertEquals(ws.url, "ws://localhost:4247/");
const ws = new WebSocket(serveUrl);
assertEquals(ws.url, serveUrl);
let seenBye = false;
ws.onerror = () => fail();
ws.onmessage = ({ data }) => {

View file

@ -5,75 +5,66 @@ import * as net from "node:net";
import { deferred } from "../../../test_util/std/async/deferred.ts";
import { assertEquals } from "../../../test_util/std/testing/asserts.ts";
const {
HTTP2_HEADER_AUTHORITY,
HTTP2_HEADER_METHOD,
HTTP2_HEADER_PATH,
HTTP2_HEADER_STATUS,
} = http2.constants;
for (const url of ["http://127.0.0.1:4246", "https://127.0.0.1:4247"]) {
Deno.test(`[node/http2 client] ${url}`, {
ignore: Deno.build.os === "windows",
}, async () => {
// Create a server to respond to the HTTP2 requests
const client = http2.connect(url, {});
client.on("error", (err) => console.error(err));
Deno.test("[node/http2 client]", async () => {
// Create a server to respond to the HTTP2 requests
const portPromise = deferred();
const reqPromise = deferred<Request>();
const ready = deferred();
const ac = new AbortController();
const server = Deno.serve({
port: 0,
signal: ac.signal,
onListen: ({ port }: { port: number }) => portPromise.resolve(port),
handler: async (req: Request) => {
reqPromise.resolve(req);
await ready;
return new Response("body", {
status: 401,
headers: { "resp-header-name": "resp-header-value" },
});
},
const req = client.request({ ":method": "POST", ":path": "/" }, {
waitForTrailers: true,
});
let receivedTrailers;
let receivedHeaders;
let receivedData = "";
req.on("response", (headers, _flags) => {
receivedHeaders = headers;
});
req.write("hello");
req.setEncoding("utf8");
req.on("wantTrailers", () => {
req.sendTrailers({ foo: "bar" });
});
req.on("trailers", (trailers, _flags) => {
receivedTrailers = trailers;
});
req.on("data", (chunk) => {
receivedData += chunk;
});
req.end();
const endPromise = deferred();
setTimeout(() => {
try {
client.close();
} catch (_) {
// pass
}
endPromise.resolve();
}, 2000);
await endPromise;
assertEquals(receivedHeaders, { ":status": 200 });
assertEquals(receivedData, "hello world\n");
assertEquals(receivedTrailers, {
"abc": "def",
"opr": "stv",
"foo": "bar",
});
});
}
const port = await portPromise;
// Get a session
const sessionPromise = deferred();
const session = http2.connect(
`localhost:${port}`,
{},
sessionPromise.resolve.bind(sessionPromise),
);
const session2 = await sessionPromise;
assertEquals(session, session2);
// Write a request, including a body
const stream = session.request({
[HTTP2_HEADER_AUTHORITY]: `localhost:${port}`,
[HTTP2_HEADER_METHOD]: "POST",
[HTTP2_HEADER_PATH]: "/path",
"req-header-name": "req-header-value",
});
stream.write("body");
stream.end();
// Check the request
const req = await reqPromise;
assertEquals(req.headers.get("req-header-name"), "req-header-value");
assertEquals(await req.text(), "body");
ready.resolve();
// Read a response
const headerPromise = new Promise<Record<string, string | string[]>>((
resolve,
) => stream.on("headers", resolve));
const headers = await headerPromise;
assertEquals(headers["resp-header-name"], "resp-header-value");
assertEquals(headers[HTTP2_HEADER_STATUS], "401");
ac.abort();
await server.finished;
});
Deno.test("[node/http2 server]", async () => {
// TODO(bartlomieju): reenable sanitizers
Deno.test("[node/http2 server]", { sanitizeOps: false }, async () => {
const server = http2.createServer();
server.listen(0);
const port = (<net.AddressInfo> server.address()).port;

View file

@ -17,12 +17,14 @@ path = "lib.rs"
aead-gcm-stream = "0.1"
aes.workspace = true
brotli.workspace = true
bytes.workspace = true
cbc.workspace = true
data-encoding = "2.3.3"
deno_core.workspace = true
deno_fetch.workspace = true
deno_fs.workspace = true
deno_media_type.workspace = true
deno_net.workspace = true
deno_npm.workspace = true
deno_semver.workspace = true
digest = { version = "0.10.5", features = ["core-api", "std"] }
@ -30,8 +32,10 @@ dsa = "0.6.1"
ecb.workspace = true
elliptic-curve.workspace = true
errno = "0.2.8"
h2.workspace = true
hex.workspace = true
hkdf.workspace = true
http.workspace = true
idna = "0.3.0"
indexmap.workspace = true
lazy-regex.workspace = true
@ -63,6 +67,7 @@ sha2.workspace = true
signature.workspace = true
tokio.workspace = true
typenum = "1.15.0"
url.workspace = true
whoami = "1.4.0"
winapi.workspace = true
# https://github.com/dalek-cryptography/x25519-dalek/pull/89

View file

@ -243,6 +243,19 @@ deno_core::extension!(deno_node,
ops::zlib::brotli::op_brotli_decompress_stream,
ops::zlib::brotli::op_brotli_decompress_stream_end,
ops::http::op_node_http_request<P>,
ops::http2::op_http2_connect,
ops::http2::op_http2_poll_client_connection,
ops::http2::op_http2_client_request,
ops::http2::op_http2_client_get_response,
ops::http2::op_http2_client_get_response_body_chunk,
ops::http2::op_http2_client_send_data,
ops::http2::op_http2_client_end_stream,
ops::http2::op_http2_client_reset_stream,
ops::http2::op_http2_client_send_trailers,
ops::http2::op_http2_client_get_response_trailers,
ops::http2::op_http2_accept,
ops::http2::op_http2_listen,
ops::http2::op_http2_send_response,
ops::os::op_node_os_get_priority<P>,
ops::os::op_node_os_set_priority<P>,
ops::os::op_node_os_username<P>,

550
ext/node/ops/http2.rs Normal file
View file

@ -0,0 +1,550 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::task::Poll;
use bytes::Bytes;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::op;
use deno_core::serde::Serialize;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_net::raw::take_network_stream_resource;
use deno_net::raw::NetworkStream;
use h2;
use h2::RecvStream;
use http;
use http::request::Parts;
use http::HeaderMap;
use http::Response;
use http::StatusCode;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
use url::Url;
pub struct Http2Client {
pub client: AsyncRefCell<h2::client::SendRequest<Bytes>>,
pub url: Url,
}
impl Resource for Http2Client {
fn name(&self) -> Cow<str> {
"http2Client".into()
}
}
#[derive(Debug)]
pub struct Http2ClientConn {
pub conn: AsyncRefCell<h2::client::Connection<NetworkStream>>,
cancel_handle: CancelHandle,
}
impl Resource for Http2ClientConn {
fn name(&self) -> Cow<str> {
"http2ClientConnection".into()
}
fn close(self: Rc<Self>) {
self.cancel_handle.cancel()
}
}
#[derive(Debug)]
pub struct Http2ClientStream {
pub response: AsyncRefCell<h2::client::ResponseFuture>,
pub stream: AsyncRefCell<h2::SendStream<Bytes>>,
}
impl Resource for Http2ClientStream {
fn name(&self) -> Cow<str> {
"http2ClientStream".into()
}
}
#[derive(Debug)]
pub struct Http2ClientResponseBody {
pub body: AsyncRefCell<h2::RecvStream>,
pub trailers_rx:
AsyncRefCell<Option<tokio::sync::oneshot::Receiver<Option<HeaderMap>>>>,
pub trailers_tx:
AsyncRefCell<Option<tokio::sync::oneshot::Sender<Option<HeaderMap>>>>,
}
impl Resource for Http2ClientResponseBody {
fn name(&self) -> Cow<str> {
"http2ClientResponseBody".into()
}
}
#[derive(Debug)]
pub struct Http2ServerConnection {
pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, Bytes>>,
}
impl Resource for Http2ServerConnection {
fn name(&self) -> Cow<str> {
"http2ServerConnection".into()
}
}
pub struct Http2ServerSendResponse {
pub send_response: AsyncRefCell<h2::server::SendResponse<Bytes>>,
}
impl Resource for Http2ServerSendResponse {
fn name(&self) -> Cow<str> {
"http2ServerSendResponse".into()
}
}
#[op]
pub async fn op_http2_connect(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
url: String,
) -> Result<(ResourceId, ResourceId), AnyError> {
// No permission check necessary because we're using an existing connection
let network_stream = {
let mut state = state.borrow_mut();
take_network_stream_resource(&mut state.resource_table, rid)?
};
let url = Url::parse(&url)?;
let (client, conn) = h2::client::handshake(network_stream).await?;
let mut state = state.borrow_mut();
let client_rid = state.resource_table.add(Http2Client {
client: AsyncRefCell::new(client),
url,
});
let conn_rid = state.resource_table.add(Http2ClientConn {
conn: AsyncRefCell::new(conn),
cancel_handle: CancelHandle::new(),
});
Ok((client_rid, conn_rid))
}
#[op]
pub async fn op_http2_listen(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
) -> Result<ResourceId, AnyError> {
let stream =
take_network_stream_resource(&mut state.borrow_mut().resource_table, rid)?;
let conn = h2::server::handshake(stream).await?;
Ok(
state
.borrow_mut()
.resource_table
.add(Http2ServerConnection {
conn: AsyncRefCell::new(conn),
}),
)
}
#[op]
pub async fn op_http2_accept(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
) -> Result<
Option<(Vec<(ByteString, ByteString)>, ResourceId, ResourceId)>,
AnyError,
> {
let resource = state
.borrow()
.resource_table
.get::<Http2ServerConnection>(rid)?;
let mut conn = RcRef::map(&resource, |r| &r.conn).borrow_mut().await;
if let Some(res) = conn.accept().await {
let (req, resp) = res?;
let (parts, body) = req.into_parts();
let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel();
let stm = state
.borrow_mut()
.resource_table
.add(Http2ClientResponseBody {
body: AsyncRefCell::new(body),
trailers_rx: AsyncRefCell::new(Some(trailers_rx)),
trailers_tx: AsyncRefCell::new(Some(trailers_tx)),
});
let Parts {
uri,
method,
headers,
..
} = parts;
let mut req_headers = Vec::with_capacity(headers.len() + 4);
req_headers.push((
ByteString::from(":method"),
ByteString::from(method.as_str()),
));
req_headers.push((
ByteString::from(":scheme"),
ByteString::from(uri.scheme().map(|s| s.as_str()).unwrap_or("http")),
));
req_headers.push((
ByteString::from(":path"),
ByteString::from(uri.path_and_query().map(|p| p.as_str()).unwrap_or("")),
));
req_headers.push((
ByteString::from(":authority"),
ByteString::from(uri.authority().map(|a| a.as_str()).unwrap_or("")),
));
for (key, val) in headers.iter() {
req_headers.push((key.as_str().into(), val.as_bytes().into()));
}
let resp = state
.borrow_mut()
.resource_table
.add(Http2ServerSendResponse {
send_response: AsyncRefCell::new(resp),
});
Ok(Some((req_headers, stm, resp)))
} else {
Ok(None)
}
}
#[op]
pub async fn op_http2_send_response(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
status: u16,
headers: Vec<(ByteString, ByteString)>,
) -> Result<(ResourceId, u32), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ServerSendResponse>(rid)?;
let mut send_response = RcRef::map(resource, |r| &r.send_response)
.borrow_mut()
.await;
let mut response = Response::new(());
if let Ok(status) = StatusCode::from_u16(status) {
*response.status_mut() = status;
}
for (name, value) in headers {
response.headers_mut().append(
HeaderName::from_lowercase(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
}
let stream = send_response.send_response(response, false)?;
let stream_id = stream.stream_id();
Ok((rid, stream_id.into()))
}
#[op]
pub async fn op_http2_poll_client_connection(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
) -> Result<(), AnyError> {
let resource = state.borrow().resource_table.get::<Http2ClientConn>(rid)?;
let cancel_handle = RcRef::map(resource.clone(), |this| &this.cancel_handle);
let mut conn = RcRef::map(resource, |this| &this.conn).borrow_mut().await;
match (&mut *conn).or_cancel(cancel_handle).await {
Ok(result) => result?,
Err(_) => {
// TODO(bartlomieju): probably need a better mechanism for closing the connection
// cancelled
}
}
Ok(())
}
#[op]
pub async fn op_http2_client_request(
state: Rc<RefCell<OpState>>,
client_rid: ResourceId,
// TODO(bartlomieju): maybe use a vector with fixed layout to save sending
// 4 strings of keys?
mut pseudo_headers: HashMap<String, String>,
headers: Vec<(ByteString, ByteString)>,
) -> Result<(ResourceId, u32), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2Client>(client_rid)?;
let url = resource.url.clone();
let pseudo_path = pseudo_headers.remove(":path").unwrap_or("/".to_string());
let pseudo_method = pseudo_headers
.remove(":method")
.unwrap_or("GET".to_string());
// TODO(bartlomieju): handle all pseudo-headers (:authority, :scheme)
let _pseudo_authority = pseudo_headers
.remove(":authority")
.unwrap_or("/".to_string());
let _pseudo_scheme = pseudo_headers
.remove(":scheme")
.unwrap_or("http".to_string());
let url = url.join(&pseudo_path)?;
let mut req = http::Request::builder()
.uri(url.as_str())
.method(pseudo_method.as_str());
for (name, value) in headers {
req.headers_mut().unwrap().append(
HeaderName::from_lowercase(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
}
let request = req.body(()).unwrap();
let resource = {
let state = state.borrow();
state.resource_table.get::<Http2Client>(client_rid)?
};
let mut client = RcRef::map(&resource, |r| &r.client).borrow_mut().await;
poll_fn(|cx| client.poll_ready(cx)).await?;
let (response, stream) = client.send_request(request, false).unwrap();
let stream_id = stream.stream_id();
let stream_rid = state.borrow_mut().resource_table.add(Http2ClientStream {
response: AsyncRefCell::new(response),
stream: AsyncRefCell::new(stream),
});
Ok((stream_rid, stream_id.into()))
}
#[op]
pub async fn op_http2_client_send_data(
state: Rc<RefCell<OpState>>,
stream_rid: ResourceId,
data: JsBuffer,
) -> Result<(), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientStream>(stream_rid)?;
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
// TODO(bartlomieju): handle end of stream
stream.send_data(bytes::Bytes::from(data), false)?;
Ok(())
}
#[op]
pub async fn op_http2_client_end_stream(
state: Rc<RefCell<OpState>>,
stream_rid: ResourceId,
) -> Result<(), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientStream>(stream_rid)?;
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
// TODO(bartlomieju): handle end of stream
stream.send_data(bytes::Bytes::from(vec![]), true)?;
Ok(())
}
#[op]
pub async fn op_http2_client_reset_stream(
state: Rc<RefCell<OpState>>,
stream_rid: ResourceId,
code: u32,
) -> Result<(), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientStream>(stream_rid)?;
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
stream.send_reset(h2::Reason::from(code));
Ok(())
}
#[op]
pub async fn op_http2_client_send_trailers(
state: Rc<RefCell<OpState>>,
stream_rid: ResourceId,
trailers: Vec<(ByteString, ByteString)>,
) -> Result<(), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientStream>(stream_rid)?;
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
let mut trailers_map = http::HeaderMap::new();
for (name, value) in trailers {
trailers_map.insert(
HeaderName::from_bytes(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
}
stream.send_trailers(trailers_map)?;
Ok(())
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Http2ClientResponse {
headers: Vec<(ByteString, ByteString)>,
body_rid: ResourceId,
status_code: u16,
}
#[op]
pub async fn op_http2_client_get_response(
state: Rc<RefCell<OpState>>,
stream_rid: ResourceId,
) -> Result<Http2ClientResponse, AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientStream>(stream_rid)?;
let mut response_future =
RcRef::map(&resource, |r| &r.response).borrow_mut().await;
let response = (&mut *response_future).await?;
let (parts, body) = response.into_parts();
let status = parts.status;
let mut res_headers = Vec::new();
for (key, val) in parts.headers.iter() {
res_headers.push((key.as_str().into(), val.as_bytes().into()));
}
let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel();
let body_rid =
state
.borrow_mut()
.resource_table
.add(Http2ClientResponseBody {
body: AsyncRefCell::new(body),
trailers_rx: AsyncRefCell::new(Some(trailers_rx)),
trailers_tx: AsyncRefCell::new(Some(trailers_tx)),
});
Ok(Http2ClientResponse {
headers: res_headers,
body_rid,
status_code: status.into(),
})
}
enum DataOrTrailers {
Data(Bytes),
Trailers(HeaderMap),
Eof,
}
fn poll_data_or_trailers(
cx: &mut std::task::Context,
body: &mut RecvStream,
) -> Poll<Result<DataOrTrailers, h2::Error>> {
loop {
if let Poll::Ready(trailers) = body.poll_trailers(cx) {
if let Some(trailers) = trailers? {
return Poll::Ready(Ok(DataOrTrailers::Trailers(trailers)));
} else {
return Poll::Ready(Ok(DataOrTrailers::Eof));
}
}
if let Poll::Ready(data) = body.poll_data(cx) {
if let Some(data) = data {
return Poll::Ready(Ok(DataOrTrailers::Data(data?)));
}
// If data is None, loop one more time to check for trailers
continue;
}
// Return pending here as poll_data will keep the waker
return Poll::Pending;
}
}
#[op]
pub async fn op_http2_client_get_response_body_chunk(
state: Rc<RefCell<OpState>>,
body_rid: ResourceId,
) -> Result<(Option<Vec<u8>>, bool), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientResponseBody>(body_rid)?;
let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
loop {
match poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await? {
DataOrTrailers::Data(data) => {
return Ok((Some(data.to_vec()), false));
}
DataOrTrailers::Trailers(trailers) => {
println!("{trailers:?}");
if let Some(trailers_tx) = RcRef::map(&resource, |r| &r.trailers_tx)
.borrow_mut()
.await
.take()
{
_ = trailers_tx.send(Some(trailers));
};
continue;
}
DataOrTrailers::Eof => {
RcRef::map(&resource, |r| &r.trailers_tx)
.borrow_mut()
.await
.take();
return Ok((None, true));
}
};
}
}
#[op]
pub async fn op_http2_client_get_response_trailers(
state: Rc<RefCell<OpState>>,
body_rid: ResourceId,
) -> Result<Option<Vec<(ByteString, ByteString)>>, AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientResponseBody>(body_rid)?;
let trailers = RcRef::map(&resource, |r| &r.trailers_rx)
.borrow_mut()
.await
.take();
if let Some(trailers) = trailers {
if let Ok(Some(trailers)) = trailers.await {
let mut v = Vec::with_capacity(trailers.len());
for (key, value) in trailers.iter() {
v.push((
ByteString::from(key.as_str()),
ByteString::from(value.as_bytes()),
));
}
Ok(Some(v))
} else {
Ok(None)
}
} else {
Ok(None)
}
}

View file

@ -2,6 +2,7 @@
pub mod crypto;
pub mod http;
pub mod http2;
pub mod idna;
pub mod os;
pub mod require;

File diff suppressed because it is too large Load diff

View file

@ -17,11 +17,13 @@ path = "src/test_server.rs"
anyhow.workspace = true
async-stream = "0.3.3"
base64.workspace = true
bytes.workspace = true
console_static_text.workspace = true
fastwebsockets = { workspace = true, features = ["upgrade"] }
flate2.workspace = true
futures.workspace = true
glob.workspace = true
h2.workspace = true
hyper = { workspace = true, features = ["server", "http1", "http2", "runtime"] }
lazy-regex.workspace = true
libc.workspace = true

View file

@ -7,6 +7,7 @@ use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
use hyper::header::HeaderValue;
use hyper::http;
use hyper::server::Server;
use hyper::service::make_service_fn;
use hyper::service::service_fn;
@ -57,6 +58,7 @@ use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_rustls::rustls;
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
use url::Url;
@ -102,6 +104,8 @@ const WS_PORT: u16 = 4242;
const WSS_PORT: u16 = 4243;
const WS_CLOSE_PORT: u16 = 4244;
const WS_PING_PORT: u16 = 4245;
const H2_GRPC_PORT: u16 = 4246;
const H2S_GRPC_PORT: u16 = 4247;
pub const PERMISSION_VARIANTS: [&str; 5] =
["read", "write", "env", "net", "run"];
@ -1664,17 +1668,7 @@ async fn wrap_https_h1_only_tls_server() {
async fn wrap_https_h2_only_tls_server() {
let main_server_https_addr =
SocketAddr::from(([127, 0, 0, 1], H2_ONLY_TLS_PORT));
let cert_file = "tls/localhost.crt";
let key_file = "tls/localhost.key";
let ca_cert_file = "tls/RootCA.pem";
let tls_config = get_tls_config(
cert_file,
key_file,
ca_cert_file,
SupportedHttpVersions::Http2Only,
)
.await
.unwrap();
let tls_config = create_tls_server_config().await;
loop {
let tcp = TcpListener::bind(&main_server_https_addr)
.await
@ -1707,6 +1701,20 @@ async fn wrap_https_h2_only_tls_server() {
}
}
async fn create_tls_server_config() -> Arc<rustls::ServerConfig> {
let cert_file = "tls/localhost.crt";
let key_file = "tls/localhost.key";
let ca_cert_file = "tls/RootCA.pem";
get_tls_config(
cert_file,
key_file,
ca_cert_file,
SupportedHttpVersions::Http2Only,
)
.await
.unwrap()
}
async fn wrap_https_h1_only_server() {
let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], H1_ONLY_PORT));
@ -1729,6 +1737,103 @@ async fn wrap_https_h2_only_server() {
let _ = main_server_http.await;
}
async fn h2_grpc_server() {
let addr = SocketAddr::from(([127, 0, 0, 1], H2_GRPC_PORT));
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
let addr_tls = SocketAddr::from(([127, 0, 0, 1], H2S_GRPC_PORT));
let listener_tls = tokio::net::TcpListener::bind(addr_tls).await.unwrap();
let tls_config = create_tls_server_config().await;
async fn serve(socket: TcpStream) -> Result<(), anyhow::Error> {
let mut connection = h2::server::handshake(socket).await?;
while let Some(result) = connection.accept().await {
let (request, respond) = result?;
tokio::spawn(async move {
let _ = handle_request(request, respond).await;
});
}
Ok(())
}
async fn serve_tls(
socket: TlsStream<TcpStream>,
) -> Result<(), anyhow::Error> {
let mut connection = h2::server::handshake(socket).await?;
while let Some(result) = connection.accept().await {
let (request, respond) = result?;
tokio::spawn(async move {
let _ = handle_request(request, respond).await;
});
}
Ok(())
}
async fn handle_request(
mut request: http::Request<h2::RecvStream>,
mut respond: h2::server::SendResponse<bytes::Bytes>,
) -> Result<(), anyhow::Error> {
let body = request.body_mut();
while let Some(data) = body.data().await {
let data = data?;
let _ = body.flow_control().release_capacity(data.len());
}
let maybe_recv_trailers = body.trailers().await?;
let response = http::Response::new(());
let mut send = respond.send_response(response, false)?;
send.send_data(bytes::Bytes::from_static(b"hello "), false)?;
send.send_data(bytes::Bytes::from_static(b"world\n"), false)?;
let mut trailers = http::HeaderMap::new();
trailers.insert(
http::HeaderName::from_static("abc"),
HeaderValue::from_static("def"),
);
trailers.insert(
http::HeaderName::from_static("opr"),
HeaderValue::from_static("stv"),
);
if let Some(recv_trailers) = maybe_recv_trailers {
for (key, value) in recv_trailers {
trailers.insert(key.unwrap(), value);
}
}
send.send_trailers(trailers)?;
Ok(())
}
let http = tokio::spawn(async move {
loop {
if let Ok((socket, _peer_addr)) = listener.accept().await {
tokio::spawn(async move {
let _ = serve(socket).await;
});
}
}
});
let https = tokio::spawn(async move {
loop {
if let Ok((socket, _peer_addr)) = listener_tls.accept().await {
let tls_acceptor = TlsAcceptor::from(tls_config.clone());
let tls = tls_acceptor.accept(socket).await.unwrap();
tokio::spawn(async move {
let _ = serve_tls(tls).await;
});
}
}
});
http.await.unwrap();
https.await.unwrap();
}
async fn wrap_client_auth_https_server() {
let main_server_https_addr =
SocketAddr::from(([127, 0, 0, 1], HTTPS_CLIENT_AUTH_PORT));
@ -1821,6 +1926,7 @@ pub async fn run_all_servers() {
let h2_only_server_tls_fut = wrap_https_h2_only_tls_server();
let h1_only_server_fut = wrap_https_h1_only_server();
let h2_only_server_fut = wrap_https_h2_only_server();
let h2_grpc_server_fut = h2_grpc_server();
let mut server_fut = async {
futures::join!(
@ -1843,7 +1949,8 @@ pub async fn run_all_servers() {
h1_only_server_tls_fut,
h2_only_server_tls_fut,
h1_only_server_fut,
h2_only_server_fut
h2_only_server_fut,
h2_grpc_server_fut,
)
}
.boxed();