Auto merge of #23820 - sfackler:fast_read_to_end, r=alexcrichton

with_end_to_cap is enormously expensive now that it's initializing
memory since it involves 64k allocation + memset on every call. This is
most noticable when calling read_to_end on very small readers, where the
new version if **4 orders of magnitude** faster.

BufReader also depended on with_end_to_cap so I've rewritten it in its
original form.

As a bonus, converted the buffered IO struct Debug impls to use the
debug builders.

I first came across this in sfackler/rust-postgres#106 where a user reported a 10x performance regression. A call to read_to_end turned out to be the culprit: 9cd413d42c.

The new version differs from the old in a couple of ways. The buffer size used is now adaptive. It starts at 32 bytes and doubles each time EOF hasn't been reached up to a limit of 64k. In addition, the buffer is only truncated when EOF or an error has been reached, rather than after every call to read as was the case for the old implementation.

I wrote up a benchmark to compare the old version and new version: https://gist.github.com/sfackler/e979711b0ee2f2063462

It tests a couple of different cases: a high bandwidth reader, a low bandwidth reader, and a low bandwidth reader that won't return more than 10k per call to `read`. The high bandwidth reader should be analagous to use cases when reading from e.g. a `BufReader` or `Vec`, and the low bandwidth readers should be analogous to reading from something like a `TcpStream`.

Of special note, reads from a high bandwith reader containing 4 bytes are now *4,495 times faster*. 
```
~/foo ❯ cargo bench
   Compiling foo v0.0.1 (file:///home/sfackler/foo)
     Running target/release/foo-7498d7dd7faecf5c

running 13 tests
test test_new ... ignored
test new_delay_4      ... bench:    230768 ns/iter (+/- 14812)
test new_delay_4_cap  ... bench:    231421 ns/iter (+/- 7211)
test new_delay_5m     ... bench:  14495370 ns/iter (+/- 4008648)
test new_delay_5m_cap ... bench:  73127954 ns/iter (+/- 59908587)
test new_nodelay_4    ... bench:        83 ns/iter (+/- 2)
test new_nodelay_5m   ... bench:  12527237 ns/iter (+/- 335243)
test std_delay_4      ... bench:    373095 ns/iter (+/- 12613)
test std_delay_4_cap  ... bench:    374190 ns/iter (+/- 19611)
test std_delay_5m     ... bench:  17356012 ns/iter (+/- 15906588)
test std_delay_5m_cap ... bench: 883555035 ns/iter (+/- 205559857)
test std_nodelay_4    ... bench:    144937 ns/iter (+/- 2448)
test std_nodelay_5m   ... bench:  16095893 ns/iter (+/- 3315116)

test result: ok. 0 passed; 0 failed; 1 ignored; 12 measured
```

r? @alexcrichton
This commit is contained in:
bors 2015-03-29 19:47:18 +00:00
commit 92f3d9a6b4
2 changed files with 76 additions and 59 deletions

View file

