RequestServer: Handle IPC requests on multiple threads concurrently

Previously RS handled all the requests in an event loop, leading to
issues with connections being started in the middle of other connections
being started (and potentially blowing up the stack), ultimately causing
requests to be delayed because of other requests.
This commit reworks the way we handle these (specifically starting
connections) by first serialising the requests, and then performing them
in multiple threads concurrently; which yields a significant loading
performance and reliability increase.
This commit is contained in:
Ali Mohammad Pur 2024-05-08 20:15:05 +02:00 committed by Andreas Kling
parent 4ef24e1c7c
commit 57714fbb38
12 changed files with 519 additions and 264 deletions

View file

@ -33,7 +33,7 @@ target_link_libraries(RequestServer PRIVATE requestserver)
target_include_directories(requestserver PRIVATE ${SERENITY_SOURCE_DIR}/Userland/Services/)
target_include_directories(requestserver PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/..)
target_link_libraries(requestserver PUBLIC LibCore LibMain LibCrypto LibFileSystem LibGemini LibHTTP LibIPC LibMain LibTLS LibWebView LibWebSocket LibURL)
target_link_libraries(requestserver PUBLIC LibCore LibMain LibCrypto LibFileSystem LibGemini LibHTTP LibIPC LibMain LibTLS LibWebView LibWebSocket LibURL LibThreading)
if (${CMAKE_SYSTEM_NAME} MATCHES "SunOS")
# Solaris has socket and networking related functions in two extra libraries
target_link_libraries(requestserver PUBLIC nsl socket)

View file

