refactor(ext/http): simpler ws server in http_next (#19133)

Merges `op_http_upgrade_next` and `op_ws_server_create`, significantly
simplifying websocket construction in ext/http (next), and removing one
JS -> Rust call. Also WS server now doesn't bypass
`HttpPropertyExtractor`.
This commit is contained in:
Luca Casonato 2023-05-16 01:24:41 +02:00 committed by GitHub
parent 1171c54952
commit 27303ef688
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 12 additions and 67 deletions

View file

@ -49,8 +49,6 @@ const {
} = primordials;
const {
op_http_wait,
op_http_upgrade_next,
op_http_get_request_headers,
op_http_get_request_method_and_url,
op_http_read_request_body,
@ -63,10 +61,9 @@ const {
op_http_set_response_header,
op_http_set_response_headers,
op_http_upgrade_raw,
op_ws_server_create,
op_http_upgrade_websocket_next,
op_http_wait,
} = core.generateAsyncOpHandler(
"op_http_wait",
"op_http_upgrade_next",
"op_http_get_request_headers",
"op_http_get_request_method_and_url",
"op_http_read_request_body",
@ -79,7 +76,8 @@ const {
"op_http_set_response_header",
"op_http_set_response_headers",
"op_http_upgrade_raw",
"op_ws_server_create",
"op_http_upgrade_websocket_next",
"op_http_wait",
);
const _upgraded = Symbol("_upgraded");
@ -208,12 +206,11 @@ class InnerRequest {
// Start the upgrade in the background.
(async () => {
try {
// Returns the connection and extra bytes, which we can pass directly to op_ws_server_create
const upgrade = await op_http_upgrade_next(
// Returns the upgraded websocket connection
const wsRid = await op_http_upgrade_websocket_next(
slabId,
response.headerList,
);
const wsRid = op_ws_server_create(upgrade[0], upgrade[1]);
// We have to wait for the go-ahead signal
await goAhead;

View file

@ -29,10 +29,9 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use deno_net::ops_tls::TlsStream;
use deno_net::raw::put_network_stream_resource;
use deno_net::raw::NetworkStream;
use deno_websocket::ws_create_server_stream;
use fly_accept_encoding::Encoding;
use http::header::ACCEPT_ENCODING;
use http::header::CACHE_CONTROL;
@ -314,11 +313,11 @@ pub fn op_http_upgrade_raw(
}
#[op]
pub async fn op_http_upgrade_next(
pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>,
index: u32,
headers: Vec<(ByteString, ByteString)>,
) -> Result<(ResourceId, ZeroCopyBuf), AnyError> {
) -> Result<ResourceId, AnyError> {
// Stage 1: set the respnse to 101 Switching Protocols and send it
let upgrade = with_http_mut(index, |http| {
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
@ -343,17 +342,9 @@ pub async fn op_http_upgrade_next(
// Stage 2: wait for the request to finish upgrading
let upgraded = upgrade.await?;
// Stage 3: return the extracted raw network stream
// Stage 3: take the extracted raw network stream and upgrade it to a websocket, then return it
let (stream, bytes) = extract_network_stream(upgraded);
// We're allocating for those extra bytes, but they are probably going to be empty most of the time
Ok((
put_network_stream_resource(
&mut state.borrow_mut().resource_table,
stream,
)?,
ZeroCopyBuf::from(bytes.to_vec()),
))
ws_create_server_stream(&mut state.borrow_mut(), stream, bytes)
}
#[op(fast)]

View file

@ -116,8 +116,8 @@ deno_core::extension!(
http_next::op_http_set_response_header,
http_next::op_http_set_response_headers,
http_next::op_http_track,
http_next::op_http_upgrade_websocket_next,
http_next::op_http_upgrade_raw,
http_next::op_http_upgrade_next,
http_next::op_http_wait,
],
esm = ["00_serve.js", "01_http.js"],

View file

@ -260,31 +260,6 @@ pub fn take_network_stream_resource(
Err(bad_resource_id())
}
/// Inserts a raw stream (back?) into the resource table and returns a resource ID. This can then be used to create raw connection
/// objects on the JS side.
pub fn put_network_stream_resource(
resource_table: &mut ResourceTable,
stream: NetworkStream,
) -> Result<ResourceId, AnyError> {
let res = match stream {
NetworkStream::Tcp(conn) => {
let (r, w) = conn.into_split();
resource_table.add(TcpStreamResource::new((r, w)))
}
NetworkStream::Tls(conn) => {
let (r, w) = conn.into_split();
resource_table.add(TlsStreamResource::new((r, w)))
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
let (r, w) = conn.into_split();
resource_table.add(UnixStreamResource::new((r, w)))
}
};
Ok(res)
}
/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server).
/// This method will extract a stream from the resource table and return it, unwrapped.
pub fn take_network_stream_listener_resource(

View file

@ -16,7 +16,6 @@ use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::ZeroCopyBuf;
use deno_net::raw::take_network_stream_resource;
use deno_net::raw::NetworkStream;
use deno_tls::create_client_config;
use deno_tls::RootCertStoreProvider;
@ -367,22 +366,6 @@ pub fn ws_create_server_stream(
Ok(rid)
}
#[op]
pub fn op_ws_server_create(
state: &mut OpState,
conn: ResourceId,
extra_bytes: &[u8],
) -> Result<ResourceId, AnyError> {
let network_stream =
take_network_stream_resource(&mut state.resource_table, conn)?;
// Copying the extra bytes, but unlikely this will account for much
ws_create_server_stream(
state,
network_stream,
Bytes::from(extra_bytes.to_vec()),
)
}
#[op]
pub async fn op_ws_send_binary(
state: Rc<RefCell<OpState>>,
@ -534,7 +517,6 @@ deno_core::extension!(deno_websocket,
op_ws_send_text,
op_ws_send_ping,
op_ws_send_pong,
op_ws_server_create,
],
esm = [ "01_websocket.js", "02_websocketstream.js" ],
options = {