@ -18,8 +18,9 @@
use cmp;
use error::{self, FromError};
use fmt;
use io::{self, Cursor, DEFAULT_BUF_SIZE, Error, ErrorKind};
use io::{self, DEFAULT_BUF_SIZE, Error, ErrorKind};
use ptr;
use iter;
/// Wraps a `Read` and buffers input from it
///
@ -30,7 +31,9 @@
#[stable(feature = "rust1", since = "1.0.0")]
pub struct BufReader<R> {
inner: R,
buf: Cursor<Vec<u8>>,
buf: Vec<u8>,
pos: usize,
cap: usize,
}
impl<R: Read> BufReader<R> {
@ -43,9 +46,13 @@ pub fn new(inner: R) -> BufReader<R> {
/// Creates a new `BufReader` with the specified buffer capacity
#[stable(feature = "rust1", since = "1.0.0")]
pub fn with_capacity(cap: usize, inner: R) -> BufReader<R> {
let mut buf = Vec::with_capacity(cap);
buf.extend(iter::repeat(0).take(cap));
BufReader {
inner: inner,
buf: Cursor::new(Vec::with_capacity(cap)),
buf: buf,
pos: 0,
cap: 0,
}
}
@ -74,12 +81,15 @@ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.buf.get_ref().len() == self.buf.position() as usize &&
buf.len() >= self.buf.get_ref().capacity() {
if self.pos == self.cap && buf.len() >= self.buf.len() {
return self.inner.read(buf);
}
try!(self.fill_buf());
self.buf.read(buf)
let nread = {
let mut rem = try!(self.fill_buf());
try!(rem.read(buf))
};
self.consume(nread);
Ok(nread)
}
}
@ -88,26 +98,25 @@ impl<R: Read> BufRead for BufReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
if self.buf.position() as usize == self.buf.get_ref().len() {
self.buf.set_position(0);
let v = self.buf.get_mut();
v.truncate(0);
let inner = &mut self.inner;
try!(super::with_end_to_cap(v, |b| inner.read(b)));
if self.pos == self.cap {
self.cap = try!(self.inner.read(&mut self.buf));
self.pos = 0;
}
self.buf.fill_buf()
Ok(&self.buf[self.pos..self.cap])
}
fn consume(&mut self, amt: usize) {
self.buf.consume(amt)
self.pos = cmp::min(self.pos + amt, self.cap);
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<R> fmt::Debug for BufReader<R> where R: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "BufReader {{ reader: {:?}, buffer: {}/{} }}",
self.inner, self.buf.position(), self.buf.get_ref().len())
fmt.debug_struct("BufReader")
.field("reader", &self.inner)
.field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buf.len()))
.finish()
}
}
@ -222,8 +231,10 @@ fn flush(&mut self) -> io::Result<()> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> fmt::Debug for BufWriter<W> where W: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "BufWriter {{ writer: {:?}, buffer: {}/{} }}",
self.inner.as_ref().unwrap(), self.buf.len(), self.buf.capacity())
fmt.debug_struct("BufWriter")
.field("writer", &self.inner.as_ref().unwrap())
.field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity()))
.finish()
}
}
@ -337,9 +348,11 @@ fn flush(&mut self) -> io::Result<()> { self.inner.flush() }
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> fmt::Debug for LineWriter<W> where W: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "LineWriter {{ writer: {:?}, buffer: {}/{} }}",
self.inner.inner, self.inner.buf.len(),
self.inner.buf.capacity())
fmt.debug_struct("LineWriter")
.field("writer", &self.inner.inner)
.field("buffer",
&format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity()))
.finish()
}
}
@ -415,10 +428,10 @@ pub fn get_mut(&mut self) -> &mut S {
/// Any leftover data in the read buffer is lost.
#[stable(feature = "rust1", since = "1.0.0")]
pub fn into_inner(self) -> Result<S, IntoInnerError<BufStream<S>>> {
let BufReader { inner: InternalBufWriter(w), buf } = self.inner;
let BufReader { inner: InternalBufWriter(w), buf, pos, cap } = self.inner;
w.into_inner().map_err(|IntoInnerError(w, e)| {
IntoInnerError(BufStream {
inner: BufReader { inner: InternalBufWriter(w), buf: buf },
inner: BufReader { inner: InternalBufWriter(w), buf: buf, pos: pos, cap: cap },
}, e)
})
}
@ -452,10 +465,12 @@ impl<S: Write> fmt::Debug for BufStream<S> where S: fmt::Debug {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let reader = &self.inner;
let writer = &self.inner.inner.0;
write!(fmt, "BufStream {{ stream: {:?}, write_buffer: {}/{}, read_buffer: {}/{} }}",
writer.inner,
writer.buf.len(), writer.buf.capacity(),
reader.buf.position(), reader.buf.get_ref().len())
fmt.debug_struct("BufStream")
.field("stream", &writer.inner)
.field("write_buffer", &format_args!("{}/{}", writer.buf.len(), writer.buf.capacity()))
.field("read_buffer",
&format_args!("{}/{}", reader.cap - reader.pos, reader.buf.len()))
.finish()
}
}

View file

@ -48,30 +48,6 @@
const DEFAULT_BUF_SIZE: usize = 64 * 1024;
// Acquires a slice of the vector `v` from its length to its capacity
// (after initializing the data), reads into it, and then updates the length.
//
// This function is leveraged to efficiently read some bytes into a destination
// vector without extra copying and taking advantage of the space that's already
// in `v`.
fn with_end_to_cap<F>(v: &mut Vec<u8>, f: F) -> Result<usize>
where F: FnOnce(&mut [u8]) -> Result<usize>
{
let len = v.len();
let new_area = v.capacity() - len;
v.extend(iter::repeat(0).take(new_area));
match f(&mut v[len..]) {
Ok(n) => {
v.truncate(len + n);
Ok(n)
}
Err(e) => {
v.truncate(len);
Err(e)
}
}
}
// A few methods below (read_to_string, read_line) will append data into a
// `String` buffer, but we need to be pretty careful when doing this. The
// implementation will just call `.as_mut_vec()` and then delegate to a
@ -116,19 +92,45 @@ fn drop(&mut self) {
}
}
// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
fn read_to_end<R: Read + ?Sized>(r: &mut R, buf: &mut Vec<u8>) -> Result<usize> {
let mut read = 0;
let start_len = buf.len();
let mut len = start_len;
let mut cap_bump = 16;
let ret;
loop {
if buf.capacity() == buf.len() {
buf.reserve(DEFAULT_BUF_SIZE);
if len == buf.len() {
if buf.capacity() == buf.len() {
if cap_bump < DEFAULT_BUF_SIZE {
cap_bump *= 2;
}
buf.reserve(cap_bump);
}
let new_area = buf.capacity() - buf.len();
buf.extend(iter::repeat(0).take(new_area));
}
match with_end_to_cap(buf, |b| r.read(b)) {
Ok(0) => return Ok(read),
Ok(n) => read += n,
match r.read(&mut buf[len..]) {
Ok(0) => {
ret = Ok(len - start_len);
break;
}
Ok(n) => len += n,
Err(ref e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => return Err(e),
Err(e) => {
ret = Err(e);
break;
}
}
}
buf.truncate(len);
ret
}
/// A trait for objects which are byte-oriented sources.