@ -10,9 +10,9 @@
namespace Core {
ElapsedTimer ElapsedTimer::start_new()
ElapsedTimer ElapsedTimer::start_new(TimerType timer_type)
{
ElapsedTimer timer;
ElapsedTimer timer(timer_type);
timer.start();
return timer;
}

View file

@ -17,7 +17,7 @@ enum class TimerType {
class ElapsedTimer {
public:
static ElapsedTimer start_new();
static ElapsedTimer start_new(TimerType timer_type = TimerType::Coarse);
ElapsedTimer(TimerType timer_type = TimerType::Coarse)
: m_timer_type(timer_type)

View file

@ -569,7 +569,7 @@ DefaultRootCACertificates::DefaultRootCACertificates()
DefaultRootCACertificates& DefaultRootCACertificates::the()
{
static DefaultRootCACertificates s_the;
static thread_local DefaultRootCACertificates s_the;
return s_the;
}

View file

@ -26,4 +26,4 @@ set(GENERATED_SOURCES
)
serenity_bin(RequestServer)
target_link_libraries(RequestServer PRIVATE LibCore LibCrypto LibIPC LibGemini LibHTTP LibMain LibTLS LibWebSocket LibURL)
target_link_libraries(RequestServer PRIVATE LibCore LibCrypto LibIPC LibGemini LibHTTP LibMain LibTLS LibWebSocket LibURL LibThreading)

View file

@ -11,9 +11,9 @@
namespace RequestServer::ConnectionCache {
HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<Core::TCPSocket, Core::Socket>>>>> g_tcp_connection_cache {};
HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<TLS::TLSv12>>>>> g_tls_connection_cache {};
HashMap<ByteString, InferredServerProperties> g_inferred_server_properties;
Threading::RWLockProtected<HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<Core::TCPSocket, Core::Socket>>>>>> g_tcp_connection_cache {};
Threading::RWLockProtected<HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<TLS::TLSv12>>>>>> g_tls_connection_cache {};
Threading::RWLockProtected<HashMap<ByteString, InferredServerProperties>> g_inferred_server_properties;
void request_did_finish(URL::URL const& url, Core::Socket const* socket)
{
@ -26,8 +26,20 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket)
ConnectionKey partial_key { url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string(), url.port_or_default() };
auto fire_off_next_job = [&](auto& cache) {
auto it = find_if(cache.begin(), cache.end(), [&](auto& connection) { return connection.key.hostname == partial_key.hostname && connection.key.port == partial_key.port; });
if (it == cache.end()) {
using CacheType = typename RemoveCVReference<decltype(cache)>::ProtectedType;
auto [it, end] = cache.with_read_locked([&](auto const& cache) {
struct Result {
decltype(cache.begin()) it;
decltype(cache.end()) end;
};
return Result {
find_if(cache.begin(), cache.end(), [&](auto& connection) {
return connection.key.hostname == partial_key.hostname && connection.key.port == partial_key.port;
}),
cache.end(),
};
});
if (it == end) {
dbgln("Request for URL {} finished, but we don't own that!", url);
return;
}
@ -38,11 +50,16 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket)
}
auto& connection = *connection_it;
auto& properties = g_inferred_server_properties.ensure(partial_key.hostname);
if constexpr (REQUESTSERVER_DEBUG) {
connection->job_data->timing_info.performing_request = Duration::from_milliseconds(connection->job_data->timing_info.timer.elapsed_milliseconds());
connection->job_data->timing_info.timer.start();
}
auto& properties = g_inferred_server_properties.with_write_locked([&](auto& map) -> InferredServerProperties& { return map.ensure(partial_key.hostname); });
if (!connection->socket->is_open())
properties.requests_served_per_connection = min(properties.requests_served_per_connection, connection->max_queue_length + 1);
if (connection->request_queue.is_empty()) {
if (connection->request_queue.with_read_locked([](auto const& queue) { return queue.is_empty(); })) {
// Immediately mark the connection as finished, as new jobs will never be run if they are queued
// before the deferred_invoke() below runs otherwise.
connection->has_started = false;
@ -59,30 +76,47 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket)
if (ptr->has_started)
return;
dbgln_if(REQUESTSERVER_DEBUG, "Removing no-longer-used connection {} (socket {})", ptr, ptr->socket);
auto did_remove = cache_entry.remove_first_matching([&](auto& entry) { return entry == ptr; });
VERIFY(did_remove);
if (cache_entry.is_empty())
cache.remove(key);
dbgln_if(REQUESTSERVER_DEBUG, "Removing no-longer-used connection {} (socket {})", ptr, ptr->socket.ptr());
cache.with_write_locked([&](CacheType& cache) {
auto did_remove = cache_entry.remove_first_matching([&](auto& entry) { return entry == ptr; });
VERIFY(did_remove);
if (cache_entry.is_empty())
cache.remove(key);
});
});
};
connection->removal_timer->start();
});
} else {
auto timer = Core::ElapsedTimer::start_new();
if (auto result = recreate_socket_if_needed(*connection, url); result.is_error()) {
dbgln("ConnectionCache request finish handler, reconnection failed with {}", result.error());
connection->job_data.fail(Core::NetworkJob::Error::ConnectionFailed);
if constexpr (REQUESTSERVER_DEBUG) {
connection->job_data->timing_info.starting_connection += Duration::from_milliseconds(timer.elapsed_milliseconds());
}
cache.with_read_locked([&](auto&) {
dbgln("ConnectionCache request finish handler, reconnection failed with {}", result.error());
connection->job_data->fail(Core::NetworkJob::Error::ConnectionFailed);
});
return;
}
if constexpr (REQUESTSERVER_DEBUG) {
connection->job_data->timing_info.starting_connection += Duration::from_milliseconds(timer.elapsed_milliseconds());
}
connection->has_started = true;
Core::deferred_invoke([&connection = *connection, url] {
dbgln_if(REQUESTSERVER_DEBUG, "Running next job in queue for connection {}", &connection);
connection.timer.start();
connection.current_url = url;
connection.job_data = connection.request_queue.take_first();
connection.socket->set_notifications_enabled(true);
connection.job_data.start(*connection.socket);
Core::deferred_invoke([&connection = *connection, url, &cache] {
cache.with_read_locked([&](auto&) {
dbgln_if(REQUESTSERVER_DEBUG, "Running next job in queue for connection {}", &connection);
connection.timer.start();
connection.current_url = url;
connection.job_data = connection.request_queue.with_write_locked([](auto& queue) { return queue.take_first(); });
if constexpr (REQUESTSERVER_DEBUG) {
connection.job_data->timing_info.waiting_in_queue = Duration::from_milliseconds(connection.job_data->timing_info.timer.elapsed_milliseconds() - connection.job_data->timing_info.performing_request.to_milliseconds());
connection.job_data->timing_info.timer.start();
}
connection.socket->set_notifications_enabled(true);
connection.job_data->start(*connection.socket);
});
});
}
};
@ -97,28 +131,37 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket)
void dump_jobs()
{
dbgln("=========== TLS Connection Cache ==========");
for (auto& connection : g_tls_connection_cache) {
dbgln(" - {}:{}", connection.key.hostname, connection.key.port);
for (auto& entry : *connection.value) {
dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket);
dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0);
dbgln(" Request Queue:");
for (auto& job : entry->request_queue)
dbgln(" - {}", &job);
g_tls_connection_cache.with_read_locked([](auto& cache) {
dbgln("=========== TLS Connection Cache ==========");
for (auto& connection : cache) {
dbgln(" - {}:{}", connection.key.hostname, connection.key.port);
for (auto& entry : *connection.value) {
dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket.ptr());
dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0);
dbgln(" Request Queue:");
entry->request_queue.for_each_locked([](auto const& job) {
dbgln(" - {}", &job);
});
}
}
}
dbgln("=========== TCP Connection Cache ==========");
for (auto& connection : g_tcp_connection_cache) {
dbgln(" - {}:{}", connection.key.hostname, connection.key.port);
for (auto& entry : *connection.value) {
dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket);
dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0);
dbgln(" Request Queue:");
for (auto& job : entry->request_queue)
dbgln(" - {}", &job);
});
g_tcp_connection_cache.with_read_locked([](auto& cache) {
dbgln("=========== TCP Connection Cache ==========");
for (auto& connection : cache) {
dbgln(" - {}:{}", connection.key.hostname, connection.key.port);
for (auto& entry : *connection.value) {
dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket.ptr());
dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0);
dbgln(" Request Queue:");
entry->request_queue.for_each_locked([](auto const& job) {
dbgln(" - {}", &job);
});
}
}
}
});
}
size_t hits;
size_t misses;
}

View file

