winhttp: Reimplement the task queue using thread pool work objects.

Simplifies cleanup.

Signed-off-by: Hans Leidekker <hans@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
This commit is contained in:
Hans Leidekker 2020-06-29 15:46:10 +02:00 committed by Alexandre Julliard
parent 7a2dc025a3
commit c7d140d34a
3 changed files with 170 additions and 211 deletions

View file

@ -122,92 +122,34 @@ static const WCHAR *attribute_table[] =
NULL /* WINHTTP_QUERY_PASSPORT_CONFIG = 78 */
};
static struct task_header *dequeue_task( struct queue *queue )
static DWORD start_queue( struct queue *queue )
{
struct task_header *task;
if (queue->pool) return ERROR_SUCCESS;
EnterCriticalSection( &queue->cs );
TRACE("%u tasks queued in %p\n", list_count(&queue->tasks), queue);
task = LIST_ENTRY( list_head( &queue->tasks ), struct task_header, entry );
if (task) list_remove( &task->entry );
LeaveCriticalSection( &queue->cs );
if (!(queue->pool = CreateThreadpool( NULL ))) return GetLastError();
SetThreadpoolThreadMinimum( queue->pool, 1 );
SetThreadpoolThreadMaximum( queue->pool, 1 );
TRACE("returning task %p\n", task);
return task;
memset( &queue->env, 0, sizeof(queue->env) );
queue->env.Version = 1;
queue->env.Pool = queue->pool;
TRACE("started %p\n", queue);
return ERROR_SUCCESS;
}
static void CALLBACK run_queue( TP_CALLBACK_INSTANCE *instance, void *ctx )
{
struct queue *queue = ctx;
HANDLE handles[] = { queue->wait, queue->cancel };
for (;;)
{
DWORD err = WaitForMultipleObjects( 2, handles, FALSE, INFINITE );
switch (err)
{
case WAIT_OBJECT_0:
{
struct task_header *task;
while ((task = dequeue_task( queue )))
{
task->proc( task );
release_object( task->object );
heap_free( task );
}
break;
}
case WAIT_OBJECT_0 + 1:
TRACE("exiting\n");
CloseHandle( queue->wait );
CloseHandle( queue->cancel );
queue->object->vtbl->destroy( queue->object );
return;
default:
ERR("wait failed %u (%u)\n", err, GetLastError());
break;
}
}
}
static DWORD start_queue( struct object_header *object, struct queue *queue )
{
DWORD ret = ERROR_OUTOFMEMORY;
if (queue->wait) return ERROR_SUCCESS;
queue->object = object;
list_init( &queue->tasks );
if (!(queue->wait = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
if (!(queue->cancel = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
if (!TrySubmitThreadpoolCallback( run_queue, queue, NULL )) ret = GetLastError();
else
{
queue->proc_running = TRUE;
TRACE("started %p\n", queue);
return ERROR_SUCCESS;
}
error:
CloseHandle( queue->wait );
queue->wait = NULL;
CloseHandle( queue->cancel );
queue->cancel = NULL;
return ret;
}
static DWORD queue_task( struct object_header *object, struct queue *queue, struct task_header *task )
static DWORD queue_task( struct queue *queue, PTP_WORK_CALLBACK task, void *ctx )
{
TP_WORK *work;
DWORD ret;
if ((ret = start_queue( object, queue ))) return ret;
EnterCriticalSection( &queue->cs );
TRACE("queueing task %p in %p\n", task, queue);
list_add_tail( &queue->tasks, &task->entry );
LeaveCriticalSection( &queue->cs );
if ((ret = start_queue( queue ))) return ret;
if (!(work = CreateThreadpoolWork( task, ctx, &queue->env ))) return GetLastError();
TRACE("queueing %p in %p\n", work, queue);
SubmitThreadpoolWork( work );
CloseThreadpoolWork( work );
SetEvent( queue->wait );
return ERROR_SUCCESS;
}
@ -2216,13 +2158,16 @@ end:
return ret;
}
static void task_send_request( struct task_header *task )
static void CALLBACK task_send_request( TP_CALLBACK_INSTANCE *instance, void *ctx, TP_WORK *work )
{
struct request *request = (struct request *)task->object;
struct send_request *s = (struct send_request *)task;
struct send_request *s = ctx;
send_request( request, s->headers, s->headers_len, s->optional, s->optional_len, s->total_len, s->context, TRUE );
TRACE("running %p\n", work);
send_request( s->request, s->headers, s->headers_len, s->optional, s->optional_len, s->total_len, s->context, TRUE );
release_object( &s->request->hdr );
heap_free( s->headers );
heap_free( s );
}
/***********************************************************************
@ -2256,8 +2201,7 @@ BOOL WINAPI WinHttpSendRequest( HINTERNET hrequest, LPCWSTR headers, DWORD heade
struct send_request *s;
if (!(s = heap_alloc( sizeof(*s) ))) return FALSE;
s->hdr.object = &request->hdr;
s->hdr.proc = task_send_request;
s->request = request;
s->headers = strdupW( headers );
s->headers_len = headers_len;
s->optional = optional;
@ -2266,7 +2210,12 @@ BOOL WINAPI WinHttpSendRequest( HINTERNET hrequest, LPCWSTR headers, DWORD heade
s->context = context;
addref_object( &request->hdr );
ret = queue_task( &request->hdr, &request->queue, (struct task_header *)s );
if ((ret = queue_task( &request->queue, task_send_request, s )))
{
release_object( &request->hdr );
heap_free( s->headers );
heap_free( s );
}
}
else ret = send_request( request, headers, headers_len, optional, optional_len, total_len, context, FALSE );
@ -2797,10 +2746,15 @@ static DWORD receive_response( struct request *request, BOOL async )
return ret;
}
static void task_receive_response( struct task_header *task )
static void CALLBACK task_receive_response( TP_CALLBACK_INSTANCE *instance, void *ctx, TP_WORK *work )
{
struct request *request = (struct request *)task->object;
receive_response( request, TRUE );
struct receive_response *r = ctx;
TRACE("running %p\n", work);
receive_response( r->request, TRUE );
release_object( &r->request->hdr );
heap_free( r );
}
/***********************************************************************
@ -2830,11 +2784,14 @@ BOOL WINAPI WinHttpReceiveResponse( HINTERNET hrequest, LPVOID reserved )
struct receive_response *r;
if (!(r = heap_alloc( sizeof(*r) ))) return FALSE;
r->hdr.object = &request->hdr;
r->hdr.proc = task_receive_response;
r->request = request;
addref_object( &request->hdr );
ret = queue_task( &request->hdr, &request->queue, (struct task_header *)r );
if ((ret = queue_task( &request->queue, task_receive_response, r )))
{
release_object( &request->hdr );
heap_free( r );
}
}
else ret = receive_response( request, FALSE );
@ -2876,12 +2833,15 @@ done:
return ret;
}
static void task_query_data_available( struct task_header *task )
static void CALLBACK task_query_data_available( TP_CALLBACK_INSTANCE *instance, void *ctx, TP_WORK *work )
{
struct request *request = (struct request *)task->object;
struct query_data *q = (struct query_data *)task;
struct query_data *q = ctx;
query_data_available( request, q->available, TRUE );
TRACE("running %p\n", work);
query_data_available( q->request, q->available, TRUE );
release_object( &q->request->hdr );
heap_free( q );
}
/***********************************************************************
@ -2911,12 +2871,15 @@ BOOL WINAPI WinHttpQueryDataAvailable( HINTERNET hrequest, LPDWORD available )
struct query_data *q;
if (!(q = heap_alloc( sizeof(*q) ))) return FALSE;
q->hdr.object = &request->hdr;
q->hdr.proc = task_query_data_available;
q->available = available;
q->request = request;
q->available = available;
addref_object( &request->hdr );
ret = queue_task( &request->hdr, &request->queue, (struct task_header *)q );
if ((ret = queue_task( &request->queue, task_query_data_available, q )))
{
release_object( &request->hdr );
heap_free( q );
}
}
else ret = query_data_available( request, available, FALSE );
@ -2925,12 +2888,15 @@ BOOL WINAPI WinHttpQueryDataAvailable( HINTERNET hrequest, LPDWORD available )
return !ret;
}
static void task_read_data( struct task_header *task )
static void CALLBACK task_read_data( TP_CALLBACK_INSTANCE *instance, void *ctx, TP_WORK *work )
{
struct request *request = (struct request *)task->object;
struct read_data *r = (struct read_data *)task;
struct read_data *r = ctx;
read_data( request, r->buffer, r->to_read, r->read, TRUE );
TRACE("running %p\n", work);
read_data( r->request, r->buffer, r->to_read, r->read, TRUE );
release_object( &r->request->hdr );
heap_free( r );
}
/***********************************************************************
@ -2960,14 +2926,17 @@ BOOL WINAPI WinHttpReadData( HINTERNET hrequest, LPVOID buffer, DWORD to_read, L
struct read_data *r;
if (!(r = heap_alloc( sizeof(*r) ))) return FALSE;
r->hdr.object = &request->hdr;
r->hdr.proc = task_read_data;
r->buffer = buffer;
r->to_read = to_read;
r->read = read;
r->request = request;
r->buffer = buffer;
r->to_read = to_read;
r->read = read;
addref_object( &request->hdr );
ret = queue_task( &request->hdr, &request->queue, (struct task_header *)r );
if ((ret = queue_task( &request->queue, task_read_data, r )))
{
release_object( &request->hdr );
heap_free( r );
}
}
else ret = read_data( request, buffer, to_read, read, FALSE );
@ -2998,12 +2967,15 @@ static DWORD write_data( struct request *request, const void *buffer, DWORD to_w
return ret;
}
static void task_write_data( struct task_header *task )
static void CALLBACK task_write_data( TP_CALLBACK_INSTANCE *instance, void *ctx, TP_WORK *work )
{
struct request *request = (struct request *)task->object;
struct write_data *w = (struct write_data *)task;
struct write_data *w = ctx;
write_data( request, w->buffer, w->to_write, w->written, TRUE );
TRACE("running %p\n", work);
write_data( w->request, w->buffer, w->to_write, w->written, TRUE );
release_object( &w->request->hdr );
heap_free( w );
}
/***********************************************************************
@ -3033,14 +3005,17 @@ BOOL WINAPI WinHttpWriteData( HINTERNET hrequest, LPCVOID buffer, DWORD to_write
struct write_data *w;
if (!(w = heap_alloc( sizeof(*w) ))) return FALSE;
w->hdr.object = &request->hdr;
w->hdr.proc = task_write_data;
w->buffer = buffer;
w->to_write = to_write;
w->written = written;
w->request = request;
w->buffer = buffer;
w->to_write = to_write;
w->written = written;
addref_object( &request->hdr );
ret = queue_task( &request->hdr, &request->queue, (struct task_header *)w );
if ((ret = queue_task( &request->queue, task_write_data, w )))
{
release_object( &request->hdr );
heap_free( w );
}
}
else ret = write_data( request, buffer, to_write, written, FALSE );
@ -3062,25 +3037,10 @@ static void socket_destroy( struct object_header *hdr )
TRACE("%p\n", socket);
if (socket->send_q.proc_running)
{
socket->send_q.proc_running = FALSE;
SetEvent( socket->send_q.cancel );
return;
}
if (socket->recv_q.proc_running)
{
socket->recv_q.proc_running = FALSE;
SetEvent( socket->recv_q.cancel );
return;
}
if (socket->send_q.pool) CloseThreadpool( socket->send_q.pool );
if (socket->recv_q.pool) CloseThreadpool( socket->recv_q.pool );
release_object( &socket->request->hdr );
socket->send_q.cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection( &socket->send_q.cs );
socket->recv_q.cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection( &socket->recv_q.cs );
heap_free( socket );
}
@ -3128,10 +3088,6 @@ HINTERNET WINAPI WinHttpWebSocketCompleteUpgrade( HINTERNET hrequest, DWORD_PTR
socket->hdr.callback = request->hdr.callback;
socket->hdr.notify_mask = request->hdr.notify_mask;
socket->hdr.context = context;
InitializeCriticalSection( &socket->send_q.cs );
socket->send_q.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": socket.send_q.cs");
InitializeCriticalSection( &socket->recv_q.cs );
socket->recv_q.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": socket.recv_q.cs");
addref_object( &request->hdr );
socket->request = request;
@ -3252,12 +3208,15 @@ static DWORD socket_send( struct socket *socket, WINHTTP_WEB_SOCKET_BUFFER_TYPE
return ret;
}
static void task_socket_send( struct task_header *task )
static void CALLBACK task_socket_send( TP_CALLBACK_INSTANCE *instance, void *ctx, TP_WORK *work )
{
struct socket *socket = (struct socket *)task->object;
struct socket_send *s = (struct socket_send *)task;
struct socket_send *s = ctx;
socket_send( socket, s->type, s->buf, s->len, TRUE );
TRACE("running %p\n", work);
socket_send( s->socket, s->type, s->buf, s->len, TRUE );
release_object( &s->socket->hdr );
heap_free( s );
}
DWORD WINAPI WinHttpWebSocketSend( HINTERNET hsocket, WINHTTP_WEB_SOCKET_BUFFER_TYPE type, void *buf, DWORD len )
@ -3291,14 +3250,17 @@ DWORD WINAPI WinHttpWebSocketSend( HINTERNET hsocket, WINHTTP_WEB_SOCKET_BUFFER_
struct socket_send *s;
if (!(s = heap_alloc( sizeof(*s) ))) return FALSE;
s->hdr.object = &socket->hdr;
s->hdr.proc = task_socket_send;
s->type = type;
s->buf = buf;
s->len = len;
s->socket = socket;
s->type = type;
s->buf = buf;
s->len = len;
addref_object( &socket->hdr );
ret = queue_task( &socket->hdr, &socket->send_q, (struct task_header *)s );
if ((ret = queue_task( &socket->send_q, task_socket_send, s )))
{
release_object( &socket->hdr );
heap_free( s );
}
}
else ret = socket_send( socket, type, buf, len, FALSE );
@ -3404,12 +3366,15 @@ static DWORD socket_receive( struct socket *socket, void *buf, DWORD len, DWORD
return ret;
}
static void task_socket_receive( struct task_header *task )
static void CALLBACK task_socket_receive( TP_CALLBACK_INSTANCE *instance, void *ctx, TP_WORK *work )
{
struct socket *socket = (struct socket *)task->object;
struct socket_receive *r = (struct socket_receive *)task;
struct socket_receive *r = ctx;
socket_receive( socket, r->buf, r->len, NULL, NULL, TRUE );
TRACE("running %p\n", work);
socket_receive( r->socket, r->buf, r->len, NULL, NULL, TRUE );
release_object( &r->socket->hdr );
heap_free( r );
}
DWORD WINAPI WinHttpWebSocketReceive( HINTERNET hsocket, void *buf, DWORD len, DWORD *ret_len,
@ -3439,13 +3404,16 @@ DWORD WINAPI WinHttpWebSocketReceive( HINTERNET hsocket, void *buf, DWORD len, D
struct socket_receive *r;
if (!(r = heap_alloc( sizeof(*r) ))) return FALSE;
r->hdr.object = &socket->hdr;
r->hdr.proc = task_socket_receive;
r->buf = buf;
r->len = len;
r->socket = socket;
r->buf = buf;
r->len = len;
addref_object( &socket->hdr );
ret = queue_task( &socket->hdr, &socket->recv_q, (struct task_header *)r );
if ((ret = queue_task( &socket->recv_q, task_socket_receive, r )))
{
release_object( &socket->hdr );
heap_free( r );
}
}
else ret = socket_receive( socket, buf, len, ret_len, ret_type, FALSE );
@ -3477,12 +3445,15 @@ static DWORD socket_shutdown( struct socket *socket, USHORT status, const void *
return ret;
}
static void task_socket_shutdown( struct task_header *task )
static void CALLBACK task_socket_shutdown( TP_CALLBACK_INSTANCE *instance, void *ctx, TP_WORK *work )
{
struct socket *socket = (struct socket *)task->object;
struct socket_shutdown *s = (struct socket_shutdown *)task;
struct socket_shutdown *s = ctx;
socket_shutdown( socket, s->status, s->reason, s->len, TRUE );
socket_shutdown( s->socket, s->status, s->reason, s->len, TRUE );
TRACE("running %p\n", work);
release_object( &s->socket->hdr );
heap_free( s );
}
DWORD WINAPI WinHttpWebSocketShutdown( HINTERNET hsocket, USHORT status, void *reason, DWORD len )
@ -3511,14 +3482,17 @@ DWORD WINAPI WinHttpWebSocketShutdown( HINTERNET hsocket, USHORT status, void *r
struct socket_shutdown *s;
if (!(s = heap_alloc( sizeof(*s) ))) return FALSE;
s->hdr.object = &socket->hdr;
s->hdr.proc = task_socket_shutdown;
s->status = status;
s->reason = reason;
s->len = len;
s->socket = socket;
s->status = status;
s->reason = reason;
s->len = len;
addref_object( &socket->hdr );
ret = queue_task( &socket->hdr, &socket->send_q, (struct task_header *)s );
if ((ret = queue_task( &socket->send_q, task_socket_shutdown, s )))
{
release_object( &socket->hdr );
heap_free( s );
}
}
else ret = socket_shutdown( socket, status, reason, len, FALSE );
@ -3577,12 +3551,15 @@ done:
return ret;
}
static void task_socket_close( struct task_header *task )
static void CALLBACK task_socket_close( TP_CALLBACK_INSTANCE *instance, void *ctx, TP_WORK *work )
{
struct socket *socket = (struct socket *)task->object;
struct socket_shutdown *s = (struct socket_shutdown *)task;
struct socket_shutdown *s = ctx;
socket_close( socket, s->status, s->reason, s->len, TRUE );
socket_close( s->socket, s->status, s->reason, s->len, TRUE );
TRACE("running %p\n", work);
release_object( &s->socket->hdr );
heap_free( s );
}
DWORD WINAPI WinHttpWebSocketClose( HINTERNET hsocket, USHORT status, void *reason, DWORD len )
@ -3611,14 +3588,17 @@ DWORD WINAPI WinHttpWebSocketClose( HINTERNET hsocket, USHORT status, void *reas
struct socket_shutdown *s;
if (!(s = heap_alloc( sizeof(*s) ))) return FALSE;
s->hdr.object = &socket->hdr;
s->hdr.proc = task_socket_close;
s->status = status;
s->reason = reason;
s->len = len;
s->socket = socket;
s->status = status;
s->reason = reason;
s->len = len;
addref_object( &socket->hdr );
ret = queue_task( &socket->hdr, &socket->recv_q, (struct task_header *)s );
if ((ret = queue_task( &socket->recv_q, task_socket_close, s )))
{
release_object( &socket->hdr );
heap_free( s );
}
}
else ret = socket_close( socket, status, reason, len, FALSE );

View file

@ -578,13 +578,7 @@ static void request_destroy( struct object_header *hdr )
TRACE("%p\n", request);
if (request->queue.proc_running)
{
/* Signal to the task proc to quit. It will call this again when it does. */
request->queue.proc_running = FALSE;
SetEvent( request->queue.cancel );
return;
}
if (request->queue.pool) CloseThreadpool( request->queue.pool );
release_object( &request->connect->hdr );
if (request->cred_handle_initialized) FreeCredentialsHandle( &request->cred_handle );
@ -614,8 +608,6 @@ static void request_destroy( struct object_header *hdr )
}
}
request->queue.cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection( &request->queue.cs );
heap_free( request );
}
@ -1120,8 +1112,6 @@ HINTERNET WINAPI WinHttpOpenRequest( HINTERNET hconnect, LPCWSTR verb, LPCWSTR o
request->hdr.notify_mask = connect->hdr.notify_mask;
request->hdr.context = connect->hdr.context;
request->hdr.redirect_policy = connect->hdr.redirect_policy;
InitializeCriticalSection( &request->queue.cs );
request->queue.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": request.queue.cs");
addref_object( &connect->hdr );
request->connect = connect;

