LibProtocol+Userland: Support unbuffered protocol requests

LibWeb will need to use unbuffered requests to support server-sent
events. Connection for such events remain open and the remote end sends
data as HTTP bodies at its leisure. The browser needs to be able to
handle this data as it arrives, as the request essentially never
finishes.

To support this, this make Protocol::Request operate in one of two
modes: buffered or unbuffered. The existing mechanism for setting up a
buffered request was a bit awkward; you had to set specific callbacks,
but be sure not to set some others, and then set a flag. The new
mechanism is to set the mode and the callbacks that the mode needs in
one API.
This commit is contained in:
Timothy Flynn 2024-05-24 11:37:02 -04:00 committed by Andreas Kling
parent 086ddd481d
commit 168d28c15f
14 changed files with 189 additions and 129 deletions

View file

@ -97,6 +97,17 @@ RequestManagerQt::Request::Request(QNetworkReply& reply)
RequestManagerQt::Request::~Request() = default;
void RequestManagerQt::Request::set_buffered_request_finished_callback(Protocol::Request::BufferedRequestFinished on_buffered_request_finished)
{
this->on_buffered_request_finish = move(on_buffered_request_finished);
}
void RequestManagerQt::Request::set_unbuffered_request_callbacks(Protocol::Request::HeadersReceived, Protocol::Request::DataReceived, Protocol::Request::RequestFinished on_request_finished)
{
dbgln("Unbuffered requests are not yet supported with Qt networking");
on_request_finished(false, 0);
}
void RequestManagerQt::Request::did_finish()
{
auto buffer = m_reply.readAll();

View file

@ -43,9 +43,9 @@ private:
virtual ~Request() override;
virtual void set_should_buffer_all_input(bool) override { }
virtual void set_buffered_request_finished_callback(Protocol::Request::BufferedRequestFinished) override;
virtual void set_unbuffered_request_callbacks(Protocol::Request::HeadersReceived, Protocol::Request::DataReceived, Protocol::Request::RequestFinished) override;
virtual bool stop() override { return false; }
virtual void stream_into(Stream&) override { }
void did_finish();
@ -55,6 +55,8 @@ private:
Request(QNetworkReply&);
QNetworkReply& m_reply;
Protocol::Request::BufferedRequestFinished on_buffered_request_finish;
};
HashMap<QNetworkReply*, NonnullRefPtr<Request>> m_pending;

View file

@ -57,8 +57,15 @@ DownloadWidget::DownloadWidget(const URL::URL& url)
m_output_file_stream = file_or_error.release_value();
}
m_download->on_finish = [this](bool success, auto) { did_finish(success); };
m_download->stream_into(*m_output_file_stream);
auto on_data_received = [this](auto data) {
m_output_file_stream->write_until_depleted(data).release_value_but_fixme_should_propagate_errors();
};
auto on_finished = [this](bool success, auto) {
did_finish(success);
};
m_download->set_unbuffered_request_callbacks({}, move(on_data_received), move(on_finished));
set_fill_with_background_color(true);
set_layout<GUI::VerticalBoxLayout>(4);

View file

@ -323,7 +323,8 @@ void MapWidget::process_tile_queue()
VERIFY(!request.is_null());
m_active_requests.append(request);
request->on_buffered_request_finish = [this, request, url, tile_key](bool success, auto, auto&, auto, ReadonlyBytes payload) {
request->set_buffered_request_finished_callback([this, request, url, tile_key](bool success, auto, auto&, auto, ReadonlyBytes payload) {
auto was_active = m_active_requests.remove_first_matching([request](auto const& other_request) { return other_request->id() == request->id(); });
if (!was_active)
return;
@ -351,8 +352,8 @@ void MapWidget::process_tile_queue()
// FIXME: only update the part of the screen that this tile covers
update();
};
request->set_should_buffer_all_input(true);
});
request->on_certificate_requested = []() -> Protocol::Request::CertificateAndKey { return {}; };
}

View file

