diff --git a/Tests/LibWeb/Text/expected/Streams/ReadableStream-default-tee.txt b/Tests/LibWeb/Text/expected/Streams/ReadableStream-default-tee.txt new file mode 100644 index 0000000000..daeb8e4bb1 --- /dev/null +++ b/Tests/LibWeb/Text/expected/Streams/ReadableStream-default-tee.txt @@ -0,0 +1,8 @@ +stream1: abcdefghijklmnopqrstuvwxyz +stream1: ABCDEFGHIJKLMNOPQRSTUVWXYZ +stream1: 0123456789!@#$%^&*()-=_+,< +stream1: Done! +stream2: abcdefghijklmnopqrstuvwxyz +stream2: ABCDEFGHIJKLMNOPQRSTUVWXYZ +stream2: 0123456789!@#$%^&*()-=_+,< +stream2: Done! diff --git a/Tests/LibWeb/Text/input/Streams/ReadableStream-default-tee.html b/Tests/LibWeb/Text/input/Streams/ReadableStream-default-tee.html new file mode 100644 index 0000000000..01eca132f2 --- /dev/null +++ b/Tests/LibWeb/Text/input/Streams/ReadableStream-default-tee.html @@ -0,0 +1,56 @@ + + diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp index 38d5e96954..fcc9f29f65 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp @@ -7,6 +7,7 @@ * SPDX-License-Identifier: BSD-2-Clause */ +#include #include #include #include @@ -16,6 +17,8 @@ #include #include #include +#include +#include #include #include #include @@ -239,7 +242,7 @@ bool readable_stream_has_default_reader(ReadableStream const& stream) } // https://streams.spec.whatwg.org/#readable-stream-tee -WebIDL::ExceptionOr readable_stream_tee(JS::Realm& realm, ReadableStream& stream, bool) +WebIDL::ExceptionOr readable_stream_tee(JS::Realm& realm, ReadableStream& stream, bool clone_for_branch2) { // 1. Assert: stream implements ReadableStream. // 2. Assert: cloneForBranch2 is a boolean. @@ -250,7 +253,308 @@ WebIDL::ExceptionOr readable_stream_tee(JS::Realm& realm, Re } // 4. Return ? ReadableStreamDefaultTee(stream, cloneForBranch2). - return realm.vm().throw_completion(JS::ErrorType::NotImplemented, "Default stream teeing"); + return TRY(readable_stream_default_tee(realm, stream, clone_for_branch2)); +} + +struct DefaultStreamTeeParams final : JS::Cell { + JS_CELL(TeeParams, JS::Cell); + JS_DECLARE_ALLOCATOR(DefaultStreamTeeParams); + + virtual void visit_edges(Visitor& visitor) override + { + Base::visit_edges(visitor); + visitor.visit(reason1); + visitor.visit(reason2); + visitor.visit(branch1); + visitor.visit(branch2); + visitor.visit(pull_algorithm); + } + + bool reading { false }; + bool read_again { false }; + bool canceled1 { false }; + bool canceled2 { false }; + JS::Value reason1 { JS::js_undefined() }; + JS::Value reason2 { JS::js_undefined() }; + JS::GCPtr branch1; + JS::GCPtr branch2; + JS::GCPtr pull_algorithm; +}; + +JS_DEFINE_ALLOCATOR(DefaultStreamTeeParams); + +// https://streams.spec.whatwg.org/#ref-for-read-request③ +class DefaultStreamTeeReadRequest final : public ReadRequest { + JS_CELL(DefaultStreamTeeReadRequest, Cell); + JS_DECLARE_ALLOCATOR(DefaultStreamTeeReadRequest); + +public: + DefaultStreamTeeReadRequest( + JS::Realm& realm, + JS::NonnullGCPtr stream, + JS::NonnullGCPtr params, + JS::NonnullGCPtr cancel_promise, + bool clone_for_branch2) + : m_realm(realm) + , m_stream(stream) + , m_params(params) + , m_cancel_promise(cancel_promise) + , m_clone_for_branch2(clone_for_branch2) + { + } + + // https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps③ + virtual void on_chunk(JS::Value chunk) override + { + // 1. Queue a microtask to perform the following steps: + HTML::queue_a_microtask(nullptr, [this, chunk]() { + HTML::TemporaryExecutionContext execution_context { Bindings::host_defined_environment_settings_object(m_realm) }; + + auto controller1 = m_params->branch1->controller()->get>(); + auto controller2 = m_params->branch2->controller()->get>(); + + // 1. Set readAgain to false. + m_params->read_again = false; + + // 2. Let chunk1 and chunk2 be chunk. + auto chunk1 = chunk; + auto chunk2 = chunk; + + // 3. If canceled2 is false and cloneForBranch2 is true, + if (!m_params->canceled2 && m_clone_for_branch2) { + // 1. Let cloneResult be StructuredClone(chunk2). + auto clone_result = structured_clone(m_realm, chunk2); + + // 2. If cloneResult is an abrupt completion, + if (clone_result.is_exception()) { + auto completion = Bindings::dom_exception_to_throw_completion(m_realm->vm(), clone_result.release_error()); + + // 1. Perform ! ReadableStreamDefaultControllerError(branch1.[[controller]], cloneResult.[[Value]]). + readable_stream_default_controller_error(controller1, completion.value().value()); + + // 2. Perform ! ReadableStreamDefaultControllerError(branch2.[[controller]], cloneResult.[[Value]]). + readable_stream_default_controller_error(controller2, completion.value().value()); + + // 3. Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]). + auto cancel_result = MUST(readable_stream_cancel(m_stream, completion.value().value())); + JS::NonnullGCPtr cancel_value = verify_cast(*cancel_result->promise().ptr()); + + WebIDL::resolve_promise(m_realm, m_cancel_promise, cancel_value); + + // 4. Return. + return; + } + + // 3. Otherwise, set chunk2 to cloneResult.[[Value]]. + chunk2 = clone_result.release_value(); + } + + // 4. If canceled1 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch1.[[controller]], chunk1). + if (!m_params->canceled1) { + MUST(readable_stream_default_controller_enqueue(controller1, chunk1)); + } + + // 5. If canceled2 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch2.[[controller]], chunk2). + if (!m_params->canceled2) { + MUST(readable_stream_default_controller_enqueue(controller2, chunk2)); + } + + // 6. Set reading to false. + m_params->reading = false; + + // 7. If readAgain is true, perform pullAlgorithm. + if (m_params->read_again) { + MUST(m_params->pull_algorithm->function()()); + } + }); + + // NOTE: The microtask delay here is necessary because it takes at least a microtask to detect errors, when we + // use reader.[[closedPromise]] below. We want errors in stream to error both branches immediately, so we + // cannot let successful synchronously-available reads happen ahead of asynchronously-available errors. + } + + // https://streams.spec.whatwg.org/#ref-for-read-request-close-steps② + virtual void on_close() override + { + auto controller1 = m_params->branch1->controller()->get>(); + auto controller2 = m_params->branch2->controller()->get>(); + + // 1. Set reading to false. + m_params->reading = false; + + // 2. If canceled1 is false, perform ! ReadableStreamDefaultControllerClose(branch1.[[controller]]). + if (!m_params->canceled1) { + readable_stream_default_controller_close(controller1); + } + + // 3. If canceled2 is false, perform ! ReadableStreamDefaultControllerClose(branch2.[[controller]]). + if (!m_params->canceled2) { + readable_stream_default_controller_close(controller2); + } + + // 4. If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined. + if (!m_params->canceled1 || !m_params->canceled2) { + WebIDL::resolve_promise(m_realm, m_cancel_promise, JS::js_undefined()); + } + } + + // https://streams.spec.whatwg.org/#ref-for-read-request-error-steps③ + virtual void on_error(JS::Value) override + { + // 1. Set reading to false. + m_params->reading = false; + } + +private: + virtual void visit_edges(Visitor& visitor) override + { + Base::visit_edges(visitor); + visitor.visit(m_realm); + visitor.visit(m_stream); + visitor.visit(m_params); + visitor.visit(m_cancel_promise); + } + + JS::NonnullGCPtr m_realm; + JS::NonnullGCPtr m_stream; + JS::NonnullGCPtr m_params; + JS::NonnullGCPtr m_cancel_promise; + bool m_clone_for_branch2 { false }; +}; + +JS_DEFINE_ALLOCATOR(DefaultStreamTeeReadRequest); + +// https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee +WebIDL::ExceptionOr readable_stream_default_tee(JS::Realm& realm, ReadableStream& stream, bool clone_for_branch2) +{ + // 1. Assert: stream implements ReadableStream. + // 2. Assert: cloneForBranch2 is a boolean. + + // 3. Let reader be ? AcquireReadableStreamDefaultReader(stream). + auto reader = TRY(acquire_readable_stream_default_reader(stream)); + + // 4. Let reading be false. + // 5. Let readAgain be false. + // 6. Let canceled1 be false. + // 7. Let canceled2 be false. + // 8. Let reason1 be undefined. + // 9. Let reason2 be undefined. + // 10. Let branch1 be undefined. + // 11. Let branch2 be undefined. + auto params = realm.heap().allocate(realm); + + // 12. Let cancelPromise be a new promise. + auto cancel_promise = WebIDL::create_promise(realm); + + // 13. Let pullAlgorithm be the following steps: + auto pull_algorithm = JS::create_heap_function(realm.heap(), [&realm, &stream, reader, params, cancel_promise, clone_for_branch2]() -> WebIDL::ExceptionOr> { + // 1. If reading is true, + if (params->reading) { + // 1. Set readAgain to true. + params->read_again = true; + + // 2. Return a promise resolved with undefined. + return WebIDL::create_resolved_promise(realm, JS::js_undefined()); + } + + // 2. Set reading to true. + params->reading = true; + + // 3. Let readRequest be a read request with the following items: + auto read_request = realm.heap().allocate_without_realm(realm, stream, params, cancel_promise, clone_for_branch2); + + // 4. Perform ! ReadableStreamDefaultReaderRead(reader, readRequest). + MUST(readable_stream_default_reader_read(reader, read_request)); + + // 5. Return a promise resolved with undefined. + return WebIDL::create_resolved_promise(realm, JS::js_undefined()); + }); + + // AD-HOC: The read request within the pull algorithm must be able to re-invoke the pull algorithm, so cache it here. + params->pull_algorithm = pull_algorithm; + + // 14. Let cancel1Algorithm be the following steps, taking a reason argument: + auto cancel1_algorithm = JS::create_heap_function(realm.heap(), [&realm, &stream, params, cancel_promise](JS::Value reason) -> WebIDL::ExceptionOr> { + // 1. Set canceled1 to true. + params->canceled1 = true; + + // 2. Set reason1 to reason. + params->reason1 = reason; + + // 3. If canceled2 is true, + if (params->canceled2) { + // 1. Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »). + auto composite_reason = JS::Array::create_from(realm, AK::Array { params->reason1, params->reason2 }); + + // 2. Let cancelResult be ! ReadableStreamCancel(stream, compositeReason). + auto cancel_result = MUST(readable_stream_cancel(stream, composite_reason)); + + // 3. Resolve cancelPromise with cancelResult. + JS::NonnullGCPtr cancel_value = verify_cast(*cancel_result->promise().ptr()); + WebIDL::resolve_promise(realm, cancel_promise, cancel_value); + } + + // 4. Return cancelPromise. + return cancel_promise; + }); + + // 15. Let cancel2Algorithm be the following steps, taking a reason argument: + auto cancel2_algorithm = JS::create_heap_function(realm.heap(), [&realm, &stream, params, cancel_promise](JS::Value reason) -> WebIDL::ExceptionOr> { + // 1. Set canceled2 to true. + params->canceled2 = true; + + // 2. Set reason2 to reason. + params->reason2 = reason; + + // 3. If canceled1 is true, + if (params->canceled1) { + // 1. Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »). + auto composite_reason = JS::Array::create_from(realm, AK::Array { params->reason1, params->reason2 }); + + // 2. Let cancelResult be ! ReadableStreamCancel(stream, compositeReason). + auto cancel_result = MUST(readable_stream_cancel(stream, composite_reason)); + + // 3. Resolve cancelPromise with cancelResult. + JS::NonnullGCPtr cancel_value = verify_cast(*cancel_result->promise().ptr()); + WebIDL::resolve_promise(realm, cancel_promise, cancel_value); + } + + // 4. Return cancelPromise. + return cancel_promise; + }); + + // 16. Let startAlgorithm be an algorithm that returns undefined. + auto start_algorithm = JS::create_heap_function(realm.heap(), []() -> WebIDL::ExceptionOr { + return JS::js_undefined(); + }); + + // 17. Set branch1 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm). + params->branch1 = MUST(create_readable_stream(realm, start_algorithm, pull_algorithm, cancel1_algorithm)); + + // 18. Set branch2 to ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm). + params->branch2 = MUST(create_readable_stream(realm, start_algorithm, pull_algorithm, cancel2_algorithm)); + + // 19. Upon rejection of reader.[[closedPromise]] with reason r, + WebIDL::upon_rejection(*reader->closed_promise_capability(), [&realm, params, cancel_promise](auto reason) -> WebIDL::ExceptionOr { + auto controller1 = params->branch1->controller()->get>(); + auto controller2 = params->branch2->controller()->get>(); + + // 1. Perform ! ReadableStreamDefaultControllerError(branch1.[[controller]], r). + readable_stream_default_controller_error(controller1, reason); + + // 2. Perform ! ReadableStreamDefaultControllerError(branch2.[[controller]], r). + readable_stream_default_controller_error(controller2, reason); + + // 3. If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined. + if (!params->canceled1 || !params->canceled2) { + WebIDL::resolve_promise(realm, cancel_promise, JS::js_undefined()); + } + + return JS::js_undefined(); + }); + + // 20. Return « branch1, branch2 ». + return ReadableStreamPair { *params->branch1, *params->branch2 }; } // https://streams.spec.whatwg.org/#make-size-algorithm-from-size-function diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h index f24c0b1bb1..ab63a65104 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h @@ -48,6 +48,7 @@ bool readable_stream_has_byob_reader(ReadableStream const&); bool readable_stream_has_default_reader(ReadableStream const&); WebIDL::ExceptionOr readable_stream_tee(JS::Realm&, ReadableStream&, bool clone_for_branch2); +WebIDL::ExceptionOr readable_stream_default_tee(JS::Realm& realm, ReadableStream& stream, bool clone_for_branch2); WebIDL::ExceptionOr> readable_stream_reader_generic_cancel(ReadableStreamGenericReaderMixin&, JS::Value reason); void readable_stream_reader_generic_initialize(ReadableStreamReader, ReadableStream&);