View file

@ -157,12 +157,8 @@ struct authinfo
struct queue
{
struct object_header *object;
CRITICAL_SECTION cs;
BOOL proc_running;
HANDLE wait;
HANDLE cancel;
struct list tasks;
TP_POOL *pool;
TP_CALLBACK_ENVIRON env;
};
enum request_flags
@ -252,16 +248,9 @@ struct socket
DWORD reason_len;
};
struct task_header
{
struct list entry;
struct object_header *object;
void (*proc)( struct task_header * );
};
struct send_request
{
struct task_header hdr;
struct request *request;
WCHAR *headers;
DWORD headers_len;
void *optional;
@ -272,18 +261,18 @@ struct send_request
struct receive_response
{
struct task_header hdr;
struct request *request;
};
struct query_data
{
struct task_header hdr;
struct request *request;
DWORD *available;
};
struct read_data
{
struct task_header hdr;
struct request *request;
void *buffer;
DWORD to_read;
DWORD *read;
@ -291,7 +280,7 @@ struct read_data
struct write_data
{
struct task_header hdr;
struct request *request;
const void *buffer;
DWORD to_write;
DWORD *written;
@ -299,7 +288,7 @@ struct write_data
struct socket_send
{
struct task_header hdr;
struct socket *socket;
WINHTTP_WEB_SOCKET_BUFFER_TYPE type;
const void *buf;
DWORD len;
@ -307,14 +296,14 @@ struct socket_send
struct socket_receive
{
struct task_header hdr;
struct socket *socket;
void *buf;
DWORD len;
};
struct socket_shutdown
{
struct task_header hdr;
struct socket *socket;
USHORT status;
const void *reason;
DWORD len;