@ -59,7 +59,8 @@ void SearchPanel::search(StringView query)
auto request = m_request_client->start_request("GET", url, headers, {});
VERIFY(!request.is_null());
m_request = request;
request->on_buffered_request_finish = [this, request, url](bool success, auto, auto&, auto, ReadonlyBytes payload) {
request->set_buffered_request_finished_callback([this, request, url](bool success, auto, auto&, auto, ReadonlyBytes payload) {
m_request.clear();
if (!success) {
dbgln("Maps: Can't load: {}", url);
@ -111,8 +112,8 @@ void SearchPanel::search(StringView query)
m_empty_container->set_visible(false);
m_places_list->set_model(*GUI::ItemListModel<String>::create(m_places_names));
m_places_list->set_visible(true);
};
request->set_should_buffer_all_input(true);
});
request->on_certificate_requested = []() -> Protocol::Request::CertificateAndKey { return {}; };
}

View file

@ -26,7 +26,8 @@ void UsersMapWidget::get_users()
auto request = request_client()->start_request("GET", url, headers, {});
VERIFY(!request.is_null());
m_request = request;
request->on_buffered_request_finish = [this, request, url](bool success, auto, auto&, auto, ReadonlyBytes payload) {
request->set_buffered_request_finished_callback([this, request, url](bool success, auto, auto&, auto, ReadonlyBytes payload) {
m_request.clear();
if (!success) {
dbgln("Maps: Can't load: {}", url);
@ -57,8 +58,8 @@ void UsersMapWidget::get_users()
m_users.value().append(user);
}
add_users_to_map();
};
request->set_should_buffer_all_input(true);
});
request->on_certificate_requested = []() -> Protocol::Request::CertificateAndKey { return {}; };
}

View file

