From 2a46702761a4348209907179830324ce91984baa Mon Sep 17 00:00:00 2001 From: Alexandre Julliard Date: Tue, 8 Feb 2005 12:55:26 +0000 Subject: [PATCH] Authors: Mike Hearn , Robert Shearman - Rework RPC dispatch layer to be simpler and not get confused by server/client duality. - Make threads shut down at the right time and not access freed memory after apartment destruction. - Rename stub_dispatch_thread to client_dispatch_thread. - Add some more tracing - Check return value of WaitNamedPipe. - Change named pipe timeouts to 0.5s, which should be enough for even the slowest machines. --- dlls/ole32/compobj.c | 4 + dlls/ole32/compobj_private.h | 8 +- dlls/ole32/marshal.c | 10 +- dlls/ole32/rpc.c | 503 +++++++++++++++++++---------------- dlls/ole32/stubmanager.c | 1 + 5 files changed, 294 insertions(+), 232 deletions(-) diff --git a/dlls/ole32/compobj.c b/dlls/ole32/compobj.c index 5bc139626b8..f08c685fe65 100644 --- a/dlls/ole32/compobj.c +++ b/dlls/ole32/compobj.c @@ -277,6 +277,8 @@ APARTMENT* COM_CreateApartment(DWORD model) MTA = apt; } + apt->shutdown_event = CreateEventA(NULL, TRUE, FALSE, NULL); + TRACE("Created apartment on OXID %s\n", wine_dbgstr_longlong(apt->oxid)); list_add_head(&apts, &apt->entry); @@ -338,6 +340,8 @@ DWORD COM_ApartmentRelease(struct apartment *apt) if (apt->filter) IUnknown_Release(apt->filter); DeleteCriticalSection(&apt->cs); + SetEvent(apt->shutdown_event); + CloseHandle(apt->shutdown_event); CloseHandle(apt->thread); HeapFree(GetProcessHeap(), 0, apt); } diff --git a/dlls/ole32/compobj_private.h b/dlls/ole32/compobj_private.h index 69af30a7226..d53d0d63b11 100644 --- a/dlls/ole32/compobj_private.h +++ b/dlls/ole32/compobj_private.h @@ -5,7 +5,7 @@ * Copyright 1999 Sylvain St-Germain * Copyright 2002 Marcus Meissner * Copyright 2003 Ove Kåven, TransGaming Technologies - * Copyright 2004 Mike Hearn, CodeWeavers Inc + * Copyright 2004 Mike Hearn, Rob Shearman, CodeWeavers Inc * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -131,8 +131,10 @@ struct apartment struct list stubmgrs; /* stub managers for exported objects (CS cs) */ BOOL remunk_exported; /* has the IRemUnknown interface for this apartment been created yet? (CS cs) */ - OID oidc; /* object ID counter, starts at 1, zero is invalid OID (CS cs). FIXME: remove me */ - DWORD listenertid; /* id of apartment_listener_thread. FIXME: remove me */ + /* FIXME: These should all be removed long term as they leak information that should be encapsulated */ + OID oidc; /* object ID counter, starts at 1, zero is invalid OID (CS cs) */ + DWORD listenertid; /* id of apartment_listener_thread */ + HANDLE shutdown_event; /* event used to tell the client_dispatch_thread to shut down */ }; /* this is what is stored in TEB->ReservedForOle */ diff --git a/dlls/ole32/marshal.c b/dlls/ole32/marshal.c index 83126b7f46b..ff169a92e53 100644 --- a/dlls/ole32/marshal.c +++ b/dlls/ole32/marshal.c @@ -868,6 +868,8 @@ static HRESULT unmarshal_object(const STDOBJREF *stdobjref, APARTMENT *apt, REFI stdobjref->oxid, stdobjref->oid, chanbuf, &proxy_manager); } + else + TRACE("proxy manager already created, using\n"); if (hr == S_OK) { @@ -961,7 +963,7 @@ StdMarshalImpl_UnmarshalInterface(LPMARSHAL iface, IStream *pStm, REFIID riid, v wine_dbgstr_longlong(stdobjref.oxid)); if (hres == S_OK) - hres = unmarshal_object(&stdobjref, apt, riid, ppv); + hres = unmarshal_object(&stdobjref, apt, riid, ppv); if (hres) WARN("Failed with error 0x%08lx\n", hres); else TRACE("Successfully created proxy %p\n", *ppv); @@ -1382,6 +1384,9 @@ cleanup: if (pMarshalStream && (objref.flags & OBJREF_CUSTOM)) IStream_Release(pMarshalStream); IMarshal_Release(pMarshal); + + TRACE("completed with hr 0x%08lx\n", hr); + return hr; } @@ -1437,6 +1442,9 @@ HRESULT WINAPI CoUnmarshalInterface(IStream *pStream, REFIID riid, LPVOID *ppv) } IMarshal_Release(pMarshal); + + TRACE("completed with hr 0x%lx\n", hr); + return hr; } diff --git a/dlls/ole32/rpc.c b/dlls/ole32/rpc.c index bff286a978d..4c8abffb696 100644 --- a/dlls/ole32/rpc.c +++ b/dlls/ole32/rpc.c @@ -89,11 +89,12 @@ struct rpc LPBYTE Buffer; }; +/* fixme: this should have a lock */ static struct rpc **reqs = NULL; static int nrofreqs = 0; /* This pipe is _thread_ based, each thread which talks to a remote - * apartment (mid) has its own pipe. The same structure is used both + * apartment (oxid) has its own pipe. The same structure is used both * for outgoing and incoming RPCs. */ struct pipe @@ -109,33 +110,37 @@ struct pipe APARTMENT *apt; /* apartment of the marshalling thread for the stub dispatch case */ }; -#define MAX_WINE_PIPES 256 - -static struct pipe pipes[MAX_WINE_PIPES]; -static int nrofpipes = 0; - typedef struct _PipeBuf { IRpcChannelBufferVtbl *lpVtbl; DWORD ref; wine_marshal_id mid; + HANDLE pipe; } PipeBuf; -static HRESULT WINAPI -read_pipe(HANDLE hf, LPVOID ptr, DWORD size) { + + +/* some helper functions */ + +static HRESULT WINAPI read_pipe(HANDLE hf, LPVOID ptr, DWORD size) +{ DWORD res; - if (!ReadFile(hf,ptr,size,&res,NULL)) { - FIXME("Failed to read from %p, le is %ld\n",hf,GetLastError()); - return E_FAIL; + + if (!ReadFile(hf,ptr,size,&res,NULL)) + { + ERR("Failed to read from %p, le is %ld\n",hf,GetLastError()); + return E_FAIL; } - if (res!=size) { - if (!res) - { - WARN("%p disconnected\n", hf); - return RPC_E_DISCONNECTED; - } - FIXME("Read only %ld of %ld bytes from %p.\n",res,size,hf); - return E_FAIL; + + if (res != size) + { + if (!res) + { + WARN("%p disconnected\n", hf); + return RPC_E_DISCONNECTED; + } + ERR("Read only %ld of %ld bytes from %p.\n",res,size,hf); + return E_FAIL; } return S_OK; } @@ -154,86 +159,49 @@ write_pipe(HANDLE hf, LPVOID ptr, DWORD size) { return S_OK; } -static DWORD WINAPI stub_dispatch_thread(LPVOID); - -static HRESULT -PIPE_RegisterPipe(wine_marshal_id *mid, HANDLE hPipe, BOOL startreader) +static HANDLE dupe_handle(HANDLE h) { - /* FIXME: this pipe caching code is commented out because it is breaks the - * tests, causing them hang due to writing to or reading from the wrong pipe. - */ -#if 0 - int i; + HANDLE h2; + + if (!DuplicateHandle(GetCurrentProcess(), h, GetCurrentProcess(), + &h2, 0, FALSE, DUPLICATE_SAME_ACCESS)) + { + ERR("could not duplicate handle: %ld\n", GetLastError()); + return INVALID_HANDLE_VALUE; + } + + return h2; +} - for (i=0;ioxid) + + + +static DWORD WINAPI client_dispatch_thread(LPVOID); + +/* FIXME: this all needs to be made thread safe */ +static HRESULT RPC_GetRequest(struct rpc **req) +{ + static int reqid = 0; + int i; + + /* try to reuse */ + for (i = 0; i < nrofreqs; i++) + { + if (reqs[i]->state == REQSTATE_DONE) + { + TRACE("reusing reqs[%d]\n", i); + + reqs[i]->reqh.reqid = reqid++; + reqs[i]->resph.reqid = reqs[i]->reqh.reqid; + reqs[i]->hPipe = INVALID_HANDLE_VALUE; + reqs[i]->state = REQSTATE_START; + *req = reqs[i]; return S_OK; -#endif - - if (nrofpipes + 1 >= MAX_WINE_PIPES) - { - FIXME("Out of pipes, please increase MAX_WINE_PIPES\n"); - return E_OUTOFMEMORY; + } } - memcpy(&(pipes[nrofpipes].mid),mid,sizeof(*mid)); - pipes[nrofpipes].hPipe = hPipe; - pipes[nrofpipes].apt = COM_CurrentApt(); - assert( pipes[nrofpipes].apt ); - InitializeCriticalSection(&(pipes[nrofpipes].crit)); - nrofpipes++; - if (startreader) { - pipes[nrofpipes-1].hThread = CreateThread(NULL,0,stub_dispatch_thread,(LPVOID)(pipes+(nrofpipes-1)),0,&(pipes[nrofpipes-1].tid)); - } else { - pipes[nrofpipes-1].tid = GetCurrentThreadId(); - } - return S_OK; -} - -static HANDLE -PIPE_FindByMID(wine_marshal_id *mid) { - int i; - for (i=0;ioxid) && - (GetCurrentThreadId()==pipes[i].tid) - ) - return pipes[i].hPipe; - return INVALID_HANDLE_VALUE; -} - -static struct pipe* -PIPE_GetFromIPID(const IPID *ipid) { - int i; - for (i=0; iData2) && - (pipes[i].mid.ipid.Data3 == ipid->Data3) && - (GetCurrentThreadId() == pipes[i].tid)) - return pipes+i; - /* special case for IRemUnknown IPID */ - else if ((pipes[i].mid.oxid == *(OXID *)ipid->Data4) && - (GetCurrentThreadId() == pipes[i].tid)) - return pipes+i; - } - return NULL; -} - -static HRESULT -RPC_GetRequest(struct rpc **req) { - static int reqid = 0xdeadbeef; - int i; - - for (i=0;istate == REQSTATE_DONE) { - reqs[i]->reqh.reqid = reqid++; - reqs[i]->resph.reqid = reqs[i]->reqh.reqid; - reqs[i]->hPipe = INVALID_HANDLE_VALUE; - *req = reqs[i]; - reqs[i]->state = REQSTATE_START; - return S_OK; - } - } - /* create new */ + + TRACE("creating new struct rpc (request)\n"); + if (reqs) reqs = (struct rpc**)HeapReAlloc( GetProcessHeap(), @@ -247,24 +215,21 @@ RPC_GetRequest(struct rpc **req) { HEAP_ZERO_MEMORY, sizeof(struct rpc*) ); - if (!reqs) - return E_OUTOFMEMORY; + + if (!reqs) return E_OUTOFMEMORY; + reqs[nrofreqs] = (struct rpc*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(struct rpc)); reqs[nrofreqs]->reqh.reqid = reqid++; reqs[nrofreqs]->resph.reqid = reqs[nrofreqs]->reqh.reqid; reqs[nrofreqs]->hPipe = INVALID_HANDLE_VALUE; - *req = reqs[nrofreqs]; reqs[nrofreqs]->state = REQSTATE_START; + *req = reqs[nrofreqs]; + nrofreqs++; + return S_OK; } -static void -RPC_FreeRequest(struct rpc *req) { - req->state = REQSTATE_DONE; /* Just reuse slot. */ - return; -} - static HRESULT WINAPI PipeBuf_QueryInterface( LPRPCCHANNELBUFFER iface,REFIID riid,LPVOID *ppv @@ -293,6 +258,7 @@ PipeBuf_Release(LPRPCCHANNELBUFFER iface) { if (ref) return ref; + CloseHandle(This->pipe); HeapFree(GetProcessHeap(),0,This); return 0; } @@ -339,21 +305,16 @@ COM_InvokeAndRpcSend(struct rpc *req) { return S_OK; } -static HRESULT COM_RpcReceive(struct pipe *xpipe); +static HRESULT process_incoming_rpc(HANDLE pipe); -static HRESULT -RPC_QueueRequestAndWait(struct rpc *req) { +static HRESULT RPC_QueueRequestAndWait(struct rpc *req, HANDLE pipe) +{ int i; struct rpc *xreq; HRESULT hres; DWORD reqtype; - struct pipe *xpipe = PIPE_GetFromIPID(&(req->reqh.ipid)); - if (!xpipe) { - FIXME("no pipe found.\n"); - return E_POINTER; - } - req->hPipe = xpipe->hPipe; + req->hPipe = pipe; req->state = REQSTATE_REQ_WAITING_FOR_REPLY; reqtype = REQTYPE_REQUEST; hres = write_pipe(req->hPipe,&reqtype,sizeof(reqtype)); @@ -366,7 +327,7 @@ RPC_QueueRequestAndWait(struct rpc *req) { /* This loop is about allowing re-entrancy. While waiting for the * response to one RPC we may receive a request starting another. */ while (!hres) { - hres = COM_RpcReceive(xpipe); + hres = process_incoming_rpc(pipe); if (hres) break; for (i=0;imid.oxid == COM_CurrentApt()->oxid) { ERR("Need to call directly!\n"); @@ -404,15 +363,20 @@ PipeBuf_SendReceive(LPRPCCHANNELBUFFER iface,RPCOLEMESSAGE* msg,ULONG *status) req->reqh.cbBuffer = msg->cbBuffer; req->reqh.ipid = This->mid.ipid; req->Buffer = msg->Buffer; - hres = RPC_QueueRequestAndWait(req); - if (hres) { - RPC_FreeRequest(req); + TRACE(" -> rpc ->\n"); + hres = RPC_QueueRequestAndWait(req, This->pipe); + TRACE(" <- response <-\n"); + if (hres) + { + req->state = REQSTATE_DONE; return hres; } - msg->cbBuffer = req->resph.cbBuffer; - msg->Buffer = req->Buffer; - *status = req->resph.retval; - RPC_FreeRequest(req); + + msg->cbBuffer = req->resph.cbBuffer; + msg->Buffer = req->Buffer; + *status = req->resph.retval; + req->state = REQSTATE_DONE; + return S_OK; } @@ -450,43 +414,52 @@ static IRpcChannelBufferVtbl pipebufvt = { PipeBuf_IsConnected }; +/* returns a pipebuf for proxies */ HRESULT PIPE_GetNewPipeBuf(wine_marshal_id *mid, IRpcChannelBuffer **pipebuf) { wine_marshal_id ourid; - DWORD res; - HANDLE hPipe; - HRESULT hres; + HANDLE handle; PipeBuf *pbuf; + char pipefn[200]; - hPipe = PIPE_FindByMID(mid); - if (hPipe == INVALID_HANDLE_VALUE) { - char pipefn[200]; - - sprintf(pipefn,OLESTUBMGR"_%08lx%08lx",(DWORD)(mid->oxid >> 32),(DWORD)mid->oxid); - hPipe = CreateFileA(pipefn, GENERIC_READ | GENERIC_WRITE, - 0, NULL, OPEN_EXISTING, 0, 0); - - if (hPipe == INVALID_HANDLE_VALUE) { - FIXME("Could not open named pipe %s, le is %lx\n",pipefn,GetLastError()); - return E_FAIL; + /* connect to the apartment listener thread */ + sprintf(pipefn,OLESTUBMGR"_%08lx%08lx",(DWORD)(mid->oxid >> 32),(DWORD)mid->oxid); + + TRACE("proxy pipe: connecting to apartment listener thread: %s\n", pipefn); + + while (TRUE) + { + BOOL ret = WaitNamedPipeA(pipefn, NMPWAIT_USE_DEFAULT_WAIT); + if (!ret) + { + ERR("Could not open named pipe %s, error %ld\n", pipefn, GetLastError()); + return RPC_E_SERVER_DIED; } - - hres = PIPE_RegisterPipe(mid, hPipe, FALSE); - if (hres) return hres; - - memset(&ourid,0,sizeof(ourid)); - ourid.oxid = COM_CurrentApt()->oxid; - - if (!WriteFile(hPipe,&ourid,sizeof(ourid),&res,NULL)||(res!=sizeof(ourid))) { - ERR("Failed writing startup mid!\n"); - return E_FAIL; + + handle = CreateFileA(pipefn, GENERIC_READ | GENERIC_WRITE, + 0, NULL, OPEN_EXISTING, 0, 0); + + if (handle == INVALID_HANDLE_VALUE) + { + if (GetLastError() == ERROR_PIPE_BUSY) continue; + + ERR("Could not open named pipe %s, error %ld\n", pipefn, GetLastError()); + return RPC_E_SERVER_DIED; } + + break; } + memset(&ourid,0,sizeof(ourid)); + ourid.oxid = COM_CurrentApt()->oxid; + + TRACE("constructing new pipebuf for proxy\n"); + pbuf = (PipeBuf*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PipeBuf)); pbuf->lpVtbl = &pipebufvt; pbuf->ref = 1; memcpy(&(pbuf->mid),mid,sizeof(*mid)); + pbuf->pipe = dupe_handle(handle); *pipebuf = (IRpcChannelBuffer*)pbuf; @@ -653,8 +626,7 @@ create_local_service(REFCLSID rclsid) } /* http://msdn.microsoft.com/library/en-us/dnmsj99/html/com0199.asp, Figure 4 */ -HRESULT -create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv) +HRESULT create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv) { HRESULT hres; HANDLE hPipe; @@ -685,7 +657,7 @@ create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv) return hres; Sleep(1000); } else { - WARN("Could not open named pipe to broker %s, le is %lx\n",pipefn,GetLastError()); + WARN("Connecting to %s, no response yet, retrying: le is %lx\n",pipefn,GetLastError()); Sleep(1000); } continue; @@ -719,58 +691,59 @@ out: } -static void WINAPI -PIPE_StartRequestThread(HANDLE xhPipe) +/* this reads an RPC from the given pipe and places it in the global reqs array */ +static HRESULT process_incoming_rpc(HANDLE pipe) { - wine_marshal_id remoteid; - HRESULT hres; - - hres = read_pipe(xhPipe,&remoteid,sizeof(remoteid)); - if (hres) { - ERR("Failed to read remote mid!\n"); - return; - } - PIPE_RegisterPipe(&remoteid,xhPipe, TRUE); -} - -static HRESULT -COM_RpcReceive(struct pipe *xpipe) { DWORD reqtype; HRESULT hres = S_OK; - HANDLE xhPipe = xpipe->hPipe; - hres = read_pipe(xhPipe,&reqtype,sizeof(reqtype)); - if (hres) goto end; - EnterCriticalSection(&(xpipe->crit)); + hres = read_pipe(pipe,&reqtype,sizeof(reqtype)); + if (hres) return hres; /* only received by servers */ - if (reqtype == REQTYPE_REQUEST) { + if (reqtype == REQTYPE_REQUEST) + { struct rpc *xreq; - + RPC_GetRequest(&xreq); - xreq->hPipe = xhPipe; - hres = read_pipe(xhPipe,&(xreq->reqh),sizeof(xreq->reqh)); - if (hres) goto end; + + xreq->hPipe = pipe; + hres = read_pipe(pipe,&(xreq->reqh),sizeof(xreq->reqh)); + if (hres) + { + xreq->state = REQSTATE_DONE; + return hres; + } xreq->resph.reqid = xreq->reqh.reqid; xreq->Buffer = HeapAlloc(GetProcessHeap(),0, xreq->reqh.cbBuffer); - hres = read_pipe(xhPipe,xreq->Buffer,xreq->reqh.cbBuffer); + hres = read_pipe(pipe,xreq->Buffer,xreq->reqh.cbBuffer); if (hres) goto end; + + TRACE("received RPC for IPID %s\n", debugstr_guid(&xreq->reqh.ipid)); xreq->state = REQSTATE_REQ_GOT; goto end; - } else if (reqtype == REQTYPE_RESPONSE) { + } + else if (reqtype == REQTYPE_RESPONSE) + { struct response_header resph; int i; - hres = read_pipe(xhPipe,&resph,sizeof(resph)); + hres = read_pipe(pipe,&resph,sizeof(resph)); if (hres) goto end; + + TRACE("read RPC response\n"); - for (i=nrofreqs;i--;) { + for (i = nrofreqs; i--;) + { struct rpc *xreq = reqs[i]; + if (xreq->state != REQSTATE_REQ_WAITING_FOR_REPLY) continue; - if (xreq->reqh.reqid == resph.reqid) { + + if (xreq->reqh.reqid == resph.reqid) + { memcpy(&(xreq->resph),&resph,sizeof(resph)); if (xreq->Buffer) @@ -778,56 +751,82 @@ COM_RpcReceive(struct pipe *xpipe) { else xreq->Buffer = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,xreq->resph.cbBuffer); - hres = read_pipe(xhPipe,xreq->Buffer,xreq->resph.cbBuffer); + hres = read_pipe(pipe,xreq->Buffer,xreq->resph.cbBuffer); if (hres) goto end; + + TRACE("received response for reqid 0x%lx\n", xreq->reqh.reqid); + xreq->state = REQSTATE_RESP_GOT; - /*PulseEvent(hRpcChanged);*/ goto end; } } - ERR("Did not find request for id %lx\n",resph.reqid); - hres = S_OK; + ERR("protocol error: did not find request for id %lx\n",resph.reqid); + hres = E_FAIL; goto end; } - ERR("Unknown reqtype %ld\n",reqtype); + ERR("protocol error: unknown reqtype %ld\n",reqtype); hres = E_FAIL; end: - LeaveCriticalSection(&(xpipe->crit)); return hres; } -/* This thread listens on the given pipe for requests to a particular stub manager */ -static DWORD WINAPI stub_dispatch_thread(LPVOID param) +struct stub_dispatch_params { - struct pipe *xpipe = (struct pipe*)param; - HANDLE xhPipe = xpipe->hPipe; + struct apartment *apt; + HANDLE pipe; +}; + +/* This thread listens on the given pipe for requests to any stub manager */ +static DWORD WINAPI client_dispatch_thread(LPVOID param) +{ + HANDLE pipe = ((struct stub_dispatch_params *)param)->pipe; + struct apartment *apt = ((struct stub_dispatch_params *)param)->apt; HRESULT hres = S_OK; - - TRACE("starting for apartment OXID %08lx%08lx\n", (DWORD)(xpipe->mid.oxid >> 32), (DWORD)(xpipe->mid.oxid)); - - /* join marshalling apartment. fixme: this stuff is all very wrong, threading needs to work like native */ - COM_CurrentInfo()->apt = xpipe->apt; + HANDLE shutdown_event = dupe_handle(apt->shutdown_event); - while (!hres) { - int i; - - hres = COM_RpcReceive(xpipe); - if (hres) break; + HeapFree(GetProcessHeap(), 0, param); + + /* join marshalling apartment. fixme: this stuff is all very wrong, threading needs to work like native */ + COM_CurrentInfo()->apt = apt; + + while (TRUE) + { + int i; - for (i=nrofreqs;i--;) { - struct rpc *xreq = reqs[i]; - if ((xreq->state == REQSTATE_REQ_GOT) && (xreq->hPipe == xhPipe)) { - hres = COM_InvokeAndRpcSend(xreq); - if (!hres) break; - } - } + TRACE("waiting for RPC on OXID: %08lx%08lx\n", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid)); + + /* read a new request into the global array, block if no requests have been sent */ + hres = process_incoming_rpc(pipe); + if (hres) break; + + /* do you expect me to talk? */ + if (WaitForSingleObject(shutdown_event, 0) == WAIT_OBJECT_0) + { + /* no mr bond, i expect you to die! bwahaha */ + CloseHandle(shutdown_event); + break; + } + + TRACE("received RPC on OXID: %08lx%08lx\n", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid)); + + /* now scan the array looking for the RPC just loaded */ + for (i=nrofreqs;i--;) + { + struct rpc *req = reqs[i]; + + if ((req->state == REQSTATE_REQ_GOT) && (req->hPipe == pipe)) + { + hres = COM_InvokeAndRpcSend(req); + if (!hres) break; + } + } } - /* fixme: this thread never quits naturally */ - WARN("exiting with hres %lx\n",hres); - CloseHandle(xhPipe); + TRACE("exiting with hres %lx\n",hres); + DisconnectNamedPipe(pipe); + CloseHandle(pipe); return 0; } @@ -846,29 +845,40 @@ struct apartment_listener_params */ static DWORD WINAPI apartment_listener_thread(LPVOID p) { - char pipefn[200]; - HANDLE listenPipe; + char pipefn[200]; + HANDLE listenPipe, thread_handle; + OVERLAPPED overlapped; + HANDLE wait[2]; + struct apartment_listener_params * params = (struct apartment_listener_params *)p; - APARTMENT *apt = params->apt; + struct apartment *apt = params->apt; HANDLE event = params->event; + HANDLE apt_shutdown_event = dupe_handle(apt->shutdown_event); + OXID this_oxid = apt->oxid; /* copy here so we can print it when we shut down */ HeapFree(GetProcessHeap(), 0, params); + overlapped.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL); + /* we must join the marshalling threads apartment. we already have a ref here */ COM_CurrentInfo()->apt = apt; sprintf(pipefn,OLESTUBMGR"_%08lx%08lx", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid)); TRACE("Apartment listener thread starting on (%s)\n",pipefn); - while (1) { + while (TRUE) + { + struct stub_dispatch_params *params; + DWORD res; + listenPipe = CreateNamedPipeA( pipefn, - PIPE_ACCESS_DUPLEX, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE|PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, 4096, 4096, - NMPWAIT_USE_DEFAULT_WAIT, + 500 /* 0.5 seconds */, NULL ); @@ -881,19 +891,53 @@ static DWORD WINAPI apartment_listener_thread(LPVOID p) if (listenPipe == INVALID_HANDLE_VALUE) { FIXME("pipe creation failed for %s, error %ld\n",pipefn,GetLastError()); - return 1; /* permanent failure, so quit stubmgr thread */ + break; /* permanent failure, so quit stubmgr thread */ } + TRACE("waiting for a client ...\n"); + /* an already connected pipe is not an error */ - if (!ConnectNamedPipe(listenPipe,NULL) && - (GetLastError() != ERROR_PIPE_CONNECTED)) { - ERR("Failure during ConnectNamedPipe %ld!\n",GetLastError()); - CloseHandle(listenPipe); - continue; + if (!ConnectNamedPipe(listenPipe, &overlapped)) + { + DWORD le = GetLastError(); + + if ((le != ERROR_IO_PENDING) && (le != ERROR_PIPE_CONNECTED)) + { + ERR("Failure during ConnectNamedPipe %ld!\n",GetLastError()); + CloseHandle(listenPipe); + continue; + } } - PIPE_StartRequestThread(listenPipe); + /* wait for action */ + wait[0] = apt_shutdown_event; + wait[1] = overlapped.hEvent; + res = WaitForMultipleObjectsEx(2, wait, FALSE, INFINITE, FALSE); + if (res == WAIT_OBJECT_0) break; + + ResetEvent(overlapped.hEvent); + + /* start the stub dispatch thread for this connection */ + TRACE("starting stub dispatch thread for OXID %08lx%08lx\n", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid)); + params = HeapAlloc(GetProcessHeap(), 0, sizeof(struct stub_dispatch_params)); + if (!params) + { + ERR("out of memory, dropping this client\n"); + CloseHandle(listenPipe); + continue; + } + params->apt = apt; + params->pipe = listenPipe; + thread_handle = CreateThread(NULL, 0, &client_dispatch_thread, params, 0, NULL); + CloseHandle(thread_handle); } + + TRACE("shutting down: %s\n", wine_dbgstr_longlong(this_oxid)); + + DisconnectNamedPipe(listenPipe); + CloseHandle(listenPipe); + CloseHandle(overlapped.hEvent); + CloseHandle(apt_shutdown_event); return 0; } @@ -958,12 +1002,15 @@ static DWORD WINAPI local_server_thread(LPVOID param) HeapFree(GetProcessHeap(), 0, lsp); hPipe = CreateNamedPipeA( pipefn, PIPE_ACCESS_DUPLEX, - PIPE_TYPE_BYTE|PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, - 4096, 4096, NMPWAIT_USE_DEFAULT_WAIT, NULL ); - if (hPipe == INVALID_HANDLE_VALUE) { + PIPE_TYPE_BYTE|PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, + 4096, 4096, 500 /* 0.5 second timeout */, NULL ); + + if (hPipe == INVALID_HANDLE_VALUE) + { FIXME("pipe creation failed for %s, le is %ld\n",pipefn,GetLastError()); return 1; } + while (1) { if (!ConnectNamedPipe(hPipe,NULL)) { ERR("Failure during ConnectNamedPipe %ld, ABORT!\n",GetLastError()); diff --git a/dlls/ole32/stubmanager.c b/dlls/ole32/stubmanager.c index bdd8c085b61..6b10ad0fe01 100644 --- a/dlls/ole32/stubmanager.c +++ b/dlls/ole32/stubmanager.c @@ -94,6 +94,7 @@ static void stub_manager_delete(struct stub_manager *m) list_remove(&m->entry); + /* release every ifstub */ while ((cursor = list_head(&m->ifstubs))) { struct ifstub *ifstub = LIST_ENTRY(cursor, struct ifstub, entry);