git/compat/simple-ipc/ipc-unix-socket.c
Johannes Schindelin 974ef7ced2 simple-ipc: work around issues with Cygwin's Unix socket emulation
Cygwin emulates Unix sockets by writing files with custom contents and
then marking them as system files.

The tricky problem is that while the file is written and its `system`
bit is set, it is still identified as a file. This caused test failures
when Git is too fast looking for the Unix sockets and then complains
that there is a plain file in the way.

Let's work around this by adding a delayed retry loop, specifically for
Cygwin.

Signed-off-by: Johannes Schindelin <johannes.schindelin@gmx.de>
Tested-by: Ramsay Jones <ramsay@ramsayjones.plus.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
2021-11-10 09:12:19 -08:00

1029 lines
26 KiB
C

#include "cache.h"
#include "simple-ipc.h"
#include "strbuf.h"
#include "pkt-line.h"
#include "thread-utils.h"
#include "unix-socket.h"
#include "unix-stream-server.h"
#ifndef SUPPORTS_SIMPLE_IPC
/*
* This source file should only be compiled when Simple IPC is supported.
* See the top-level Makefile.
*/
#error SUPPORTS_SIMPLE_IPC not defined
#endif
enum ipc_active_state ipc_get_active_state(const char *path)
{
enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
struct ipc_client_connect_options options
= IPC_CLIENT_CONNECT_OPTIONS_INIT;
struct stat st;
struct ipc_client_connection *connection_test = NULL;
options.wait_if_busy = 0;
options.wait_if_not_found = 0;
if (lstat(path, &st) == -1) {
switch (errno) {
case ENOENT:
case ENOTDIR:
return IPC_STATE__NOT_LISTENING;
default:
return IPC_STATE__INVALID_PATH;
}
}
#ifdef __CYGWIN__
/*
* Cygwin emulates Unix sockets by writing special-crafted files whose
* `system` bit is set.
*
* If we are too fast, Cygwin might still be in the process of marking
* the underlying file as a system file. Until then, we will not see a
* Unix socket here, but a plain file instead. Just in case that this
* is happening, wait a little and try again.
*/
{
static const int delay[] = { 1, 10, 20, 40, -1 };
int i;
for (i = 0; S_ISREG(st.st_mode) && delay[i] > 0; i++) {
sleep_millisec(delay[i]);
if (lstat(path, &st) == -1)
return IPC_STATE__INVALID_PATH;
}
}
#endif
/* also complain if a plain file is in the way */
if ((st.st_mode & S_IFMT) != S_IFSOCK)
return IPC_STATE__INVALID_PATH;
/*
* Just because the filesystem has a S_IFSOCK type inode
* at `path`, doesn't mean it that there is a server listening.
* Ping it to be sure.
*/
state = ipc_client_try_connect(path, &options, &connection_test);
ipc_client_close_connection(connection_test);
return state;
}
/*
* Retry frequency when trying to connect to a server.
*
* This value should be short enough that we don't seriously delay our
* caller, but not fast enough that our spinning puts pressure on the
* system.
*/
#define WAIT_STEP_MS (50)
/*
* Try to connect to the server. If the server is just starting up or
* is very busy, we may not get a connection the first time.
*/
static enum ipc_active_state connect_to_server(
const char *path,
int timeout_ms,
const struct ipc_client_connect_options *options,
int *pfd)
{
int k;
*pfd = -1;
for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
int fd = unix_stream_connect(path, options->uds_disallow_chdir);
if (fd != -1) {
*pfd = fd;
return IPC_STATE__LISTENING;
}
if (errno == ENOENT) {
if (!options->wait_if_not_found)
return IPC_STATE__PATH_NOT_FOUND;
goto sleep_and_try_again;
}
if (errno == ETIMEDOUT) {
if (!options->wait_if_busy)
return IPC_STATE__NOT_LISTENING;
goto sleep_and_try_again;
}
if (errno == ECONNREFUSED) {
if (!options->wait_if_busy)
return IPC_STATE__NOT_LISTENING;
goto sleep_and_try_again;
}
return IPC_STATE__OTHER_ERROR;
sleep_and_try_again:
sleep_millisec(WAIT_STEP_MS);
}
return IPC_STATE__NOT_LISTENING;
}
/*
* The total amount of time that we are willing to wait when trying to
* connect to a server.
*
* When the server is first started, it might take a little while for
* it to become ready to service requests. Likewise, the server may
* be very (temporarily) busy and not respond to our connections.
*
* We should gracefully and silently handle those conditions and try
* again for a reasonable time period.
*
* The value chosen here should be long enough for the server
* to reliably heal from the above conditions.
*/
#define MY_CONNECTION_TIMEOUT_MS (1000)
enum ipc_active_state ipc_client_try_connect(
const char *path,
const struct ipc_client_connect_options *options,
struct ipc_client_connection **p_connection)
{
enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
int fd = -1;
*p_connection = NULL;
trace2_region_enter("ipc-client", "try-connect", NULL);
trace2_data_string("ipc-client", NULL, "try-connect/path", path);
state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
options, &fd);
trace2_data_intmax("ipc-client", NULL, "try-connect/state",
(intmax_t)state);
trace2_region_leave("ipc-client", "try-connect", NULL);
if (state == IPC_STATE__LISTENING) {
(*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
(*p_connection)->fd = fd;
}
return state;
}
void ipc_client_close_connection(struct ipc_client_connection *connection)
{
if (!connection)
return;
if (connection->fd != -1)
close(connection->fd);
free(connection);
}
int ipc_client_send_command_to_connection(
struct ipc_client_connection *connection,
const char *message, size_t message_len,
struct strbuf *answer)
{
int ret = 0;
strbuf_setlen(answer, 0);
trace2_region_enter("ipc-client", "send-command", NULL);
if (write_packetized_from_buf_no_flush(message, message_len,
connection->fd) < 0 ||
packet_flush_gently(connection->fd) < 0) {
ret = error(_("could not send IPC command"));
goto done;
}
if (read_packetized_to_strbuf(
connection->fd, answer,
PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
ret = error(_("could not read IPC response"));
goto done;
}
done:
trace2_region_leave("ipc-client", "send-command", NULL);
return ret;
}
int ipc_client_send_command(const char *path,
const struct ipc_client_connect_options *options,
const char *message, size_t message_len,
struct strbuf *answer)
{
int ret = -1;
enum ipc_active_state state;
struct ipc_client_connection *connection = NULL;
state = ipc_client_try_connect(path, options, &connection);
if (state != IPC_STATE__LISTENING)
return ret;
ret = ipc_client_send_command_to_connection(connection,
message, message_len,
answer);
ipc_client_close_connection(connection);
return ret;
}
static int set_socket_blocking_flag(int fd, int make_nonblocking)
{
int flags;
flags = fcntl(fd, F_GETFL, NULL);
if (flags < 0)
return -1;
if (make_nonblocking)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
return fcntl(fd, F_SETFL, flags);
}
/*
* Magic numbers used to annotate callback instance data.
* These are used to help guard against accidentally passing the
* wrong instance data across multiple levels of callbacks (which
* is easy to do if there are `void*` arguments).
*/
enum magic {
MAGIC_SERVER_REPLY_DATA,
MAGIC_WORKER_THREAD_DATA,
MAGIC_ACCEPT_THREAD_DATA,
MAGIC_SERVER_DATA,
};
struct ipc_server_reply_data {
enum magic magic;
int fd;
struct ipc_worker_thread_data *worker_thread_data;
};
struct ipc_worker_thread_data {
enum magic magic;
struct ipc_worker_thread_data *next_thread;
struct ipc_server_data *server_data;
pthread_t pthread_id;
};
struct ipc_accept_thread_data {
enum magic magic;
struct ipc_server_data *server_data;
struct unix_ss_socket *server_socket;
int fd_send_shutdown;
int fd_wait_shutdown;
pthread_t pthread_id;
};
/*
* With unix-sockets, the conceptual "ipc-server" is implemented as a single
* controller "accept-thread" thread and a pool of "worker-thread" threads.
* The former does the usual `accept()` loop and dispatches connections
* to an idle worker thread. The worker threads wait in an idle loop for
* a new connection, communicate with the client and relay data to/from
* the `application_cb` and then wait for another connection from the
* server thread. This avoids the overhead of constantly creating and
* destroying threads.
*/
struct ipc_server_data {
enum magic magic;
ipc_server_application_cb *application_cb;
void *application_data;
struct strbuf buf_path;
struct ipc_accept_thread_data *accept_thread;
struct ipc_worker_thread_data *worker_thread_list;
pthread_mutex_t work_available_mutex;
pthread_cond_t work_available_cond;
/*
* Accepted but not yet processed client connections are kept
* in a circular buffer FIFO. The queue is empty when the
* positions are equal.
*/
int *fifo_fds;
int queue_size;
int back_pos;
int front_pos;
int shutdown_requested;
int is_stopped;
};
/*
* Remove and return the oldest queued connection.
*
* Returns -1 if empty.
*/
static int fifo_dequeue(struct ipc_server_data *server_data)
{
/* ASSERT holding mutex */
int fd;
if (server_data->back_pos == server_data->front_pos)
return -1;
fd = server_data->fifo_fds[server_data->front_pos];
server_data->fifo_fds[server_data->front_pos] = -1;
server_data->front_pos++;
if (server_data->front_pos == server_data->queue_size)
server_data->front_pos = 0;
return fd;
}
/*
* Push a new fd onto the back of the queue.
*
* Drop it and return -1 if queue is already full.
*/
static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
{
/* ASSERT holding mutex */
int next_back_pos;
next_back_pos = server_data->back_pos + 1;
if (next_back_pos == server_data->queue_size)
next_back_pos = 0;
if (next_back_pos == server_data->front_pos) {
/* Queue is full. Just drop it. */
close(fd);
return -1;
}
server_data->fifo_fds[server_data->back_pos] = fd;
server_data->back_pos = next_back_pos;
return fd;
}
/*
* Wait for a connection to be queued to the FIFO and return it.
*
* Returns -1 if someone has already requested a shutdown.
*/
static int worker_thread__wait_for_connection(
struct ipc_worker_thread_data *worker_thread_data)
{
/* ASSERT NOT holding mutex */
struct ipc_server_data *server_data = worker_thread_data->server_data;
int fd = -1;
pthread_mutex_lock(&server_data->work_available_mutex);
for (;;) {
if (server_data->shutdown_requested)
break;
fd = fifo_dequeue(server_data);
if (fd >= 0)
break;
pthread_cond_wait(&server_data->work_available_cond,
&server_data->work_available_mutex);
}
pthread_mutex_unlock(&server_data->work_available_mutex);
return fd;
}
/*
* Forward declare our reply callback function so that any compiler
* errors are reported when we actually define the function (in addition
* to any errors reported when we try to pass this callback function as
* a parameter in a function call). The former are easier to understand.
*/
static ipc_server_reply_cb do_io_reply_callback;
/*
* Relay application's response message to the client process.
* (We do not flush at this point because we allow the caller
* to chunk data to the client thru us.)
*/
static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
const char *response, size_t response_len)
{
if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
BUG("reply_cb called with wrong instance data");
return write_packetized_from_buf_no_flush(response, response_len,
reply_data->fd);
}
/* A randomly chosen value. */
#define MY_WAIT_POLL_TIMEOUT_MS (10)
/*
* If the client hangs up without sending any data on the wire, just
* quietly close the socket and ignore this client.
*
* This worker thread is committed to reading the IPC request data
* from the client at the other end of this fd. Wait here for the
* client to actually put something on the wire -- because if the
* client just does a ping (connect and hangup without sending any
* data), our use of the pkt-line read routines will spew an error
* message.
*
* Return -1 if the client hung up.
* Return 0 if data (possibly incomplete) is ready.
*/
static int worker_thread__wait_for_io_start(
struct ipc_worker_thread_data *worker_thread_data,
int fd)
{
struct ipc_server_data *server_data = worker_thread_data->server_data;
struct pollfd pollfd[1];
int result;
for (;;) {
pollfd[0].fd = fd;
pollfd[0].events = POLLIN;
result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
if (result < 0) {
if (errno == EINTR)
continue;
goto cleanup;
}
if (result == 0) {
/* a timeout */
int in_shutdown;
pthread_mutex_lock(&server_data->work_available_mutex);
in_shutdown = server_data->shutdown_requested;
pthread_mutex_unlock(&server_data->work_available_mutex);
/*
* If a shutdown is already in progress and this
* client has not started talking yet, just drop it.
*/
if (in_shutdown)
goto cleanup;
continue;
}
if (pollfd[0].revents & POLLHUP)
goto cleanup;
if (pollfd[0].revents & POLLIN)
return 0;
goto cleanup;
}
cleanup:
close(fd);
return -1;
}
/*
* Receive the request/command from the client and pass it to the
* registered request-callback. The request-callback will compose
* a response and call our reply-callback to send it to the client.
*/
static int worker_thread__do_io(
struct ipc_worker_thread_data *worker_thread_data,
int fd)
{
/* ASSERT NOT holding lock */
struct strbuf buf = STRBUF_INIT;
struct ipc_server_reply_data reply_data;
int ret = 0;
reply_data.magic = MAGIC_SERVER_REPLY_DATA;
reply_data.worker_thread_data = worker_thread_data;
reply_data.fd = fd;
ret = read_packetized_to_strbuf(
reply_data.fd, &buf,
PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
if (ret >= 0) {
ret = worker_thread_data->server_data->application_cb(
worker_thread_data->server_data->application_data,
buf.buf, buf.len, do_io_reply_callback, &reply_data);
packet_flush_gently(reply_data.fd);
}
else {
/*
* The client probably disconnected/shutdown before it
* could send a well-formed message. Ignore it.
*/
}
strbuf_release(&buf);
close(reply_data.fd);
return ret;
}
/*
* Block SIGPIPE on the current thread (so that we get EPIPE from
* write() rather than an actual signal).
*
* Note that using sigchain_push() and _pop() to control SIGPIPE
* around our IO calls is not thread safe:
* [] It uses a global stack of handler frames.
* [] It uses ALLOC_GROW() to resize it.
* [] Finally, according to the `signal(2)` man-page:
* "The effects of `signal()` in a multithreaded process are unspecified."
*/
static void thread_block_sigpipe(sigset_t *old_set)
{
sigset_t new_set;
sigemptyset(&new_set);
sigaddset(&new_set, SIGPIPE);
sigemptyset(old_set);
pthread_sigmask(SIG_BLOCK, &new_set, old_set);
}
/*
* Thread proc for an IPC worker thread. It handles a series of
* connections from clients. It pulls the next fd from the queue
* processes it, and then waits for the next client.
*
* Block SIGPIPE in this worker thread for the life of the thread.
* This avoids stray (and sometimes delayed) SIGPIPE signals caused
* by client errors and/or when we are under extremely heavy IO load.
*
* This means that the application callback will have SIGPIPE blocked.
* The callback should not change it.
*/
static void *worker_thread_proc(void *_worker_thread_data)
{
struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
struct ipc_server_data *server_data = worker_thread_data->server_data;
sigset_t old_set;
int fd, io;
int ret;
trace2_thread_start("ipc-worker");
thread_block_sigpipe(&old_set);
for (;;) {
fd = worker_thread__wait_for_connection(worker_thread_data);
if (fd == -1)
break; /* in shutdown */
io = worker_thread__wait_for_io_start(worker_thread_data, fd);
if (io == -1)
continue; /* client hung up without sending anything */
ret = worker_thread__do_io(worker_thread_data, fd);
if (ret == SIMPLE_IPC_QUIT) {
trace2_data_string("ipc-worker", NULL, "queue_stop_async",
"application_quit");
/*
* The application layer is telling the ipc-server
* layer to shutdown.
*
* We DO NOT have a response to send to the client.
*
* Queue an async stop (to stop the other threads) and
* allow this worker thread to exit now (no sense waiting
* for the thread-pool shutdown signal).
*
* Other non-idle worker threads are allowed to finish
* responding to their current clients.
*/
ipc_server_stop_async(server_data);
break;
}
}
trace2_thread_exit();
return NULL;
}
/* A randomly chosen value. */
#define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
/*
* Accept a new client connection on our socket. This uses non-blocking
* IO so that we can also wait for shutdown requests on our socket-pair
* without actually spinning on a fast timeout.
*/
static int accept_thread__wait_for_connection(
struct ipc_accept_thread_data *accept_thread_data)
{
struct pollfd pollfd[2];
int result;
for (;;) {
pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
pollfd[0].events = POLLIN;
pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
pollfd[1].events = POLLIN;
result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
if (result < 0) {
if (errno == EINTR)
continue;
return result;
}
if (result == 0) {
/* a timeout */
/*
* If someone deletes or force-creates a new unix
* domain socket at our path, all future clients
* will be routed elsewhere and we silently starve.
* If that happens, just queue a shutdown.
*/
if (unix_ss_was_stolen(
accept_thread_data->server_socket)) {
trace2_data_string("ipc-accept", NULL,
"queue_stop_async",
"socket_stolen");
ipc_server_stop_async(
accept_thread_data->server_data);
}
continue;
}
if (pollfd[0].revents & POLLIN) {
/* shutdown message queued to socketpair */
return -1;
}
if (pollfd[1].revents & POLLIN) {
/* a connection is available on server_socket */
int client_fd =
accept(accept_thread_data->server_socket->fd_socket,
NULL, NULL);
if (client_fd >= 0)
return client_fd;
/*
* An error here is unlikely -- it probably
* indicates that the connecting process has
* already dropped the connection.
*/
continue;
}
BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
errno, pollfd[0].revents, pollfd[1].revents);
}
}
/*
* Thread proc for the IPC server "accept thread". This waits for
* an incoming socket connection, appends it to the queue of available
* connections, and notifies a worker thread to process it.
*
* Block SIGPIPE in this thread for the life of the thread. This
* avoids any stray SIGPIPE signals when closing pipe fds under
* extremely heavy loads (such as when the fifo queue is full and we
* drop incomming connections).
*/
static void *accept_thread_proc(void *_accept_thread_data)
{
struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
struct ipc_server_data *server_data = accept_thread_data->server_data;
sigset_t old_set;
trace2_thread_start("ipc-accept");
thread_block_sigpipe(&old_set);
for (;;) {
int client_fd = accept_thread__wait_for_connection(
accept_thread_data);
pthread_mutex_lock(&server_data->work_available_mutex);
if (server_data->shutdown_requested) {
pthread_mutex_unlock(&server_data->work_available_mutex);
if (client_fd >= 0)
close(client_fd);
break;
}
if (client_fd < 0) {
/* ignore transient accept() errors */
}
else {
fifo_enqueue(server_data, client_fd);
pthread_cond_broadcast(&server_data->work_available_cond);
}
pthread_mutex_unlock(&server_data->work_available_mutex);
}
trace2_thread_exit();
return NULL;
}
/*
* We can't predict the connection arrival rate relative to the worker
* processing rate, therefore we allow the "accept-thread" to queue up
* a generous number of connections, since we'd rather have the client
* not unnecessarily timeout if we can avoid it. (The assumption is
* that this will be used for FSMonitor and a few second wait on a
* connection is better than having the client timeout and do the full
* computation itself.)
*
* The FIFO queue size is set to a multiple of the worker pool size.
* This value chosen at random.
*/
#define FIFO_SCALE (100)
/*
* The backlog value for `listen(2)`. This doesn't need to huge,
* rather just large enough for our "accept-thread" to wake up and
* queue incoming connections onto the FIFO without the kernel
* dropping any.
*
* This value chosen at random.
*/
#define LISTEN_BACKLOG (50)
static int create_listener_socket(
const char *path,
const struct ipc_server_opts *ipc_opts,
struct unix_ss_socket **new_server_socket)
{
struct unix_ss_socket *server_socket = NULL;
struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
int ret;
uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
if (ret)
return ret;
if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
int saved_errno = errno;
unix_ss_free(server_socket);
errno = saved_errno;
return -1;
}
*new_server_socket = server_socket;
trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
return 0;
}
static int setup_listener_socket(
const char *path,
const struct ipc_server_opts *ipc_opts,
struct unix_ss_socket **new_server_socket)
{
int ret, saved_errno;
trace2_region_enter("ipc-server", "create-listener_socket", NULL);
ret = create_listener_socket(path, ipc_opts, new_server_socket);
saved_errno = errno;
trace2_region_leave("ipc-server", "create-listener_socket", NULL);
errno = saved_errno;
return ret;
}
/*
* Start IPC server in a pool of background threads.
*/
int ipc_server_run_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb,
void *application_data)
{
struct unix_ss_socket *server_socket = NULL;
struct ipc_server_data *server_data;
int sv[2];
int k;
int ret;
int nr_threads = opts->nr_threads;
*returned_server_data = NULL;
/*
* Create a socketpair and set sv[1] to non-blocking. This
* will used to send a shutdown message to the accept-thread
* and allows the accept-thread to wait on EITHER a client
* connection or a shutdown request without spinning.
*/
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
return -1;
if (set_socket_blocking_flag(sv[1], 1)) {
int saved_errno = errno;
close(sv[0]);
close(sv[1]);
errno = saved_errno;
return -1;
}
ret = setup_listener_socket(path, opts, &server_socket);
if (ret) {
int saved_errno = errno;
close(sv[0]);
close(sv[1]);
errno = saved_errno;
return ret;
}
server_data = xcalloc(1, sizeof(*server_data));
server_data->magic = MAGIC_SERVER_DATA;
server_data->application_cb = application_cb;
server_data->application_data = application_data;
strbuf_init(&server_data->buf_path, 0);
strbuf_addstr(&server_data->buf_path, path);
if (nr_threads < 1)
nr_threads = 1;
pthread_mutex_init(&server_data->work_available_mutex, NULL);
pthread_cond_init(&server_data->work_available_cond, NULL);
server_data->queue_size = nr_threads * FIFO_SCALE;
CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
server_data->accept_thread =
xcalloc(1, sizeof(*server_data->accept_thread));
server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
server_data->accept_thread->server_data = server_data;
server_data->accept_thread->server_socket = server_socket;
server_data->accept_thread->fd_send_shutdown = sv[0];
server_data->accept_thread->fd_wait_shutdown = sv[1];
if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
accept_thread_proc, server_data->accept_thread))
die_errno(_("could not start accept_thread '%s'"), path);
for (k = 0; k < nr_threads; k++) {
struct ipc_worker_thread_data *wtd;
wtd = xcalloc(1, sizeof(*wtd));
wtd->magic = MAGIC_WORKER_THREAD_DATA;
wtd->server_data = server_data;
if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
wtd)) {
if (k == 0)
die(_("could not start worker[0] for '%s'"),
path);
/*
* Limp along with the thread pool that we have.
*/
break;
}
wtd->next_thread = server_data->worker_thread_list;
server_data->worker_thread_list = wtd;
}
*returned_server_data = server_data;
return 0;
}
/*
* Gently tell the IPC server treads to shutdown.
* Can be run on any thread.
*/
int ipc_server_stop_async(struct ipc_server_data *server_data)
{
/* ASSERT NOT holding mutex */
int fd;
if (!server_data)
return 0;
trace2_region_enter("ipc-server", "server-stop-async", NULL);
pthread_mutex_lock(&server_data->work_available_mutex);
server_data->shutdown_requested = 1;
/*
* Write a byte to the shutdown socket pair to wake up the
* accept-thread.
*/
if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
error_errno("could not write to fd_send_shutdown");
/*
* Drain the queue of existing connections.
*/
while ((fd = fifo_dequeue(server_data)) != -1)
close(fd);
/*
* Gently tell worker threads to stop processing new connections
* and exit. (This does not abort in-process conversations.)
*/
pthread_cond_broadcast(&server_data->work_available_cond);
pthread_mutex_unlock(&server_data->work_available_mutex);
trace2_region_leave("ipc-server", "server-stop-async", NULL);
return 0;
}
/*
* Wait for all IPC server threads to stop.
*/
int ipc_server_await(struct ipc_server_data *server_data)
{
pthread_join(server_data->accept_thread->pthread_id, NULL);
if (!server_data->shutdown_requested)
BUG("ipc-server: accept-thread stopped for '%s'",
server_data->buf_path.buf);
while (server_data->worker_thread_list) {
struct ipc_worker_thread_data *wtd =
server_data->worker_thread_list;
pthread_join(wtd->pthread_id, NULL);
server_data->worker_thread_list = wtd->next_thread;
free(wtd);
}
server_data->is_stopped = 1;
return 0;
}
void ipc_server_free(struct ipc_server_data *server_data)
{
struct ipc_accept_thread_data * accept_thread_data;
if (!server_data)
return;
if (!server_data->is_stopped)
BUG("cannot free ipc-server while running for '%s'",
server_data->buf_path.buf);
accept_thread_data = server_data->accept_thread;
if (accept_thread_data) {
unix_ss_free(accept_thread_data->server_socket);
if (accept_thread_data->fd_send_shutdown != -1)
close(accept_thread_data->fd_send_shutdown);
if (accept_thread_data->fd_wait_shutdown != -1)
close(accept_thread_data->fd_wait_shutdown);
free(server_data->accept_thread);
}
while (server_data->worker_thread_list) {
struct ipc_worker_thread_data *wtd =
server_data->worker_thread_list;
server_data->worker_thread_list = wtd->next_thread;
free(wtd);
}
pthread_cond_destroy(&server_data->work_available_cond);
pthread_mutex_destroy(&server_data->work_available_mutex);
strbuf_release(&server_data->buf_path);
free(server_data->fifo_fds);
free(server_data);
}