summaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorStef Walter <stef@memberwebs.com>2008-07-21 19:35:56 +0000
committerStef Walter <stef@memberwebs.com>2008-07-21 19:35:56 +0000
commit4c4bfb64b62ff5b7b7fa21ec0185db797f434386 (patch)
tree531eaed845b2997a3d71edaf2a522a00ea9307da /common
parent56805d33c1ed477f6839074748bfa373db01c431 (diff)
- Rework event handling system so we don't use a full thread per
connection, but instead only use threads for active requests.
Diffstat (limited to 'common')
-rw-r--r--common/server-mainloop.c583
-rw-r--r--common/server-mainloop.h57
-rw-r--r--common/tpool.c398
-rw-r--r--common/tpool.h36
4 files changed, 1074 insertions, 0 deletions
diff --git a/common/server-mainloop.c b/common/server-mainloop.c
new file mode 100644
index 0000000..c102b88
--- /dev/null
+++ b/common/server-mainloop.c
@@ -0,0 +1,583 @@
+/*
+ * Copyright (c) 2006, Stefan Walter
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ * * Redistributions in binary form must reproduce the
+ * above copyright notice, this list of conditions and
+ * the following disclaimer in the documentation and/or
+ * other materials provided with the distribution.
+ * * The names of contributors to this software may not be
+ * used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+ * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+ * DAMAGE.
+ */
+
+#include "config.h"
+#include "server-mainloop.h"
+
+#include <sys/time.h>
+
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#ifdef HAVE_PTHREAD
+#include <pthread.h>
+#endif
+
+typedef struct _socket_callback {
+ int fd;
+ server_socket_callback callback;
+ void *arg;
+ int closed;
+
+ struct _socket_callback *next;
+} socket_callback;
+
+typedef struct _timer_callback {
+ struct timeval at;
+ struct timeval interval;
+ server_timer_callback callback;
+ void* arg;
+ int closed;
+
+ struct _timer_callback *next;
+} timer_callback;
+
+typedef struct _server_context {
+ int stopped;
+ fd_set read_fds;
+ fd_set write_fds;
+ int max_fd;
+ int revision;
+ socket_callback *callbacks;
+ timer_callback *timers;
+ int wakeup[2];
+#ifdef HAVE_PTHREAD
+ pthread_mutex_t mutex;
+#endif
+} server_context;
+
+#ifdef HAVE_PTHREAD
+#define LOCK() pthread_mutex_lock (&ctx.mutex)
+#define UNLOCK() pthread_mutex_unlock (&ctx.mutex)
+#else
+#define LOCK()
+#define UNLOCK()
+#endif
+
+/* Global context */
+static server_context ctx;
+
+static void
+timeval_add(struct timeval *t1, struct timeval *t2)
+{
+ assert (t1->tv_usec < 1000000);
+ assert (t2->tv_usec < 1000000);
+
+ t1->tv_sec += t2->tv_sec;
+ t1->tv_usec += t2->tv_usec;
+ if (t1->tv_usec >= 1000000) {
+ t1->tv_usec -= 1000000;
+ t1->tv_sec += 1;
+ }
+}
+
+static void
+timeval_subtract (struct timeval *t1, struct timeval *t2)
+{
+ assert (t1->tv_usec < 1000000);
+ assert (t2->tv_usec < 1000000);
+
+ t1->tv_sec -= t2->tv_sec;
+ if (t1->tv_usec < t2->tv_usec) {
+ t1->tv_usec += 1000000;
+ t1->tv_sec -= 1;
+ }
+ t1->tv_usec -= t2->tv_usec;
+}
+
+static int
+timeval_compare(struct timeval *t1, struct timeval *t2)
+{
+ assert (t1->tv_usec < 1000000);
+ assert (t2->tv_usec < 1000000);
+
+ if (t1->tv_sec > t2->tv_sec)
+ return 1;
+ else if (t1->tv_sec < t2->tv_sec)
+ return -1;
+ else {
+ if (t1->tv_usec > t2->tv_usec)
+ return 1;
+ else if (t1->tv_usec < t2->tv_usec)
+ return -1;
+ else
+ return 0;
+ }
+}
+
+#define timeval_empty(tv) \
+ ((tv)->tv_sec == 0 && (tv)->tv_usec == 0)
+
+#define timeval_to_ms(tv) \
+ ((((uint64_t)(tv).tv_sec) * 1000L) + (((uint64_t)(tv).tv_usec) / 1000L))
+
+#define timeval_dump(tv) \
+ (fprintf(stderr, "{ %d:%d }", (uint)((tv).tv_sec), (uint)((tv).tv_usec / 1000)))
+
+static int
+add_timer (int ms, int oneshot, server_timer_callback callback, void *arg)
+{
+ struct timeval interval;
+ timer_callback *cb;
+
+ assert (ms || oneshot);
+ assert (callback != NULL);
+
+ interval.tv_sec = ms / 1000;
+ interval.tv_usec = (ms % 1000) * 1000; /* into micro seconds */
+
+ cb = (timer_callback*)calloc (1, sizeof (*cb));
+ if(!cb) {
+ errno = ENOMEM;
+ return -1;
+ }
+
+ if (gettimeofday (&(cb->at), NULL) == -1) {
+ free(cb);
+ return -1;
+ }
+
+ timeval_add (&(cb->at), &interval);
+
+ if (oneshot)
+ memset (&(cb->interval), 0, sizeof (cb->interval));
+ else
+ memcpy (&(cb->interval), &interval, sizeof(cb->interval));
+
+ cb->callback = callback;
+ cb->arg = arg;
+
+ cb->next = ctx.timers;
+ ctx.timers = cb;
+ ++ctx.revision;
+ server_wakeup ();
+
+ return 0;
+}
+
+static void
+remove_closed (void)
+{
+ timer_callback **tcb, *timcb;
+ socket_callback **scb, *sockcb;
+
+ for (tcb = &ctx.timers; *tcb; ) {
+ timcb = *tcb;
+ if (timcb->closed) {
+ *tcb = timcb->next;
+ free (timcb);
+ continue;
+ } else {
+ tcb = &timcb->next;
+ }
+ }
+
+ for (scb = &ctx.callbacks; *scb; ) {
+ sockcb = *scb;
+ if (sockcb->closed) {
+ *scb = sockcb->next;
+ free (sockcb);
+ continue;
+ } else {
+ scb = &sockcb->next;
+ }
+ }
+}
+
+int
+server_init (void)
+{
+ memset (&ctx, 0, sizeof (ctx));
+
+ if (pipe (ctx.wakeup) < 0)
+ return -1;
+
+ FD_ZERO (&ctx.read_fds);
+ FD_ZERO (&ctx.write_fds);
+
+ /* The wakeup descriptor */
+ FD_SET (ctx.wakeup[0], &ctx.read_fds);
+ fcntl (ctx.wakeup[0], F_SETFL, fcntl (ctx.wakeup[0], F_GETFL, 0) | O_NONBLOCK);
+
+ ctx.max_fd = ctx.wakeup[0] + 1;
+
+ ctx.stopped = 1;
+ ctx.callbacks = NULL;
+ ctx.timers = NULL;
+
+
+#ifdef HAVE_PTHREAD
+ {
+ int ret = pthread_mutex_init (&ctx.mutex, NULL);
+ if (ret != 0) {
+ errno = ret;
+ close (ctx.wakeup[0]);
+ close (ctx.wakeup[1]);
+ return -1;
+ }
+ }
+#endif
+
+ return 0;
+}
+
+void
+server_uninit (void)
+{
+ timer_callback *timcb;
+ timer_callback *timn;
+ socket_callback *sockcb;
+ socket_callback *sockn;
+
+ LOCK ();
+
+ for(timcb = ctx.timers; timcb; timcb = timn) {
+ timn = timcb->next;
+ free (timcb);
+ }
+
+ ctx.timers = NULL;
+
+ for(sockcb = ctx.callbacks; sockcb; sockcb = sockn) {
+ sockn = sockcb->next;
+ free (sockcb);
+ }
+
+ ctx.callbacks = NULL;
+
+ UNLOCK ();
+
+#ifdef HAVE_PTHREAD
+ pthread_mutex_destroy (&ctx.mutex);
+#endif
+}
+
+uint64_t
+server_get_time (void)
+{
+ struct timeval tv;
+ if (gettimeofday (&tv, NULL) == -1)
+ return 0L;
+ return timeval_to_ms (tv);
+}
+
+int
+server_run (void)
+{
+ char buffer[16];
+ struct timeval *timeout;
+ struct timeval tv, current;
+ timer_callback *timcb;
+ socket_callback *sockcb;
+ fd_set rfds, wfds;
+ int r, revision, max_fd;
+ int ret = 0;
+
+ /* For doing lock safe callbacks */
+ int type, fd;
+ server_socket_callback scb;
+ server_timer_callback tcb;
+ void *arg;
+
+ LOCK ();
+
+ /* No watches have been set */
+ assert (ctx.max_fd > -1);
+
+ ctx.stopped = 0;
+ ctx.revision = 0;
+
+ while (!ctx.stopped) {
+
+ /* Drain the wakeup if necessary */
+ while (read (ctx.wakeup[0], buffer, sizeof (buffer)) > 0);
+
+ /* Watch for the various fds */
+ memcpy (&rfds, &ctx.read_fds, sizeof (rfds));
+ memcpy (&wfds, &ctx.write_fds, sizeof (wfds));
+
+ /* Prepare for timers */
+ timeout = NULL;
+ if (gettimeofday (&current, NULL) == -1) {
+ ret = -1;
+ break;
+ }
+
+ /* Cycle through timers */
+ for (timcb = ctx.timers; timcb; timcb = timcb->next) {
+ assert (timcb->callback);
+
+ if (timcb->closed)
+ continue;
+
+ /* Call any timers that have already passed */
+ if (timeval_compare (&current, &timcb->at) >= 0) {
+
+ /* Do a lock safe callback */
+ tcb = timcb->callback;
+ arg = timcb->arg;
+
+ /* Convert to milliseconds, and make the call */
+ UNLOCK ();
+ r = (tcb) (timeval_to_ms (current), arg);
+ LOCK ();
+
+ /* Reset timer if so desired */
+ if (r == 1 && !timeval_empty (&timcb->interval)) {
+ timeval_add (&timcb->at, &timcb->interval);
+
+ /* If the time has already passed, just use current time */
+ if (timeval_compare (&(timcb->at), &current) <= 0)
+ memcpy (&(timcb->at), &current, sizeof (timcb->at));
+
+ /* Otherwise remove it. Either one shot, or returned 0 */
+ } else {
+ timcb->closed = 1;
+ continue;
+ }
+ }
+
+ /* Get soonest timer */
+ if (!timeout || timeval_compare (&timcb->at, timeout) < 0)
+ timeout = &timcb->at;
+ }
+
+ /* Convert to an offset */
+ if (timeout) {
+ memcpy (&tv, timeout, sizeof (tv));
+ timeout = &tv;
+ timeval_subtract (timeout, &current);
+ }
+
+ if (ctx.stopped)
+ continue;
+
+ remove_closed ();
+
+ /* No callbacks? No use continuing */
+ if (!ctx.callbacks && !ctx.timers)
+ break;
+
+ /* fprintf(stderr, "selecting with timeout: ");
+ timeval_dump(timeout);
+ fprintf(stderr, "\n"); */
+
+ /* We can see if changes happen while in select */
+ revision = ctx.revision;
+ max_fd = ctx.max_fd;
+
+ UNLOCK ();
+ r = select (max_fd, &rfds, &wfds, NULL, timeout);
+ LOCK ();
+
+ if (r < 0) {
+ /* Interrupted so try again, and possibly exit */
+ if (errno == EINTR)
+ continue;
+
+ if (errno == EBADF) {
+ if (ctx.revision != revision)
+ continue;
+ assert (0 && "bad file descriptor being watched");
+ }
+
+ /* Programmer errors */
+ assert (errno != EINVAL);
+ ret = -1;
+ break;
+ }
+
+ /* Timeout, just jump to timeout processing */
+ if(r == 0)
+ continue;
+
+ /* Process ready file descriptors */
+ for (sockcb = ctx.callbacks; sockcb; sockcb = sockcb->next) {
+ assert (sockcb->fd != -1);
+ if (sockcb->closed)
+ continue;
+
+ /* Call any that are set */
+ type = 0;
+ if (FD_ISSET (sockcb->fd, &rfds))
+ type = SERVER_READ;
+ if (FD_ISSET (sockcb->fd, &wfds))
+ type = SERVER_WRITE;
+
+ /* Lock safe callback */
+ if (type != 0) {
+ scb = sockcb->callback;
+ fd = sockcb->fd;
+ arg = sockcb->arg;
+
+ UNLOCK ();
+ (scb) (fd, type, arg);
+ LOCK ();
+ }
+ }
+ }
+
+ UNLOCK ();
+ return ret;
+}
+
+void
+server_wakeup (void)
+{
+ /*
+ * This can be called from within arbitrary
+ * locks, so be careful.
+ */
+ int r = write (ctx.wakeup[1], "a", 1);
+ fprintf (stderr, "wakeup: %d\n", r);
+}
+
+void
+server_stop (void)
+{
+ /*
+ * This could be called from a signal handler,
+ * so it's not worth trying to lock.
+ */
+ ctx.stopped = 1;
+ server_wakeup ();
+}
+
+int
+server_stopped (void)
+{
+ return ctx.stopped;
+}
+
+int
+server_watch (int fd, int type, server_socket_callback callback, void *arg)
+{
+ socket_callback *cb;
+ assert (type != 0);
+ assert (fd != -1);
+ assert (callback != NULL);
+
+ cb = (socket_callback*)calloc (sizeof(*cb), 1);
+ if(!cb) {
+ errno = ENOMEM;
+ return -1;
+ }
+
+ LOCK ();
+
+ cb->fd = fd;
+ cb->callback = callback;
+ cb->arg = arg;
+
+ cb->next = ctx.callbacks;
+ ctx.callbacks = cb;
+
+ if (type & SERVER_READ)
+ FD_SET (fd, &ctx.read_fds);
+ if (type & SERVER_WRITE)
+ FD_SET (fd, &ctx.write_fds);
+
+ if (fd >= ctx.max_fd)
+ ctx.max_fd = fd + 1;
+
+ ++ctx.revision;
+ server_wakeup ();
+
+ UNLOCK ();
+
+ return 0;
+}
+
+void
+server_unwatch (int fd)
+{
+ socket_callback* cb;
+
+ assert (fd != -1);
+
+ LOCK ();
+
+ FD_CLR (fd, &ctx.read_fds);
+ FD_CLR (fd, &ctx.write_fds);
+
+ if (!ctx.callbacks)
+ goto done;
+
+ /* First in list */;
+ if (ctx.callbacks->fd == fd) {
+ ctx.callbacks->closed = 1;
+ goto done;
+ }
+
+ /* One ahead processing of rest */
+ for (cb = ctx.callbacks; cb->next; cb = cb->next) {
+ if (cb->next->fd == fd) {
+ cb->next->closed = 1;
+ goto done;
+ }
+ }
+
+done:
+ server_wakeup ();
+ ++ctx.revision;
+ UNLOCK ();
+}
+
+int
+server_timer (int ms, server_timer_callback callback, void *arg)
+{
+ int ret;
+
+ LOCK ();
+ ret = add_timer (ms, 0, callback, arg);
+ UNLOCK ();
+
+ return ret;
+}
+
+int
+server_oneshot (int ms, server_timer_callback callback, void *arg)
+{
+ int ret;
+
+ LOCK ();
+ ret = add_timer (ms, 1, callback, arg);
+ UNLOCK ();
+
+ return ret;
+}
diff --git a/common/server-mainloop.h b/common/server-mainloop.h
new file mode 100644
index 0000000..a749653
--- /dev/null
+++ b/common/server-mainloop.h
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2006, Stefan Walter
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ * * Redistributions in binary form must reproduce the
+ * above copyright notice, this list of conditions and
+ * the following disclaimer in the documentation and/or
+ * other materials provided with the distribution.
+ * * The names of contributors to this software may not be
+ * used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+ * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+ * DAMAGE.
+ */
+
+#ifndef __SERVER_MAINLOOP_H__
+#define __SERVER_MAINLOOP_H__
+
+#include <stdint.h>
+
+#define SERVER_READ 0x01
+#define SERVER_WRITE 0x02
+
+typedef void (*server_socket_callback) (int fd, int type, void* arg);
+typedef int (*server_timer_callback) (uint64_t when, void* arg);
+
+int server_init (void);
+void server_uninit (void);
+int server_run (void);
+void server_stop (void);
+int server_stopped (void);
+int server_watch (int fd, int type, server_socket_callback callback, void* arg);
+void server_unwatch (int fd);
+void server_wakeup (void);
+int server_timer (int length, server_timer_callback callback, void* arg);
+int server_oneshot (int length, server_timer_callback callback, void* arg);
+uint64_t server_get_time (void);
+
+#endif /* __SERVER_MAINLOOP_H__ */
diff --git a/common/tpool.c b/common/tpool.c
new file mode 100644
index 0000000..f980b86
--- /dev/null
+++ b/common/tpool.c
@@ -0,0 +1,398 @@
+/*
+ * HttpAuth
+ *
+ * Copyright (C) 2008 Stefan Walter
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the
+ * Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/*
+ * -------------------------------------------------------------------------
+ * Ideas from tpool.c - htun thread pool functions
+ * Copyright (C) 2002 Moshe Jacobson <moshe@runslinux.net>,
+ * Ola Nordström <ola@triblock.com>
+ * -------------------------------------------------------------------------
+ */
+
+/*
+ * most of the theory and implementation of the thread pool was taken
+ * from the o'reilly pthreads programming book.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <err.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+
+#define __USE_GNU
+#include <pthread.h>
+
+#include "tpool.h"
+
+typedef struct _tpool_work {
+ void (*handler_routine)(void*);
+ void *arg;
+ struct _tpool_work *next;
+} tpool_work;
+
+typedef struct _tpool_thread {
+ pthread_t handle;
+ int is_active;
+ struct _tpool_thread *next;
+} tpool_thread;
+
+static int tpool_max_threads = 0;
+static int tpool_min_threads = 0;
+static int tpool_running_threads = 0;
+static int tpool_idle_threads = 0;
+static int tpool_zombie_threads = 0;
+static tpool_thread *tpool_thread_list = NULL;
+
+static int tpool_max_queue_size = 0;
+static int tpool_cur_queue_size = 0;
+static tpool_work *tpool_queue_head = NULL;
+static tpool_work *tpool_queue_tail = NULL;
+static pthread_mutex_t tpool_queue_lock;
+static pthread_cond_t tpool_queue_not_full;
+static pthread_cond_t tpool_queue_not_empty;
+static pthread_cond_t tpool_queue_empty;
+
+static int tpool_queue_closed = 0;
+static int tpool_shutdown = 0;
+static int tpool_initialized = 0;
+
+#define PTHREQ(x) do { \
+ int r = (x); \
+ if (r != 0) { \
+ fprintf (stderr, "fatal: '%s' failed: %s", #x, strerror (r)); \
+ assert (0 && "" #x); \
+ } } while (0)
+
+static void*
+tpool_thread_func (void *data)
+{
+ tpool_thread *thread = (tpool_thread*)data;
+ tpool_work *work;
+ int stop = 0;
+ sigset_t set;
+
+ /* We handle these signals on the main thread */
+ sigemptyset (&set);
+ sigaddset (&set, SIGINT);
+ sigaddset (&set, SIGTERM);
+ pthread_sigmask (SIG_BLOCK, &set, NULL);
+
+ assert (thread);
+ assert (tpool_initialized);
+
+ PTHREQ (pthread_mutex_lock (&tpool_queue_lock));
+
+ assert (thread->is_active);
+ ++tpool_running_threads;
+ ++tpool_idle_threads;
+
+ while (!tpool_shutdown && !stop) {
+
+ /* No work to do? */
+ while (tpool_cur_queue_size == 0 && !tpool_shutdown && !stop) {
+
+ /* Too many idle threads? */
+ if (tpool_running_threads > tpool_min_threads) {
+ stop = 1;
+ break;
+ }
+
+ /*
+ * Sleep until there is work, while asleep the queue_lock
+ * is relinquished
+ */
+ PTHREQ (pthread_cond_wait (&tpool_queue_not_empty, &tpool_queue_lock));
+ }
+
+ /* Are we shutting down? */
+ if (stop || tpool_shutdown)
+ break;
+
+ /* Pop the work off the queue */
+ work = tpool_queue_head;
+ tpool_cur_queue_size--;
+ if (tpool_cur_queue_size == 0)
+ tpool_queue_head = tpool_queue_tail = NULL;
+ else
+ tpool_queue_head = work->next;
+
+ /* Broadcast queue state to anyone who's waiting */
+ if (tpool_cur_queue_size < tpool_max_queue_size)
+ PTHREQ (pthread_cond_broadcast (&tpool_queue_not_full));
+ if (tpool_cur_queue_size == 0)
+ PTHREQ (pthread_cond_signal (&tpool_queue_empty));
+
+ --tpool_idle_threads;
+ assert (tpool_idle_threads >= 0);
+
+ /* Unlock the queue lock while processing */
+ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock));
+
+ /* Perform the work */
+ (work->handler_routine) (work->arg);
+ free (work);
+
+ PTHREQ (pthread_mutex_lock (&tpool_queue_lock));
+
+ ++tpool_idle_threads;
+ }
+
+ --tpool_idle_threads;
+ --tpool_running_threads;
+ ++tpool_zombie_threads;
+ thread->is_active = 0;
+ assert (tpool_running_threads >= 0);
+ assert (tpool_idle_threads >= 0);
+
+ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock));
+
+ /* So we can double check the result code */
+ return thread;
+}
+
+static void
+free_pool_stuff (void)
+{
+ tpool_work *work;
+
+ if (!tpool_initialized)
+ return;
+
+ /* No threads should be running at this point */
+ assert (!tpool_thread_list);
+
+ /* Clean up memory */
+ while (tpool_queue_head != NULL) {
+ work = tpool_queue_head;
+ tpool_queue_head = tpool_queue_head->next;
+ free (work);
+ }
+
+ pthread_cond_destroy (&tpool_queue_not_full);
+ pthread_cond_destroy (&tpool_queue_not_empty);
+ pthread_cond_destroy (&tpool_queue_empty);
+ pthread_mutex_destroy (&tpool_queue_lock);
+
+ tpool_initialized = 0;
+}
+
+int
+tpool_init (int max_threads, int min_threads, int max_queued)
+{
+ assert (!tpool_initialized);
+
+ /* set the desired thread pool values */
+ tpool_max_threads = max_threads;
+ tpool_running_threads = 0;
+ tpool_idle_threads = 0;
+ tpool_zombie_threads = 0;
+ tpool_min_threads = min_threads;
+
+ /* initialize the work queue */
+ tpool_max_queue_size = max_queued;
+ tpool_cur_queue_size = 0;
+ tpool_queue_head = NULL;
+ tpool_queue_tail = NULL;
+ tpool_queue_closed = 0;
+ tpool_shutdown = 0;
+
+ /* create the mutexs and cond vars */
+ PTHREQ (pthread_mutex_init (&tpool_queue_lock, NULL));
+ PTHREQ (pthread_cond_init (&tpool_queue_not_empty, NULL));
+ PTHREQ (pthread_cond_init (&tpool_queue_not_full, NULL));
+ PTHREQ (pthread_cond_init (&tpool_queue_empty, NULL));
+
+ tpool_initialized = 1;
+ return 0;
+}
+
+static void
+join_pool_threads (int all)
+{
+ tpool_thread *thread, **spot;
+ void *exit_code;
+
+ /* Should be called from within the lock */
+
+ for (spot = &tpool_thread_list, thread = *spot;
+ thread && (all || tpool_zombie_threads > 0); ) {
+
+ assert (thread->handle);
+
+ if (all || !thread->is_active) {
+ if (!thread->is_active)
+ --tpool_zombie_threads;
+
+ /* Join the thread */
+ PTHREQ (pthread_join (thread->handle, &exit_code));
+ assert (exit_code == thread);
+
+ /* Remove and get the next in list */
+ *spot = thread->next;
+ free (thread);
+
+ } else {
+ /* Just get the next in list */
+ spot = &thread->next;
+ }
+
+ thread = *spot;
+ }
+}
+
+int
+tpool_add_work (void (*routine)(void*), void *arg)
+{
+ tpool_work *work;
+ tpool_thread *thread;
+ int r, ret = -1;
+
+ assert (tpool_initialized);
+
+ PTHREQ (pthread_mutex_lock (&tpool_queue_lock));
+
+ /* Join any old threads */
+ join_pool_threads (0);
+
+ /* Should have found all of these above */
+ assert (tpool_zombie_threads == 0);
+
+ /* Are we closed for business? */
+ if (tpool_shutdown || tpool_queue_closed) {
+ errno = ECANCELED;
+ goto done;
+ }
+
+ /* No idle threads to run this work? */
+ if (!tpool_idle_threads) {
+
+ /* See if we can start another thread */
+ if (tpool_max_threads <= 0 ||
+ tpool_running_threads < tpool_max_threads) {
+
+ /* The thread context */
+ thread = calloc (1, sizeof (tpool_thread));
+ if (thread == NULL) {
+ errno = ENOMEM;
+ goto done;
+ }
+
+ /* Start the thread */
+ thread->is_active = 1;
+ r = pthread_create (&thread->handle, NULL, tpool_thread_func, (void*)thread);
+ if (r != 0) {
+ errno = r;
+ free (thread);
+ goto done;
+ }
+
+ /* Add the new thread to the list */
+ thread->next = tpool_thread_list;
+ tpool_thread_list = thread;
+ }
+ }
+
+ /*
+ * If the queue is full, yield this thread, and see if
+ * another processes something out of the queue.
+ */
+ if (tpool_max_queue_size > 0 && tpool_cur_queue_size >= tpool_max_queue_size) {
+ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock));
+ pthread_yield();
+ PTHREQ (pthread_mutex_lock (&tpool_queue_lock));
+ }
+
+ /* Are we closed for business? */
+ if (tpool_shutdown || tpool_queue_closed) {
+ errno = ECANCELED;
+ goto done;
+ }
+
+ /* Is the queue still full? */
+ if (tpool_max_queue_size > 0 && tpool_cur_queue_size >= tpool_max_queue_size) {
+ errno = EAGAIN;
+ goto done;
+ }
+
+ /* Allocate the work structure */
+ work = calloc (1, sizeof (tpool_work));
+ if (work == NULL) {
+ errno = ENOMEM;
+ goto done;
+ }
+
+ work->handler_routine = routine;
+ work->arg = arg;
+ work->next = NULL;
+
+ if (tpool_cur_queue_size == 0) {
+ tpool_queue_tail = tpool_queue_head = work;
+ } else {
+ tpool_queue_tail->next = work;
+ tpool_queue_tail = work;
+ }
+
+ PTHREQ (pthread_cond_broadcast (&tpool_queue_not_empty));
+ tpool_cur_queue_size++;
+ ret = 0;
+
+done:
+ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock));
+ return ret;
+}
+
+int
+tpool_destroy (int finish)
+{
+ assert (tpool_initialized);
+
+ PTHREQ (pthread_mutex_lock (&tpool_queue_lock));
+
+ /* Can't have two places calling shutdown */
+ assert (!tpool_queue_closed && !tpool_shutdown);
+
+ /* close the queue to any new work */
+ tpool_queue_closed = 1;
+
+ /* if the finish flag is set, drain the queue */
+ if (finish) {
+ while (tpool_cur_queue_size != 0)
+ PTHREQ (pthread_cond_wait (&tpool_queue_empty, &tpool_queue_lock));
+ }
+
+ /* Set the shutdown flag */
+ tpool_shutdown = 1;
+
+ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock));
+
+ /* wake up all workers to rechedk the shutdown flag */
+ PTHREQ (pthread_cond_broadcast (&tpool_queue_not_empty));
+ PTHREQ (pthread_cond_broadcast (&tpool_queue_not_full));
+
+ join_pool_threads (1);
+ free_pool_stuff ();
+
+ return 0;
+}
+
diff --git a/common/tpool.h b/common/tpool.h
new file mode 100644
index 0000000..ac5a3f4
--- /dev/null
+++ b/common/tpool.h
@@ -0,0 +1,36 @@
+/*
+ * HttpAuth
+ *
+ * Copyright (C) 2008 Stefan Walter
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the
+ * Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef _TPOOL_H_
+#define _TPOOL_H_
+
+typedef void (*tpool_routine) (void*);
+
+/* returns -1 if pool creation failed, 0 if succeeded */
+int tpool_init (int max_threads, int min_threads, int max_queued);
+
+/* returns -1 if failed (ie busy, closing etc...), 0 if succeeded */
+int tpool_add_work (tpool_routine routine, void *arg);
+
+/* cleanup and close. Must only be called once. Specify |finish| to drain queue */
+int tpool_destroy (int finish);
+
+#endif /* _TPOOL_H_ */