summaryrefslogtreecommitdiff
path: root/common/tpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'common/tpool.c')
-rw-r--r--common/tpool.c398
1 files changed, 398 insertions, 0 deletions
diff --git a/common/tpool.c b/common/tpool.c
new file mode 100644
index 0000000..f980b86
--- /dev/null
+++ b/common/tpool.c
@@ -0,0 +1,398 @@
+/*
+ * HttpAuth
+ *
+ * Copyright (C) 2008 Stefan Walter
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the
+ * Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/*
+ * -------------------------------------------------------------------------
+ * Ideas from tpool.c - htun thread pool functions
+ * Copyright (C) 2002 Moshe Jacobson <moshe@runslinux.net>,
+ * Ola Nordström <ola@triblock.com>
+ * -------------------------------------------------------------------------
+ */
+
+/*
+ * most of the theory and implementation of the thread pool was taken
+ * from the o'reilly pthreads programming book.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <err.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+
+#define __USE_GNU
+#include <pthread.h>
+
+#include "tpool.h"
+
+typedef struct _tpool_work {
+ void (*handler_routine)(void*);
+ void *arg;
+ struct _tpool_work *next;
+} tpool_work;
+
+typedef struct _tpool_thread {
+ pthread_t handle;
+ int is_active;
+ struct _tpool_thread *next;
+} tpool_thread;
+
+static int tpool_max_threads = 0;
+static int tpool_min_threads = 0;
+static int tpool_running_threads = 0;
+static int tpool_idle_threads = 0;
+static int tpool_zombie_threads = 0;
+static tpool_thread *tpool_thread_list = NULL;
+
+static int tpool_max_queue_size = 0;
+static int tpool_cur_queue_size = 0;
+static tpool_work *tpool_queue_head = NULL;
+static tpool_work *tpool_queue_tail = NULL;
+static pthread_mutex_t tpool_queue_lock;
+static pthread_cond_t tpool_queue_not_full;
+static pthread_cond_t tpool_queue_not_empty;
+static pthread_cond_t tpool_queue_empty;
+
+static int tpool_queue_closed = 0;
+static int tpool_shutdown = 0;
+static int tpool_initialized = 0;
+
+#define PTHREQ(x) do { \
+ int r = (x); \
+ if (r != 0) { \
+ fprintf (stderr, "fatal: '%s' failed: %s", #x, strerror (r)); \
+ assert (0 && "" #x); \
+ } } while (0)
+
+static void*
+tpool_thread_func (void *data)
+{
+ tpool_thread *thread = (tpool_thread*)data;
+ tpool_work *work;
+ int stop = 0;
+ sigset_t set;
+
+ /* We handle these signals on the main thread */
+ sigemptyset (&set);
+ sigaddset (&set, SIGINT);
+ sigaddset (&set, SIGTERM);
+ pthread_sigmask (SIG_BLOCK, &set, NULL);
+
+ assert (thread);
+ assert (tpool_initialized);
+
+ PTHREQ (pthread_mutex_lock (&tpool_queue_lock));
+
+ assert (thread->is_active);
+ ++tpool_running_threads;
+ ++tpool_idle_threads;
+
+ while (!tpool_shutdown && !stop) {
+
+ /* No work to do? */
+ while (tpool_cur_queue_size == 0 && !tpool_shutdown && !stop) {
+
+ /* Too many idle threads? */
+ if (tpool_running_threads > tpool_min_threads) {
+ stop = 1;
+ break;
+ }
+
+ /*
+ * Sleep until there is work, while asleep the queue_lock
+ * is relinquished
+ */
+ PTHREQ (pthread_cond_wait (&tpool_queue_not_empty, &tpool_queue_lock));
+ }
+
+ /* Are we shutting down? */
+ if (stop || tpool_shutdown)
+ break;
+
+ /* Pop the work off the queue */
+ work = tpool_queue_head;
+ tpool_cur_queue_size--;
+ if (tpool_cur_queue_size == 0)
+ tpool_queue_head = tpool_queue_tail = NULL;
+ else
+ tpool_queue_head = work->next;
+
+ /* Broadcast queue state to anyone who's waiting */
+ if (tpool_cur_queue_size < tpool_max_queue_size)
+ PTHREQ (pthread_cond_broadcast (&tpool_queue_not_full));
+ if (tpool_cur_queue_size == 0)
+ PTHREQ (pthread_cond_signal (&tpool_queue_empty));
+
+ --tpool_idle_threads;
+ assert (tpool_idle_threads >= 0);
+
+ /* Unlock the queue lock while processing */
+ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock));
+
+ /* Perform the work */
+ (work->handler_routine) (work->arg);
+ free (work);
+
+ PTHREQ (pthread_mutex_lock (&tpool_queue_lock));
+
+ ++tpool_idle_threads;
+ }
+
+ --tpool_idle_threads;
+ --tpool_running_threads;
+ ++tpool_zombie_threads;
+ thread->is_active = 0;
+ assert (tpool_running_threads >= 0);
+ assert (tpool_idle_threads >= 0);
+
+ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock));
+
+ /* So we can double check the result code */
+ return thread;
+}
+
+static void
+free_pool_stuff (void)
+{
+ tpool_work *work;
+
+ if (!tpool_initialized)
+ return;
+
+ /* No threads should be running at this point */
+ assert (!tpool_thread_list);
+
+ /* Clean up memory */
+ while (tpool_queue_head != NULL) {
+ work = tpool_queue_head;
+ tpool_queue_head = tpool_queue_head->next;
+ free (work);
+ }
+
+ pthread_cond_destroy (&tpool_queue_not_full);
+ pthread_cond_destroy (&tpool_queue_not_empty);
+ pthread_cond_destroy (&tpool_queue_empty);
+ pthread_mutex_destroy (&tpool_queue_lock);
+
+ tpool_initialized = 0;
+}
+
+int
+tpool_init (int max_threads, int min_threads, int max_queued)
+{
+ assert (!tpool_initialized);
+
+ /* set the desired thread pool values */
+ tpool_max_threads = max_threads;
+ tpool_running_threads = 0;
+ tpool_idle_threads = 0;
+ tpool_zombie_threads = 0;
+ tpool_min_threads = min_threads;
+
+ /* initialize the work queue */
+ tpool_max_queue_size = max_queued;
+ tpool_cur_queue_size = 0;
+ tpool_queue_head = NULL;
+ tpool_queue_tail = NULL;
+ tpool_queue_closed = 0;
+ tpool_shutdown = 0;
+
+ /* create the mutexs and cond vars */
+ PTHREQ (pthread_mutex_init (&tpool_queue_lock, NULL));
+ PTHREQ (pthread_cond_init (&tpool_queue_not_empty, NULL));
+ PTHREQ (pthread_cond_init (&tpool_queue_not_full, NULL));
+ PTHREQ (pthread_cond_init (&tpool_queue_empty, NULL));
+
+ tpool_initialized = 1;
+ return 0;
+}
+
+static void
+join_pool_threads (int all)
+{
+ tpool_thread *thread, **spot;
+ void *exit_code;
+
+ /* Should be called from within the lock */
+
+ for (spot = &tpool_thread_list, thread = *spot;
+ thread && (all || tpool_zombie_threads > 0); ) {
+
+ assert (thread->handle);
+
+ if (all || !thread->is_active) {
+ if (!thread->is_active)
+ --tpool_zombie_threads;
+
+ /* Join the thread */
+ PTHREQ (pthread_join (thread->handle, &exit_code));
+ assert (exit_code == thread);
+
+ /* Remove and get the next in list */
+ *spot = thread->next;
+ free (thread);
+
+ } else {
+ /* Just get the next in list */
+ spot = &thread->next;
+ }
+
+ thread = *spot;
+ }
+}
+
+int
+tpool_add_work (void (*routine)(void*), void *arg)
+{
+ tpool_work *work;
+ tpool_thread *thread;
+ int r, ret = -1;
+
+ assert (tpool_initialized);
+
+ PTHREQ (pthread_mutex_lock (&tpool_queue_lock));
+
+ /* Join any old threads */
+ join_pool_threads (0);
+
+ /* Should have found all of these above */
+ assert (tpool_zombie_threads == 0);
+
+ /* Are we closed for business? */
+ if (tpool_shutdown || tpool_queue_closed) {
+ errno = ECANCELED;
+ goto done;
+ }
+
+ /* No idle threads to run this work? */
+ if (!tpool_idle_threads) {
+
+ /* See if we can start another thread */
+ if (tpool_max_threads <= 0 ||
+ tpool_running_threads < tpool_max_threads) {
+
+ /* The thread context */
+ thread = calloc (1, sizeof (tpool_thread));
+ if (thread == NULL) {
+ errno = ENOMEM;
+ goto done;
+ }
+
+ /* Start the thread */
+ thread->is_active = 1;
+ r = pthread_create (&thread->handle, NULL, tpool_thread_func, (void*)thread);
+ if (r != 0) {
+ errno = r;
+ free (thread);
+ goto done;
+ }
+
+ /* Add the new thread to the list */
+ thread->next = tpool_thread_list;
+ tpool_thread_list = thread;
+ }
+ }
+
+ /*
+ * If the queue is full, yield this thread, and see if
+ * another processes something out of the queue.
+ */
+ if (tpool_max_queue_size > 0 && tpool_cur_queue_size >= tpool_max_queue_size) {
+ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock));
+ pthread_yield();
+ PTHREQ (pthread_mutex_lock (&tpool_queue_lock));
+ }
+
+ /* Are we closed for business? */
+ if (tpool_shutdown || tpool_queue_closed) {
+ errno = ECANCELED;
+ goto done;
+ }
+
+ /* Is the queue still full? */
+ if (tpool_max_queue_size > 0 && tpool_cur_queue_size >= tpool_max_queue_size) {
+ errno = EAGAIN;
+ goto done;
+ }
+
+ /* Allocate the work structure */
+ work = calloc (1, sizeof (tpool_work));
+ if (work == NULL) {
+ errno = ENOMEM;
+ goto done;
+ }
+
+ work->handler_routine = routine;
+ work->arg = arg;
+ work->next = NULL;
+
+ if (tpool_cur_queue_size == 0) {
+ tpool_queue_tail = tpool_queue_head = work;
+ } else {
+ tpool_queue_tail->next = work;
+ tpool_queue_tail = work;
+ }
+
+ PTHREQ (pthread_cond_broadcast (&tpool_queue_not_empty));
+ tpool_cur_queue_size++;
+ ret = 0;
+
+done:
+ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock));
+ return ret;
+}
+
+int
+tpool_destroy (int finish)
+{
+ assert (tpool_initialized);
+
+ PTHREQ (pthread_mutex_lock (&tpool_queue_lock));
+
+ /* Can't have two places calling shutdown */
+ assert (!tpool_queue_closed && !tpool_shutdown);
+
+ /* close the queue to any new work */
+ tpool_queue_closed = 1;
+
+ /* if the finish flag is set, drain the queue */
+ if (finish) {
+ while (tpool_cur_queue_size != 0)
+ PTHREQ (pthread_cond_wait (&tpool_queue_empty, &tpool_queue_lock));
+ }
+
+ /* Set the shutdown flag */
+ tpool_shutdown = 1;
+
+ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock));
+
+ /* wake up all workers to rechedk the shutdown flag */
+ PTHREQ (pthread_cond_broadcast (&tpool_queue_not_empty));
+ PTHREQ (pthread_cond_broadcast (&tpool_queue_not_full));
+
+ join_pool_threads (1);
+ free_pool_stuff ();
+
+ return 0;
+}
+