auto merge of #8723 : anasazi/rust/temporary-unkillable-io, r=brson

Also added a home_for_io_with_sched variant to consolidate some cases.

This is a temporary step to resolving #8674.
This commit is contained in:
bors 2013-08-25 12:26:16 -07:00
commit 05f1bbba16

View file

@ -35,6 +35,7 @@
S_IRUSR, S_IWUSR};
use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
use task;
#[cfg(test)] use container::Container;
#[cfg(test)] use unstable::run_in_bare_thread;
@ -52,6 +53,42 @@ trait HomingIO {
*/
fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
use rt::sched::{PinnedTask, TaskFromFriend};
// go home
let old_home = Cell::new_empty();
let old_home_ptr = &old_home;
do task::unkillable { // FIXME(#8674)
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
// get the old home first
do task.wake().map_move |mut task| {
old_home_ptr.put_back(task.take_unwrap_home());
self.home().send(PinnedTask(task));
};
}
}
// do IO
let a = io(self);
// unhome home
do task::unkillable { // FIXME(#8674)
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |scheduler, task| {
do task.wake().map_move |mut task| {
task.give_home(old_home.take());
scheduler.make_handle().send(TaskFromFriend(task));
};
}
}
// return the result of the IO
a
}
fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
use rt::sched::{PinnedTask, TaskFromFriend};
do task::unkillable { // FIXME(#8674)
// go home
let old_home = Cell::new_empty();
let old_home_ptr = &old_home;
@ -65,7 +102,8 @@ fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
}
// do IO
let a = io(self);
let scheduler = Local::take::<Scheduler>();
let a = io_sched(self, scheduler);
// unhome home
let scheduler = Local::take::<Scheduler>();
@ -79,6 +117,7 @@ fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
// return the result of the IO
a
}
}
}
// get a handle for the current scheduler
@ -376,6 +415,7 @@ fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoEr
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
// Block this task and take ownership, switch to scheduler context
do task::unkillable { // FIXME(#8674)
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
@ -409,6 +449,7 @@ fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoEr
}
}
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
@ -422,6 +463,7 @@ fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoErr
Ok(~UvTcpListener::new(watcher, home))
}
Err(uverr) => {
do task::unkillable { // FIXME(#8674)
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -434,6 +476,7 @@ fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoErr
}
}
}
}
fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> {
let mut watcher = UdpWatcher::new(self.uv_loop());
@ -443,6 +486,7 @@ fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError
Ok(~UvUdpSocket { watcher: watcher, home: home })
}
Err(uverr) => {
do task::unkillable { // FIXME(#8674)
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -455,6 +499,7 @@ fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError
}
}
}
}
fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> {
let watcher = TimerWatcher::new(self.uv_loop());
@ -493,6 +538,7 @@ fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
let result_cell_ptr: *Cell<Result<~RtioFileStream,
IoError>> = &result_cell;
let path_cell = Cell::new(path);
do task::unkillable { // FIXME(#8674)
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -517,6 +563,7 @@ fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
}
};
};
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
@ -525,6 +572,7 @@ fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let path_cell = Cell::new(path);
do task::unkillable { // FIXME(#8674)
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -539,6 +587,7 @@ fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
scheduler.resume_blocked_task_immediately(task_cell.take());
};
};
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
@ -572,8 +621,7 @@ impl Drop for UvTcpListener {
fn drop(&self) {
// XXX need mutable finalizer
let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
do self_.home_for_io |self_| {
let scheduler = Local::take::<Scheduler>();
do self_.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher().as_stream().close {
@ -665,8 +713,7 @@ impl Drop for UvTcpStream {
fn drop(&self) {
// XXX need mutable finalizer
let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
do this.home_for_io |self_| {
let scheduler = Local::take::<Scheduler>();
do this.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher.as_stream().close {
@ -688,11 +735,10 @@ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
impl RtioTcpStream for UvTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
do self.home_for_io |self_| {
do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_sched, task| {
let task_cell = Cell::new(task);
@ -730,10 +776,9 @@ fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
do self.home_for_io |self_| {
do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -827,11 +872,10 @@ impl Drop for UvUdpSocket {
fn drop(&self) {
// XXX need mutable finalizer
let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
do this.home_for_io |_| {
let scheduler = Local::take::<Scheduler>();
do this.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do this.watcher.close {
do self_.watcher.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
@ -850,11 +894,10 @@ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
impl RtioUdpSocket for UvUdpSocket {
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
do self.home_for_io |self_| {
do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -885,10 +928,9 @@ fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
}
fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
do self.home_for_io |self_| {
do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
@ -1047,9 +1089,8 @@ fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
impl Drop for UvTimer {
fn drop(&self) {
let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
do self_.home_for_io |self_| {
do self_.home_for_io_with_sched |self_, scheduler| {
rtdebug!("closing UvTimer");
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher.close {
@ -1063,8 +1104,7 @@ fn drop(&self) {
impl RtioTimer for UvTimer {
fn sleep(&mut self, msecs: u64) {
do self.home_for_io |self_| {
let scheduler = Local::take::<Scheduler>();
do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_sched, task| {
rtdebug!("sleep: entered scheduler context");
let task_cell = Cell::new(task);
@ -1104,8 +1144,7 @@ fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
let buf_ptr: *&mut [u8] = &buf;
do self.home_for_io |self_| {
let scheduler = Local::take::<Scheduler>();
do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let task_cell = Cell::new(task);
@ -1126,8 +1165,7 @@ fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let buf_ptr: *&[u8] = &buf;
do self.home_for_io |self_| {
let scheduler = Local::take::<Scheduler>();
do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let task_cell = Cell::new(task);
@ -1166,8 +1204,7 @@ impl Drop for UvFileStream {
fn drop(&self) {
let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
if self.close_on_drop {
do self_.home_for_io |self_| {
let scheduler = Local::take::<Scheduler>();
do self_.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.fd.close(&self.loop_) |_,_| {
@ -1273,6 +1310,7 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
assert!(maybe_socket.is_ok());
// block self on sched1
do task::unkillable { // FIXME(#8674)
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
// unblock task
@ -1282,6 +1320,7 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
};
// sched1 should now sleep since it has nothing else to do
}
}
// sched2 will wake up and get the task
// as we do nothing else, the function ends and the socket goes out of scope
// sched2 will start to run the destructor
@ -1548,6 +1587,7 @@ fn test_read_and_block() {
}
reads += 1;
do task::unkillable { // FIXME(#8674)
let scheduler = Local::take::<Scheduler>();
// Yield to the other task in hopes that it
// will trigger a read callback while we are
@ -1557,6 +1597,7 @@ fn test_read_and_block() {
sched.enqueue_blocked_task(task.take());
}
}
}
// Make sure we had multiple reads
assert!(reads > 1);