@ -16,6 +16,7 @@
#include <LibCore/SOCKSProxyClient.h>
#include <LibCore/Timer.h>
#include <LibTLS/TLSv12.h>
#include <LibThreading/RWLockProtected.h>
#include <LibURL/URL.h>
namespace RequestServer {
@ -53,42 +54,95 @@ struct Proxy {
}
};
struct JobData {
Function<void(Core::BufferedSocketBase&)> start {};
Function<void(Core::NetworkJob::Error)> fail {};
Function<Vector<TLS::Certificate>()> provide_client_certificates {};
struct TimingInfo {
#if REQUESTSERVER_DEBUG
bool valid { true };
Core::ElapsedTimer timer {};
URL::URL url {};
Duration waiting_in_queue {};
Duration starting_connection {};
Duration performing_request {};
#endif
} timing_info {};
JobData(Function<void(Core::BufferedSocketBase&)> start, Function<void(Core::NetworkJob::Error)> fail, Function<Vector<TLS::Certificate>()> provide_client_certificates, TimingInfo timing_info)
: start(move(start))
, fail(move(fail))
, provide_client_certificates(move(provide_client_certificates))
, timing_info(move(timing_info))
{
}
JobData(JobData&& other)
: start(move(other.start))
, fail(move(other.fail))
, provide_client_certificates(move(other.provide_client_certificates))
, timing_info(move(other.timing_info))
{
#if REQUESTSERVER_DEBUG
other.timing_info.valid = false;
#endif
}
#if REQUESTSERVER_DEBUG
~JobData()
{
if (timing_info.valid) {
dbgln("JobData for {} timings:", timing_info.url);
dbgln(" - Waiting in queue: {}ms", timing_info.waiting_in_queue.to_milliseconds());
dbgln(" - Starting connection: {}ms", timing_info.starting_connection.to_milliseconds());
dbgln(" - Performing request: {}ms", timing_info.performing_request.to_milliseconds());
}
}
#endif
template<typename T>
static JobData create(NonnullRefPtr<T> job, [[maybe_unused]] URL::URL url)
{
return JobData {
[job](auto& socket) { job->start(socket); },
[job](auto error) { job->fail(error); },
[job] {
if constexpr (requires { job->on_certificate_requested; }) {
if (job->on_certificate_requested)
return job->on_certificate_requested();
} else {
// "use" `job`, otherwise clang gets sad.
(void)job;
}
return Vector<TLS::Certificate> {};
},
{
#if REQUESTSERVER_DEBUG
.timer = Core::ElapsedTimer::start_new(Core::TimerType::Precise),
.url = move(url),
.waiting_in_queue = {},
.starting_connection = {},
.performing_request = {},
#endif
},
};
}
};
template<typename Socket, typename SocketStorageType = Socket>
struct Connection {
struct JobData {
Function<void(Core::BufferedSocketBase&)> start {};
Function<void(Core::NetworkJob::Error)> fail {};
Function<Vector<TLS::Certificate>()> provide_client_certificates {};
template<typename T>
static JobData create(NonnullRefPtr<T> job)
{
return JobData {
.start = [job](auto& socket) { job->start(socket); },
.fail = [job](auto error) { job->fail(error); },
.provide_client_certificates = [job] {
if constexpr (requires { job->on_certificate_requested; }) {
if (job->on_certificate_requested)
return job->on_certificate_requested();
} else {
// "use" `job`, otherwise clang gets sad.
(void)job;
}
return Vector<TLS::Certificate> {}; },
};
}
};
using QueueType = Vector<JobData>;
using SocketType = Socket;
using StorageType = SocketStorageType;
NonnullOwnPtr<Core::BufferedSocket<SocketStorageType>> socket;
QueueType request_queue;
OwnPtr<Core::BufferedSocket<SocketStorageType>> socket;
Threading::RWLockProtected<QueueType> request_queue;
NonnullRefPtr<Core::Timer> removal_timer;
Atomic<bool> is_being_started { false };
bool has_started { false };
URL::URL current_url {};
Core::ElapsedTimer timer {};
JobData job_data {};
Optional<JobData> job_data {};
Proxy proxy {};
size_t max_queue_length { 0 };
};
@ -117,15 +171,16 @@ struct InferredServerProperties {
size_t requests_served_per_connection { NumericLimits<size_t>::max() };
};
extern HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<Core::TCPSocket, Core::Socket>>>>> g_tcp_connection_cache;
extern HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<TLS::TLSv12>>>>> g_tls_connection_cache;
extern HashMap<ByteString, InferredServerProperties> g_inferred_server_properties;
extern Threading::RWLockProtected<HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<Core::TCPSocket, Core::Socket>>>>>> g_tcp_connection_cache;
extern Threading::RWLockProtected<HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<TLS::TLSv12>>>>>> g_tls_connection_cache;
extern Threading::RWLockProtected<HashMap<ByteString, InferredServerProperties>> g_inferred_server_properties;
void request_did_finish(URL::URL const&, Core::Socket const*);
void dump_jobs();
constexpr static size_t MaxConcurrentConnectionsPerURL = 4;
constexpr static size_t ConnectionKeepAliveTimeMilliseconds = 10'000;
constexpr static size_t ConnectionKeepAliveTimeMilliseconds = 20'000;
constexpr static size_t ConnectionCacheQueueHighWatermark = 4;
template<typename T>
ErrorOr<void> recreate_socket_if_needed(T& connection, URL::URL const& url)
@ -134,6 +189,7 @@ ErrorOr<void> recreate_socket_if_needed(T& connection, URL::URL const& url)
using SocketStorageType = typename T::StorageType;
if (!connection.socket->is_open() || connection.socket->is_eof()) {
connection.socket = nullptr;
// Create another socket for the connection.
auto set_socket = [&](NonnullOwnPtr<SocketStorageType>&& socket) -> ErrorOr<void> {
connection.socket = TRY(Core::BufferedSocket<SocketStorageType>::create(move(socket)));
@ -151,52 +207,82 @@ ErrorOr<void> recreate_socket_if_needed(T& connection, URL::URL const& url)
else
reason = Core::NetworkJob::Error::TransmissionFailed;
if (connection.job_data.fail)
connection.job_data.fail(reason);
if (connection.job_data->fail)
connection.job_data->fail(reason);
});
options.set_certificate_provider([&connection]() -> Vector<TLS::Certificate> {
if (connection.job_data.provide_client_certificates)
return connection.job_data.provide_client_certificates();
if (connection.job_data->provide_client_certificates)
return connection.job_data->provide_client_certificates();
return {};
});
TRY(set_socket(TRY((connection.proxy.template tunnel<SocketType, SocketStorageType>(url, move(options))))));
} else {
TRY(set_socket(TRY((connection.proxy.template tunnel<SocketType, SocketStorageType>(url)))));
}
dbgln_if(REQUESTSERVER_DEBUG, "Creating a new socket for {} -> {}", url, connection.socket);
dbgln_if(REQUESTSERVER_DEBUG, "Creating a new socket for {} -> {}", url, connection.socket.ptr());
}
return {};
}
decltype(auto) get_or_create_connection(auto& cache, URL::URL const& url, auto job, Core::ProxyData proxy_data = {})
extern size_t hits;
extern size_t misses;
template<typename Cache>
void start_connection(const URL::URL& url, auto job, auto& sockets_for_url, size_t index, Duration, Cache&);
void ensure_connection(auto& cache, const URL::URL& url, auto job, Core::ProxyData proxy_data = {})
{
using CacheEntryType = RemoveCVReference<decltype(*cache.begin()->value)>;
using CacheEntryType = RemoveCVReference<decltype(*declval<typename RemoveCVReference<decltype(cache)>::ProtectedType>().begin()->value)>;
auto hostname = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string();
auto& properties = g_inferred_server_properties.ensure(hostname);
auto& properties = g_inferred_server_properties.with_write_locked([&](auto& map) -> InferredServerProperties& { return map.ensure(hostname); });
auto& sockets_for_url = *cache.ensure({ move(hostname), url.port_or_default(), proxy_data }, [] { return make<CacheEntryType>(); });
auto& sockets_for_url = *cache.with_write_locked([&](auto& map) -> NonnullOwnPtr<CacheEntryType>& {
return map.ensure({ move(hostname), url.port_or_default(), proxy_data }, [] { return make<CacheEntryType>(); });
});
Proxy proxy { proxy_data };
using ReturnType = decltype(sockets_for_url[0].ptr());
// Find the connection with an empty queue; if none exist, we'll find the least backed-up connection later.
// Note that servers that are known to serve a single request per connection (e.g. HTTP/1.0) usually have
// issues with concurrent connections, so we'll only allow one connection per URL in that case to avoid issues.
// This is a bit too aggressive, but there's no way to know if the server can handle concurrent connections
// without trying it out first, and that's not worth the effort as HTTP/1.0 is a legacy protocol anyway.
auto it = sockets_for_url.find_if([&](auto& connection) { return properties.requests_served_per_connection < 2 || connection->request_queue.is_empty(); });
auto it = sockets_for_url.find_if([&](auto const& connection) {
return properties.requests_served_per_connection < 2
|| connection->request_queue.with_read_locked([](auto const& queue) { return queue.size(); }) <= ConnectionCacheQueueHighWatermark;
});
auto did_add_new_connection = false;
auto failed_to_find_a_socket = it.is_end();
if (failed_to_find_a_socket && sockets_for_url.size() < ConnectionCache::MaxConcurrentConnectionsPerURL) {
using ConnectionType = RemoveCVReference<decltype(*cache.begin()->value->at(0))>;
Proxy proxy { proxy_data };
size_t index;
auto timer = Core::ElapsedTimer::start_new();
if (failed_to_find_a_socket && sockets_for_url.size() < MaxConcurrentConnectionsPerURL) {
using ConnectionType = RemoveCVReference<decltype(*AK::Detail::declval<CacheEntryType>().at(0))>;
auto& connection = cache.with_write_locked([&](auto&) -> ConnectionType& {
index = sockets_for_url.size();
sockets_for_url.append(AK::make<ConnectionType>(
nullptr,
typename ConnectionType::QueueType {},
Core::Timer::create_single_shot(ConnectionKeepAliveTimeMilliseconds, nullptr),
true));
auto& connection = sockets_for_url.last();
connection->proxy = move(proxy);
return *connection;
});
ScopeGuard start_guard = [&] {
connection.is_being_started = false;
};
dbgln_if(REQUESTSERVER_DEBUG, "I will start a connection ({}) for URL {}", &connection, url);
auto connection_result = proxy.tunnel<typename ConnectionType::SocketType, typename ConnectionType::StorageType>(url);
misses++;
if (connection_result.is_error()) {
dbgln("ConnectionCache: Connection to {} failed: {}", url, connection_result.error());
Core::deferred_invoke([job] {
job->fail(Core::NetworkJob::Error::ConnectionFailed);
});
return ReturnType { nullptr };
return;
}
auto socket_result = Core::BufferedSocket<typename ConnectionType::StorageType>::create(connection_result.release_value());
if (socket_result.is_error()) {
@ -204,25 +290,21 @@ decltype(auto) get_or_create_connection(auto& cache, URL::URL const& url, auto j
Core::deferred_invoke([job] {
job->fail(Core::NetworkJob::Error::ConnectionFailed);
});
return ReturnType { nullptr };
return;
}
sockets_for_url.append(make<ConnectionType>(
socket_result.release_value(),
typename ConnectionType::QueueType {},
Core::Timer::create_single_shot(ConnectionKeepAliveTimeMilliseconds, nullptr)));
sockets_for_url.last()->proxy = move(proxy);
did_add_new_connection = true;
connection.socket = socket_result.release_value();
}
size_t index;
auto elapsed = Duration::from_milliseconds(timer.elapsed_milliseconds());
if (failed_to_find_a_socket) {
if (did_add_new_connection) {
index = sockets_for_url.size() - 1;
} else {
if (!did_add_new_connection) {
// Find the least backed-up connection (based on how many entries are in their request queue).
index = 0;
auto min_queue_size = (size_t)-1;
for (auto it = sockets_for_url.begin(); it != sockets_for_url.end(); ++it) {
if (auto queue_size = (*it)->request_queue.size(); min_queue_size > queue_size) {
if (auto queue_size = (*it)->request_queue.with_read_locked([](auto const& queue) { return queue.size(); }); min_queue_size > queue_size) {
index = it.index();
min_queue_size = queue_size;
}
@ -230,39 +312,76 @@ decltype(auto) get_or_create_connection(auto& cache, URL::URL const& url, auto j
}
} else {
index = it.index();
hits++;
}
dbgln_if(REQUESTSERVER_DEBUG, "ConnectionCache: Hits: {}, Misses: {}", RequestServer::ConnectionCache::hits, RequestServer::ConnectionCache::misses);
start_connection(url, job, sockets_for_url, index, elapsed, cache);
}
template<typename Cache>
void start_connection(URL::URL const& url, auto job, auto& sockets_for_url, size_t index, Duration setup_time, Cache& cache)
{
if (sockets_for_url.is_empty()) {
Core::deferred_invoke([job] {
job->fail(Core::NetworkJob::Error::ConnectionFailed);
});
return ReturnType { nullptr };
return;
}
auto& connection = *sockets_for_url[index];
if (connection.is_being_started) {
// Someone else is creating the connection, queue the job and let them handle it.
dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket.ptr());
auto size = connection.request_queue.with_write_locked([&](auto& queue) {
queue.append(JobData::create(job, url));
return queue.size();
});
connection.max_queue_length = max(connection.max_queue_length, size);
return;
}
if (!connection.has_started) {
connection.has_started = true;
Core::deferred_invoke([&connection, url, job] {
Core::deferred_invoke([&connection, &cache, url, job, setup_time] {
(void)setup_time;
auto job_data = JobData::create(job, url);
if constexpr (REQUESTSERVER_DEBUG) {
job_data.timing_info.waiting_in_queue = Duration::from_milliseconds(job_data.timing_info.timer.elapsed_milliseconds());
job_data.timing_info.timer.start();
}
if (auto result = recreate_socket_if_needed(connection, url); result.is_error()) {
dbgln("ConnectionCache: request failed to start, failed to make a socket: {}", result.error());
dbgln_if(REQUESTSERVER_DEBUG, "ConnectionCache: request failed to start, failed to make a socket: {}", result.error());
if constexpr (REQUESTSERVER_DEBUG) {
job_data.timing_info.starting_connection += Duration::from_milliseconds(job_data.timing_info.timer.elapsed_milliseconds()) + setup_time;
job_data.timing_info.timer.start();
}
Core::deferred_invoke([job] {
job->fail(Core::NetworkJob::Error::ConnectionFailed);
});
} else {
dbgln_if(REQUESTSERVER_DEBUG, "Immediately start request for url {} in {} - {}", url, &connection, connection.socket);
connection.removal_timer->stop();
connection.timer.start();
connection.current_url = url;
connection.job_data = decltype(connection.job_data)::create(job);
connection.socket->set_notifications_enabled(true);
connection.job_data.start(*connection.socket);
cache.with_write_locked([&](auto&) {
dbgln_if(REQUESTSERVER_DEBUG, "Immediately start request for url {} in {} - {}", url, &connection, connection.socket.ptr());
connection.job_data = move(job_data);
if constexpr (REQUESTSERVER_DEBUG) {
connection.job_data->timing_info.starting_connection += Duration::from_milliseconds(connection.job_data->timing_info.timer.elapsed_milliseconds()) + setup_time;
connection.job_data->timing_info.timer.start();
}
connection.removal_timer->stop();
connection.timer.start();
connection.current_url = url;
connection.socket->set_notifications_enabled(true);
connection.job_data->start(*connection.socket);
});
}
});
} else {
dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket);
connection.request_queue.append(decltype(connection.job_data)::create(job));
connection.max_queue_length = max(connection.max_queue_length, connection.request_queue.size());
dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket.ptr());
auto size = connection.request_queue.with_write_locked([&](auto& queue) {
queue.append(JobData::create(job, url));
return queue.size();
});
connection.max_queue_length = max(connection.max_queue_length, size);
}
return &connection;
}
}

View file

@ -26,121 +26,11 @@ static IDAllocator s_client_ids;
ConnectionFromClient::ConnectionFromClient(NonnullOwnPtr<Core::LocalSocket> socket)
: IPC::ConnectionFromClient<RequestClientEndpoint, RequestServerEndpoint>(*this, move(socket), s_client_ids.allocate())
, m_thread_pool([this](Work work) { worker_do_work(move(work)); })
{
s_connections.set(client_id(), *this);
}
void ConnectionFromClient::die()
{
auto client_id = this->client_id();
s_connections.remove(client_id);
s_client_ids.deallocate(client_id);
if (s_connections.is_empty())
Core::EventLoop::current().quit(0);
}
Messages::RequestServer::ConnectNewClientResponse ConnectionFromClient::connect_new_client()
{
int socket_fds[2] {};
if (auto err = Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, socket_fds); err.is_error()) {
dbgln("Failed to create client socketpair: {}", err.error());
return IPC::File {};
}
auto client_socket_or_error = Core::LocalSocket::adopt_fd(socket_fds[0]);
if (client_socket_or_error.is_error()) {
close(socket_fds[0]);
close(socket_fds[1]);
dbgln("Failed to adopt client socket: {}", client_socket_or_error.error());
return IPC::File {};
}
auto client_socket = client_socket_or_error.release_value();
// Note: A ref is stored in the static s_connections map
auto client = adopt_ref(*new ConnectionFromClient(move(client_socket)));
return IPC::File::adopt_fd(socket_fds[1]);
}
Messages::RequestServer::IsSupportedProtocolResponse ConnectionFromClient::is_supported_protocol(ByteString const& protocol)
{
bool supported = Protocol::find_by_name(protocol.to_lowercase());
return supported;
}
void ConnectionFromClient::start_request(i32 request_id, ByteString const& method, URL::URL const& url, HashMap<ByteString, ByteString> const& request_headers, ByteBuffer const& request_body, Core::ProxyData const& proxy_data)
{
if (!url.is_valid()) {
dbgln("StartRequest: Invalid URL requested: '{}'", url);
(void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0));
return;
}
auto* protocol = Protocol::find_by_name(url.scheme().to_byte_string());
if (!protocol) {
dbgln("StartRequest: No protocol handler for URL: '{}'", url);
(void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0));
return;
}
auto request = protocol->start_request(request_id, *this, method, url, request_headers, request_body, proxy_data);
if (!request) {
dbgln("StartRequest: Protocol handler failed to start request: '{}'", url);
(void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0));
return;
}
auto id = request->id();
auto fd = request->request_fd();
m_requests.set(id, move(request));
(void)post_message(Messages::RequestClient::RequestStarted(request_id, IPC::File::adopt_fd(fd)));
}
Messages::RequestServer::StopRequestResponse ConnectionFromClient::stop_request(i32 request_id)
{
auto* request = const_cast<Request*>(m_requests.get(request_id).value_or(nullptr));
bool success = false;
if (request) {
request->stop();
m_requests.remove(request_id);
success = true;
}
return success;
}
void ConnectionFromClient::did_receive_headers(Badge<Request>, Request& request)
{
auto response_headers = request.response_headers().clone().release_value_but_fixme_should_propagate_errors();
async_headers_became_available(request.id(), move(response_headers), request.status_code());
}
void ConnectionFromClient::did_finish_request(Badge<Request>, Request& request, bool success)
{
if (request.total_size().has_value())
async_request_finished(request.id(), success, request.total_size().value());
m_requests.remove(request.id());
}
void ConnectionFromClient::did_progress_request(Badge<Request>, Request& request)
{
async_request_progress(request.id(), request.total_size(), request.downloaded_size());
}
void ConnectionFromClient::did_request_certificates(Badge<Request>, Request& request)
{
async_certificate_requested(request.id());
}
Messages::RequestServer::SetCertificateResponse ConnectionFromClient::set_certificate(i32 request_id, ByteString const& certificate, ByteString const& key)
{
auto* request = const_cast<Request*>(m_requests.get(request_id).value_or(nullptr));
bool success = false;
if (request) {
request->set_certificate(certificate, key);
success = true;
}
return success;
}
class Job : public RefCounted<Job>
, public Weakable<Job> {
public:
@ -183,6 +73,195 @@ private:
inline static HashMap<URL::URL, WeakPtr<Job>> s_jobs {};
};
template<typename Pool>
IterationDecision ConnectionFromClient::Looper<Pool>::next(Pool& pool, bool wait)
{
bool should_exit = false;
auto timer = Core::Timer::create_repeating(100, [&] {
if (Threading::ThreadPoolLooper<Pool>::next(pool, false) == IterationDecision::Break) {
event_loop.quit(0);
should_exit = true;
}
});
timer->start();
if (!wait) {
event_loop.deferred_invoke([&] {
event_loop.quit(0);
});
}
event_loop.exec();
if (should_exit)
return IterationDecision::Break;
return IterationDecision::Continue;
}
void ConnectionFromClient::worker_do_work(Work work)
{
work.visit(
[&](StartRequest& start_request) {
auto* protocol = Protocol::find_by_name(start_request.url.scheme().to_byte_string());
if (!protocol) {
dbgln("StartRequest: No protocol handler for URL: '{}'", start_request.url);
(void)post_message(Messages::RequestClient::RequestFinished(start_request.request_id, false, 0));
return;
}
auto request = protocol->start_request(start_request.request_id, *this, start_request.method, start_request.url, start_request.request_headers, start_request.request_body, start_request.proxy_data);
if (!request) {
dbgln("StartRequest: Protocol handler failed to start request: '{}'", start_request.url);
(void)post_message(Messages::RequestClient::RequestFinished(start_request.request_id, false, 0));
return;
}
auto id = request->id();
auto fd = request->request_fd();
m_requests.with_locked([&](auto& map) { map.set(id, move(request)); });
(void)post_message(Messages::RequestClient::RequestStarted(start_request.request_id, IPC::File::adopt_fd(fd)));
},
[&](EnsureConnection& ensure_connection) {
auto& url = ensure_connection.url;
auto& cache_level = ensure_connection.cache_level;
if (cache_level == CacheLevel::ResolveOnly) {
Core::deferred_invoke([host = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string()] {
dbgln("EnsureConnection: DNS-preload for {}", host);
auto resolved_host = Core::Socket::resolve_host(host, Core::Socket::SocketType::Stream);
if (resolved_host.is_error())
dbgln("EnsureConnection: DNS-preload failed for {}", host);
});
dbgln("EnsureConnection: DNS-preload for {} done", url);
return;
}
auto job = Job::ensure(url);
dbgln("EnsureConnection: Pre-connect to {}", url);
auto do_preconnect = [&](auto& cache) {
ConnectionCache::ensure_connection(cache, url, job);
};
if (url.scheme() == "http"sv)
do_preconnect(ConnectionCache::g_tcp_connection_cache);
else if (url.scheme() == "https"sv)
do_preconnect(ConnectionCache::g_tls_connection_cache);
else
dbgln("EnsureConnection: Invalid URL scheme: '{}'", url.scheme());
},
[&](Empty) {});
}
void ConnectionFromClient::die()
{
auto client_id = this->client_id();
s_connections.remove(client_id);
s_client_ids.deallocate(client_id);
if (s_connections.is_empty())
Core::EventLoop::current().quit(0);
}
Messages::RequestServer::ConnectNewClientResponse ConnectionFromClient::connect_new_client()
{
int socket_fds[2] {};
if (auto err = Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, socket_fds); err.is_error()) {
dbgln("Failed to create client socketpair: {}", err.error());
return IPC::File {};
}
auto client_socket_or_error = Core::LocalSocket::adopt_fd(socket_fds[0]);
if (client_socket_or_error.is_error()) {
close(socket_fds[0]);
close(socket_fds[1]);
dbgln("Failed to adopt client socket: {}", client_socket_or_error.error());
return IPC::File {};
}
auto client_socket = client_socket_or_error.release_value();
// Note: A ref is stored in the static s_connections map
auto client = adopt_ref(*new ConnectionFromClient(move(client_socket)));
return IPC::File::adopt_fd(socket_fds[1]);
}
void ConnectionFromClient::enqueue(Work work)
{
m_thread_pool.submit(move(work));
}
Messages::RequestServer::IsSupportedProtocolResponse ConnectionFromClient::is_supported_protocol(ByteString const& protocol)
{
bool supported = Protocol::find_by_name(protocol.to_lowercase());
return supported;
}
void ConnectionFromClient::start_request(i32 request_id, ByteString const& method, URL::URL const& url, HashMap<ByteString, ByteString> const& request_headers, ByteBuffer const& request_body, Core::ProxyData const& proxy_data)
{
if (!url.is_valid()) {
dbgln("StartRequest: Invalid URL requested: '{}'", url);
(void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0));
return;
}
enqueue(StartRequest {
.request_id = request_id,
.method = method,
.url = url,
.request_headers = request_headers,
.request_body = request_body,
.proxy_data = proxy_data,
});
}
Messages::RequestServer::StopRequestResponse ConnectionFromClient::stop_request(i32 request_id)
{
return m_requests.with_locked([&](auto& map) {
auto* request = const_cast<Request*>(map.get(request_id).value_or(nullptr));
bool success = false;
if (request) {
request->stop();
map.remove(request_id);
success = true;
}
return success;
});
}
void ConnectionFromClient::did_receive_headers(Badge<Request>, Request& request)
{
auto response_headers = request.response_headers().clone().release_value_but_fixme_should_propagate_errors();
async_headers_became_available(request.id(), move(response_headers), request.status_code());
}
void ConnectionFromClient::did_finish_request(Badge<Request>, Request& request, bool success)
{
if (request.total_size().has_value())
async_request_finished(request.id(), success, request.total_size().value());
m_requests.with_locked([&](auto& map) { map.remove(request.id()); });
}
void ConnectionFromClient::did_progress_request(Badge<Request>, Request& request)
{
async_request_progress(request.id(), request.total_size(), request.downloaded_size());
}
void ConnectionFromClient::did_request_certificates(Badge<Request>, Request& request)
{
async_certificate_requested(request.id());
}
Messages::RequestServer::SetCertificateResponse ConnectionFromClient::set_certificate(i32 request_id, ByteString const& certificate, ByteString const& key)
{
return m_requests.with_locked([&](auto& map) {
auto* request = const_cast<Request*>(map.get(request_id).value_or(nullptr));
bool success = false;
if (request) {
request->set_certificate(certificate, key);
success = true;
}
return success;
});
}
void ConnectionFromClient::ensure_connection(URL::URL const& url, ::RequestServer::CacheLevel const& cache_level)
{
if (!url.is_valid()) {
@ -190,30 +269,10 @@ void ConnectionFromClient::ensure_connection(URL::URL const& url, ::RequestServe
return;
}
if (cache_level == CacheLevel::ResolveOnly) {
return Core::deferred_invoke([host = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string()] {
dbgln("EnsureConnection: DNS-preload for {}", host);
auto resolved_host = Core::Socket::resolve_host(host, Core::Socket::SocketType::Stream);
if (resolved_host.is_error())
dbgln("EnsureConnection: DNS-preload failed for {}", host);
});
}
auto job = Job::ensure(url);
dbgln("EnsureConnection: Pre-connect to {}", url);
auto do_preconnect = [&](auto& cache) {
auto serialized_host = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string();
auto it = cache.find({ serialized_host, url.port_or_default() });
if (it == cache.end() || it->value->is_empty())
ConnectionCache::get_or_create_connection(cache, url, job);
};
if (url.scheme() == "http"sv)
do_preconnect(ConnectionCache::g_tcp_connection_cache);
else if (url.scheme() == "https"sv)
do_preconnect(ConnectionCache::g_tls_connection_cache);
else
dbgln("EnsureConnection: Invalid URL scheme: '{}'", url.scheme());
enqueue(EnsureConnection {
.url = url,
.cache_level = cache_level,
});
}
static i32 s_next_websocket_id = 1;

View file

@ -7,7 +7,10 @@
#pragma once
#include <AK/HashMap.h>
#include <LibCore/SharedCircularQueue.h>
#include <LibIPC/ConnectionFromClient.h>
#include <LibThreading/MutexProtected.h>
#include <LibThreading/ThreadPool.h>
#include <LibWebSocket/WebSocket.h>
#include <RequestServer/Forward.h>
#include <RequestServer/RequestClientEndpoint.h>
@ -46,8 +49,36 @@ private:
virtual void websocket_close(i32, u16, ByteString const&) override;
virtual Messages::RequestServer::WebsocketSetCertificateResponse websocket_set_certificate(i32, ByteString const&, ByteString const&) override;
HashMap<i32, OwnPtr<Request>> m_requests;
struct StartRequest {
i32 request_id;
ByteString method;
URL::URL url;
HashMap<ByteString, ByteString> request_headers;
ByteBuffer request_body;
Core::ProxyData proxy_data;
};
struct EnsureConnection {
URL::URL url;
CacheLevel cache_level;
};
using Work = Variant<StartRequest, EnsureConnection, Empty>;
void worker_do_work(Work);
Threading::MutexProtected<HashMap<i32, OwnPtr<Request>>> m_requests;
HashMap<i32, RefPtr<WebSocket::WebSocket>> m_websockets;
void enqueue(Work);
template<typename Pool>
struct Looper : public Threading::ThreadPoolLooper<Pool> {
IterationDecision next(Pool& pool, bool wait);
Core::EventLoop event_loop;
};
Threading::ThreadPool<Work, Looper> m_thread_pool;
};
}

