server: Introduce pipe message queue.

Signed-off-by: Jacek Caban <jacek@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Jacek Caban 2017-03-21 13:03:03 +01:00 committed by Alexandre Julliard
parent 888f2c76dd
commit 9db22fdfcf

View file

@ -65,6 +65,14 @@ enum pipe_state
struct named_pipe; struct named_pipe;
struct pipe_message
{
struct list entry; /* entry in message queue */
data_size_t read_pos; /* already read bytes */
struct iosb *iosb; /* message iosb */
struct async *async; /* async of pending write */
};
struct pipe_end struct pipe_end
{ {
struct object obj; /* object header */ struct object obj; /* object header */
@ -72,6 +80,7 @@ struct pipe_end
unsigned int flags; /* pipe flags */ unsigned int flags; /* pipe flags */
struct pipe_end *connection; /* the other end of the pipe */ struct pipe_end *connection; /* the other end of the pipe */
data_size_t buffer_size;/* size of buffered data that doesn't block caller */ data_size_t buffer_size;/* size of buffered data that doesn't block caller */
struct list message_queue;
}; };
struct pipe_server struct pipe_server
@ -379,6 +388,13 @@ static void notify_empty( struct pipe_server *server )
fd_async_wake_up( server->pipe_end.fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); fd_async_wake_up( server->pipe_end.fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
} }
static void free_message( struct pipe_message *message )
{
list_remove( &message->entry );
if (message->iosb) release_object( message->iosb );
free( message );
}
static void pipe_end_disconnect( struct pipe_end *pipe_end, unsigned int status ) static void pipe_end_disconnect( struct pipe_end *pipe_end, unsigned int status )
{ {
struct pipe_end *connection = pipe_end->connection; struct pipe_end *connection = pipe_end->connection;
@ -409,6 +425,18 @@ static void do_disconnect( struct pipe_server *server )
server->pipe_end.fd = NULL; server->pipe_end.fd = NULL;
} }
static void pipe_end_destroy( struct pipe_end *pipe_end )
{
struct pipe_message *message;
while (!list_empty( &pipe_end->message_queue ))
{
message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
assert( !message->async );
free_message( message );
}
}
static void pipe_server_destroy( struct object *obj) static void pipe_server_destroy( struct object *obj)
{ {
struct pipe_server *server = (struct pipe_server *)obj; struct pipe_server *server = (struct pipe_server *)obj;
@ -423,6 +451,7 @@ static void pipe_server_destroy( struct object *obj)
do_disconnect( server ); do_disconnect( server );
} }
pipe_end_destroy( &server->pipe_end );
if (server->client) if (server->client)
{ {
server->client->server = NULL; server->client->server = NULL;
@ -467,6 +496,8 @@ static void pipe_client_destroy( struct object *obj)
server->client = NULL; server->client = NULL;
client->server = NULL; client->server = NULL;
} }
pipe_end_destroy( &client->pipe_end );
if (client->pipe_end.fd) release_object( client->pipe_end.fd ); if (client->pipe_end.fd) release_object( client->pipe_end.fd );
} }
@ -550,7 +581,7 @@ static int pipe_data_remaining( struct pipe_server *server )
assert( server->client ); assert( server->client );
if (use_server_io( &server->pipe_end )) if (use_server_io( &server->pipe_end ))
return 0; return !list_empty( &server->client->pipe_end.message_queue );
fd = get_unix_fd( server->client->pipe_end.fd ); fd = get_unix_fd( server->client->pipe_end.fd );
if (fd < 0) if (fd < 0)
@ -721,6 +752,7 @@ static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, d
pipe_end->flags = pipe_flags; pipe_end->flags = pipe_flags;
pipe_end->connection = NULL; pipe_end->connection = NULL;
pipe_end->buffer_size = buffer_size; pipe_end->buffer_size = buffer_size;
list_init( &pipe_end->message_queue );
} }
static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned int options, static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned int options,