diff options
Diffstat (limited to 'common/tpool.c')
-rw-r--r-- | common/tpool.c | 398 |
1 files changed, 398 insertions, 0 deletions
diff --git a/common/tpool.c b/common/tpool.c new file mode 100644 index 0000000..f980b86 --- /dev/null +++ b/common/tpool.c @@ -0,0 +1,398 @@ +/* + * HttpAuth + * + * Copyright (C) 2008 Stefan Walter + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/* + * ------------------------------------------------------------------------- + * Ideas from tpool.c - htun thread pool functions + * Copyright (C) 2002 Moshe Jacobson <moshe@runslinux.net>, + * Ola Nordström <ola@triblock.com> + * ------------------------------------------------------------------------- + */ + +/* + * most of the theory and implementation of the thread pool was taken + * from the o'reilly pthreads programming book. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <errno.h> +#include <err.h> +#include <string.h> +#include <assert.h> +#include <signal.h> + +#define __USE_GNU +#include <pthread.h> + +#include "tpool.h" + +typedef struct _tpool_work { + void (*handler_routine)(void*); + void *arg; + struct _tpool_work *next; +} tpool_work; + +typedef struct _tpool_thread { + pthread_t handle; + int is_active; + struct _tpool_thread *next; +} tpool_thread; + +static int tpool_max_threads = 0; +static int tpool_min_threads = 0; +static int tpool_running_threads = 0; +static int tpool_idle_threads = 0; +static int tpool_zombie_threads = 0; +static tpool_thread *tpool_thread_list = NULL; + +static int tpool_max_queue_size = 0; +static int tpool_cur_queue_size = 0; +static tpool_work *tpool_queue_head = NULL; +static tpool_work *tpool_queue_tail = NULL; +static pthread_mutex_t tpool_queue_lock; +static pthread_cond_t tpool_queue_not_full; +static pthread_cond_t tpool_queue_not_empty; +static pthread_cond_t tpool_queue_empty; + +static int tpool_queue_closed = 0; +static int tpool_shutdown = 0; +static int tpool_initialized = 0; + +#define PTHREQ(x) do { \ + int r = (x); \ + if (r != 0) { \ + fprintf (stderr, "fatal: '%s' failed: %s", #x, strerror (r)); \ + assert (0 && "" #x); \ + } } while (0) + +static void* +tpool_thread_func (void *data) +{ + tpool_thread *thread = (tpool_thread*)data; + tpool_work *work; + int stop = 0; + sigset_t set; + + /* We handle these signals on the main thread */ + sigemptyset (&set); + sigaddset (&set, SIGINT); + sigaddset (&set, SIGTERM); + pthread_sigmask (SIG_BLOCK, &set, NULL); + + assert (thread); + assert (tpool_initialized); + + PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); + + assert (thread->is_active); + ++tpool_running_threads; + ++tpool_idle_threads; + + while (!tpool_shutdown && !stop) { + + /* No work to do? */ + while (tpool_cur_queue_size == 0 && !tpool_shutdown && !stop) { + + /* Too many idle threads? */ + if (tpool_running_threads > tpool_min_threads) { + stop = 1; + break; + } + + /* + * Sleep until there is work, while asleep the queue_lock + * is relinquished + */ + PTHREQ (pthread_cond_wait (&tpool_queue_not_empty, &tpool_queue_lock)); + } + + /* Are we shutting down? */ + if (stop || tpool_shutdown) + break; + + /* Pop the work off the queue */ + work = tpool_queue_head; + tpool_cur_queue_size--; + if (tpool_cur_queue_size == 0) + tpool_queue_head = tpool_queue_tail = NULL; + else + tpool_queue_head = work->next; + + /* Broadcast queue state to anyone who's waiting */ + if (tpool_cur_queue_size < tpool_max_queue_size) + PTHREQ (pthread_cond_broadcast (&tpool_queue_not_full)); + if (tpool_cur_queue_size == 0) + PTHREQ (pthread_cond_signal (&tpool_queue_empty)); + + --tpool_idle_threads; + assert (tpool_idle_threads >= 0); + + /* Unlock the queue lock while processing */ + PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); + + /* Perform the work */ + (work->handler_routine) (work->arg); + free (work); + + PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); + + ++tpool_idle_threads; + } + + --tpool_idle_threads; + --tpool_running_threads; + ++tpool_zombie_threads; + thread->is_active = 0; + assert (tpool_running_threads >= 0); + assert (tpool_idle_threads >= 0); + + PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); + + /* So we can double check the result code */ + return thread; +} + +static void +free_pool_stuff (void) +{ + tpool_work *work; + + if (!tpool_initialized) + return; + + /* No threads should be running at this point */ + assert (!tpool_thread_list); + + /* Clean up memory */ + while (tpool_queue_head != NULL) { + work = tpool_queue_head; + tpool_queue_head = tpool_queue_head->next; + free (work); + } + + pthread_cond_destroy (&tpool_queue_not_full); + pthread_cond_destroy (&tpool_queue_not_empty); + pthread_cond_destroy (&tpool_queue_empty); + pthread_mutex_destroy (&tpool_queue_lock); + + tpool_initialized = 0; +} + +int +tpool_init (int max_threads, int min_threads, int max_queued) +{ + assert (!tpool_initialized); + + /* set the desired thread pool values */ + tpool_max_threads = max_threads; + tpool_running_threads = 0; + tpool_idle_threads = 0; + tpool_zombie_threads = 0; + tpool_min_threads = min_threads; + + /* initialize the work queue */ + tpool_max_queue_size = max_queued; + tpool_cur_queue_size = 0; + tpool_queue_head = NULL; + tpool_queue_tail = NULL; + tpool_queue_closed = 0; + tpool_shutdown = 0; + + /* create the mutexs and cond vars */ + PTHREQ (pthread_mutex_init (&tpool_queue_lock, NULL)); + PTHREQ (pthread_cond_init (&tpool_queue_not_empty, NULL)); + PTHREQ (pthread_cond_init (&tpool_queue_not_full, NULL)); + PTHREQ (pthread_cond_init (&tpool_queue_empty, NULL)); + + tpool_initialized = 1; + return 0; +} + +static void +join_pool_threads (int all) +{ + tpool_thread *thread, **spot; + void *exit_code; + + /* Should be called from within the lock */ + + for (spot = &tpool_thread_list, thread = *spot; + thread && (all || tpool_zombie_threads > 0); ) { + + assert (thread->handle); + + if (all || !thread->is_active) { + if (!thread->is_active) + --tpool_zombie_threads; + + /* Join the thread */ + PTHREQ (pthread_join (thread->handle, &exit_code)); + assert (exit_code == thread); + + /* Remove and get the next in list */ + *spot = thread->next; + free (thread); + + } else { + /* Just get the next in list */ + spot = &thread->next; + } + + thread = *spot; + } +} + +int +tpool_add_work (void (*routine)(void*), void *arg) +{ + tpool_work *work; + tpool_thread *thread; + int r, ret = -1; + + assert (tpool_initialized); + + PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); + + /* Join any old threads */ + join_pool_threads (0); + + /* Should have found all of these above */ + assert (tpool_zombie_threads == 0); + + /* Are we closed for business? */ + if (tpool_shutdown || tpool_queue_closed) { + errno = ECANCELED; + goto done; + } + + /* No idle threads to run this work? */ + if (!tpool_idle_threads) { + + /* See if we can start another thread */ + if (tpool_max_threads <= 0 || + tpool_running_threads < tpool_max_threads) { + + /* The thread context */ + thread = calloc (1, sizeof (tpool_thread)); + if (thread == NULL) { + errno = ENOMEM; + goto done; + } + + /* Start the thread */ + thread->is_active = 1; + r = pthread_create (&thread->handle, NULL, tpool_thread_func, (void*)thread); + if (r != 0) { + errno = r; + free (thread); + goto done; + } + + /* Add the new thread to the list */ + thread->next = tpool_thread_list; + tpool_thread_list = thread; + } + } + + /* + * If the queue is full, yield this thread, and see if + * another processes something out of the queue. + */ + if (tpool_max_queue_size > 0 && tpool_cur_queue_size >= tpool_max_queue_size) { + PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); + pthread_yield(); + PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); + } + + /* Are we closed for business? */ + if (tpool_shutdown || tpool_queue_closed) { + errno = ECANCELED; + goto done; + } + + /* Is the queue still full? */ + if (tpool_max_queue_size > 0 && tpool_cur_queue_size >= tpool_max_queue_size) { + errno = EAGAIN; + goto done; + } + + /* Allocate the work structure */ + work = calloc (1, sizeof (tpool_work)); + if (work == NULL) { + errno = ENOMEM; + goto done; + } + + work->handler_routine = routine; + work->arg = arg; + work->next = NULL; + + if (tpool_cur_queue_size == 0) { + tpool_queue_tail = tpool_queue_head = work; + } else { + tpool_queue_tail->next = work; + tpool_queue_tail = work; + } + + PTHREQ (pthread_cond_broadcast (&tpool_queue_not_empty)); + tpool_cur_queue_size++; + ret = 0; + +done: + PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); + return ret; +} + +int +tpool_destroy (int finish) +{ + assert (tpool_initialized); + + PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); + + /* Can't have two places calling shutdown */ + assert (!tpool_queue_closed && !tpool_shutdown); + + /* close the queue to any new work */ + tpool_queue_closed = 1; + + /* if the finish flag is set, drain the queue */ + if (finish) { + while (tpool_cur_queue_size != 0) + PTHREQ (pthread_cond_wait (&tpool_queue_empty, &tpool_queue_lock)); + } + + /* Set the shutdown flag */ + tpool_shutdown = 1; + + PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); + + /* wake up all workers to rechedk the shutdown flag */ + PTHREQ (pthread_cond_broadcast (&tpool_queue_not_empty)); + PTHREQ (pthread_cond_broadcast (&tpool_queue_not_full)); + + join_pool_threads (1); + free_pool_stuff (); + + return 0; +} + |