View file

@ -32,7 +32,7 @@ OwnPtr<Request> GeminiProtocol::start_request(i32 request_id, ConnectionFromClie
protocol_request->set_request_fd(pipe_result.value().read_fd);
Core::EventLoop::current().deferred_invoke([=] {
ConnectionCache::get_or_create_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data);
ConnectionCache::ensure_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data);
});
return protocol_request;

View file

@ -102,10 +102,12 @@ OwnPtr<Request> start_request(TBadgedProtocol&& protocol, i32 request_id, Connec
auto protocol_request = TRequest::create_with_job(forward<TBadgedProtocol>(protocol), client, (TJob&)*job, move(output_stream), request_id);
protocol_request->set_request_fd(pipe_result.value().read_fd);
if constexpr (IsSame<typename TBadgedProtocol::Type, HttpsProtocol>)
ConnectionCache::get_or_create_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data);
else
ConnectionCache::get_or_create_connection(ConnectionCache::g_tcp_connection_cache, url, job, proxy_data);
Core::deferred_invoke([=] {
if constexpr (IsSame<typename TBadgedProtocol::Type, HttpsProtocol>)
ConnectionCache::ensure_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data);
else
ConnectionCache::ensure_connection(ConnectionCache::g_tcp_connection_cache, url, job, proxy_data);
});
return protocol_request;
}

