mirror of
https://github.com/denoland/deno
synced 2024-09-13 13:21:50 +00:00
refactor(ext/fetch): simplify fetch ops (#19494)
Addresses feedback from https://github.com/denoland/deno/pull/19412#discussion_r1227912676
This commit is contained in:
parent
3d71c36888
commit
f145cbfacc
|
@ -228,7 +228,7 @@ function prettyResourceNames(name) {
|
||||||
return ["A fetch request", "started", "finished"];
|
return ["A fetch request", "started", "finished"];
|
||||||
case "fetchRequestBody":
|
case "fetchRequestBody":
|
||||||
return ["A fetch request body", "created", "closed"];
|
return ["A fetch request body", "created", "closed"];
|
||||||
case "fetchResponseBody":
|
case "fetchResponse":
|
||||||
return ["A fetch response body", "created", "consumed"];
|
return ["A fetch response body", "created", "consumed"];
|
||||||
case "httpClient":
|
case "httpClient":
|
||||||
return ["An HTTP client", "created", "closed"];
|
return ["An HTTP client", "created", "closed"];
|
||||||
|
@ -295,7 +295,7 @@ function resourceCloseHint(name) {
|
||||||
return "Await the promise returned from `fetch()` or abort the fetch with an abort signal.";
|
return "Await the promise returned from `fetch()` or abort the fetch with an abort signal.";
|
||||||
case "fetchRequestBody":
|
case "fetchRequestBody":
|
||||||
return "Terminate the request body `ReadableStream` by closing or erroring it.";
|
return "Terminate the request body `ReadableStream` by closing or erroring it.";
|
||||||
case "fetchResponseBody":
|
case "fetchResponse":
|
||||||
return "Consume or close the response body `ReadableStream`, e.g `await resp.text()` or `await resp.body.cancel()`.";
|
return "Consume or close the response body `ReadableStream`, e.g `await resp.text()` or `await resp.body.cancel()`.";
|
||||||
case "httpClient":
|
case "httpClient":
|
||||||
return "Close the HTTP client by calling `httpClient.close()`.";
|
return "Close the HTTP client by calling `httpClient.close()`.";
|
||||||
|
|
|
@ -1,7 +1,2 @@
|
||||||
{
|
{ "0": "stdin", "1": "stdout", "2": "stderr", "5": "fetchResponse" }
|
||||||
"0": "stdin",
|
|
||||||
"1": "stdout",
|
|
||||||
"2": "stderr",
|
|
||||||
"5": "fetchResponseBody"
|
|
||||||
}
|
|
||||||
{ "0": "stdin", "1": "stdout", "2": "stderr" }
|
{ "0": "stdin", "1": "stdout", "2": "stderr" }
|
||||||
|
|
|
@ -86,7 +86,7 @@ function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) {
|
||||||
* @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>}
|
* @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>}
|
||||||
*/
|
*/
|
||||||
function opFetchSend(rid) {
|
function opFetchSend(rid) {
|
||||||
return core.opAsync("op_fetch_send", rid, true);
|
return core.opAsync("op_fetch_send", rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
118
ext/fetch/lib.rs
118
ext/fetch/lib.rs
|
@ -112,7 +112,6 @@ deno_core::extension!(deno_fetch,
|
||||||
ops = [
|
ops = [
|
||||||
op_fetch<FP>,
|
op_fetch<FP>,
|
||||||
op_fetch_send,
|
op_fetch_send,
|
||||||
op_fetch_response_into_byte_stream,
|
|
||||||
op_fetch_response_upgrade,
|
op_fetch_response_upgrade,
|
||||||
op_fetch_custom_client<FP>,
|
op_fetch_custom_client<FP>,
|
||||||
],
|
],
|
||||||
|
@ -427,7 +426,6 @@ pub struct FetchResponse {
|
||||||
pub async fn op_fetch_send(
|
pub async fn op_fetch_send(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
rid: ResourceId,
|
rid: ResourceId,
|
||||||
into_byte_stream: bool,
|
|
||||||
) -> Result<FetchResponse, AnyError> {
|
) -> Result<FetchResponse, AnyError> {
|
||||||
let request = state
|
let request = state
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
|
@ -459,27 +457,10 @@ pub async fn op_fetch_send(
|
||||||
(None, None)
|
(None, None)
|
||||||
};
|
};
|
||||||
|
|
||||||
let response_rid = if !into_byte_stream {
|
let response_rid = state
|
||||||
state
|
.borrow_mut()
|
||||||
.borrow_mut()
|
.resource_table
|
||||||
.resource_table
|
.add(FetchResponseResource::new(res, content_length));
|
||||||
.add(FetchResponseResource {
|
|
||||||
response: res,
|
|
||||||
size: content_length,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
|
|
||||||
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
|
|
||||||
}));
|
|
||||||
state
|
|
||||||
.borrow_mut()
|
|
||||||
.resource_table
|
|
||||||
.add(FetchResponseBodyResource {
|
|
||||||
reader: AsyncRefCell::new(stream.peekable()),
|
|
||||||
cancel: CancelHandle::default(),
|
|
||||||
size: content_length,
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(FetchResponse {
|
Ok(FetchResponse {
|
||||||
status: status.as_u16(),
|
status: status.as_u16(),
|
||||||
|
@ -493,28 +474,6 @@ pub async fn op_fetch_send(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
|
||||||
pub fn op_fetch_response_into_byte_stream(
|
|
||||||
state: &mut OpState,
|
|
||||||
rid: ResourceId,
|
|
||||||
) -> Result<ResourceId, AnyError> {
|
|
||||||
let raw_response = state.resource_table.take::<FetchResponseResource>(rid)?;
|
|
||||||
let raw_response = Rc::try_unwrap(raw_response)
|
|
||||||
.expect("Someone is holding onto FetchResponseResource");
|
|
||||||
let stream: BytesStream =
|
|
||||||
Box::pin(raw_response.response.bytes_stream().map(|r| {
|
|
||||||
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
|
|
||||||
}));
|
|
||||||
|
|
||||||
let rid = state.resource_table.add(FetchResponseBodyResource {
|
|
||||||
reader: AsyncRefCell::new(stream.peekable()),
|
|
||||||
cancel: CancelHandle::default(),
|
|
||||||
size: raw_response.size,
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(rid)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
pub async fn op_fetch_response_upgrade(
|
pub async fn op_fetch_response_upgrade(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
|
@ -530,7 +489,7 @@ pub async fn op_fetch_response_upgrade(
|
||||||
let (read, write) = tokio::io::duplex(1024);
|
let (read, write) = tokio::io::duplex(1024);
|
||||||
let (read_rx, write_tx) = tokio::io::split(read);
|
let (read_rx, write_tx) = tokio::io::split(read);
|
||||||
let (mut write_rx, mut read_tx) = tokio::io::split(write);
|
let (mut write_rx, mut read_tx) = tokio::io::split(write);
|
||||||
let upgraded = raw_response.response.upgrade().await?;
|
let upgraded = raw_response.upgrade().await?;
|
||||||
{
|
{
|
||||||
// Stage 3: Pump the data
|
// Stage 3: Pump the data
|
||||||
let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);
|
let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);
|
||||||
|
@ -698,35 +657,72 @@ impl Resource for FetchRequestBodyResource {
|
||||||
type BytesStream =
|
type BytesStream =
|
||||||
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
|
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
|
||||||
|
|
||||||
|
pub enum FetchResponseReader {
|
||||||
|
Start(Response),
|
||||||
|
BodyReader(Peekable<BytesStream>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for FetchResponseReader {
|
||||||
|
fn default() -> Self {
|
||||||
|
let stream: BytesStream = Box::pin(deno_core::futures::stream::empty());
|
||||||
|
Self::BodyReader(stream.peekable())
|
||||||
|
}
|
||||||
|
}
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct FetchResponseResource {
|
pub struct FetchResponseResource {
|
||||||
pub response: Response,
|
pub response_reader: AsyncRefCell<FetchResponseReader>,
|
||||||
|
pub cancel: CancelHandle,
|
||||||
pub size: Option<u64>,
|
pub size: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FetchResponseResource {
|
||||||
|
pub fn new(response: Response, size: Option<u64>) -> Self {
|
||||||
|
Self {
|
||||||
|
response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)),
|
||||||
|
cancel: CancelHandle::default(),
|
||||||
|
size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn upgrade(self) -> Result<reqwest::Upgraded, AnyError> {
|
||||||
|
let reader = self.response_reader.into_inner();
|
||||||
|
match reader {
|
||||||
|
FetchResponseReader::Start(resp) => Ok(resp.upgrade().await?),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Resource for FetchResponseResource {
|
impl Resource for FetchResponseResource {
|
||||||
fn name(&self) -> Cow<str> {
|
fn name(&self) -> Cow<str> {
|
||||||
"fetchResponse".into()
|
"fetchResponse".into()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
pub struct FetchResponseBodyResource {
|
|
||||||
pub reader: AsyncRefCell<Peekable<BytesStream>>,
|
|
||||||
pub cancel: CancelHandle,
|
|
||||||
pub size: Option<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Resource for FetchResponseBodyResource {
|
|
||||||
fn name(&self) -> Cow<str> {
|
|
||||||
"fetchResponseBody".into()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
|
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
|
let mut reader =
|
||||||
|
RcRef::map(&self, |r| &r.response_reader).borrow_mut().await;
|
||||||
|
|
||||||
|
let body = loop {
|
||||||
|
match &mut *reader {
|
||||||
|
FetchResponseReader::BodyReader(reader) => break reader,
|
||||||
|
FetchResponseReader::Start(_) => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
match std::mem::take(&mut *reader) {
|
||||||
|
FetchResponseReader::Start(resp) => {
|
||||||
|
let stream: BytesStream = Box::pin(resp.bytes_stream().map(|r| {
|
||||||
|
r.map_err(|err| {
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, err)
|
||||||
|
})
|
||||||
|
}));
|
||||||
|
*reader = FetchResponseReader::BodyReader(stream.peekable());
|
||||||
|
}
|
||||||
|
FetchResponseReader::BodyReader(_) => unreachable!(),
|
||||||
|
}
|
||||||
|
};
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
let mut reader = Pin::new(reader);
|
let mut reader = Pin::new(body);
|
||||||
loop {
|
loop {
|
||||||
match reader.as_mut().peek_mut().await {
|
match reader.as_mut().peek_mut().await {
|
||||||
Some(Ok(chunk)) if !chunk.is_empty() => {
|
Some(Ok(chunk)) if !chunk.is_empty() => {
|
||||||
|
|
|
@ -595,13 +595,7 @@ class ClientRequest extends OutgoingMessage {
|
||||||
(async () => {
|
(async () => {
|
||||||
try {
|
try {
|
||||||
const [res, _] = await Promise.all([
|
const [res, _] = await Promise.all([
|
||||||
core.opAsync(
|
core.opAsync("op_fetch_send", this._req.requestRid),
|
||||||
"op_fetch_send",
|
|
||||||
this._req.requestRid,
|
|
||||||
/* false because we want to have access to actual Response,
|
|
||||||
not the bytes stream of response (because we need to handle upgrades) */
|
|
||||||
false,
|
|
||||||
),
|
|
||||||
(async () => {
|
(async () => {
|
||||||
if (this._bodyWriteRid) {
|
if (this._bodyWriteRid) {
|
||||||
try {
|
try {
|
||||||
|
@ -700,10 +694,7 @@ class ClientRequest extends OutgoingMessage {
|
||||||
this.emit("close");
|
this.emit("close");
|
||||||
} else {
|
} else {
|
||||||
{
|
{
|
||||||
const responseRid = core.ops.op_fetch_response_into_byte_stream(
|
incoming._bodyRid = res.responseRid;
|
||||||
res.responseRid,
|
|
||||||
);
|
|
||||||
incoming._bodyRid = responseRid;
|
|
||||||
}
|
}
|
||||||
this.emit("response", incoming);
|
this.emit("response", incoming);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue