remove Read/Write futures, use poll_fn instead (#4150)

This commit is contained in:
Bartek Iwańczuk 2020-02-26 22:06:20 +01:00 committed by GitHub
parent 1a8ef36b71
commit 7fff2d2d1e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -4,9 +4,9 @@ use crate::op_error::OpError;
use crate::ops::minimal_op; use crate::ops::minimal_op;
use crate::state::State; use crate::state::State;
use deno_core::*; use deno_core::*;
use futures::future::poll_fn;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::ready; use futures::ready;
use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
@ -69,6 +69,10 @@ pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
(stdin, stdout, stderr) (stdin, stdout, stderr)
} }
fn no_buffer_specified() -> OpError {
OpError::type_error("no buffer specified".to_string())
}
#[cfg(unix)] #[cfg(unix)]
use nix::sys::termios; use nix::sys::termios;
@ -131,51 +135,6 @@ impl DenoAsyncRead for StreamResource {
} }
} }
#[derive(Debug, PartialEq)]
enum IoState {
Pending,
Flush,
Done,
}
/// A future which can be used to easily read available number of bytes to fill
/// a buffer.
///
/// Created by the [`read`] function.
pub struct Read<T> {
rid: ResourceId,
buf: T,
io_state: IoState,
state: State,
}
impl<T> Future for Read<T>
where
T: AsMut<[u8]> + Unpin,
{
type Output = Result<i32, OpError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
if inner.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
let mut state = inner.state.borrow_mut();
let resource = state
.resource_table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(OpError::bad_resource)?;
let nread = ready!(resource.poll_read(cx, &mut inner.buf.as_mut()[..]))?;
inner.io_state = IoState::Done;
Poll::Ready(Ok(nread as i32))
}
}
fn no_buffer_specified() -> OpError {
OpError::type_error("no buffer specified".to_string())
}
pub fn op_read( pub fn op_read(
state: &State, state: &State,
rid: i32, rid: i32,
@ -185,13 +144,18 @@ pub fn op_read(
if zero_copy.is_none() { if zero_copy.is_none() {
return futures::future::err(no_buffer_specified()).boxed_local(); return futures::future::err(no_buffer_specified()).boxed_local();
} }
// TODO(ry) Probably poll_fn can be used here and the Read struct eliminated.
Read { let state = state.clone();
rid: rid as u32, let mut buf = zero_copy.unwrap();
buf: zero_copy.unwrap(),
io_state: IoState::Pending, poll_fn(move |cx| {
state: state.clone(), let resource_table = &mut state.borrow_mut().resource_table;
} let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
.ok_or_else(OpError::bad_resource)?;
let nread = ready!(resource.poll_read(cx, &mut buf.as_mut()[..]))?;
Poll::Ready(Ok(nread as i32))
})
.boxed_local() .boxed_local()
} }
@ -253,59 +217,6 @@ impl DenoAsyncWrite for StreamResource {
} }
} }
/// A future used to write some data to a stream.
pub struct Write<T> {
rid: ResourceId,
buf: T,
io_state: IoState,
state: State,
nwritten: i32,
}
/// This is almost the same implementation as in tokio, difference is
/// that error type is `OpError` instead of `std::io::Error`.
impl<T> Future for Write<T>
where
T: AsRef<[u8]> + Unpin,
{
type Output = Result<i32, OpError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
if inner.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
if inner.io_state == IoState::Pending {
let mut state = inner.state.borrow_mut();
let resource = state
.resource_table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(OpError::bad_resource)?;
let nwritten = ready!(resource.poll_write(cx, inner.buf.as_ref()))?;
inner.io_state = IoState::Flush;
inner.nwritten = nwritten as i32;
}
// TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
// and the reasons for the need to explicitly flush are not fully known.
// Figure out why it's needed and preferably remove it.
// https://github.com/denoland/deno/issues/3565
if inner.io_state == IoState::Flush {
let mut state = inner.state.borrow_mut();
let resource = state
.resource_table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(OpError::bad_resource)?;
ready!(resource.poll_flush(cx))?;
inner.io_state = IoState::Done;
}
Poll::Ready(Ok(inner.nwritten))
}
}
pub fn op_write( pub fn op_write(
state: &State, state: &State,
rid: i32, rid: i32,
@ -315,13 +226,34 @@ pub fn op_write(
if zero_copy.is_none() { if zero_copy.is_none() {
return futures::future::err(no_buffer_specified()).boxed_local(); return futures::future::err(no_buffer_specified()).boxed_local();
} }
// TODO(ry) Probably poll_fn can be used here and the Write struct eliminated.
Write { let state = state.clone();
rid: rid as u32, let buf = zero_copy.unwrap();
buf: zero_copy.unwrap(),
io_state: IoState::Pending, async move {
state: state.clone(), let nwritten = poll_fn(|cx| {
nwritten: 0, let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
.ok_or_else(OpError::bad_resource)?;
resource.poll_write(cx, &buf.as_ref()[..])
})
.await?;
// TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
// and the reasons for the need to explicitly flush are not fully known.
// Figure out why it's needed and preferably remove it.
// https://github.com/denoland/deno/issues/3565
poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
let resource = resource_table
.get_mut::<StreamResource>(rid as u32)
.ok_or_else(OpError::bad_resource)?;
resource.poll_flush(cx)
})
.await?;
Ok(nwritten as i32)
} }
.boxed_local() .boxed_local()
} }