mirror of
https://github.com/SerenityOS/serenity
synced 2024-10-04 23:19:27 +00:00
LibIPC: Allow sync messages to completely block the event loop
The expectation with these messages is that no other code is allowed to run while we're waiting for the response to arrive. Not meeting this requirement did not degrade any (observable) functionality, however.
This commit is contained in:
parent
3aa1bd520b
commit
3b05c81070
|
@ -36,19 +36,19 @@ void ConnectionBase::set_deferred_invoker(NonnullOwnPtr<DeferredInvoker> deferre
|
|||
m_deferred_invoker = move(deferred_invoker);
|
||||
}
|
||||
|
||||
ErrorOr<void> ConnectionBase::post_message(Message const& message)
|
||||
ErrorOr<void> ConnectionBase::post_message(Message const& message, MessageKind kind)
|
||||
{
|
||||
return post_message(TRY(message.encode()));
|
||||
return post_message(TRY(message.encode()), kind);
|
||||
}
|
||||
|
||||
ErrorOr<void> ConnectionBase::post_message(MessageBuffer buffer)
|
||||
ErrorOr<void> ConnectionBase::post_message(MessageBuffer buffer, MessageKind kind)
|
||||
{
|
||||
// NOTE: If this connection is being shut down, but has not yet been destroyed,
|
||||
// the socket will be closed. Don't try to send more messages.
|
||||
if (!m_socket->is_open())
|
||||
return Error::from_string_literal("Trying to post_message during IPC shutdown");
|
||||
|
||||
if (auto result = buffer.transfer_message(*m_socket); result.is_error()) {
|
||||
if (auto result = buffer.transfer_message(*m_socket, kind == MessageKind::Sync); result.is_error()) {
|
||||
shutdown_with_error(result.error());
|
||||
return result.release_error();
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ void ConnectionBase::handle_messages()
|
|||
}
|
||||
|
||||
if (auto response = handler_result.release_value()) {
|
||||
if (auto post_result = post_message(*response); post_result.is_error()) {
|
||||
if (auto post_result = post_message(*response, MessageKind::Async); post_result.is_error()) {
|
||||
dbgln("IPC::ConnectionBase::handle_messages: {}", post_result.error());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,11 @@ public:
|
|||
DeferredInvoker& deferred_invoker() { return *m_deferred_invoker; }
|
||||
|
||||
bool is_open() const { return m_socket->is_open(); }
|
||||
ErrorOr<void> post_message(Message const&);
|
||||
enum class MessageKind {
|
||||
Async,
|
||||
Sync,
|
||||
};
|
||||
ErrorOr<void> post_message(Message const&, MessageKind = MessageKind::Async);
|
||||
|
||||
void shutdown();
|
||||
virtual void die() { }
|
||||
|
@ -64,7 +68,7 @@ protected:
|
|||
ErrorOr<Vector<u8>> read_as_much_as_possible_from_socket_without_blocking();
|
||||
ErrorOr<void> drain_messages_from_peer();
|
||||
|
||||
ErrorOr<void> post_message(MessageBuffer);
|
||||
ErrorOr<void> post_message(MessageBuffer, MessageKind);
|
||||
void handle_messages();
|
||||
|
||||
IPC::Stub& m_local_stub;
|
||||
|
@ -105,7 +109,7 @@ public:
|
|||
template<typename RequestType, typename... Args>
|
||||
NonnullOwnPtr<typename RequestType::ResponseType> send_sync(Args&&... args)
|
||||
{
|
||||
MUST(post_message(RequestType(forward<Args>(args)...)));
|
||||
MUST(post_message(RequestType(forward<Args>(args)...), MessageKind::Sync));
|
||||
auto response = wait_for_specific_endpoint_message<typename RequestType::ResponseType, PeerEndpoint>();
|
||||
VERIFY(response);
|
||||
return response.release_nonnull();
|
||||
|
@ -114,7 +118,7 @@ public:
|
|||
template<typename RequestType, typename... Args>
|
||||
OwnPtr<typename RequestType::ResponseType> send_sync_but_allow_failure(Args&&... args)
|
||||
{
|
||||
if (post_message(RequestType(forward<Args>(args)...)).is_error())
|
||||
if (post_message(RequestType(forward<Args>(args)...), MessageKind::Sync).is_error())
|
||||
return nullptr;
|
||||
return wait_for_specific_endpoint_message<typename RequestType::ResponseType, PeerEndpoint>();
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ ErrorOr<void> MessageBuffer::append_file_descriptor(int fd)
|
|||
return {};
|
||||
}
|
||||
|
||||
ErrorOr<void> MessageBuffer::transfer_message(Core::LocalSocket& socket)
|
||||
ErrorOr<void> MessageBuffer::transfer_message(Core::LocalSocket& socket, bool block_event_loop)
|
||||
{
|
||||
Checked<MessageSizeType> checked_message_size { m_data.size() };
|
||||
checked_message_size -= sizeof(MessageSizeType);
|
||||
|
@ -77,7 +77,10 @@ ErrorOr<void> MessageBuffer::transfer_message(Core::LocalSocket& socket)
|
|||
// FIXME: This is a hacky way to at least not crash on large messages
|
||||
// The limit of 100 writes is arbitrary, and there to prevent indefinite spinning on the EventLoop
|
||||
if ((error.code() == EAGAIN || error.code() == EMSGSIZE) && writes_done < 100) {
|
||||
Core::EventLoop::current().pump();
|
||||
if (block_event_loop)
|
||||
sched_yield();
|
||||
else
|
||||
Core::EventLoop::current().pump();
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ public:
|
|||
|
||||
ErrorOr<void> append_file_descriptor(int fd);
|
||||
|
||||
ErrorOr<void> transfer_message(Core::LocalSocket& socket);
|
||||
ErrorOr<void> transfer_message(Core::LocalSocket& socket, bool block_event_loop = false);
|
||||
|
||||
private:
|
||||
Vector<u8, 1024> m_data;
|
||||
|
|
Loading…
Reference in a new issue