View file

@ -20,18 +20,18 @@
ErrorOr<int> serenity_main(Main::Arguments)
{
if constexpr (TLS_SSL_KEYLOG_DEBUG)
TRY(Core::System::pledge("stdio inet accept unix cpath wpath rpath sendfd recvfd sigaction"));
TRY(Core::System::pledge("stdio inet accept thread unix cpath wpath rpath sendfd recvfd sigaction"));
else
TRY(Core::System::pledge("stdio inet accept unix rpath sendfd recvfd sigaction"));
TRY(Core::System::pledge("stdio inet accept thread unix rpath sendfd recvfd sigaction"));
#ifdef SIGINFO
signal(SIGINFO, [](int) { RequestServer::ConnectionCache::dump_jobs(); });
#endif
if constexpr (TLS_SSL_KEYLOG_DEBUG)
TRY(Core::System::pledge("stdio inet accept unix cpath wpath rpath sendfd recvfd"));
TRY(Core::System::pledge("stdio inet accept thread unix cpath wpath rpath sendfd recvfd"));
else
TRY(Core::System::pledge("stdio inet accept unix rpath sendfd recvfd"));
TRY(Core::System::pledge("stdio inet accept thread unix rpath sendfd recvfd"));
// Ensure the certificates are read out here.
// FIXME: Allow specifying extra certificates on the command line, or in other configuration.
@ -40,6 +40,7 @@ ErrorOr<int> serenity_main(Main::Arguments)
Core::EventLoop event_loop;
// FIXME: Establish a connection to LookupServer and then drop "unix"?
TRY(Core::System::unveil("/tmp/portal/lookup", "rw"));
TRY(Core::System::unveil("/etc/cacert.pem", "rw"));
TRY(Core::System::unveil("/etc/timezone", "r"));
if constexpr (TLS_SSL_KEYLOG_DEBUG)
TRY(Core::System::unveil("/home/anon", "rwc"));