@ -17,6 +17,15 @@ Request::Request(RequestClient& client, i32 request_id)
bool Request::stop()
{
on_headers_received = nullptr;
on_finish = nullptr;
on_progress = nullptr;
on_certificate_requested = nullptr;
m_internal_buffered_data = nullptr;
m_internal_stream_data = nullptr;
m_mode = Mode::Unknown;
return m_client->stop_request({}, *this);
}
@ -32,79 +41,23 @@ void Request::set_request_fd(Badge<Protocol::RequestClient>, int fd)
m_internal_stream_data->read_stream = move(stream);
}
void Request::stream_into(Stream& stream)
void Request::set_buffered_request_finished_callback(BufferedRequestFinished on_buffered_request_finished)
{
VERIFY(!m_internal_stream_data);
VERIFY(m_mode == Mode::Unknown);
m_mode = Mode::Buffered;
m_internal_stream_data = make<InternalStreamData>();
m_internal_stream_data->read_notifier = Core::Notifier::construct(fd(), Core::Notifier::Type::Read);
if (fd() != -1)
m_internal_stream_data->read_stream = MUST(Core::File::adopt_fd(fd(), Core::File::OpenMode::Read));
auto user_on_finish = move(on_finish);
on_finish = [this](auto success, auto total_size) {
m_internal_stream_data->success = success;
m_internal_stream_data->total_size = total_size;
m_internal_stream_data->request_done = true;
m_internal_stream_data->on_finish();
};
m_internal_stream_data->on_finish = [this, user_on_finish = move(user_on_finish)] {
if (!m_internal_stream_data->user_finish_called && m_internal_stream_data->read_stream->is_eof()) {
m_internal_stream_data->user_finish_called = true;
user_on_finish(m_internal_stream_data->success, m_internal_stream_data->total_size);
}
};
m_internal_stream_data->read_notifier->on_activation = [this, &stream] {
constexpr size_t buffer_size = 256 * KiB;
static char buf[buffer_size];
do {
auto result = m_internal_stream_data->read_stream->read_some({ buf, buffer_size });
if (result.is_error() && (!result.error().is_errno() || (result.error().is_errno() && result.error().code() != EINTR)))
break;
if (result.is_error())
continue;
auto read_bytes = result.release_value();
if (read_bytes.is_empty())
break;
// FIXME: What do we do if this fails?
stream.write_until_depleted(read_bytes).release_value_but_fixme_should_propagate_errors();
} while (true);
if (m_internal_stream_data->read_stream->is_eof())
m_internal_stream_data->read_notifier->close();
if (m_internal_stream_data->request_done)
m_internal_stream_data->on_finish();
};
}
void Request::set_should_buffer_all_input(bool value)
{
if (m_should_buffer_all_input == value)
return;
if (m_internal_buffered_data && !value) {
m_internal_buffered_data = nullptr;
m_should_buffer_all_input = false;
return;
}
VERIFY(!m_internal_stream_data);
VERIFY(!m_internal_buffered_data);
VERIFY(on_buffered_request_finish); // Not having this set makes no sense.
m_internal_buffered_data = make<InternalBufferedData>();
m_should_buffer_all_input = true;
on_headers_received = [this](auto& headers, auto response_code) {
m_internal_buffered_data->response_headers = headers;
m_internal_buffered_data->response_code = move(response_code);
};
on_finish = [this](auto success, auto total_size) {
on_finish = [this, on_buffered_request_finished = move(on_buffered_request_finished)](auto success, auto total_size) {
auto output_buffer = ByteBuffer::create_uninitialized(m_internal_buffered_data->payload_stream.used_buffer_size()).release_value_but_fixme_should_propagate_errors();
m_internal_buffered_data->payload_stream.read_until_filled(output_buffer).release_value_but_fixme_should_propagate_errors();
on_buffered_request_finish(
on_buffered_request_finished(
success,
total_size,
m_internal_buffered_data->response_headers,
@ -112,15 +65,27 @@ void Request::set_should_buffer_all_input(bool value)
output_buffer);
};
stream_into(m_internal_buffered_data->payload_stream);
set_up_internal_stream_data([this](auto read_bytes) {
// FIXME: What do we do if this fails?
m_internal_buffered_data->payload_stream.write_until_depleted(read_bytes).release_value_but_fixme_should_propagate_errors();
});
}
void Request::set_unbuffered_request_callbacks(HeadersReceived on_headers_received, DataReceived on_data_received, RequestFinished on_finish)
{
VERIFY(m_mode == Mode::Unknown);
m_mode = Mode::Unbuffered;
this->on_headers_received = move(on_headers_received);
this->on_finish = move(on_finish);
set_up_internal_stream_data(move(on_data_received));
}
void Request::did_finish(Badge<RequestClient>, bool success, u64 total_size)
{
if (!on_finish)
return;
on_finish(success, total_size);
if (on_finish)
on_finish(success, total_size);
}
void Request::did_progress(Badge<RequestClient>, Optional<u64> total_size, u64 downloaded_size)
@ -144,4 +109,55 @@ void Request::did_request_certificates(Badge<RequestClient>)
}
}
}
void Request::set_up_internal_stream_data(DataReceived on_data_available)
{
VERIFY(!m_internal_stream_data);
m_internal_stream_data = make<InternalStreamData>();
m_internal_stream_data->read_notifier = Core::Notifier::construct(fd(), Core::Notifier::Type::Read);
if (fd() != -1)
m_internal_stream_data->read_stream = MUST(Core::File::adopt_fd(fd(), Core::File::OpenMode::Read));
auto user_on_finish = move(on_finish);
on_finish = [this](auto success, auto total_size) {
m_internal_stream_data->success = success;
m_internal_stream_data->total_size = total_size;
m_internal_stream_data->request_done = true;
m_internal_stream_data->on_finish();
};
m_internal_stream_data->on_finish = [this, user_on_finish = move(user_on_finish)]() {
if (!m_internal_stream_data->user_finish_called && m_internal_stream_data->read_stream->is_eof()) {
m_internal_stream_data->user_finish_called = true;
user_on_finish(m_internal_stream_data->success, m_internal_stream_data->total_size);
}
};
m_internal_stream_data->read_notifier->on_activation = [this, on_data_available = move(on_data_available)]() {
static constexpr size_t buffer_size = 256 * KiB;
static char buffer[buffer_size];
do {
auto result = m_internal_stream_data->read_stream->read_some({ buffer, buffer_size });
if (result.is_error() && (!result.error().is_errno() || (result.error().is_errno() && result.error().code() != EINTR)))
break;
if (result.is_error())
continue;
auto read_bytes = result.release_value();
if (read_bytes.is_empty())
break;
on_data_available(read_bytes);
} while (true);
if (m_internal_stream_data->read_stream->is_eof())
m_internal_stream_data->read_notifier->close();
if (m_internal_stream_data->request_done)
m_internal_stream_data->on_finish();
};
}
}

