chore(ext/node): use BufView natively in http2 (#21688)

Node HTTP/2 was using the default h2 `Bytes` datatype when we can be
making using of `BufView` like we do in `Deno.serve`.

`fetch` and `Deno.serverHttp` can't make use of `BufView` because they
are using `reqwest` which is stuck on hyper 0.x at this time.
This commit is contained in:
Matt Mastracci 2023-12-23 08:58:20 -07:00 committed by GitHub
parent 36536c784c
commit 1297c9a8f3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 19 additions and 16 deletions

View file

@ -55,7 +55,7 @@ pub async fn op_jupyter_broadcast(
.new_message(&message_type)
.with_content(content)
.with_metadata(metadata)
.with_buffers(buffers.into_iter().map(|b| b.into()).collect())
.with_buffers(buffers.into_iter().map(|b| b.to_vec().into()).collect())
.send(&mut *iopub_socket.lock().await)
.await?;
}

View file

@ -14,6 +14,7 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use bytes::Bytes;
use deno_core::anyhow::Error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
@ -233,7 +234,7 @@ unsafe impl Send for ResourceToBodyAdapter {}
unsafe impl Sync for ResourceToBodyAdapter {}
impl Stream for ResourceToBodyAdapter {
type Item = Result<BufView, Error>;
type Item = Result<Bytes, Error>;
fn poll_next(
self: Pin<&mut Self>,
@ -250,9 +251,9 @@ impl Stream for ResourceToBodyAdapter {
Ok(buf) if buf.is_empty() => Poll::Ready(None),
Ok(_) => {
this.1 = Some(this.0.clone().read(64 * 1024));
Poll::Ready(Some(res))
Poll::Ready(Some(res.map(|b| b.to_vec().into())))
}
_ => Poll::Ready(Some(res)),
_ => Poll::Ready(Some(res.map(|b| b.to_vec().into()))),
},
}
} else {

View file

@ -739,7 +739,7 @@ fn http_response(
Some(data) => {
// If a buffer was passed, but isn't compressible, we use it to
// construct a response body.
Ok((HttpResponseWriter::Closed, Bytes::from(data).into()))
Ok((HttpResponseWriter::Closed, data.to_vec().into()))
}
None if compressing => {
// Create a one way pipe that implements tokio's async io traits. To do
@ -881,7 +881,7 @@ async fn op_http_write_resource(
}
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::from(view);
let bytes = view.to_vec().into();
if let Err(err) = body.sender().send_data(bytes).await {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
@ -930,7 +930,7 @@ async fn op_http_write(
}
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::from(buf);
let bytes = Bytes::from(buf.to_vec());
match body.sender().send_data(bytes).await {
Ok(_) => Ok(()),
Err(err) => {

View file

@ -12,6 +12,7 @@ use deno_core::futures::future::poll_fn;
use deno_core::op2;
use deno_core::serde::Serialize;
use deno_core::AsyncRefCell;
use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@ -34,7 +35,7 @@ use reqwest::header::HeaderValue;
use url::Url;
pub struct Http2Client {
pub client: AsyncRefCell<h2::client::SendRequest<Bytes>>,
pub client: AsyncRefCell<h2::client::SendRequest<BufView>>,
pub url: Url,
}
@ -46,7 +47,7 @@ impl Resource for Http2Client {
#[derive(Debug)]
pub struct Http2ClientConn {
pub conn: AsyncRefCell<h2::client::Connection<NetworkStream>>,
pub conn: AsyncRefCell<h2::client::Connection<NetworkStream, BufView>>,
cancel_handle: CancelHandle,
}
@ -63,7 +64,7 @@ impl Resource for Http2ClientConn {
#[derive(Debug)]
pub struct Http2ClientStream {
pub response: AsyncRefCell<h2::client::ResponseFuture>,
pub stream: AsyncRefCell<h2::SendStream<Bytes>>,
pub stream: AsyncRefCell<h2::SendStream<BufView>>,
}
impl Resource for Http2ClientStream {
@ -89,7 +90,7 @@ impl Resource for Http2ClientResponseBody {
#[derive(Debug)]
pub struct Http2ServerConnection {
pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, Bytes>>,
pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, BufView>>,
}
impl Resource for Http2ServerConnection {
@ -99,7 +100,7 @@ impl Resource for Http2ServerConnection {
}
pub struct Http2ServerSendResponse {
pub send_response: AsyncRefCell<h2::server::SendResponse<Bytes>>,
pub send_response: AsyncRefCell<h2::server::SendResponse<BufView>>,
}
impl Resource for Http2ServerSendResponse {
@ -123,7 +124,8 @@ pub async fn op_http2_connect(
let url = Url::parse(&url)?;
let (client, conn) = h2::client::handshake(network_stream).await?;
let (client, conn) =
h2::client::Builder::new().handshake(network_stream).await?;
let mut state = state.borrow_mut();
let client_rid = state.resource_table.add(Http2Client {
client: AsyncRefCell::new(client),
@ -145,7 +147,7 @@ pub async fn op_http2_listen(
let stream =
take_network_stream_resource(&mut state.borrow_mut().resource_table, rid)?;
let conn = h2::server::handshake(stream).await?;
let conn = h2::server::Builder::new().handshake(stream).await?;
Ok(
state
.borrow_mut()
@ -349,7 +351,7 @@ pub async fn op_http2_client_send_data(
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
// TODO(bartlomieju): handle end of stream
stream.send_data(bytes::Bytes::from(data), false)?;
stream.send_data(data.to_vec().into(), false)?;
Ok(())
}
@ -365,7 +367,7 @@ pub async fn op_http2_client_end_stream(
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
// TODO(bartlomieju): handle end of stream
stream.send_data(bytes::Bytes::from(vec![]), true)?;
stream.send_data(BufView::empty(), true)?;
Ok(())
}