ntdll: Implement threadpool timer queues.

This commit is contained in:
Sebastian Lackner 2015-07-03 01:58:29 +02:00 committed by Alexandre Julliard
parent bd7cb07358
commit cb2aae1af8
2 changed files with 321 additions and 3 deletions

View file

@ -981,6 +981,7 @@
@ stdcall TpCallbackSetEventOnCompletion(ptr long)
@ stdcall TpCallbackUnloadDllOnCompletion(ptr ptr)
@ stdcall TpDisassociateCallback(ptr)
@ stdcall TpIsTimerSet(ptr)
@ stdcall TpPostWork(ptr)
@ stdcall TpReleaseCleanupGroup(ptr)
@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
@ -989,7 +990,9 @@
@ stdcall TpReleaseWork(ptr)
@ stdcall TpSetPoolMaxThreads(ptr long)
@ stdcall TpSetPoolMinThreads(ptr long)
@ stdcall TpSetTimer(ptr ptr long long)
@ stdcall TpSimpleTryPost(ptr ptr ptr)
@ stdcall TpWaitForTimer(ptr long)
@ stdcall TpWaitForWork(ptr long)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
@ stdcall WinSqmIsOptedIn()

View file

@ -77,14 +77,14 @@ static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
{
0, 0, &old_threadpool.threadpool_cs,
{ &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
};
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
{
0, 0, &old_threadpool.threadpool_compl_cs,
{ &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
};
struct work_item
@ -200,6 +200,14 @@ struct threadpool_object
struct
{
PTP_TIMER_CALLBACK callback;
/* information about the timer, locked via timerqueue.cs */
BOOL timer_initialized;
BOOL timer_pending;
struct list timer_entry;
BOOL timer_set;
ULONGLONG timeout;
LONG period;
LONG window_length;
} timer;
} u;
};
@ -232,6 +240,33 @@ struct threadpool_group
struct list members;
};
/* global timerqueue object */
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
static struct
{
CRITICAL_SECTION cs;
LONG objcount;
BOOL thread_running;
struct list pending_timers;
RTL_CONDITION_VARIABLE update_event;
}
timerqueue =
{
{ &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
0, /* objcount */
FALSE, /* thread_running */
LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
RTL_CONDITION_VARIABLE_INIT /* update_event */
};
static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
{
0, 0, &timerqueue.cs,
{ &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
};
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
@ -1185,6 +1220,171 @@ NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
return status;
}
/***********************************************************************
* timerqueue_thread_proc (internal)
*/
static void CALLBACK timerqueue_thread_proc( void *param )
{
ULONGLONG timeout_lower, timeout_upper, new_timeout;
struct threadpool_object *other_timer;
LARGE_INTEGER now, timeout;
struct list *ptr;
TRACE( "starting timer queue thread\n" );
RtlEnterCriticalSection( &timerqueue.cs );
for (;;)
{
NtQuerySystemTime( &now );
/* Check for expired timers. */
while ((ptr = list_head( &timerqueue.pending_timers )))
{
struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
assert( timer->type == TP_OBJECT_TYPE_TIMER );
assert( timer->u.timer.timer_pending );
if (timer->u.timer.timeout > now.QuadPart)
break;
/* Queue a new callback in one of the worker threads. */
list_remove( &timer->u.timer.timer_entry );
timer->u.timer.timer_pending = FALSE;
tp_object_submit( timer );
/* Insert the timer back into the queue, except its marked for shutdown. */
if (timer->u.timer.period && !timer->shutdown)
{
timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
if (timer->u.timer.timeout <= now.QuadPart)
timer->u.timer.timeout = now.QuadPart + 1;
LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
struct threadpool_object, u.timer.timer_entry )
{
assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
if (timer->u.timer.timeout < other_timer->u.timer.timeout)
break;
}
list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
timer->u.timer.timer_pending = TRUE;
}
}
timeout_lower = TIMEOUT_INFINITE;
timeout_upper = TIMEOUT_INFINITE;
/* Determine next timeout and use the window length to optimize wakeup times. */
LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
struct threadpool_object, u.timer.timer_entry )
{
assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
if (other_timer->u.timer.timeout >= timeout_upper)
break;
timeout_lower = other_timer->u.timer.timeout;
new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
if (new_timeout < timeout_upper)
timeout_upper = new_timeout;
}
/* Wait for timer update events or until the next timer expires. */
if (timerqueue.objcount)
{
timeout.QuadPart = timeout_lower;
RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
continue;
}
/* All timers have been destroyed, if no new timers are created
* within some amount of time, then we can shutdown this thread. */
timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
&timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
{
break;
}
}
timerqueue.thread_running = FALSE;
RtlLeaveCriticalSection( &timerqueue.cs );
TRACE( "terminating timer queue thread\n" );
}
/***********************************************************************
* tp_timerqueue_lock (internal)
*
* Acquires a lock on the global timerqueue. When the lock is acquired
* successfully, it is guaranteed that the timer thread is running.
*/
static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
{
NTSTATUS status = STATUS_SUCCESS;
assert( timer->type == TP_OBJECT_TYPE_TIMER );
timer->u.timer.timer_initialized = FALSE;
timer->u.timer.timer_pending = FALSE;
timer->u.timer.timer_set = FALSE;
timer->u.timer.timeout = 0;
timer->u.timer.period = 0;
timer->u.timer.window_length = 0;
RtlEnterCriticalSection( &timerqueue.cs );
/* Make sure that the timerqueue thread is running. */
if (!timerqueue.thread_running)
{
HANDLE thread;
status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
timerqueue_thread_proc, NULL, &thread, NULL );
if (status == STATUS_SUCCESS)
{
timerqueue.thread_running = TRUE;
NtClose( thread );
}
}
if (status == STATUS_SUCCESS)
{
timer->u.timer.timer_initialized = TRUE;
timerqueue.objcount++;
}
RtlLeaveCriticalSection( &timerqueue.cs );
return status;
}
/***********************************************************************
* tp_timerqueue_unlock (internal)
*
* Releases a lock on the global timerqueue.
*/
static void tp_timerqueue_unlock( struct threadpool_object *timer )
{
assert( timer->type == TP_OBJECT_TYPE_TIMER );
RtlEnterCriticalSection( &timerqueue.cs );
if (timer->u.timer.timer_initialized)
{
/* If timer was pending, remove it. */
if (timer->u.timer.timer_pending)
{
list_remove( &timer->u.timer.timer_entry );
timer->u.timer.timer_pending = FALSE;
}
/* If the last timer object was destroyed, then wake up the thread. */
if (!--timerqueue.objcount)
{
assert( list_empty( &timerqueue.pending_timers ) );
RtlWakeAllConditionVariable( &timerqueue.update_event );
}
timer->u.timer.timer_initialized = FALSE;
}
RtlLeaveCriticalSection( &timerqueue.cs );
}
/***********************************************************************
* tp_threadpool_alloc (internal)
*
@ -1583,6 +1783,9 @@ static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
*/
static void tp_object_shutdown( struct threadpool_object *object )
{
if (object->type == TP_OBJECT_TYPE_TIMER)
tp_timerqueue_unlock( object );
object->shutdown = TRUE;
}
@ -1785,7 +1988,6 @@ static void CALLBACK threadpool_worker_proc( void *param )
tp_threadpool_release( pool );
}
/***********************************************************************
* TpAllocCleanupGroup (NTDLL.@)
*/
@ -1834,6 +2036,15 @@ NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID
object->type = TP_OBJECT_TYPE_TIMER;
object->u.timer.callback = callback;
status = tp_timerqueue_lock( object );
if (status)
{
tp_threadpool_unlock( pool );
RtlFreeHeap( GetProcessHeap(), 0, object );
return status;
}
tp_object_initialize( object, pool, userdata, environment );
*out = (TP_TIMER *)object;
@ -2020,6 +2231,18 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
this->associated = FALSE;
}
/***********************************************************************
* TpIsTimerSet (NTDLL.@)
*/
BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
{
struct threadpool_object *this = impl_from_TP_TIMER( timer );
TRACE( "%p\n", timer );
return this->u.timer.timer_set;
}
/***********************************************************************
* TpPostWork (NTDLL.@)
*/
@ -2195,6 +2418,84 @@ BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
return !status;
}
/***********************************************************************
* TpSetTimer (NTDLL.@)
*/
VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
{
struct threadpool_object *this = impl_from_TP_TIMER( timer );
struct threadpool_object *other_timer;
BOOL submit_timer = FALSE;
ULONGLONG timestamp;
TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
RtlEnterCriticalSection( &timerqueue.cs );
assert( this->u.timer.timer_initialized );
this->u.timer.timer_set = timeout != NULL;
/* Convert relative timeout to absolute timestamp and handle a timeout
* of zero, which means that the timer is submitted immediately. */
if (timeout)
{
timestamp = timeout->QuadPart;
if ((LONGLONG)timestamp < 0)
{
LARGE_INTEGER now;
NtQuerySystemTime( &now );
timestamp = now.QuadPart - timestamp;
}
else if (!timestamp)
{
if (!period)
timeout = NULL;
else
{
LARGE_INTEGER now;
NtQuerySystemTime( &now );
timestamp = now.QuadPart + (ULONGLONG)period * 10000;
}
submit_timer = TRUE;
}
}
/* First remove existing timeout. */
if (this->u.timer.timer_pending)
{
list_remove( &this->u.timer.timer_entry );
this->u.timer.timer_pending = FALSE;
}
/* If the timer was enabled, then add it back to the queue. */
if (timeout)
{
this->u.timer.timeout = timestamp;
this->u.timer.period = period;
this->u.timer.window_length = window_length;
LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
struct threadpool_object, u.timer.timer_entry )
{
assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
if (this->u.timer.timeout < other_timer->u.timer.timeout)
break;
}
list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
/* Wake up the timer thread when the timeout has to be updated. */
if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
RtlWakeAllConditionVariable( &timerqueue.update_event );
this->u.timer.timer_pending = TRUE;
}
RtlLeaveCriticalSection( &timerqueue.cs );
if (submit_timer)
tp_object_submit( this );
}
/***********************************************************************
* TpSimpleTryPost (NTDLL.@)
*/
@ -2225,6 +2526,20 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
return STATUS_SUCCESS;
}
/***********************************************************************
* TpWaitForTimer (NTDLL.@)
*/
VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
{
struct threadpool_object *this = impl_from_TP_TIMER( timer );
TRACE( "%p %d\n", timer, cancel_pending );
if (cancel_pending)
tp_object_cancel( this, FALSE, NULL );
tp_object_wait( this, FALSE );
}
/***********************************************************************
* TpWaitForWork (NTDLL.@)
*/