View file

@ -36,17 +36,21 @@ public:
int fd() const { return m_fd; }
bool stop();
void stream_into(Stream&);
using BufferedRequestFinished = Function<void(bool success, u64 total_size, HashMap<ByteString, ByteString, CaseInsensitiveStringTraits> const& response_headers, Optional<u32> response_code, ReadonlyBytes payload)>;
bool should_buffer_all_input() const { return m_should_buffer_all_input; }
/// Note: Will override `on_finish', and `on_headers_received', and expects `on_buffered_request_finish' to be set!
void set_should_buffer_all_input(bool);
// Configure the request such that the entirety of the response data is buffered. The callback receives that data and
// the response headers all at once. Using this method is mutually exclusive with `set_unbuffered_data_received_callback`.
void set_buffered_request_finished_callback(BufferedRequestFinished);
using HeadersReceived = Function<void(HashMap<ByteString, ByteString, CaseInsensitiveStringTraits> const& response_headers, Optional<u32> response_code)>;
using DataReceived = Function<void(ReadonlyBytes data)>;
using RequestFinished = Function<void(bool success, u64 total_size)>;
// Configure the request such that the response data is provided unbuffered as it is received. Using this method is
// mutually exclusive with `set_buffered_request_finished_callback`.
void set_unbuffered_request_callbacks(HeadersReceived, DataReceived, RequestFinished);
/// Note: Must be set before `set_should_buffer_all_input(true)`.
Function<void(bool success, u64 total_size, HashMap<ByteString, ByteString, CaseInsensitiveStringTraits> const& response_headers, Optional<u32> response_code, ReadonlyBytes payload)> on_buffered_request_finish;
Function<void(bool success, u64 total_size)> on_finish;
Function<void(Optional<u64> total_size, u64 downloaded_size)> on_progress;
Function<void(HashMap<ByteString, ByteString, CaseInsensitiveStringTraits> const& response_headers, Optional<u32> response_code)> on_headers_received;
Function<CertificateAndKey()> on_certificate_requested;
void did_finish(Badge<RequestClient>, bool success, u64 total_size);
@ -60,11 +64,22 @@ public:
private:
explicit Request(RequestClient&, i32 request_id);
void set_up_internal_stream_data(DataReceived on_data_available);
WeakPtr<RequestClient> m_client;
int m_request_id { -1 };
RefPtr<Core::Notifier> m_write_notifier;
int m_fd { -1 };
bool m_should_buffer_all_input { false };
enum class Mode {
Buffered,
Unbuffered,
Unknown,
};
Mode m_mode { Mode::Unknown };
HeadersReceived on_headers_received;
RequestFinished on_finish;
struct InternalBufferedData {
AllocatingMemoryStream payload_stream;

View file

@ -414,7 +414,7 @@ void ResourceLoader::load(LoadRequest& request, SuccessCallback success_callback
m_active_requests.set(*protocol_request);
protocol_request->on_buffered_request_finish = [this, success_callback = move(success_callback), error_callback = move(error_callback), log_success, log_failure, request, &protocol_request = *protocol_request](bool success, auto, auto& response_headers, auto status_code, ReadonlyBytes payload) mutable {
auto on_buffered_request_finished = [this, success_callback = move(success_callback), error_callback = move(error_callback), log_success, log_failure, request, &protocol_request = *protocol_request](bool success, auto, auto& response_headers, auto status_code, ReadonlyBytes payload) mutable {
--m_pending_loads;
if (on_load_counter_change)
on_load_counter_change();
@ -446,13 +446,17 @@ void ResourceLoader::load(LoadRequest& request, SuccessCallback success_callback
m_active_requests.remove(protocol_request);
});
};
protocol_request->set_should_buffer_all_input(true);
protocol_request->set_buffered_request_finished_callback(move(on_buffered_request_finished));
protocol_request->on_certificate_requested = []() -> ResourceLoaderConnectorRequest::CertificateAndKey {
return {};
};
++m_pending_loads;
if (on_load_counter_change)
on_load_counter_change();
return;
}

View file

@ -13,6 +13,7 @@
#include <LibCore/EventReceiver.h>
#include <LibCore/Proxy.h>
#include <LibJS/SafeFunction.h>
#include <LibProtocol/Request.h>
#include <LibURL/URL.h>
#include <LibWeb/Loader/Resource.h>
#include <LibWeb/Page/Page.h>
@ -32,13 +33,16 @@ public:
ByteString key;
};
virtual void set_should_buffer_all_input(bool) = 0;
// Configure the request such that the entirety of the response data is buffered. The callback receives that data and
// the response headers all at once. Using this method is mutually exclusive with `set_unbuffered_data_received_callback`.
virtual void set_buffered_request_finished_callback(Protocol::Request::BufferedRequestFinished) = 0;
// Configure the request such that the response data is provided unbuffered as it is received. Using this method is
// mutually exclusive with `set_buffered_request_finished_callback`.
virtual void set_unbuffered_request_callbacks(Protocol::Request::HeadersReceived, Protocol::Request::DataReceived, Protocol::Request::RequestFinished) = 0;
virtual bool stop() = 0;
virtual void stream_into(Stream&) = 0;
Function<void(bool success, u64 total_size, HashMap<ByteString, ByteString, CaseInsensitiveStringTraits> const& response_headers, Optional<u32> response_code, ReadonlyBytes payload)> on_buffered_request_finish;
Function<void(bool success, u64 total_size)> on_finish;
Function<void(Optional<u64> total_size, u64 downloaded_size)> on_progress;
Function<CertificateAndKey()> on_certificate_requested;

View file

@ -19,18 +19,6 @@ ErrorOr<NonnullRefPtr<RequestServerRequestAdapter>> RequestServerRequestAdapter:
RequestServerRequestAdapter::RequestServerRequestAdapter(NonnullRefPtr<Protocol::Request> request)
: m_request(request)
{
request->on_buffered_request_finish = [weak_this = make_weak_ptr()](auto success, auto total_size, auto const& response_headers, auto response_code, auto payload) {
if (auto strong_this = weak_this.strong_ref())
if (strong_this->on_buffered_request_finish)
strong_this->on_buffered_request_finish(success, total_size, response_headers, response_code, move(payload));
};
request->on_finish = [weak_this = make_weak_ptr()](bool success, u64 total_size) {
if (auto strong_this = weak_this.strong_ref())
if (strong_this->on_finish)
strong_this->on_finish(success, total_size);
};
request->on_progress = [weak_this = make_weak_ptr()](Optional<u64> total_size, u64 downloaded_size) {
if (auto strong_this = weak_this.strong_ref())
if (strong_this->on_progress)
@ -54,9 +42,14 @@ RequestServerRequestAdapter::RequestServerRequestAdapter(NonnullRefPtr<Protocol:
RequestServerRequestAdapter::~RequestServerRequestAdapter() = default;
void RequestServerRequestAdapter::set_should_buffer_all_input(bool should_buffer_all_input)
void RequestServerRequestAdapter::set_buffered_request_finished_callback(Protocol::Request::BufferedRequestFinished on_buffered_request_finished)
{
m_request->set_should_buffer_all_input(should_buffer_all_input);
m_request->set_buffered_request_finished_callback(move(on_buffered_request_finished));
}
void RequestServerRequestAdapter::set_unbuffered_request_callbacks(Protocol::Request::HeadersReceived on_headers_received, Protocol::Request::DataReceived on_data_received, Protocol::Request::RequestFinished on_finished)
{
m_request->set_unbuffered_request_callbacks(move(on_headers_received), move(on_data_received), move(on_finished));
}
bool RequestServerRequestAdapter::stop()
@ -64,11 +57,6 @@ bool RequestServerRequestAdapter::stop()
return m_request->stop();
}
void RequestServerRequestAdapter::stream_into(Stream& stream)
{
m_request->stream_into(stream);
}
ErrorOr<NonnullRefPtr<RequestServerAdapter>> RequestServerAdapter::try_create(NonnullRefPtr<Protocol::RequestClient> protocol_client)
{
return try_make_ref_counted<RequestServerAdapter>(move(protocol_client));

View file

@ -25,11 +25,10 @@ public:
static ErrorOr<NonnullRefPtr<RequestServerRequestAdapter>> try_create(NonnullRefPtr<Protocol::Request>);
virtual ~RequestServerRequestAdapter() override;
virtual void set_should_buffer_all_input(bool) override;
virtual void set_buffered_request_finished_callback(Protocol::Request::BufferedRequestFinished) override;
virtual void set_unbuffered_request_callbacks(Protocol::Request::HeadersReceived, Protocol::Request::DataReceived, Protocol::Request::RequestFinished) override;
virtual bool stop() override;
virtual void stream_into(Stream&) override;
private:
RequestServerRequestAdapter(NonnullRefPtr<Protocol::Request>);
NonnullRefPtr<Protocol::Request> m_request;

View file

@ -105,15 +105,23 @@ ErrorOr<int> AvailablePort::update_available_ports_list_file()
URL::URL url("https://raw.githubusercontent.com/SerenityOS/serenity/master/Ports/AvailablePorts.md");
ByteString method = "GET";
outln("pkg: Syncing packages database...");
request = protocol_client->start_request(method, url, request_headers, ReadonlyBytes {}, proxy_data);
request->on_finish = [&](bool success, auto) {
auto on_data_received = [&](auto data) {
output_stream->write_until_depleted(data).release_value_but_fixme_should_propagate_errors();
};
auto on_finished = [&](bool success, auto) {
if (!success)
outln("pkg: Syncing packages database failed.");
else
outln("pkg: Syncing packages database done.");
loop.quit(success ? 0 : 1);
};
request->stream_into(*output_stream);
request->set_unbuffered_request_callbacks({}, move(on_data_received), move(on_finished));
return loop.exec();
}

View file

@ -303,10 +303,7 @@ ErrorOr<int> serenity_main(Main::Arguments arguments)
}
}
request->on_progress = [&](Optional<u64> maybe_total_size, u64 downloaded_size) {
update_progress(move(maybe_total_size), downloaded_size, false);
};
request->on_headers_received = [&](auto& response_headers, auto status_code) {
auto on_headers_received = [&](auto& response_headers, auto status_code) {
if (received_actual_headers)
return;
dbgln("Received headers! response code = {}", status_code.value_or(0));
@ -377,9 +374,6 @@ ErrorOr<int> serenity_main(Main::Arguments arguments)
following_url = true;
received_actual_headers = false;
should_save_stream_data = false;
request->on_finish = nullptr;
request->on_headers_received = nullptr;
request->on_progress = nullptr;
request->stop();
Core::deferred_invoke([&, was_following_url, url = location.value()] {
@ -395,7 +389,12 @@ ErrorOr<int> serenity_main(Main::Arguments arguments)
warnln("Request returned error {}", status_code_value);
}
};
request->on_finish = [&](bool success, u64 total_size) {
auto on_data_received = [&](auto data) {
output_stream.write_until_depleted(data).release_value_but_fixme_should_propagate_errors();
};
auto on_finished = [&](bool success, u64 total_size) {
if (following_url)
return;
@ -409,7 +408,11 @@ ErrorOr<int> serenity_main(Main::Arguments arguments)
loop.quit(0);
};
request->stream_into(output_stream);
request->set_unbuffered_request_callbacks(move(on_headers_received), move(on_data_received), move(on_finished));
request->on_progress = [&](Optional<u64> maybe_total_size, u64 downloaded_size) {
update_progress(move(maybe_total_size), downloaded_size, false);
};
};
request = protocol_client->start_request(method, url, request_headers, data.bytes(), proxy_data);