From 75e8c4493a697c85b1a707a233f34c7453602818 Mon Sep 17 00:00:00 2001 From: Zebediah Figura Date: Sat, 11 Apr 2020 00:16:26 -0500 Subject: [PATCH] ntdll: Implement threadpool I/O queues. Signed-off-by: Zebediah Figura Signed-off-by: Alexandre Julliard --- dlls/ntdll/ntdll.spec | 4 + dlls/ntdll/ntdll_misc.h | 7 + dlls/ntdll/tests/threadpool.c | 179 ++++++++++++++++++ dlls/ntdll/threadpool.c | 335 +++++++++++++++++++++++++++++++++- include/winternl.h | 8 +- 5 files changed, 523 insertions(+), 10 deletions(-) diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec index e61a2578da5..4f5fa8c21d4 100644 --- a/dlls/ntdll/ntdll.spec +++ b/dlls/ntdll/ntdll.spec @@ -1060,6 +1060,7 @@ @ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize @ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize @ stdcall TpAllocCleanupGroup(ptr) +@ stdcall TpAllocIoCompletion(ptr ptr ptr ptr ptr) @ stdcall TpAllocPool(ptr ptr) @ stdcall TpAllocTimer(ptr ptr ptr ptr) @ stdcall TpAllocWait(ptr ptr ptr ptr) @@ -1070,12 +1071,14 @@ @ stdcall TpCallbackReleaseSemaphoreOnCompletion(ptr long long) @ stdcall TpCallbackSetEventOnCompletion(ptr long) @ stdcall TpCallbackUnloadDllOnCompletion(ptr ptr) +@ stdcall TpCancelAsyncIoOperation(ptr) @ stdcall TpDisassociateCallback(ptr) @ stdcall TpIsTimerSet(ptr) @ stdcall TpPostWork(ptr) @ stdcall TpQueryPoolStackInformation(ptr ptr) @ stdcall TpReleaseCleanupGroup(ptr) @ stdcall TpReleaseCleanupGroupMembers(ptr long ptr) +@ stdcall TpReleaseIoCompletion(ptr) @ stdcall TpReleasePool(ptr) @ stdcall TpReleaseTimer(ptr) @ stdcall TpReleaseWait(ptr) @@ -1087,6 +1090,7 @@ @ stdcall TpSetWait(ptr long ptr) @ stdcall TpSimpleTryPost(ptr ptr ptr) @ stdcall TpStartAsyncIoOperation(ptr) +@ stdcall TpWaitForIoCompletion(ptr long) @ stdcall TpWaitForTimer(ptr long) @ stdcall TpWaitForWait(ptr long) @ stdcall TpWaitForWork(ptr long) diff --git a/dlls/ntdll/ntdll_misc.h b/dlls/ntdll/ntdll_misc.h index b62239de5b4..4601e490345 100644 --- a/dlls/ntdll/ntdll_misc.h +++ b/dlls/ntdll/ntdll_misc.h @@ -30,6 +30,13 @@ #include "wine/server.h" #include "wine/asm.h" +#define DECLARE_CRITICAL_SECTION(cs) \ + static RTL_CRITICAL_SECTION cs; \ + static RTL_CRITICAL_SECTION_DEBUG cs##_debug = \ + { 0, 0, &cs, { &cs##_debug.ProcessLocksList, &cs##_debug.ProcessLocksList }, \ + 0, 0, { (DWORD_PTR)(__FILE__ ": " # cs) }}; \ + static RTL_CRITICAL_SECTION cs = { &cs##_debug, -1, 0, 0, 0, 0 }; + #define MAX_NT_PATH_LENGTH 277 #define MAX_DOS_DRIVES 26 diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c index bf5493cac0c..32d4c3eac2e 100644 --- a/dlls/ntdll/tests/threadpool.c +++ b/dlls/ntdll/tests/threadpool.c @@ -18,22 +18,27 @@ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA */ +#define NONAMELESSSTRUCT +#define NONAMELESSUNION #include "ntdll_test.h" static HMODULE hntdll = 0; static NTSTATUS (WINAPI *pTpAllocCleanupGroup)(TP_CLEANUP_GROUP **); +static NTSTATUS (WINAPI *pTpAllocIoCompletion)(TP_IO **,HANDLE,PTP_IO_CALLBACK,void *,TP_CALLBACK_ENVIRON *); static NTSTATUS (WINAPI *pTpAllocPool)(TP_POOL **,PVOID); static NTSTATUS (WINAPI *pTpAllocTimer)(TP_TIMER **,PTP_TIMER_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *); static NTSTATUS (WINAPI *pTpAllocWait)(TP_WAIT **,PTP_WAIT_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *); static NTSTATUS (WINAPI *pTpAllocWork)(TP_WORK **,PTP_WORK_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *); static NTSTATUS (WINAPI *pTpCallbackMayRunLong)(TP_CALLBACK_INSTANCE *); static VOID (WINAPI *pTpCallbackReleaseSemaphoreOnCompletion)(TP_CALLBACK_INSTANCE *,HANDLE,DWORD); +static void (WINAPI *pTpCancelAsyncIoOperation)(TP_IO *); static VOID (WINAPI *pTpDisassociateCallback)(TP_CALLBACK_INSTANCE *); static BOOL (WINAPI *pTpIsTimerSet)(TP_TIMER *); static VOID (WINAPI *pTpReleaseWait)(TP_WAIT *); static VOID (WINAPI *pTpPostWork)(TP_WORK *); static VOID (WINAPI *pTpReleaseCleanupGroup)(TP_CLEANUP_GROUP *); static VOID (WINAPI *pTpReleaseCleanupGroupMembers)(TP_CLEANUP_GROUP *,BOOL,PVOID); +static void (WINAPI *pTpReleaseIoCompletion)(TP_IO *); static VOID (WINAPI *pTpReleasePool)(TP_POOL *); static VOID (WINAPI *pTpReleaseTimer)(TP_TIMER *); static VOID (WINAPI *pTpReleaseWork)(TP_WORK *); @@ -41,6 +46,8 @@ static VOID (WINAPI *pTpSetPoolMaxThreads)(TP_POOL *,DWORD); static VOID (WINAPI *pTpSetTimer)(TP_TIMER *,LARGE_INTEGER *,LONG,LONG); static VOID (WINAPI *pTpSetWait)(TP_WAIT *,HANDLE,LARGE_INTEGER *); static NTSTATUS (WINAPI *pTpSimpleTryPost)(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *); +static void (WINAPI *pTpStartAsyncIoOperation)(TP_IO *); +static void (WINAPI *pTpWaitForIoCompletion)(TP_IO *,BOOL); static VOID (WINAPI *pTpWaitForTimer)(TP_TIMER *,BOOL); static VOID (WINAPI *pTpWaitForWait)(TP_WAIT *,BOOL); static VOID (WINAPI *pTpWaitForWork)(TP_WORK *,BOOL); @@ -63,10 +70,12 @@ static BOOL init_threadpool(void) } NTDLL_GET_PROC(TpAllocCleanupGroup); + NTDLL_GET_PROC(TpAllocIoCompletion); NTDLL_GET_PROC(TpAllocPool); NTDLL_GET_PROC(TpAllocTimer); NTDLL_GET_PROC(TpAllocWait); NTDLL_GET_PROC(TpAllocWork); + NTDLL_GET_PROC(TpCancelAsyncIoOperation); NTDLL_GET_PROC(TpCallbackMayRunLong); NTDLL_GET_PROC(TpCallbackReleaseSemaphoreOnCompletion); NTDLL_GET_PROC(TpDisassociateCallback); @@ -74,6 +83,7 @@ static BOOL init_threadpool(void) NTDLL_GET_PROC(TpPostWork); NTDLL_GET_PROC(TpReleaseCleanupGroup); NTDLL_GET_PROC(TpReleaseCleanupGroupMembers); + NTDLL_GET_PROC(TpReleaseIoCompletion); NTDLL_GET_PROC(TpReleasePool); NTDLL_GET_PROC(TpReleaseTimer); NTDLL_GET_PROC(TpReleaseWait); @@ -82,6 +92,8 @@ static BOOL init_threadpool(void) NTDLL_GET_PROC(TpSetTimer); NTDLL_GET_PROC(TpSetWait); NTDLL_GET_PROC(TpSimpleTryPost); + NTDLL_GET_PROC(TpStartAsyncIoOperation); + NTDLL_GET_PROC(TpWaitForIoCompletion); NTDLL_GET_PROC(TpWaitForTimer); NTDLL_GET_PROC(TpWaitForWait); NTDLL_GET_PROC(TpWaitForWork); @@ -1906,6 +1918,172 @@ static void test_tp_multi_wait(void) CloseHandle(semaphore); } +struct io_cb_ctx +{ + unsigned int count; + void *ovl; + NTSTATUS ret; + ULONG_PTR length; + TP_IO *io; +}; + +static void CALLBACK io_cb(TP_CALLBACK_INSTANCE *instance, void *userdata, + void *cvalue, IO_STATUS_BLOCK *iosb, TP_IO *io) +{ + struct io_cb_ctx *ctx = userdata; + ++ctx->count; + ctx->ovl = cvalue; + ctx->ret = iosb->u.Status; + ctx->length = iosb->Information; + ctx->io = io; +} + +static DWORD WINAPI io_wait_thread(void *arg) +{ + TP_IO *io = arg; + pTpWaitForIoCompletion(io, FALSE); + return 0; +} + +static void test_tp_io(void) +{ + TP_CALLBACK_ENVIRON environment = {.Version = 1}; + OVERLAPPED ovl = {}, ovl2 = {}; + HANDLE client, server, thread; + struct io_cb_ctx userdata; + char in[1], in2[1]; + const char out[1]; + NTSTATUS status; + DWORD ret_size; + TP_POOL *pool; + TP_IO *io; + BOOL ret; + + ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL); + + status = pTpAllocPool(&pool, NULL); + ok(!status, "failed to allocate pool, status %#x\n", status); + + server = CreateNamedPipeA("\\\\.\\pipe\\wine_tp_test", + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, 0, 1, 1024, 1024, 0, NULL); + ok(server != INVALID_HANDLE_VALUE, "Failed to create server pipe, error %u.\n", GetLastError()); + client = CreateFileA("\\\\.\\pipe\\wine_tp_test", GENERIC_READ | GENERIC_WRITE, + 0, NULL, OPEN_EXISTING, 0, 0); + ok(client != INVALID_HANDLE_VALUE, "Failed to create client pipe, error %u.\n", GetLastError()); + + environment.Pool = pool; + io = NULL; + status = pTpAllocIoCompletion(&io, server, io_cb, &userdata, &environment); + ok(!status, "got %#x\n", status); + ok(!!io, "expected non-NULL TP_IO\n"); + + pTpWaitForIoCompletion(io, FALSE); + + userdata.count = 0; + pTpStartAsyncIoOperation(io); + + thread = CreateThread(NULL, 0, io_wait_thread, io, 0, NULL); + ok(WaitForSingleObject(thread, 100) == WAIT_TIMEOUT, "TpWaitForIoCompletion() should not return\n"); + + ret = ReadFile(server, in, sizeof(in), NULL, &ovl); + ok(!ret, "wrong ret %d\n", ret); + ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError()); + + ret = WriteFile(client, out, sizeof(out), &ret_size, NULL); + ok(ret, "WriteFile() failed, error %u\n", GetLastError()); + + pTpWaitForIoCompletion(io, FALSE); + ok(userdata.count == 1, "callback ran %u times\n", userdata.count); + ok(userdata.ovl == &ovl, "expected %p, got %p\n", &ovl, userdata.ovl); + ok(userdata.ret == STATUS_SUCCESS, "got status %#x\n", userdata.ret); + ok(userdata.length == 1, "got length %lu\n", userdata.length); + ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io); + + ok(!WaitForSingleObject(thread, 1000), "wait timed out\n"); + CloseHandle(thread); + + userdata.count = 0; + pTpStartAsyncIoOperation(io); + pTpStartAsyncIoOperation(io); + + ret = ReadFile(server, in, sizeof(in), NULL, &ovl); + ok(!ret, "wrong ret %d\n", ret); + ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError()); + ret = ReadFile(server, in2, sizeof(in2), NULL, &ovl2); + ok(!ret, "wrong ret %d\n", ret); + ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError()); + + ret = WriteFile(client, out, sizeof(out), &ret_size, NULL); + ok(ret, "WriteFile() failed, error %u\n", GetLastError()); + ret = WriteFile(client, out, sizeof(out), &ret_size, NULL); + ok(ret, "WriteFile() failed, error %u\n", GetLastError()); + + pTpWaitForIoCompletion(io, FALSE); + ok(userdata.count == 2, "callback ran %u times\n", userdata.count); + ok(userdata.ret == STATUS_SUCCESS, "got status %#x\n", userdata.ret); + ok(userdata.length == 1, "got length %lu\n", userdata.length); + ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io); + + /* The documentation is a bit unclear about passing TRUE to + * WaitForThreadpoolIoCallbacks()—"pending I/O requests are not canceled" + * [as with CancelIoEx()], but pending threadpool callbacks are, even those + * which have not yet reached the completion port [as with + * TpCancelAsyncIoOperation()]. */ + userdata.count = 0; + pTpStartAsyncIoOperation(io); + + pTpWaitForIoCompletion(io, TRUE); + ok(!userdata.count, "callback ran %u times\n", userdata.count); + + pTpStartAsyncIoOperation(io); + + ret = WriteFile(client, out, sizeof(out), &ret_size, NULL); + ok(ret, "WriteFile() failed, error %u\n", GetLastError()); + + ret = ReadFile(server, in, sizeof(in), NULL, &ovl); + ok(ret, "wrong ret %d\n", ret); + + pTpWaitForIoCompletion(io, FALSE); + ok(userdata.count == 1, "callback ran %u times\n", userdata.count); + ok(userdata.ovl == &ovl, "expected %p, got %p\n", &ovl, userdata.ovl); + ok(userdata.ret == STATUS_SUCCESS, "got status %#x\n", userdata.ret); + ok(userdata.length == 1, "got length %lu\n", userdata.length); + ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io); + + userdata.count = 0; + pTpStartAsyncIoOperation(io); + + ret = ReadFile(server, NULL, 1, NULL, &ovl); + ok(!ret, "wrong ret %d\n", ret); + ok(GetLastError() == ERROR_NOACCESS, "wrong error %u\n", GetLastError()); + + pTpCancelAsyncIoOperation(io); + pTpWaitForIoCompletion(io, FALSE); + ok(!userdata.count, "callback ran %u times\n", userdata.count); + + userdata.count = 0; + pTpStartAsyncIoOperation(io); + + ret = ReadFile(server, in, sizeof(in), NULL, &ovl); + ok(!ret, "wrong ret %d\n", ret); + ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError()); + ret = CancelIo(server); + ok(ret, "CancelIo() failed, error %u\n", GetLastError()); + + pTpWaitForIoCompletion(io, FALSE); + ok(userdata.count == 1, "callback ran %u times\n", userdata.count); + ok(userdata.ovl == &ovl, "expected %p, got %p\n", &ovl, userdata.ovl); + ok(userdata.ret == STATUS_CANCELLED, "got status %#x\n", userdata.ret); + ok(!userdata.length, "got length %lu\n", userdata.length); + ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io); + + CloseHandle(ovl.hEvent); + CloseHandle(client); + CloseHandle(server); + pTpReleaseIoCompletion(io); + pTpReleasePool(pool); +} + START_TEST(threadpool) { test_RtlQueueWorkItem(); @@ -1925,4 +2103,5 @@ START_TEST(threadpool) test_tp_window_length(); test_tp_wait(); test_tp_multi_wait(); + test_tp_io(); } diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c index a6e4749e10f..215a5e9c53a 100644 --- a/dlls/ntdll/threadpool.c +++ b/dlls/ntdll/threadpool.c @@ -131,6 +131,7 @@ struct threadpool int min_workers; int num_workers; int num_busy_workers; + HANDLE compl_port; TP_POOL_STACK_INFORMATION stack_info; }; @@ -139,7 +140,14 @@ enum threadpool_objtype TP_OBJECT_TYPE_SIMPLE, TP_OBJECT_TYPE_WORK, TP_OBJECT_TYPE_TIMER, - TP_OBJECT_TYPE_WAIT + TP_OBJECT_TYPE_WAIT, + TP_OBJECT_TYPE_IO, +}; + +struct io_completion +{ + IO_STATUS_BLOCK iosb; + ULONG_PTR cvalue; }; /* internal threadpool object representation */ @@ -201,6 +209,13 @@ struct threadpool_object ULONGLONG timeout; HANDLE handle; } wait; + struct + { + PTP_IO_CALLBACK callback; + /* locked via .pool->cs */ + unsigned int pending_count, completion_count, completion_max; + struct io_completion *completions; + } io; } u; }; @@ -291,6 +306,29 @@ struct waitqueue_bucket HANDLE update_event; }; +/* global I/O completion queue object */ +static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug; + +static struct +{ + CRITICAL_SECTION cs; + LONG objcount; + BOOL thread_running; + HANDLE port; + RTL_CONDITION_VARIABLE update_event; +} +ioqueue = +{ + .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 }, +}; + +static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug = +{ + 0, 0, &ioqueue.cs, + { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList }, + 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") } +}; + static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool ) { return (struct threadpool *)pool; @@ -317,6 +355,13 @@ static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait ) return object; } +static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io ) +{ + struct threadpool_object *object = (struct threadpool_object *)io; + assert( object->type == TP_OBJECT_TYPE_IO ); + return object; +} + static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group ) { return (struct threadpool_group *)group; @@ -343,6 +388,33 @@ static inline LONG interlocked_dec( PLONG dest ) return interlocked_xchg_add( dest, -1 ) - 1; } +static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size) +{ + unsigned int new_capacity, max_capacity; + void *new_elements; + + if (count <= *capacity) + return TRUE; + + max_capacity = ~(SIZE_T)0 / size; + if (count > max_capacity) + return FALSE; + + new_capacity = max(4, *capacity); + while (new_capacity < count && new_capacity <= max_capacity / 2) + new_capacity *= 2; + if (new_capacity < count) + new_capacity = max_capacity; + + if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size ))) + return FALSE; + + *elements = new_elements; + *capacity = new_capacity; + + return TRUE; +} + static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata ) { struct rtl_work_item *item = userdata; @@ -1642,6 +1714,127 @@ static void tp_waitqueue_unlock( struct threadpool_object *wait ) RtlLeaveCriticalSection( &waitqueue.cs ); } +static void CALLBACK ioqueue_thread_proc( void *param ) +{ + struct io_completion *completion; + struct threadpool_object *io; + IO_STATUS_BLOCK iosb; + ULONG_PTR key, value; + NTSTATUS status; + + TRACE( "starting I/O completion thread\n" ); + + RtlEnterCriticalSection( &ioqueue.cs ); + + for (;;) + { + RtlLeaveCriticalSection( &ioqueue.cs ); + if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL ))) + ERR("NtRemoveIoCompletion failed, status %#x.\n", status); + RtlEnterCriticalSection( &ioqueue.cs ); + + if (key) + { + io = (struct threadpool_object *)key; + + RtlEnterCriticalSection( &io->pool->cs ); + + if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max, + io->u.io.completion_count + 1, sizeof(*io->u.io.completions))) + { + ERR("Failed to allocate memory.\n"); + RtlLeaveCriticalSection( &io->pool->cs ); + continue; + } + + completion = &io->u.io.completions[io->u.io.completion_count++]; + completion->iosb = iosb; + completion->cvalue = value; + + tp_object_submit( io, FALSE ); + + RtlLeaveCriticalSection( &io->pool->cs ); + } + + if (!ioqueue.objcount) + { + /* All I/O objects have been destroyed; if no new objects are + * created within some amount of time, then we can shutdown this + * thread. */ + LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000}; + if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs, + &timeout) == STATUS_TIMEOUT && !ioqueue.objcount) + break; + } + } + + RtlLeaveCriticalSection( &ioqueue.cs ); + + TRACE( "terminating I/O completion thread\n" ); + + RtlExitUserThread( 0 ); +} + +static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file ) +{ + NTSTATUS status = STATUS_SUCCESS; + + assert( io->type == TP_OBJECT_TYPE_IO ); + + RtlEnterCriticalSection( &ioqueue.cs ); + + if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port, + IO_COMPLETION_ALL_ACCESS, NULL, 0 ))) + { + RtlLeaveCriticalSection( &ioqueue.cs ); + return status; + } + + if (!ioqueue.thread_running) + { + HANDLE thread; + + if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, + NULL, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL ))) + { + ioqueue.thread_running = TRUE; + NtClose( thread ); + } + } + + if (status == STATUS_SUCCESS) + { + FILE_COMPLETION_INFORMATION info; + IO_STATUS_BLOCK iosb; + + info.CompletionPort = ioqueue.port; + info.CompletionKey = (ULONG_PTR)io; + + status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation ); + } + + if (status == STATUS_SUCCESS) + { + if (!ioqueue.objcount++) + RtlWakeConditionVariable( &ioqueue.update_event ); + } + + RtlLeaveCriticalSection( &ioqueue.cs ); + return status; +} + +static void tp_ioqueue_unlock( struct threadpool_object *io ) +{ + assert( io->type == TP_OBJECT_TYPE_IO ); + + RtlEnterCriticalSection( &ioqueue.cs ); + + if (!--ioqueue.objcount) + NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 ); + + RtlLeaveCriticalSection( &ioqueue.cs ); +} + /*********************************************************************** * tp_threadpool_alloc (internal) * @@ -2017,6 +2210,8 @@ static void tp_object_cancel( struct threadpool_object *object ) if (object->type == TP_OBJECT_TYPE_WAIT) object->u.wait.signaled = 0; } + if (object->type == TP_OBJECT_TYPE_IO) + object->u.io.pending_count = 0; RtlLeaveCriticalSection( &pool->cs ); while (pending_callbacks--) @@ -2027,6 +2222,8 @@ static BOOL object_is_finished( struct threadpool_object *object, BOOL group ) { if (object->num_pending_callbacks) return FALSE; + if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count) + return FALSE; if (group) return !object->num_running_callbacks; @@ -2066,6 +2263,8 @@ static void tp_object_prepare_shutdown( struct threadpool_object *object ) tp_timerqueue_unlock( object ); else if (object->type == TP_OBJECT_TYPE_WAIT) tp_waitqueue_unlock( object ); + else if (object->type == TP_OBJECT_TYPE_IO) + tp_ioqueue_unlock( object ); } /*********************************************************************** @@ -2131,6 +2330,7 @@ static void CALLBACK threadpool_worker_proc( void *param ) { TP_CALLBACK_INSTANCE *callback_instance; struct threadpool_instance instance; + struct io_completion completion; struct threadpool *pool = param; TP_WAIT_RESULT wait_result = 0; LARGE_INTEGER timeout; @@ -2160,6 +2360,12 @@ static void CALLBACK threadpool_worker_proc( void *param ) wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT; if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--; } + else if (object->type == TP_OBJECT_TYPE_IO) + { + assert( object->u.io.completion_count ); + completion = object->u.io.completions[--object->u.io.completion_count]; + object->u.io.pending_count--; + } /* Leave critical section and do the actual callback. */ object->num_associated_callbacks++; @@ -2218,6 +2424,17 @@ static void CALLBACK threadpool_worker_proc( void *param ) break; } + case TP_OBJECT_TYPE_IO: + { + TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n", + object->u.io.callback, callback_instance, object->userdata, + completion.cvalue, &completion.iosb, (TP_IO *)object ); + object->u.io.callback( callback_instance, object->userdata, + (void *)completion.cvalue, &completion.iosb, (TP_IO *)object ); + TRACE( "callback %p returned\n", object->u.io.callback ); + break; + } + default: assert(0); break; @@ -2317,6 +2534,50 @@ NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out ) return tp_group_alloc( (struct threadpool_group **)out ); } +/*********************************************************************** + * TpAllocIoCompletion (NTDLL.@) + */ +NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback, + void *userdata, TP_CALLBACK_ENVIRON *environment ) +{ + struct threadpool_object *object; + struct threadpool *pool; + NTSTATUS status; + + TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment ); + + if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) ))) + return STATUS_NO_MEMORY; + + if ((status = tp_threadpool_lock( &pool, environment ))) + { + RtlFreeHeap( GetProcessHeap(), 0, object ); + return status; + } + + object->type = TP_OBJECT_TYPE_IO; + object->u.io.callback = callback; + if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) ))) + { + tp_threadpool_unlock( pool ); + RtlFreeHeap( GetProcessHeap(), 0, object ); + return status; + } + + if ((status = tp_ioqueue_lock( object, file ))) + { + tp_threadpool_unlock( pool ); + RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions ); + RtlFreeHeap( GetProcessHeap(), 0, object ); + return status; + } + + tp_object_initialize( object, pool, userdata, environment ); + + *out = (TP_IO *)object; + return STATUS_SUCCESS; +} + /*********************************************************************** * TpAllocPool (NTDLL.@) */ @@ -2441,6 +2702,26 @@ NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID us return STATUS_SUCCESS; } +/*********************************************************************** + * TpCancelAsyncIoOperation (NTDLL.@) + */ +void WINAPI TpCancelAsyncIoOperation( TP_IO *io ) +{ + struct threadpool_object *this = impl_from_TP_IO( io ); + + TRACE( "%p\n", io ); + + RtlEnterCriticalSection( &this->pool->cs ); + + this->u.io.pending_count--; + if (object_is_finished( this, TRUE )) + RtlWakeAllConditionVariable( &this->group_finished_event ); + if (object_is_finished( this, FALSE )) + RtlWakeAllConditionVariable( &this->finished_event ); + + RtlLeaveCriticalSection( &this->pool->cs ); +} + /*********************************************************************** * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@) */ @@ -2692,6 +2973,20 @@ VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_p } } +/*********************************************************************** + * TpReleaseIoCompletion (NTDLL.@) + */ +void WINAPI TpReleaseIoCompletion( TP_IO *io ) +{ + struct threadpool_object *this = impl_from_TP_IO( io ); + + TRACE( "%p\n", io ); + + tp_object_prepare_shutdown( this ); + this->shutdown = TRUE; + tp_object_release( this ); +} + /*********************************************************************** * TpReleasePool (NTDLL.@) */ @@ -2960,6 +3255,36 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata, return STATUS_SUCCESS; } +/*********************************************************************** + * TpStartAsyncIoOperation (NTDLL.@) + */ +void WINAPI TpStartAsyncIoOperation( TP_IO *io ) +{ + struct threadpool_object *this = impl_from_TP_IO( io ); + + TRACE( "%p\n", io ); + + RtlEnterCriticalSection( &this->pool->cs ); + + this->u.io.pending_count++; + + RtlLeaveCriticalSection( &this->pool->cs ); +} + +/*********************************************************************** + * TpWaitForIoCompletion (NTDLL.@) + */ +void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending ) +{ + struct threadpool_object *this = impl_from_TP_IO( io ); + + TRACE( "%p %d\n", io, cancel_pending ); + + if (cancel_pending) + tp_object_cancel( this ); + tp_object_wait( this, FALSE ); +} + /*********************************************************************** * TpWaitForTimer (NTDLL.@) */ @@ -3039,11 +3364,3 @@ NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORM return STATUS_SUCCESS; } - -/*********************************************************************** - * TpStartAsyncIoOperation (NTDLL.@) - */ -void WINAPI TpStartAsyncIoOperation( TP_IO *io ) -{ - FIXME( "%p\n", io ); -} diff --git a/include/winternl.h b/include/winternl.h index e4c611e4af9..ac045c17097 100644 --- a/include/winternl.h +++ b/include/winternl.h @@ -2351,6 +2351,8 @@ typedef struct _SYSTEM_MODULE_INFORMATION typedef LONG (CALLBACK *PRTL_EXCEPTION_FILTER)(PEXCEPTION_POINTERS); +typedef void (CALLBACK *PTP_IO_CALLBACK)(PTP_CALLBACK_INSTANCE,void*,void*,IO_STATUS_BLOCK*,PTP_IO); + /*********************************************************************** * Function declarations */ @@ -3013,6 +3015,7 @@ NTSYSAPI NTSTATUS WINAPI RtlLargeIntegerToChar(const ULONGLONG *,ULONG,ULONG,PC /* Threadpool functions */ NTSYSAPI NTSTATUS WINAPI TpAllocCleanupGroup(TP_CLEANUP_GROUP **); +NTSYSAPI NTSTATUS WINAPI TpAllocIoCompletion(TP_IO **,HANDLE,PTP_IO_CALLBACK,void *,TP_CALLBACK_ENVIRON *); NTSYSAPI NTSTATUS WINAPI TpAllocPool(TP_POOL **,PVOID); NTSYSAPI NTSTATUS WINAPI TpAllocTimer(TP_TIMER **,PTP_TIMER_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *); NTSYSAPI NTSTATUS WINAPI TpAllocWait(TP_WAIT **,PTP_WAIT_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *); @@ -3023,23 +3026,26 @@ NTSYSAPI void WINAPI TpCallbackReleaseMutexOnCompletion(TP_CALLBACK_INSTANC NTSYSAPI void WINAPI TpCallbackReleaseSemaphoreOnCompletion(TP_CALLBACK_INSTANCE *,HANDLE,DWORD); NTSYSAPI void WINAPI TpCallbackSetEventOnCompletion(TP_CALLBACK_INSTANCE *,HANDLE); NTSYSAPI void WINAPI TpCallbackUnloadDllOnCompletion(TP_CALLBACK_INSTANCE *,HMODULE); +NTSYSAPI void WINAPI TpCancelAsyncIoOperation(TP_IO *); NTSYSAPI void WINAPI TpDisassociateCallback(TP_CALLBACK_INSTANCE *); NTSYSAPI BOOL WINAPI TpIsTimerSet(TP_TIMER *); NTSYSAPI void WINAPI TpPostWork(TP_WORK *); NTSYSAPI NTSTATUS WINAPI TpQueryPoolStackInformation(TP_POOL *, TP_POOL_STACK_INFORMATION *stack_info); NTSYSAPI void WINAPI TpReleaseCleanupGroup(TP_CLEANUP_GROUP *); NTSYSAPI void WINAPI TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP *,BOOL,PVOID); +NTSYSAPI void WINAPI TpReleaseIoCompletion(TP_IO *); NTSYSAPI void WINAPI TpReleasePool(TP_POOL *); NTSYSAPI void WINAPI TpReleaseTimer(TP_TIMER *); NTSYSAPI void WINAPI TpReleaseWait(TP_WAIT *); NTSYSAPI void WINAPI TpReleaseWork(TP_WORK *); -NTSYSAPI void WINAPI TpStartAsyncIoOperation(TP_IO *); NTSYSAPI void WINAPI TpSetPoolMaxThreads(TP_POOL *,DWORD); NTSYSAPI BOOL WINAPI TpSetPoolMinThreads(TP_POOL *,DWORD); NTSYSAPI NTSTATUS WINAPI TpSetPoolStackInformation(TP_POOL *, TP_POOL_STACK_INFORMATION *stack_info); NTSYSAPI void WINAPI TpSetTimer(TP_TIMER *, LARGE_INTEGER *,LONG,LONG); NTSYSAPI void WINAPI TpSetWait(TP_WAIT *,HANDLE,LARGE_INTEGER *); NTSYSAPI NTSTATUS WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *); +NTSYSAPI void WINAPI TpStartAsyncIoOperation(TP_IO *); +NTSYSAPI void WINAPI TpWaitForIoCompletion(TP_IO *,BOOL); NTSYSAPI void WINAPI TpWaitForTimer(TP_TIMER *,BOOL); NTSYSAPI void WINAPI TpWaitForWait(TP_WAIT *,BOOL); NTSYSAPI void WINAPI TpWaitForWork(TP_WORK *,BOOL);