deno/ext/http/response_body.rs
2023-09-25 17:23:55 +02:00

1028 lines
29 KiB
Rust

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Waker;
use brotli::enc::encode::BrotliEncoderParameter;
use brotli::ffi::compressor::BrotliEncoderState;
use bytes::Bytes;
use bytes::BytesMut;
use deno_core::error::AnyError;
use deno_core::futures::ready;
use deno_core::futures::FutureExt;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::Resource;
use flate2::write::GzEncoder;
use http::HeaderMap;
use hyper1::body::Body;
use hyper1::body::Frame;
use hyper1::body::SizeHint;
use pin_project::pin_project;
use crate::slab::HttpRequestBodyAutocloser;
/// Simplification for nested types we use for our streams. We provide a way to convert from
/// this type into Hyper's body [`Frame`].
enum ResponseStreamResult {
/// Stream is over.
EndOfStream,
/// Stream provided non-empty data.
NonEmptyBuf(BufView),
/// Stream is ready, but provided no data. Retry. This is a result that is like Pending, but does
/// not register a waker and should be called again at the lowest level of this code. Generally this
/// will only be returned from compression streams that require additional buffering.
NoData,
/// Stream provided trailers.
// TODO(mmastrac): We are threading trailers through the response system to eventually support Grpc.
#[allow(unused)]
Trailers(HeaderMap),
/// Stream failed.
Error(AnyError),
}
impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> {
fn from(value: ResponseStreamResult) -> Self {
match value {
ResponseStreamResult::EndOfStream => None,
ResponseStreamResult::NonEmptyBuf(buf) => Some(Ok(Frame::data(buf))),
ResponseStreamResult::Error(err) => Some(Err(err)),
ResponseStreamResult::Trailers(map) => Some(Ok(Frame::trailers(map))),
// This result should be handled by retrying
ResponseStreamResult::NoData => unimplemented!(),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct CompletionHandle {
inner: Rc<RefCell<CompletionHandleInner>>,
}
#[derive(Debug, Default)]
struct CompletionHandleInner {
complete: bool,
success: bool,
waker: Option<Waker>,
}
impl CompletionHandle {
pub fn complete(&self, success: bool) {
let mut mut_self = self.inner.borrow_mut();
mut_self.complete = true;
mut_self.success = success;
if let Some(waker) = mut_self.waker.take() {
drop(mut_self);
waker.wake();
}
}
pub fn is_completed(&self) -> bool {
self.inner.borrow().complete
}
}
impl Future for CompletionHandle {
type Output = bool;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut mut_self = self.inner.borrow_mut();
if mut_self.complete {
return std::task::Poll::Ready(mut_self.success);
}
mut_self.waker = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
trait PollFrame: Unpin {
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<ResponseStreamResult>;
fn size_hint(&self) -> SizeHint;
}
#[derive(PartialEq, Eq)]
pub enum Compression {
None,
GZip,
Brotli,
}
pub enum ResponseStream {
/// A resource stream, piped in fast mode.
Resource(ResourceBodyAdapter),
#[cfg(test)]
TestChannel(tokio::sync::mpsc::Receiver<BufView>),
}
impl ResponseStream {
pub fn abort(self) {
match self {
ResponseStream::Resource(resource) => resource.stm.close(),
#[cfg(test)]
ResponseStream::TestChannel(..) => {}
}
}
}
#[derive(Default)]
pub enum ResponseBytesInner {
/// An empty stream.
#[default]
Empty,
/// A completed stream.
Done,
/// A static buffer of bytes, sent in one fell swoop.
Bytes(BufView),
/// An uncompressed stream.
UncompressedStream(ResponseStream),
/// A GZip stream.
GZipStream(GZipResponseStream),
/// A Brotli stream.
BrotliStream(BrotliResponseStream),
}
impl std::fmt::Debug for ResponseBytesInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Done => f.write_str("Done"),
Self::Empty => f.write_str("Empty"),
Self::Bytes(..) => f.write_str("Bytes"),
Self::UncompressedStream(..) => f.write_str("Uncompressed"),
Self::GZipStream(..) => f.write_str("GZip"),
Self::BrotliStream(..) => f.write_str("Brotli"),
}
}
}
/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
/// required by hyper. As the API requires information about request completion (including a success/fail
/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
#[derive(Default)]
pub struct ResponseBytes {
inner: ResponseBytesInner,
completion_handle: CompletionHandle,
headers: Rc<RefCell<Option<HeaderMap>>>,
res: Option<HttpRequestBodyAutocloser>,
}
impl ResponseBytes {
pub fn initialize(
&mut self,
inner: ResponseBytesInner,
req_body_resource: Option<HttpRequestBodyAutocloser>,
) {
debug_assert!(matches!(self.inner, ResponseBytesInner::Empty));
self.inner = inner;
self.res = req_body_resource;
}
pub fn completion_handle(&self) -> CompletionHandle {
self.completion_handle.clone()
}
pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> {
self.headers.clone()
}
fn complete(&mut self, success: bool) -> ResponseBytesInner {
if matches!(self.inner, ResponseBytesInner::Done) {
return ResponseBytesInner::Done;
}
let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done);
self.completion_handle.complete(success);
if success {
current
} else {
current.abort();
ResponseBytesInner::Done
}
}
}
impl ResponseBytesInner {
pub fn abort(self) {
match self {
Self::Done | Self::Empty | Self::Bytes(..) => {}
Self::BrotliStream(stm) => stm.abort(),
Self::GZipStream(stm) => stm.abort(),
Self::UncompressedStream(stm) => stm.abort(),
}
}
pub fn size_hint(&self) -> SizeHint {
match self {
Self::Done => SizeHint::with_exact(0),
Self::Empty => SizeHint::with_exact(0),
Self::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64),
Self::UncompressedStream(res) => res.size_hint(),
Self::GZipStream(..) => SizeHint::default(),
Self::BrotliStream(..) => SizeHint::default(),
}
}
fn from_stream(compression: Compression, stream: ResponseStream) -> Self {
match compression {
Compression::GZip => Self::GZipStream(GZipResponseStream::new(stream)),
Compression::Brotli => {
Self::BrotliStream(BrotliResponseStream::new(stream))
}
_ => Self::UncompressedStream(stream),
}
}
pub fn from_resource(
compression: Compression,
stm: Rc<dyn Resource>,
auto_close: bool,
) -> Self {
Self::from_stream(
compression,
ResponseStream::Resource(ResourceBodyAdapter::new(stm, auto_close)),
)
}
pub fn from_bufview(compression: Compression, buf: BufView) -> Self {
match compression {
Compression::GZip => {
let mut writer =
GzEncoder::new(Vec::new(), flate2::Compression::fast());
writer.write_all(&buf).unwrap();
Self::Bytes(BufView::from(writer.finish().unwrap()))
}
Compression::Brotli => {
// quality level 6 is based on google's nginx default value for
// on-the-fly compression
// https://github.com/google/ngx_brotli#brotli_comp_level
// lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes
// (~4MB)
let mut writer =
brotli::CompressorWriter::new(Vec::new(), 65 * 1024, 6, 22);
writer.write_all(&buf).unwrap();
writer.flush().unwrap();
Self::Bytes(BufView::from(writer.into_inner()))
}
_ => Self::Bytes(buf),
}
}
pub fn from_vec(compression: Compression, vec: Vec<u8>) -> Self {
match compression {
Compression::GZip => {
let mut writer =
GzEncoder::new(Vec::new(), flate2::Compression::fast());
writer.write_all(&vec).unwrap();
Self::Bytes(BufView::from(writer.finish().unwrap()))
}
Compression::Brotli => {
let mut writer =
brotli::CompressorWriter::new(Vec::new(), 65 * 1024, 6, 22);
writer.write_all(&vec).unwrap();
writer.flush().unwrap();
Self::Bytes(BufView::from(writer.into_inner()))
}
_ => Self::Bytes(BufView::from(vec)),
}
}
}
impl Body for ResponseBytes {
type Data = BufView;
type Error = AnyError;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let res = loop {
let res = match &mut self.inner {
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
if let Some(trailers) = self.headers.borrow_mut().take() {
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
unreachable!()
}
ResponseBytesInner::Bytes(..) => {
let ResponseBytesInner::Bytes(data) = self.complete(true) else {
unreachable!();
};
return std::task::Poll::Ready(Some(Ok(Frame::data(data))));
}
ResponseBytesInner::UncompressedStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
ResponseBytesInner::GZipStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
ResponseBytesInner::BrotliStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
};
// This is where we retry the NoData response
if matches!(res, ResponseStreamResult::NoData) {
continue;
}
break res;
};
if matches!(res, ResponseStreamResult::EndOfStream) {
if let Some(trailers) = self.headers.borrow_mut().take() {
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
self.complete(true);
}
std::task::Poll::Ready(res.into())
}
fn is_end_stream(&self) -> bool {
matches!(
self.inner,
ResponseBytesInner::Done | ResponseBytesInner::Empty
) && self.headers.borrow_mut().is_none()
}
fn size_hint(&self) -> SizeHint {
// The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
// anyways just in case hyper needs it.
self.inner.size_hint()
}
}
impl Drop for ResponseBytes {
fn drop(&mut self) {
// We won't actually poll_frame for Empty responses so this is where we return success
self.complete(matches!(self.inner, ResponseBytesInner::Empty));
}
}
pub struct ResourceBodyAdapter {
auto_close: bool,
stm: Rc<dyn Resource>,
future: AsyncResult<BufView>,
}
impl ResourceBodyAdapter {
pub fn new(stm: Rc<dyn Resource>, auto_close: bool) -> Self {
let future = stm.clone().read(64 * 1024);
ResourceBodyAdapter {
auto_close,
stm,
future,
}
}
}
impl PollFrame for ResponseStream {
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<ResponseStreamResult> {
match &mut *self {
ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx),
#[cfg(test)]
ResponseStream::TestChannel(rx) => Pin::new(rx).poll_frame(cx),
}
}
fn size_hint(&self) -> SizeHint {
match self {
ResponseStream::Resource(res) => res.size_hint(),
#[cfg(test)]
ResponseStream::TestChannel(_) => SizeHint::default(),
}
}
}
impl PollFrame for ResourceBodyAdapter {
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<ResponseStreamResult> {
let res = match ready!(self.future.poll_unpin(cx)) {
Err(err) => ResponseStreamResult::Error(err),
Ok(buf) => {
if buf.is_empty() {
if self.auto_close {
self.stm.clone().close();
}
ResponseStreamResult::EndOfStream
} else {
// Re-arm the future
self.future = self.stm.clone().read(64 * 1024);
ResponseStreamResult::NonEmptyBuf(buf)
}
}
};
std::task::Poll::Ready(res)
}
fn size_hint(&self) -> SizeHint {
let hint = self.stm.size_hint();
let mut size_hint = SizeHint::new();
size_hint.set_lower(hint.0);
if let Some(upper) = hint.1 {
size_hint.set_upper(upper)
}
size_hint
}
}
#[cfg(test)]
impl PollFrame for tokio::sync::mpsc::Receiver<BufView> {
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<ResponseStreamResult> {
let res = match ready!(self.poll_recv(cx)) {
Some(buf) => ResponseStreamResult::NonEmptyBuf(buf),
None => ResponseStreamResult::EndOfStream,
};
std::task::Poll::Ready(res)
}
fn size_hint(&self) -> SizeHint {
SizeHint::default()
}
}
#[derive(Copy, Clone, Debug)]
enum GZipState {
Header,
Streaming,
Flushing,
Trailer,
EndOfStream,
}
#[pin_project]
pub struct GZipResponseStream {
stm: flate2::Compress,
crc: flate2::Crc,
next_buf: Option<BytesMut>,
partial: Option<BufView>,
#[pin]
underlying: ResponseStream,
state: GZipState,
}
impl GZipResponseStream {
pub fn new(underlying: ResponseStream) -> Self {
Self {
stm: flate2::Compress::new(flate2::Compression::fast(), false),
crc: flate2::Crc::new(),
next_buf: None,
partial: None,
state: GZipState::Header,
underlying,
}
}
pub fn abort(self) {
self.underlying.abort()
}
}
/// This is a minimal GZip header suitable for serving data from a webserver. We don't need to provide
/// most of the information. We're skipping header name, CRC, etc, and providing a null timestamp.
///
/// We're using compression level 1, as higher levels don't produce significant size differences. This
/// is probably the reason why nginx's default gzip compression level is also 1:
///
/// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
static GZIP_HEADER: Bytes =
Bytes::from_static(&[0x1f, 0x8b, 0x08, 0, 0, 0, 0, 0, 0x01, 0xff]);
impl PollFrame for GZipResponseStream {
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<ResponseStreamResult> {
let this = self.get_mut();
let state = &mut this.state;
let orig_state = *state;
let frame = match *state {
GZipState::EndOfStream => {
return std::task::Poll::Ready(ResponseStreamResult::EndOfStream)
}
GZipState::Header => {
*state = GZipState::Streaming;
return std::task::Poll::Ready(ResponseStreamResult::NonEmptyBuf(
BufView::from(GZIP_HEADER.clone()),
));
}
GZipState::Trailer => {
*state = GZipState::EndOfStream;
let mut v = Vec::with_capacity(8);
v.extend(&this.crc.sum().to_le_bytes());
v.extend(&this.crc.amount().to_le_bytes());
return std::task::Poll::Ready(ResponseStreamResult::NonEmptyBuf(
BufView::from(v),
));
}
GZipState::Streaming => {
if let Some(partial) = this.partial.take() {
ResponseStreamResult::NonEmptyBuf(partial)
} else {
ready!(Pin::new(&mut this.underlying).poll_frame(cx))
}
}
GZipState::Flushing => ResponseStreamResult::EndOfStream,
};
let stm = &mut this.stm;
// Ideally we could use MaybeUninit here, but flate2 requires &[u8]. We should also try
// to dynamically adjust this buffer.
let mut buf = this
.next_buf
.take()
.unwrap_or_else(|| BytesMut::zeroed(64 * 1024));
let start_in = stm.total_in();
let start_out = stm.total_out();
let res = match frame {
// Short-circuit these and just return
x @ (ResponseStreamResult::NoData
| ResponseStreamResult::Error(..)
| ResponseStreamResult::Trailers(..)) => {
return std::task::Poll::Ready(x)
}
ResponseStreamResult::EndOfStream => {
*state = GZipState::Flushing;
stm.compress(&[], &mut buf, flate2::FlushCompress::Finish)
}
ResponseStreamResult::NonEmptyBuf(mut input) => {
let res = stm.compress(&input, &mut buf, flate2::FlushCompress::None);
let len_in = (stm.total_in() - start_in) as usize;
debug_assert!(len_in <= input.len());
this.crc.update(&input[..len_in]);
if len_in < input.len() {
input.advance_cursor(len_in);
this.partial = Some(input);
}
res
}
};
let len = stm.total_out() - start_out;
let res = match res {
Err(err) => ResponseStreamResult::Error(err.into()),
Ok(flate2::Status::BufError) => {
// This should not happen
unreachable!("old={orig_state:?} new={state:?} buf_len={}", buf.len());
}
Ok(flate2::Status::Ok) => {
if len == 0 {
this.next_buf = Some(buf);
ResponseStreamResult::NoData
} else {
buf.truncate(len as usize);
ResponseStreamResult::NonEmptyBuf(BufView::from(buf.freeze()))
}
}
Ok(flate2::Status::StreamEnd) => {
*state = GZipState::Trailer;
if len == 0 {
this.next_buf = Some(buf);
ResponseStreamResult::NoData
} else {
buf.truncate(len as usize);
ResponseStreamResult::NonEmptyBuf(BufView::from(buf.freeze()))
}
}
};
std::task::Poll::Ready(res)
}
fn size_hint(&self) -> SizeHint {
SizeHint::default()
}
}
#[derive(Copy, Clone, Debug)]
enum BrotliState {
Streaming,
Flushing,
EndOfStream,
}
struct BrotliEncoderStateWrapper {
stm: *mut BrotliEncoderState,
}
#[pin_project]
pub struct BrotliResponseStream {
state: BrotliState,
stm: BrotliEncoderStateWrapper,
current_cursor: usize,
output_written_so_far: usize,
#[pin]
underlying: ResponseStream,
}
impl Drop for BrotliEncoderStateWrapper {
fn drop(&mut self) {
// SAFETY: since we are dropping, we can be sure that this instance will not
// be used again.
unsafe {
brotli::ffi::compressor::BrotliEncoderDestroyInstance(self.stm);
}
}
}
impl BrotliResponseStream {
pub fn new(underlying: ResponseStream) -> Self {
// SAFETY: creating an FFI instance should be OK with these args.
let stm = unsafe {
let stm = brotli::ffi::compressor::BrotliEncoderCreateInstance(
None,
None,
std::ptr::null_mut(),
);
// Quality level 6 is based on google's nginx default value for on-the-fly compression
// https://github.com/google/ngx_brotli#brotli_comp_level
// lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes (~4MB)
brotli::ffi::compressor::BrotliEncoderSetParameter(
stm,
BrotliEncoderParameter::BROTLI_PARAM_QUALITY,
6,
);
brotli::ffi::compressor::BrotliEncoderSetParameter(
stm,
BrotliEncoderParameter::BROTLI_PARAM_LGWIN,
22,
);
BrotliEncoderStateWrapper { stm }
};
Self {
stm,
output_written_so_far: 0,
current_cursor: 0,
state: BrotliState::Streaming,
underlying,
}
}
pub fn abort(self) {
self.underlying.abort()
}
}
fn max_compressed_size(input_size: usize) -> usize {
if input_size == 0 {
return 2;
}
// [window bits / empty metadata] + N * [uncompressed] + [last empty]
let num_large_blocks = input_size >> 14;
let overhead = 2 + (4 * num_large_blocks) + 3 + 1;
let result = input_size + overhead;
if result < input_size {
0
} else {
result
}
}
impl PollFrame for BrotliResponseStream {
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<ResponseStreamResult> {
let this = self.get_mut();
let state = &mut this.state;
let frame = match *state {
BrotliState::Streaming => {
ready!(Pin::new(&mut this.underlying).poll_frame(cx))
}
BrotliState::Flushing => ResponseStreamResult::EndOfStream,
BrotliState::EndOfStream => {
return std::task::Poll::Ready(ResponseStreamResult::EndOfStream);
}
};
let res = match frame {
ResponseStreamResult::NonEmptyBuf(buf) => {
let mut output_written = 0;
let mut total_output_written = 0;
let mut input_size = buf.len();
let input_buffer = buf.as_ref();
let mut len = max_compressed_size(input_size);
let mut output_buffer = vec![0u8; len];
let mut ob_ptr = output_buffer.as_mut_ptr();
// SAFETY: these are okay arguments to these FFI calls.
unsafe {
brotli::ffi::compressor::BrotliEncoderCompressStream(
this.stm.stm,
brotli::ffi::compressor::BrotliEncoderOperation::BROTLI_OPERATION_PROCESS,
&mut input_size,
&input_buffer.as_ptr() as *const *const u8 as *mut *const u8,
&mut len,
&mut ob_ptr,
&mut output_written,
);
total_output_written += output_written;
output_written = 0;
brotli::ffi::compressor::BrotliEncoderCompressStream(
this.stm.stm,
brotli::ffi::compressor::BrotliEncoderOperation::BROTLI_OPERATION_FLUSH,
&mut input_size,
&input_buffer.as_ptr() as *const *const u8 as *mut *const u8,
&mut len,
&mut ob_ptr,
&mut output_written,
);
total_output_written += output_written;
};
output_buffer
.truncate(total_output_written - this.output_written_so_far);
this.output_written_so_far = total_output_written;
ResponseStreamResult::NonEmptyBuf(BufView::from(output_buffer))
}
ResponseStreamResult::EndOfStream => {
let mut len = 1024usize;
let mut output_buffer = vec![0u8; len];
let mut input_size = 0;
let mut output_written = 0;
let ob_ptr = output_buffer.as_mut_ptr();
// SAFETY: these are okay arguments to these FFI calls.
unsafe {
brotli::ffi::compressor::BrotliEncoderCompressStream(
this.stm.stm,
brotli::ffi::compressor::BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
&mut input_size,
std::ptr::null_mut(),
&mut len,
&ob_ptr as *const *mut u8 as *mut *mut u8,
&mut output_written,
);
};
if output_written == 0 {
this.state = BrotliState::EndOfStream;
ResponseStreamResult::EndOfStream
} else {
this.state = BrotliState::Flushing;
output_buffer.truncate(output_written - this.output_written_so_far);
ResponseStreamResult::NonEmptyBuf(BufView::from(output_buffer))
}
}
_ => frame,
};
std::task::Poll::Ready(res)
}
fn size_hint(&self) -> SizeHint {
SizeHint::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use deno_core::futures::future::poll_fn;
use std::hash::Hasher;
use std::io::Read;
use std::io::Write;
fn zeros() -> Vec<u8> {
vec![0; 1024 * 1024]
}
fn hard_to_gzip_data() -> Vec<u8> {
const SIZE: usize = 1024 * 1024;
let mut v = Vec::with_capacity(SIZE);
let mut hasher = std::collections::hash_map::DefaultHasher::new();
for i in 0..SIZE {
hasher.write_usize(i);
v.push(hasher.finish() as u8);
}
v
}
fn already_gzipped_data() -> Vec<u8> {
let mut v = Vec::with_capacity(1024 * 1024);
let mut gz =
flate2::GzBuilder::new().write(&mut v, flate2::Compression::best());
gz.write_all(&hard_to_gzip_data()).unwrap();
_ = gz.finish().unwrap();
v
}
fn chunk(v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
// Chunk the data into 10k
let mut out = vec![];
for v in v.chunks(10 * 1024) {
out.push(v.to_vec());
}
out.into_iter()
}
fn random(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
let mut out = vec![];
loop {
if v.is_empty() {
break;
}
let rand = (rand::random::<usize>() % v.len()) + 1;
let new = v.split_off(rand);
out.push(v);
v = new;
}
// Print the lengths of the vectors if we actually fail this test at some point
let lengths = out.iter().map(|v| v.len()).collect::<Vec<_>>();
eprintln!("Lengths = {:?}", lengths);
out.into_iter()
}
fn front_load(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
// Chunk the data at 90%
let offset = (v.len() * 90) / 100;
let v2 = v.split_off(offset);
vec![v, v2].into_iter()
}
fn front_load_but_one(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
let offset = v.len() - 1;
let v2 = v.split_off(offset);
vec![v, v2].into_iter()
}
fn back_load(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
// Chunk the data at 10%
let offset = (v.len() * 10) / 100;
let v2 = v.split_off(offset);
vec![v, v2].into_iter()
}
async fn test_gzip(i: impl Iterator<Item = Vec<u8>> + Send + 'static) {
let v = i.collect::<Vec<_>>();
let mut expected: Vec<u8> = vec![];
for v in &v {
expected.extend(v);
}
let (tx, rx) = tokio::sync::mpsc::channel(1);
let underlying = ResponseStream::TestChannel(rx);
let mut resp = GZipResponseStream::new(underlying);
let handle = tokio::task::spawn(async move {
for chunk in v {
tx.send(chunk.into()).await.ok().unwrap();
}
});
// Limit how many times we'll loop
const LIMIT: usize = 1000;
let mut v: Vec<u8> = vec![];
for i in 0..=LIMIT {
assert_ne!(i, LIMIT);
let frame = poll_fn(|cx| Pin::new(&mut resp).poll_frame(cx)).await;
if matches!(frame, ResponseStreamResult::EndOfStream) {
break;
}
if matches!(frame, ResponseStreamResult::NoData) {
continue;
}
let ResponseStreamResult::NonEmptyBuf(buf) = frame else {
panic!("Unexpected stream type");
};
assert_ne!(buf.len(), 0);
v.extend(&*buf);
}
let mut gz = flate2::read::GzDecoder::new(&*v);
let mut v = vec![];
gz.read_to_end(&mut v).unwrap();
assert_eq!(v, expected);
handle.await.unwrap();
}
async fn test_brotli(i: impl Iterator<Item = Vec<u8>> + Send + 'static) {
let v = i.collect::<Vec<_>>();
let mut expected: Vec<u8> = vec![];
for v in &v {
expected.extend(v);
}
let (tx, rx) = tokio::sync::mpsc::channel(1);
let underlying = ResponseStream::TestChannel(rx);
let mut resp = BrotliResponseStream::new(underlying);
let handle = tokio::task::spawn(async move {
for chunk in v {
tx.send(chunk.into()).await.ok().unwrap();
}
});
// Limit how many times we'll loop
const LIMIT: usize = 1000;
let mut v: Vec<u8> = vec![];
for i in 0..=LIMIT {
assert_ne!(i, LIMIT);
let frame = poll_fn(|cx| Pin::new(&mut resp).poll_frame(cx)).await;
if matches!(frame, ResponseStreamResult::EndOfStream) {
break;
}
if matches!(frame, ResponseStreamResult::NoData) {
continue;
}
let ResponseStreamResult::NonEmptyBuf(buf) = frame else {
panic!("Unexpected stream type");
};
assert_ne!(buf.len(), 0);
v.extend(&*buf);
}
let mut gz = brotli::Decompressor::new(&*v, v.len());
let mut v = vec![];
if !expected.is_empty() {
gz.read_to_end(&mut v).unwrap();
}
assert_eq!(v, expected);
handle.await.unwrap();
}
#[tokio::test]
async fn test_simple() {
test_brotli(vec![b"hello world".to_vec()].into_iter()).await;
test_gzip(vec![b"hello world".to_vec()].into_iter()).await;
}
#[tokio::test]
async fn test_empty() {
test_brotli(vec![].into_iter()).await;
test_gzip(vec![].into_iter()).await;
}
#[tokio::test]
async fn test_simple_zeros() {
test_brotli(vec![vec![0; 0x10000]].into_iter()).await;
test_gzip(vec![vec![0; 0x10000]].into_iter()).await;
}
macro_rules! test {
($vec:ident) => {
mod $vec {
#[tokio::test]
async fn chunk() {
let iter = super::chunk(super::$vec());
super::test_gzip(iter).await;
let br_iter = super::chunk(super::$vec());
super::test_brotli(br_iter).await;
}
#[tokio::test]
async fn front_load() {
let iter = super::front_load(super::$vec());
super::test_gzip(iter).await;
let br_iter = super::front_load(super::$vec());
super::test_brotli(br_iter).await;
}
#[tokio::test]
async fn front_load_but_one() {
let iter = super::front_load_but_one(super::$vec());
super::test_gzip(iter).await;
let br_iter = super::front_load_but_one(super::$vec());
super::test_brotli(br_iter).await;
}
#[tokio::test]
async fn back_load() {
let iter = super::back_load(super::$vec());
super::test_gzip(iter).await;
let br_iter = super::back_load(super::$vec());
super::test_brotli(br_iter).await;
}
#[tokio::test]
async fn random() {
let iter = super::random(super::$vec());
super::test_gzip(iter).await;
let br_iter = super::random(super::$vec());
super::test_brotli(br_iter).await;
}
}
};
}
test!(zeros);
test!(hard_to_gzip_data);
test!(already_gzipped_data);
}