mirror of
https://github.com/rust-lang/rust
synced 2024-09-15 22:50:55 +00:00
Added home_for_io_with_sched variant. Temporarily making IO unkillable.
This commit is contained in:
parent
2c0f9bd354
commit
66365b6378
|
@ -35,6 +35,7 @@
|
||||||
S_IRUSR, S_IWUSR};
|
S_IRUSR, S_IWUSR};
|
||||||
use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
|
use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
|
||||||
CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
|
CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite};
|
||||||
|
use task;
|
||||||
|
|
||||||
#[cfg(test)] use container::Container;
|
#[cfg(test)] use container::Container;
|
||||||
#[cfg(test)] use unstable::run_in_bare_thread;
|
#[cfg(test)] use unstable::run_in_bare_thread;
|
||||||
|
@ -55,30 +56,68 @@ fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
|
||||||
// go home
|
// go home
|
||||||
let old_home = Cell::new_empty();
|
let old_home = Cell::new_empty();
|
||||||
let old_home_ptr = &old_home;
|
let old_home_ptr = &old_home;
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do task::unkillable { // FIXME(#8674)
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
let scheduler = Local::take::<Scheduler>();
|
||||||
// get the old home first
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
do task.wake().map_move |mut task| {
|
// get the old home first
|
||||||
old_home_ptr.put_back(task.take_unwrap_home());
|
do task.wake().map_move |mut task| {
|
||||||
self.home().send(PinnedTask(task));
|
old_home_ptr.put_back(task.take_unwrap_home());
|
||||||
};
|
self.home().send(PinnedTask(task));
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// do IO
|
// do IO
|
||||||
let a = io(self);
|
let a = io(self);
|
||||||
|
|
||||||
// unhome home
|
// unhome home
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do task::unkillable { // FIXME(#8674)
|
||||||
do scheduler.deschedule_running_task_and_then |scheduler, task| {
|
let scheduler = Local::take::<Scheduler>();
|
||||||
do task.wake().map_move |mut task| {
|
do scheduler.deschedule_running_task_and_then |scheduler, task| {
|
||||||
task.give_home(old_home.take());
|
do task.wake().map_move |mut task| {
|
||||||
scheduler.make_handle().send(TaskFromFriend(task));
|
task.give_home(old_home.take());
|
||||||
};
|
scheduler.make_handle().send(TaskFromFriend(task));
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// return the result of the IO
|
// return the result of the IO
|
||||||
a
|
a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
|
||||||
|
use rt::sched::{PinnedTask, TaskFromFriend};
|
||||||
|
|
||||||
|
do task::unkillable { // FIXME(#8674)
|
||||||
|
// go home
|
||||||
|
let old_home = Cell::new_empty();
|
||||||
|
let old_home_ptr = &old_home;
|
||||||
|
let scheduler = Local::take::<Scheduler>();
|
||||||
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
|
// get the old home first
|
||||||
|
do task.wake().map_move |mut task| {
|
||||||
|
old_home_ptr.put_back(task.take_unwrap_home());
|
||||||
|
self.home().send(PinnedTask(task));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// do IO
|
||||||
|
let scheduler = Local::take::<Scheduler>();
|
||||||
|
let a = io_sched(self, scheduler);
|
||||||
|
|
||||||
|
// unhome home
|
||||||
|
let scheduler = Local::take::<Scheduler>();
|
||||||
|
do scheduler.deschedule_running_task_and_then |scheduler, task| {
|
||||||
|
do task.wake().map_move |mut task| {
|
||||||
|
task.give_home(old_home.take());
|
||||||
|
scheduler.make_handle().send(TaskFromFriend(task));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the result of the IO
|
||||||
|
a
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// get a handle for the current scheduler
|
// get a handle for the current scheduler
|
||||||
|
@ -376,35 +415,37 @@ fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoEr
|
||||||
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
|
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
|
||||||
|
|
||||||
// Block this task and take ownership, switch to scheduler context
|
// Block this task and take ownership, switch to scheduler context
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do task::unkillable { // FIXME(#8674)
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
let scheduler = Local::take::<Scheduler>();
|
||||||
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
|
|
||||||
let mut tcp = TcpWatcher::new(self.uv_loop());
|
let mut tcp = TcpWatcher::new(self.uv_loop());
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
|
|
||||||
// Wait for a connection
|
// Wait for a connection
|
||||||
do tcp.connect(addr) |stream, status| {
|
do tcp.connect(addr) |stream, status| {
|
||||||
match status {
|
match status {
|
||||||
None => {
|
None => {
|
||||||
let tcp = NativeHandle::from_native_handle(stream.native_handle());
|
let tcp = NativeHandle::from_native_handle(stream.native_handle());
|
||||||
let home = get_handle_to_current_scheduler!();
|
let home = get_handle_to_current_scheduler!();
|
||||||
let res = Ok(~UvTcpStream { watcher: tcp, home: home });
|
let res = Ok(~UvTcpStream { watcher: tcp, home: home });
|
||||||
|
|
||||||
// Store the stream in the task's stack
|
// Store the stream in the task's stack
|
||||||
unsafe { (*result_cell_ptr).put_back(res); }
|
|
||||||
|
|
||||||
// Context switch
|
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
|
||||||
}
|
|
||||||
Some(_) => {
|
|
||||||
let task_cell = Cell::new(task_cell.take());
|
|
||||||
do stream.close {
|
|
||||||
let res = Err(uv_error_to_io_error(status.unwrap()));
|
|
||||||
unsafe { (*result_cell_ptr).put_back(res); }
|
unsafe { (*result_cell_ptr).put_back(res); }
|
||||||
|
|
||||||
|
// Context switch
|
||||||
let scheduler = Local::take::<Scheduler>();
|
let scheduler = Local::take::<Scheduler>();
|
||||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||||
}
|
}
|
||||||
|
Some(_) => {
|
||||||
|
let task_cell = Cell::new(task_cell.take());
|
||||||
|
do stream.close {
|
||||||
|
let res = Err(uv_error_to_io_error(status.unwrap()));
|
||||||
|
unsafe { (*result_cell_ptr).put_back(res); }
|
||||||
|
let scheduler = Local::take::<Scheduler>();
|
||||||
|
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -422,15 +463,17 @@ fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoErr
|
||||||
Ok(~UvTcpListener::new(watcher, home))
|
Ok(~UvTcpListener::new(watcher, home))
|
||||||
}
|
}
|
||||||
Err(uverr) => {
|
Err(uverr) => {
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do task::unkillable { // FIXME(#8674)
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
let scheduler = Local::take::<Scheduler>();
|
||||||
let task_cell = Cell::new(task);
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
do watcher.as_stream().close {
|
let task_cell = Cell::new(task);
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do watcher.as_stream().close {
|
||||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
let scheduler = Local::take::<Scheduler>();
|
||||||
|
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Err(uv_error_to_io_error(uverr))
|
||||||
}
|
}
|
||||||
Err(uv_error_to_io_error(uverr))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -443,15 +486,17 @@ fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError
|
||||||
Ok(~UvUdpSocket { watcher: watcher, home: home })
|
Ok(~UvUdpSocket { watcher: watcher, home: home })
|
||||||
}
|
}
|
||||||
Err(uverr) => {
|
Err(uverr) => {
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do task::unkillable { // FIXME(#8674)
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
let scheduler = Local::take::<Scheduler>();
|
||||||
let task_cell = Cell::new(task);
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
do watcher.close {
|
let task_cell = Cell::new(task);
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do watcher.close {
|
||||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
let scheduler = Local::take::<Scheduler>();
|
||||||
|
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Err(uv_error_to_io_error(uverr))
|
||||||
}
|
}
|
||||||
Err(uv_error_to_io_error(uverr))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -493,30 +538,32 @@ fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
|
||||||
let result_cell_ptr: *Cell<Result<~RtioFileStream,
|
let result_cell_ptr: *Cell<Result<~RtioFileStream,
|
||||||
IoError>> = &result_cell;
|
IoError>> = &result_cell;
|
||||||
let path_cell = Cell::new(path);
|
let path_cell = Cell::new(path);
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do task::unkillable { // FIXME(#8674)
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
let scheduler = Local::take::<Scheduler>();
|
||||||
let task_cell = Cell::new(task);
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let path = path_cell.take();
|
let task_cell = Cell::new(task);
|
||||||
do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
|
let path = path_cell.take();
|
||||||
|req,err| {
|
do file::FsRequest::open(self.uv_loop(), path, flags as int, create_mode as int)
|
||||||
if err.is_none() {
|
|req,err| {
|
||||||
let loop_ = Loop {handle: req.get_loop().native_handle()};
|
if err.is_none() {
|
||||||
let home = get_handle_to_current_scheduler!();
|
let loop_ = Loop {handle: req.get_loop().native_handle()};
|
||||||
let fd = file::FileDescriptor(req.get_result());
|
let home = get_handle_to_current_scheduler!();
|
||||||
let fs = ~UvFileStream::new(
|
let fd = file::FileDescriptor(req.get_result());
|
||||||
loop_, fd, true, home) as ~RtioFileStream;
|
let fs = ~UvFileStream::new(
|
||||||
let res = Ok(fs);
|
loop_, fd, true, home) as ~RtioFileStream;
|
||||||
unsafe { (*result_cell_ptr).put_back(res); }
|
let res = Ok(fs);
|
||||||
let scheduler = Local::take::<Scheduler>();
|
unsafe { (*result_cell_ptr).put_back(res); }
|
||||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
let scheduler = Local::take::<Scheduler>();
|
||||||
} else {
|
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||||
let res = Err(uv_error_to_io_error(err.unwrap()));
|
} else {
|
||||||
unsafe { (*result_cell_ptr).put_back(res); }
|
let res = Err(uv_error_to_io_error(err.unwrap()));
|
||||||
let scheduler = Local::take::<Scheduler>();
|
unsafe { (*result_cell_ptr).put_back(res); }
|
||||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
let scheduler = Local::take::<Scheduler>();
|
||||||
}
|
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||||
|
}
|
||||||
|
};
|
||||||
};
|
};
|
||||||
};
|
}
|
||||||
assert!(!result_cell.is_empty());
|
assert!(!result_cell.is_empty());
|
||||||
return result_cell.take();
|
return result_cell.take();
|
||||||
}
|
}
|
||||||
|
@ -525,20 +572,22 @@ fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> {
|
||||||
let result_cell = Cell::new_empty();
|
let result_cell = Cell::new_empty();
|
||||||
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
|
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
|
||||||
let path_cell = Cell::new(path);
|
let path_cell = Cell::new(path);
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do task::unkillable { // FIXME(#8674)
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
let scheduler = Local::take::<Scheduler>();
|
||||||
let task_cell = Cell::new(task);
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let path = path_cell.take();
|
let task_cell = Cell::new(task);
|
||||||
do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
|
let path = path_cell.take();
|
||||||
let res = match err {
|
do file::FsRequest::unlink(self.uv_loop(), path) |_, err| {
|
||||||
None => Ok(()),
|
let res = match err {
|
||||||
Some(err) => Err(uv_error_to_io_error(err))
|
None => Ok(()),
|
||||||
|
Some(err) => Err(uv_error_to_io_error(err))
|
||||||
|
};
|
||||||
|
unsafe { (*result_cell_ptr).put_back(res); }
|
||||||
|
let scheduler = Local::take::<Scheduler>();
|
||||||
|
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||||
};
|
};
|
||||||
unsafe { (*result_cell_ptr).put_back(res); }
|
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
|
||||||
};
|
};
|
||||||
};
|
}
|
||||||
assert!(!result_cell.is_empty());
|
assert!(!result_cell.is_empty());
|
||||||
return result_cell.take();
|
return result_cell.take();
|
||||||
}
|
}
|
||||||
|
@ -572,8 +621,7 @@ impl Drop for UvTcpListener {
|
||||||
fn drop(&self) {
|
fn drop(&self) {
|
||||||
// XXX need mutable finalizer
|
// XXX need mutable finalizer
|
||||||
let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
|
let self_ = unsafe { transmute::<&UvTcpListener, &mut UvTcpListener>(self) };
|
||||||
do self_.home_for_io |self_| {
|
do self_.home_for_io_with_sched |self_, scheduler| {
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
do self_.watcher().as_stream().close {
|
do self_.watcher().as_stream().close {
|
||||||
|
@ -665,8 +713,7 @@ impl Drop for UvTcpStream {
|
||||||
fn drop(&self) {
|
fn drop(&self) {
|
||||||
// XXX need mutable finalizer
|
// XXX need mutable finalizer
|
||||||
let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
|
let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
|
||||||
do this.home_for_io |self_| {
|
do this.home_for_io_with_sched |self_, scheduler| {
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
do self_.watcher.as_stream().close {
|
do self_.watcher.as_stream().close {
|
||||||
|
@ -688,11 +735,10 @@ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
|
||||||
|
|
||||||
impl RtioTcpStream for UvTcpStream {
|
impl RtioTcpStream for UvTcpStream {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
|
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
|
||||||
do self.home_for_io |self_| {
|
do self.home_for_io_with_sched |self_, scheduler| {
|
||||||
let result_cell = Cell::new_empty();
|
let result_cell = Cell::new_empty();
|
||||||
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
|
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
|
||||||
|
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
let buf_ptr: *&mut [u8] = &buf;
|
let buf_ptr: *&mut [u8] = &buf;
|
||||||
do scheduler.deschedule_running_task_and_then |_sched, task| {
|
do scheduler.deschedule_running_task_and_then |_sched, task| {
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
|
@ -730,10 +776,9 @@ fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
|
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
|
||||||
do self.home_for_io |self_| {
|
do self.home_for_io_with_sched |self_, scheduler| {
|
||||||
let result_cell = Cell::new_empty();
|
let result_cell = Cell::new_empty();
|
||||||
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
|
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
let buf_ptr: *&[u8] = &buf;
|
let buf_ptr: *&[u8] = &buf;
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
|
@ -827,11 +872,10 @@ impl Drop for UvUdpSocket {
|
||||||
fn drop(&self) {
|
fn drop(&self) {
|
||||||
// XXX need mutable finalizer
|
// XXX need mutable finalizer
|
||||||
let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
|
let this = unsafe { transmute::<&UvUdpSocket, &mut UvUdpSocket>(self) };
|
||||||
do this.home_for_io |_| {
|
do this.home_for_io_with_sched |self_, scheduler| {
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
do this.watcher.close {
|
do self_.watcher.close {
|
||||||
let scheduler = Local::take::<Scheduler>();
|
let scheduler = Local::take::<Scheduler>();
|
||||||
scheduler.resume_blocked_task_immediately(task_cell.take());
|
scheduler.resume_blocked_task_immediately(task_cell.take());
|
||||||
}
|
}
|
||||||
|
@ -850,11 +894,10 @@ fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
|
||||||
|
|
||||||
impl RtioUdpSocket for UvUdpSocket {
|
impl RtioUdpSocket for UvUdpSocket {
|
||||||
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
|
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
|
||||||
do self.home_for_io |self_| {
|
do self.home_for_io_with_sched |self_, scheduler| {
|
||||||
let result_cell = Cell::new_empty();
|
let result_cell = Cell::new_empty();
|
||||||
let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
|
let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
|
||||||
|
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
let buf_ptr: *&mut [u8] = &buf;
|
let buf_ptr: *&mut [u8] = &buf;
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
|
@ -885,10 +928,9 @@ fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
|
fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
|
||||||
do self.home_for_io |self_| {
|
do self.home_for_io_with_sched |self_, scheduler| {
|
||||||
let result_cell = Cell::new_empty();
|
let result_cell = Cell::new_empty();
|
||||||
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
|
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
let buf_ptr: *&[u8] = &buf;
|
let buf_ptr: *&[u8] = &buf;
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
|
@ -1047,9 +1089,8 @@ fn new(w: timer::TimerWatcher, home: SchedHandle) -> UvTimer {
|
||||||
impl Drop for UvTimer {
|
impl Drop for UvTimer {
|
||||||
fn drop(&self) {
|
fn drop(&self) {
|
||||||
let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
|
let self_ = unsafe { transmute::<&UvTimer, &mut UvTimer>(self) };
|
||||||
do self_.home_for_io |self_| {
|
do self_.home_for_io_with_sched |self_, scheduler| {
|
||||||
rtdebug!("closing UvTimer");
|
rtdebug!("closing UvTimer");
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
do self_.watcher.close {
|
do self_.watcher.close {
|
||||||
|
@ -1063,8 +1104,7 @@ fn drop(&self) {
|
||||||
|
|
||||||
impl RtioTimer for UvTimer {
|
impl RtioTimer for UvTimer {
|
||||||
fn sleep(&mut self, msecs: u64) {
|
fn sleep(&mut self, msecs: u64) {
|
||||||
do self.home_for_io |self_| {
|
do self.home_for_io_with_sched |self_, scheduler| {
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
do scheduler.deschedule_running_task_and_then |_sched, task| {
|
do scheduler.deschedule_running_task_and_then |_sched, task| {
|
||||||
rtdebug!("sleep: entered scheduler context");
|
rtdebug!("sleep: entered scheduler context");
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
|
@ -1104,8 +1144,7 @@ fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> {
|
||||||
let result_cell = Cell::new_empty();
|
let result_cell = Cell::new_empty();
|
||||||
let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
|
let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
|
||||||
let buf_ptr: *&mut [u8] = &buf;
|
let buf_ptr: *&mut [u8] = &buf;
|
||||||
do self.home_for_io |self_| {
|
do self.home_for_io_with_sched |self_, scheduler| {
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
|
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
|
@ -1126,8 +1165,7 @@ fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> {
|
||||||
let result_cell = Cell::new_empty();
|
let result_cell = Cell::new_empty();
|
||||||
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
|
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
|
||||||
let buf_ptr: *&[u8] = &buf;
|
let buf_ptr: *&[u8] = &buf;
|
||||||
do self.home_for_io |self_| {
|
do self.home_for_io_with_sched |self_, scheduler| {
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
|
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
|
@ -1166,8 +1204,7 @@ impl Drop for UvFileStream {
|
||||||
fn drop(&self) {
|
fn drop(&self) {
|
||||||
let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
|
let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) };
|
||||||
if self.close_on_drop {
|
if self.close_on_drop {
|
||||||
do self_.home_for_io |self_| {
|
do self_.home_for_io_with_sched |self_, scheduler| {
|
||||||
let scheduler = Local::take::<Scheduler>();
|
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
let task_cell = Cell::new(task);
|
let task_cell = Cell::new(task);
|
||||||
do self_.fd.close(&self.loop_) |_,_| {
|
do self_.fd.close(&self.loop_) |_,_| {
|
||||||
|
@ -1273,14 +1310,16 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
|
||||||
assert!(maybe_socket.is_ok());
|
assert!(maybe_socket.is_ok());
|
||||||
|
|
||||||
// block self on sched1
|
// block self on sched1
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do task::unkillable { // FIXME(#8674)
|
||||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
let scheduler = Local::take::<Scheduler>();
|
||||||
// unblock task
|
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||||
do task.wake().map_move |task| {
|
// unblock task
|
||||||
// send self to sched2
|
do task.wake().map_move |task| {
|
||||||
tasksFriendHandle.take().send(TaskFromFriend(task));
|
// send self to sched2
|
||||||
};
|
tasksFriendHandle.take().send(TaskFromFriend(task));
|
||||||
// sched1 should now sleep since it has nothing else to do
|
};
|
||||||
|
// sched1 should now sleep since it has nothing else to do
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// sched2 will wake up and get the task
|
// sched2 will wake up and get the task
|
||||||
// as we do nothing else, the function ends and the socket goes out of scope
|
// as we do nothing else, the function ends and the socket goes out of scope
|
||||||
|
@ -1548,13 +1587,15 @@ fn test_read_and_block() {
|
||||||
}
|
}
|
||||||
reads += 1;
|
reads += 1;
|
||||||
|
|
||||||
let scheduler = Local::take::<Scheduler>();
|
do task::unkillable { // FIXME(#8674)
|
||||||
// Yield to the other task in hopes that it
|
let scheduler = Local::take::<Scheduler>();
|
||||||
// will trigger a read callback while we are
|
// Yield to the other task in hopes that it
|
||||||
// not ready for it
|
// will trigger a read callback while we are
|
||||||
do scheduler.deschedule_running_task_and_then |sched, task| {
|
// not ready for it
|
||||||
let task = Cell::new(task);
|
do scheduler.deschedule_running_task_and_then |sched, task| {
|
||||||
sched.enqueue_blocked_task(task.take());
|
let task = Cell::new(task);
|
||||||
|
sched.enqueue_blocked_task(task.take());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue