From bcbbee7399d41d813e78abe63126e2a01edb5848 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 27 Sep 2018 00:56:39 -0400 Subject: [PATCH] Adds basic File I/O and FD table. Adds deno.stdin, deno.stdout, deno.stderr, deno.open(), deno.write(), deno.read(), deno.Reader, deno.Writer, deno.copy(). Fixes #721. tests/cat.ts works. --- BUILD.gn | 4 ++ js/deno.ts | 2 + js/files.ts | 89 ++++++++++++++++++++++++++++++++ js/files_test.ts | 20 ++++++++ js/io.ts | 114 +++++++++++++++++++++++++++++++++++++++++ js/unit_tests.ts | 1 + src/errors.rs | 13 ++++- src/files.rs | 119 ++++++++++++++++++++++++++++++++++++++++++ src/handlers.rs | 131 +++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 3 ++ src/msg.fbs | 48 +++++++++++++++++ tests/cat.ts | 11 ++++ 12 files changed, 553 insertions(+), 2 deletions(-) create mode 100644 js/files.ts create mode 100644 js/files_test.ts create mode 100644 js/io.ts create mode 100644 src/files.rs create mode 100644 tests/cat.ts diff --git a/BUILD.gn b/BUILD.gn index b901b1764b..e573b75041 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -54,6 +54,8 @@ main_extern = [ "$rust_build:tempfile", "$rust_build:rand", "$rust_build:tokio", + "$rust_build:tokio_io", + "$rust_build:tokio_fs", "$rust_build:tokio_executor", "$rust_build:tokio_threadpool", "$rust_build:url", @@ -73,6 +75,8 @@ ts_sources = [ "js/dom_types.ts", "js/errors.ts", "js/fetch.ts", + "js/files.ts", + "js/io.ts", "js/global-eval.ts", "js/globals.ts", "js/libdeno.ts", diff --git a/js/deno.ts b/js/deno.ts index 92ba5301d3..44fba61649 100644 --- a/js/deno.ts +++ b/js/deno.ts @@ -2,6 +2,8 @@ // Public deno module. /// export { env, exit } from "./os"; +export { File, open, stdin, stdout, stderr, read, write, close } from "./files"; +export { copy, Reader, Writer } from "./io"; export { mkdirSync, mkdir } from "./mkdir"; export { makeTempDirSync, makeTempDir } from "./make_temp_dir"; export { removeSync, remove, removeAllSync, removeAll } from "./remove"; diff --git a/js/files.ts b/js/files.ts new file mode 100644 index 0000000000..d22f1b390e --- /dev/null +++ b/js/files.ts @@ -0,0 +1,89 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. + +import { Reader, Writer, Closer, ReadResult } from "./io"; +import * as dispatch from "./dispatch"; +import * as fbs from "gen/msg_generated"; +import { assert } from "./util"; +import { flatbuffers } from "flatbuffers"; + +export class File implements Reader, Writer, Closer { + constructor(readonly fd: number) {} + + write(p: ArrayBufferView): Promise { + return write(this.fd, p); + } + + read(p: ArrayBufferView): Promise { + return read(this.fd, p); + } + + close(): void { + return close(this.fd); + } +} + +export const stdin = new File(0); +export const stdout = new File(1); +export const stderr = new File(2); + +// TODO This is just a placeholder - not final API. +export type OpenMode = "r" | "w" | "w+" | "x"; + +export function create(filename: string): Promise { + return open(filename, "x"); +} + +export async function open( + filename: string, + mode: OpenMode = "r" +): Promise { + const builder = new flatbuffers.Builder(); + const filename_ = builder.createString(filename); + fbs.Open.startOpen(builder); + fbs.Open.addFilename(builder, filename_); + const msg = fbs.Open.endOpen(builder); + const baseRes = await dispatch.sendAsync(builder, fbs.Any.Open, msg); + assert(baseRes != null); + assert(fbs.Any.OpenRes === baseRes!.msgType()); + const res = new fbs.OpenRes(); + assert(baseRes!.msg(res) != null); + const fd = res.fd(); + return new File(fd); +} + +export async function read( + fd: number, + p: ArrayBufferView +): Promise { + const builder = new flatbuffers.Builder(); + fbs.Read.startRead(builder); + fbs.Read.addFd(builder, fd); + const msg = fbs.Read.endRead(builder); + const baseRes = await dispatch.sendAsync(builder, fbs.Any.Read, msg, p); + assert(baseRes != null); + assert(fbs.Any.ReadRes === baseRes!.msgType()); + const res = new fbs.ReadRes(); + assert(baseRes!.msg(res) != null); + return { nread: res.nread(), eof: res.eof() }; +} + +export async function write(fd: number, p: ArrayBufferView): Promise { + const builder = new flatbuffers.Builder(); + fbs.Write.startWrite(builder); + fbs.Write.addFd(builder, fd); + const msg = fbs.Write.endWrite(builder); + const baseRes = await dispatch.sendAsync(builder, fbs.Any.Write, msg, p); + assert(baseRes != null); + assert(fbs.Any.WriteRes === baseRes!.msgType()); + const res = new fbs.WriteRes(); + assert(baseRes!.msg(res) != null); + return res.nbyte(); +} + +export function close(fd: number): void { + const builder = new flatbuffers.Builder(); + fbs.Close.startClose(builder); + fbs.Close.addFd(builder, fd); + const msg = fbs.Close.endClose(builder); + dispatch.sendSync(builder, fbs.Any.Close, msg); +} diff --git a/js/files_test.ts b/js/files_test.ts new file mode 100644 index 0000000000..82af10aa2c --- /dev/null +++ b/js/files_test.ts @@ -0,0 +1,20 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. + +import * as deno from "deno"; +import { test, assert, assertEqual } from "./test_util.ts"; + +test(function filesStdioFileDescriptors() { + assertEqual(deno.stdin.fd, 0); + assertEqual(deno.stdout.fd, 1); + assertEqual(deno.stderr.fd, 2); +}); + +test(async function filesCopyToStdout() { + const filename = "package.json"; + const file = await deno.open(filename); + assert(file.fd > 2); + const bytesWritten = await deno.copy(deno.stdout, file); + const fileSize = deno.statSync(filename).len; + assertEqual(bytesWritten, fileSize); + console.log("bytes written", bytesWritten); +}); diff --git a/js/io.ts b/js/io.ts new file mode 100644 index 0000000000..710722f42e --- /dev/null +++ b/js/io.ts @@ -0,0 +1,114 @@ +// Interfaces 100% copied from Go. +// Documentation liberally lifted from them too. +// Thank you! We love Go! + +// The bytes read during an I/O call and a boolean indicating EOF. +export interface ReadResult { + nread: number; + eof: boolean; +} + +// Reader is the interface that wraps the basic read() method. +// https://golang.org/pkg/io/#Reader +export interface Reader { + // read() reads up to p.byteLength bytes into p. It returns the number of + // bytes read (0 <= n <= p.byteLength) and any error encountered. Even if + // read() returns n < p.byteLength, it may use all of p as scratch space + // during the call. If some data is available but not p.byteLength bytes, + // read() conventionally returns what is available instead of waiting for + // more. + // + // When read() encounters an error or end-of-file condition after successfully + // reading n > 0 bytes, it returns the number of bytes read. It may return the + // (non-nil) error from the same call or return the error (and n == 0) from a + // subsequent call. An instance of this general case is that a Reader + // returning a non-zero number of bytes at the end of the input stream may + // return either err == EOF or err == nil. The next read() should return 0, + // EOF. + // + // Callers should always process the n > 0 bytes returned before considering + // the EOF. Doing so correctly handles I/O errors that happen after reading + // some bytes and also both of the allowed EOF behaviors. + // + // Implementations of read() are discouraged from returning a zero byte count + // with a nil error, except when p.byteLength == 0. Callers should treat a + // return of 0 and nil as indicating that nothing happened; in particular it + // does not indicate EOF. + // + // Implementations must not retain p. + read(p: ArrayBufferView): Promise; +} + +// Writer is the interface that wraps the basic write() method. +// https://golang.org/pkg/io/#Writer +export interface Writer { + // write() writes p.byteLength bytes from p to the underlying data stream. It + // returns the number of bytes written from p (0 <= n <= p.byteLength) and any + // error encountered that caused the write to stop early. write() must return + // a non-nil error if it returns n < p.byteLength. write() must not modify the + // slice data, even temporarily. + // + // Implementations must not retain p. + write(p: ArrayBufferView): Promise; +} + +// https://golang.org/pkg/io/#Closer +export interface Closer { + // The behavior of Close after the first call is undefined. Specific + // implementations may document their own behavior. + close(): void; +} + +// https://golang.org/pkg/io/#Seeker +export interface Seeker { + // Seek sets the offset for the next read() or write() to offset, interpreted + // according to whence: SeekStart means relative to the start of the file, + // SeekCurrent means relative to the current offset, and SeekEnd means + // relative to the end. Seek returns the new offset relative to the start of + // the file and an error, if any. + // + // Seeking to an offset before the start of the file is an error. Seeking to + // any positive offset is legal, but the behavior of subsequent I/O operations + // on the underlying object is implementation-dependent. + seek(offset: number, whence: number): Promise; +} + +// https://golang.org/pkg/io/#ReadCloser +export interface ReaderCloser extends Reader, Closer {} + +// https://golang.org/pkg/io/#WriteCloser +export interface WriteCloser extends Writer, Closer {} + +// https://golang.org/pkg/io/#ReadSeeker +export interface ReadSeeker extends Reader, Seeker {} + +// https://golang.org/pkg/io/#WriteSeeker +export interface WriteSeeker extends Writer, Seeker {} + +// https://golang.org/pkg/io/#ReadWriteCloser +export interface ReadWriteCloser extends Reader, Writer, Closer {} + +// https://golang.org/pkg/io/#ReadWriteSeeker +export interface ReadWriteSeeker extends Reader, Writer, Seeker {} + +// copy() copies from src to dst until either EOF is reached on src or an error +// occurs. It returns the number of bytes copied and the first error encountered +// while copying, if any. +// +// Because copy() is defined to read from src until EOF, it does not treat an +// EOF from read() as an error to be reported. +// +// https://golang.org/pkg/io/#Copy +export async function copy(dst: Writer, src: Reader): Promise { + let n = 0; + const b = new Uint8Array(1024); + let gotEOF = false; + while (gotEOF === false) { + const result = await src.read(b); + if (result.eof) { + gotEOF = true; + } + n += await dst.write(b.subarray(0, result.nread)); + } + return n; +} diff --git a/js/unit_tests.ts b/js/unit_tests.ts index 578a65c6da..3a1fdd9d1c 100644 --- a/js/unit_tests.ts +++ b/js/unit_tests.ts @@ -5,6 +5,7 @@ import "./compiler_test.ts"; import "./console_test.ts"; import "./fetch_test.ts"; import "./os_test.ts"; +import "./files_test.ts"; import "./read_file_test.ts"; import "./write_file_test.ts"; import "./mkdir_test.ts"; diff --git a/src/errors.rs b/src/errors.rs index 1bedf8d40f..872f3492e7 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -15,15 +15,22 @@ pub struct DenoError { #[derive(Debug)] enum Repr { - // Simple(ErrorKind), + Simple(ErrorKind, String), IoErr(io::Error), UrlErr(url::ParseError), HyperErr(hyper::Error), } +pub fn new(kind: ErrorKind, msg: String) -> DenoError { + DenoError { + repr: Repr::Simple(kind, msg), + } +} + impl DenoError { pub fn kind(&self) -> ErrorKind { match self.repr { + Repr::Simple(kind, ref _msg) => kind, // Repr::Simple(kind) => kind, Repr::IoErr(ref err) => { use std::io::ErrorKind::*; @@ -87,10 +94,10 @@ impl DenoError { impl fmt::Display for DenoError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.repr { + Repr::Simple(_kind, ref _msg) => panic!("todo"), Repr::IoErr(ref err) => err.fmt(f), Repr::UrlErr(ref err) => err.fmt(f), Repr::HyperErr(ref err) => err.fmt(f), - // Repr::Simple(..) => Ok(()), } } } @@ -98,6 +105,7 @@ impl fmt::Display for DenoError { impl std::error::Error for DenoError { fn description(&self) -> &str { match self.repr { + Repr::Simple(_kind, ref msg) => msg.as_str(), Repr::IoErr(ref err) => err.description(), Repr::UrlErr(ref err) => err.description(), Repr::HyperErr(ref err) => err.description(), @@ -107,6 +115,7 @@ impl std::error::Error for DenoError { fn cause(&self) -> Option<&std::error::Error> { match self.repr { + Repr::Simple(_kind, ref _msg) => None, Repr::IoErr(ref err) => Some(err), Repr::UrlErr(ref err) => Some(err), Repr::HyperErr(ref err) => Some(err), diff --git a/src/files.rs b/src/files.rs new file mode 100644 index 0000000000..64160bb849 --- /dev/null +++ b/src/files.rs @@ -0,0 +1,119 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. + +use futures; +use futures::Poll; +use std; +use std::collections::HashMap; +use std::io::Error; +use std::io::{Read, Write}; +use std::sync::atomic::AtomicIsize; +use std::sync::atomic::Ordering; +use std::sync::Mutex; +use tokio; +use tokio::io::{AsyncRead, AsyncWrite}; + +// These store Deno's file descriptors. These are not necessarally the operating +// system ones. +type FdTable = HashMap; + +lazy_static! { + // Starts at 3 because stdio is [0-2]. + static ref NEXT_FD: AtomicIsize = AtomicIsize::new(3); + static ref FD_TABLE: Mutex = Mutex::new({ + let mut m = HashMap::new(); + // TODO Load these lazily during lookup? + m.insert(0, Repr::Stdin(tokio::io::stdin())); + m.insert(1, Repr::Stdout(tokio::io::stdout())); + m.insert(2, Repr::Stderr(tokio::io::stderr())); + m + }); +} + +// Internal representation of DFile. +enum Repr { + Stdin(tokio::io::Stdin), + Stdout(tokio::io::Stdout), + Stderr(tokio::io::Stderr), + FsFile(tokio::fs::File), +} + +// Abstract async file interface. +// fd does not necessarally correspond to an OS fd. +// Ideally in unix, if DFile represents an OS fd, it will be the same. +pub struct DFile { + pub fd: i32, +} + +impl Read for DFile { + fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { + unimplemented!(); + } +} + +impl AsyncRead for DFile { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll { + let mut table = FD_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&self.fd); + match maybe_repr { + None => panic!("bad fd"), + Some(repr) => match repr { + Repr::FsFile(ref mut f) => f.poll_read(buf), + Repr::Stdin(ref mut f) => f.poll_read(buf), + Repr::Stdout(_) | Repr::Stderr(_) => { + panic!("Cannot read from stdout/stderr") + } + }, + } + } +} + +impl Write for DFile { + fn write(&mut self, _buf: &[u8]) -> std::io::Result { + unimplemented!() + } + + fn flush(&mut self) -> std::io::Result<()> { + unimplemented!() + } +} + +impl AsyncWrite for DFile { + fn poll_write(&mut self, buf: &[u8]) -> Poll { + let mut table = FD_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&self.fd); + match maybe_repr { + None => panic!("bad fd"), + Some(repr) => match repr { + Repr::FsFile(ref mut f) => f.poll_write(buf), + Repr::Stdout(ref mut f) => f.poll_write(buf), + Repr::Stderr(ref mut f) => f.poll_write(buf), + Repr::Stdin(_) => panic!("Cannot write to stdin"), + }, + } + } + + fn shutdown(&mut self) -> futures::Poll<(), std::io::Error> { + unimplemented!() + } +} + +fn new_fd() -> i32 { + // TODO If on unix, just extract the real FD of fs_file. + // let fd = AsRawFd::as_raw_fd(fs_file.std()); + let next_fd = NEXT_FD.fetch_add(1, Ordering::SeqCst); + next_fd as i32 +} + +pub fn add_fs_file(fs_file: tokio::fs::File) -> DFile { + let fd = new_fd(); + let mut tg = FD_TABLE.lock().unwrap(); + match tg.insert(fd, Repr::FsFile(fs_file)) { + Some(_) => panic!("There is already a file with that fd"), + None => DFile { fd }, + } +} + +pub fn lookup(fd: i32) -> Option { + let table = FD_TABLE.lock().unwrap(); + table.get(&fd).map(|_| DFile { fd }) +} diff --git a/src/handlers.rs b/src/handlers.rs index 62bdabb614..b1ef67d949 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,5 +1,6 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. +use errors; use errors::DenoError; use errors::DenoResult; use fs as deno_fs; @@ -8,6 +9,7 @@ use isolate::IsolateState; use isolate::Op; use msg; +use files; use flatbuffers::FlatBufferBuilder; use futures; use futures::future::poll_fn; @@ -26,7 +28,10 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::UNIX_EPOCH; use std::time::{Duration, Instant}; +use tokio; use tokio::timer::Delay; +use tokio_io::AsyncRead; +use tokio_io::AsyncWrite; use tokio_threadpool; type OpResult = DenoResult; @@ -62,6 +67,9 @@ pub fn msg_from_js( msg::Any::TimerClear => handle_timer_clear, msg::Any::MakeTempDir => handle_make_temp_dir, msg::Any::Mkdir => handle_mkdir, + msg::Any::Open => handle_open, + msg::Any::Read => handle_read, + msg::Any::Write => handle_write, msg::Any::Remove => handle_remove, msg::Any::ReadFile => handle_read_file, msg::Any::Rename => handle_rename, @@ -551,6 +559,129 @@ fn handle_mkdir( }) } +fn handle_open( + _state: Arc, + base: &msg::Base, + data: &'static mut [u8], +) -> Box { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let msg = base.msg_as_open().unwrap(); + let filename = PathBuf::from(msg.filename().unwrap()); + // TODO let perm = msg.perm(); + + let op = tokio::fs::File::open(filename) + .map_err(|err| DenoError::from(err)) + .and_then(move |fs_file| -> OpResult { + let dfile = files::add_fs_file(fs_file); + let builder = &mut FlatBufferBuilder::new(); + let msg = msg::OpenRes::create( + builder, + &msg::OpenResArgs { + fd: dfile.fd, + ..Default::default() + }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + msg: Some(msg.as_union_value()), + msg_type: msg::Any::OpenRes, + ..Default::default() + }, + )) + }); + Box::new(op) +} + +fn handle_read( + _state: Arc, + base: &msg::Base, + data: &'static mut [u8], +) -> Box { + let cmd_id = base.cmd_id(); + let msg = base.msg_as_read().unwrap(); + let fd = msg.fd(); + + match files::lookup(fd) { + None => odd_future(errors::new( + errors::ErrorKind::BadFileDescriptor, + String::from("Bad File Descriptor"), + )), + Some(mut dfile) => { + let op = futures::future::poll_fn(move || { + let poll = dfile.poll_read(data); + poll + }).map_err(|err| DenoError::from(err)) + .and_then(move |nread: usize| { + let builder = &mut FlatBufferBuilder::new(); + let msg = msg::ReadRes::create( + builder, + &msg::ReadResArgs { + nread: nread as u32, + eof: nread == 0, + ..Default::default() + }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + msg: Some(msg.as_union_value()), + msg_type: msg::Any::ReadRes, + ..Default::default() + }, + )) + }); + Box::new(op) + } + } +} + +fn handle_write( + _state: Arc, + base: &msg::Base, + data: &'static mut [u8], +) -> Box { + let cmd_id = base.cmd_id(); + let msg = base.msg_as_write().unwrap(); + let fd = msg.fd(); + + match files::lookup(fd) { + None => odd_future(errors::new( + errors::ErrorKind::BadFileDescriptor, + String::from("Bad File Descriptor"), + )), + Some(mut dfile) => { + let op = futures::future::poll_fn(move || { + let poll = dfile.poll_write(data); + poll + }).map_err(|err| DenoError::from(err)) + .and_then(move |bytes_written: usize| { + let builder = &mut FlatBufferBuilder::new(); + let msg = msg::WriteRes::create( + builder, + &msg::WriteResArgs { + nbyte: bytes_written as u32, + ..Default::default() + }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + msg: Some(msg.as_union_value()), + msg_type: msg::Any::WriteRes, + ..Default::default() + }, + )) + }); + Box::new(op) + } + } +} + fn handle_remove( state: Arc, base: &msg::Base, diff --git a/src/main.rs b/src/main.rs index cc762f1aef..01be538e19 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,8 @@ extern crate rand; extern crate tempfile; extern crate tokio; extern crate tokio_executor; +extern crate tokio_fs; +extern crate tokio_io; extern crate tokio_threadpool; extern crate url; #[macro_use] @@ -21,6 +23,7 @@ extern crate ring; mod deno_dir; mod errors; +mod files; mod flags; mod fs; pub mod handlers; diff --git a/src/msg.fbs b/src/msg.fbs index 0d78395ea4..7f42cd6370 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -27,6 +27,13 @@ union Any { Stat, StatRes, SetEnv, + Open, + OpenRes, + Read, + ReadRes, + Write, + WriteRes, + Close, } enum ErrorKind: byte { @@ -53,6 +60,8 @@ enum ErrorKind: byte { Other, UnexpectedEof, + BadFileDescriptor, + // url errors EmptyHost, @@ -199,6 +208,7 @@ table ReadFileRes { table WriteFile { filename: string; + data: [ubyte]; perm: uint; // perm specified by https://godoc.org/os#FileMode } @@ -237,4 +247,42 @@ table StatRes { has_mode: bool; // false on windows } +table WriteFileSync { + filename: string; + data: [ubyte]; + perm: uint; + // perm specified by https://godoc.org/os#FileMode +} + +table Open { + filename: string; + perm: uint; +} + +table OpenRes { + fd: int; +} + +table Read { + fd: int; + // (ptr, len) is passed as second parameter to libdeno.send(). +} + +table ReadRes { + nread: uint; + eof: bool; +} + +table Write { + fd: int; +} + +table WriteRes { + nbyte: uint; +} + +table Close { + fd: int; +} + root_type Base; diff --git a/tests/cat.ts b/tests/cat.ts new file mode 100644 index 0000000000..bede0d4322 --- /dev/null +++ b/tests/cat.ts @@ -0,0 +1,11 @@ +import { stdout, open, copy, args } from "deno"; + +async function main() { + for (let i = 1; i < args.length; i++) { + let filename = args[i]; + let file = await open(filename); + await copy(stdout, file); + } +} + +main();