From 4c4bfb64b62ff5b7b7fa21ec0185db797f434386 Mon Sep 17 00:00:00 2001 From: Stef Walter Date: Mon, 21 Jul 2008 19:35:56 +0000 Subject: - Rework event handling system so we don't use a full thread per connection, but instead only use threads for active requests. --- common/server-mainloop.c | 583 +++++++++++++++++++++++++++++++++++++++++++++++ common/server-mainloop.h | 57 +++++ common/tpool.c | 398 ++++++++++++++++++++++++++++++++ common/tpool.h | 36 +++ 4 files changed, 1074 insertions(+) create mode 100644 common/server-mainloop.c create mode 100644 common/server-mainloop.h create mode 100644 common/tpool.c create mode 100644 common/tpool.h (limited to 'common') diff --git a/common/server-mainloop.c b/common/server-mainloop.c new file mode 100644 index 0000000..c102b88 --- /dev/null +++ b/common/server-mainloop.c @@ -0,0 +1,583 @@ +/* + * Copyright (c) 2006, Stefan Walter + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * * Redistributions in binary form must reproduce the + * above copyright notice, this list of conditions and + * the following disclaimer in the documentation and/or + * other materials provided with the distribution. + * * The names of contributors to this software may not be + * used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH + * DAMAGE. + */ + +#include "config.h" +#include "server-mainloop.h" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#ifdef HAVE_PTHREAD +#include +#endif + +typedef struct _socket_callback { + int fd; + server_socket_callback callback; + void *arg; + int closed; + + struct _socket_callback *next; +} socket_callback; + +typedef struct _timer_callback { + struct timeval at; + struct timeval interval; + server_timer_callback callback; + void* arg; + int closed; + + struct _timer_callback *next; +} timer_callback; + +typedef struct _server_context { + int stopped; + fd_set read_fds; + fd_set write_fds; + int max_fd; + int revision; + socket_callback *callbacks; + timer_callback *timers; + int wakeup[2]; +#ifdef HAVE_PTHREAD + pthread_mutex_t mutex; +#endif +} server_context; + +#ifdef HAVE_PTHREAD +#define LOCK() pthread_mutex_lock (&ctx.mutex) +#define UNLOCK() pthread_mutex_unlock (&ctx.mutex) +#else +#define LOCK() +#define UNLOCK() +#endif + +/* Global context */ +static server_context ctx; + +static void +timeval_add(struct timeval *t1, struct timeval *t2) +{ + assert (t1->tv_usec < 1000000); + assert (t2->tv_usec < 1000000); + + t1->tv_sec += t2->tv_sec; + t1->tv_usec += t2->tv_usec; + if (t1->tv_usec >= 1000000) { + t1->tv_usec -= 1000000; + t1->tv_sec += 1; + } +} + +static void +timeval_subtract (struct timeval *t1, struct timeval *t2) +{ + assert (t1->tv_usec < 1000000); + assert (t2->tv_usec < 1000000); + + t1->tv_sec -= t2->tv_sec; + if (t1->tv_usec < t2->tv_usec) { + t1->tv_usec += 1000000; + t1->tv_sec -= 1; + } + t1->tv_usec -= t2->tv_usec; +} + +static int +timeval_compare(struct timeval *t1, struct timeval *t2) +{ + assert (t1->tv_usec < 1000000); + assert (t2->tv_usec < 1000000); + + if (t1->tv_sec > t2->tv_sec) + return 1; + else if (t1->tv_sec < t2->tv_sec) + return -1; + else { + if (t1->tv_usec > t2->tv_usec) + return 1; + else if (t1->tv_usec < t2->tv_usec) + return -1; + else + return 0; + } +} + +#define timeval_empty(tv) \ + ((tv)->tv_sec == 0 && (tv)->tv_usec == 0) + +#define timeval_to_ms(tv) \ + ((((uint64_t)(tv).tv_sec) * 1000L) + (((uint64_t)(tv).tv_usec) / 1000L)) + +#define timeval_dump(tv) \ + (fprintf(stderr, "{ %d:%d }", (uint)((tv).tv_sec), (uint)((tv).tv_usec / 1000))) + +static int +add_timer (int ms, int oneshot, server_timer_callback callback, void *arg) +{ + struct timeval interval; + timer_callback *cb; + + assert (ms || oneshot); + assert (callback != NULL); + + interval.tv_sec = ms / 1000; + interval.tv_usec = (ms % 1000) * 1000; /* into micro seconds */ + + cb = (timer_callback*)calloc (1, sizeof (*cb)); + if(!cb) { + errno = ENOMEM; + return -1; + } + + if (gettimeofday (&(cb->at), NULL) == -1) { + free(cb); + return -1; + } + + timeval_add (&(cb->at), &interval); + + if (oneshot) + memset (&(cb->interval), 0, sizeof (cb->interval)); + else + memcpy (&(cb->interval), &interval, sizeof(cb->interval)); + + cb->callback = callback; + cb->arg = arg; + + cb->next = ctx.timers; + ctx.timers = cb; + ++ctx.revision; + server_wakeup (); + + return 0; +} + +static void +remove_closed (void) +{ + timer_callback **tcb, *timcb; + socket_callback **scb, *sockcb; + + for (tcb = &ctx.timers; *tcb; ) { + timcb = *tcb; + if (timcb->closed) { + *tcb = timcb->next; + free (timcb); + continue; + } else { + tcb = &timcb->next; + } + } + + for (scb = &ctx.callbacks; *scb; ) { + sockcb = *scb; + if (sockcb->closed) { + *scb = sockcb->next; + free (sockcb); + continue; + } else { + scb = &sockcb->next; + } + } +} + +int +server_init (void) +{ + memset (&ctx, 0, sizeof (ctx)); + + if (pipe (ctx.wakeup) < 0) + return -1; + + FD_ZERO (&ctx.read_fds); + FD_ZERO (&ctx.write_fds); + + /* The wakeup descriptor */ + FD_SET (ctx.wakeup[0], &ctx.read_fds); + fcntl (ctx.wakeup[0], F_SETFL, fcntl (ctx.wakeup[0], F_GETFL, 0) | O_NONBLOCK); + + ctx.max_fd = ctx.wakeup[0] + 1; + + ctx.stopped = 1; + ctx.callbacks = NULL; + ctx.timers = NULL; + + +#ifdef HAVE_PTHREAD + { + int ret = pthread_mutex_init (&ctx.mutex, NULL); + if (ret != 0) { + errno = ret; + close (ctx.wakeup[0]); + close (ctx.wakeup[1]); + return -1; + } + } +#endif + + return 0; +} + +void +server_uninit (void) +{ + timer_callback *timcb; + timer_callback *timn; + socket_callback *sockcb; + socket_callback *sockn; + + LOCK (); + + for(timcb = ctx.timers; timcb; timcb = timn) { + timn = timcb->next; + free (timcb); + } + + ctx.timers = NULL; + + for(sockcb = ctx.callbacks; sockcb; sockcb = sockn) { + sockn = sockcb->next; + free (sockcb); + } + + ctx.callbacks = NULL; + + UNLOCK (); + +#ifdef HAVE_PTHREAD + pthread_mutex_destroy (&ctx.mutex); +#endif +} + +uint64_t +server_get_time (void) +{ + struct timeval tv; + if (gettimeofday (&tv, NULL) == -1) + return 0L; + return timeval_to_ms (tv); +} + +int +server_run (void) +{ + char buffer[16]; + struct timeval *timeout; + struct timeval tv, current; + timer_callback *timcb; + socket_callback *sockcb; + fd_set rfds, wfds; + int r, revision, max_fd; + int ret = 0; + + /* For doing lock safe callbacks */ + int type, fd; + server_socket_callback scb; + server_timer_callback tcb; + void *arg; + + LOCK (); + + /* No watches have been set */ + assert (ctx.max_fd > -1); + + ctx.stopped = 0; + ctx.revision = 0; + + while (!ctx.stopped) { + + /* Drain the wakeup if necessary */ + while (read (ctx.wakeup[0], buffer, sizeof (buffer)) > 0); + + /* Watch for the various fds */ + memcpy (&rfds, &ctx.read_fds, sizeof (rfds)); + memcpy (&wfds, &ctx.write_fds, sizeof (wfds)); + + /* Prepare for timers */ + timeout = NULL; + if (gettimeofday (¤t, NULL) == -1) { + ret = -1; + break; + } + + /* Cycle through timers */ + for (timcb = ctx.timers; timcb; timcb = timcb->next) { + assert (timcb->callback); + + if (timcb->closed) + continue; + + /* Call any timers that have already passed */ + if (timeval_compare (¤t, &timcb->at) >= 0) { + + /* Do a lock safe callback */ + tcb = timcb->callback; + arg = timcb->arg; + + /* Convert to milliseconds, and make the call */ + UNLOCK (); + r = (tcb) (timeval_to_ms (current), arg); + LOCK (); + + /* Reset timer if so desired */ + if (r == 1 && !timeval_empty (&timcb->interval)) { + timeval_add (&timcb->at, &timcb->interval); + + /* If the time has already passed, just use current time */ + if (timeval_compare (&(timcb->at), ¤t) <= 0) + memcpy (&(timcb->at), ¤t, sizeof (timcb->at)); + + /* Otherwise remove it. Either one shot, or returned 0 */ + } else { + timcb->closed = 1; + continue; + } + } + + /* Get soonest timer */ + if (!timeout || timeval_compare (&timcb->at, timeout) < 0) + timeout = &timcb->at; + } + + /* Convert to an offset */ + if (timeout) { + memcpy (&tv, timeout, sizeof (tv)); + timeout = &tv; + timeval_subtract (timeout, ¤t); + } + + if (ctx.stopped) + continue; + + remove_closed (); + + /* No callbacks? No use continuing */ + if (!ctx.callbacks && !ctx.timers) + break; + + /* fprintf(stderr, "selecting with timeout: "); + timeval_dump(timeout); + fprintf(stderr, "\n"); */ + + /* We can see if changes happen while in select */ + revision = ctx.revision; + max_fd = ctx.max_fd; + + UNLOCK (); + r = select (max_fd, &rfds, &wfds, NULL, timeout); + LOCK (); + + if (r < 0) { + /* Interrupted so try again, and possibly exit */ + if (errno == EINTR) + continue; + + if (errno == EBADF) { + if (ctx.revision != revision) + continue; + assert (0 && "bad file descriptor being watched"); + } + + /* Programmer errors */ + assert (errno != EINVAL); + ret = -1; + break; + } + + /* Timeout, just jump to timeout processing */ + if(r == 0) + continue; + + /* Process ready file descriptors */ + for (sockcb = ctx.callbacks; sockcb; sockcb = sockcb->next) { + assert (sockcb->fd != -1); + if (sockcb->closed) + continue; + + /* Call any that are set */ + type = 0; + if (FD_ISSET (sockcb->fd, &rfds)) + type = SERVER_READ; + if (FD_ISSET (sockcb->fd, &wfds)) + type = SERVER_WRITE; + + /* Lock safe callback */ + if (type != 0) { + scb = sockcb->callback; + fd = sockcb->fd; + arg = sockcb->arg; + + UNLOCK (); + (scb) (fd, type, arg); + LOCK (); + } + } + } + + UNLOCK (); + return ret; +} + +void +server_wakeup (void) +{ + /* + * This can be called from within arbitrary + * locks, so be careful. + */ + int r = write (ctx.wakeup[1], "a", 1); + fprintf (stderr, "wakeup: %d\n", r); +} + +void +server_stop (void) +{ + /* + * This could be called from a signal handler, + * so it's not worth trying to lock. + */ + ctx.stopped = 1; + server_wakeup (); +} + +int +server_stopped (void) +{ + return ctx.stopped; +} + +int +server_watch (int fd, int type, server_socket_callback callback, void *arg) +{ + socket_callback *cb; + assert (type != 0); + assert (fd != -1); + assert (callback != NULL); + + cb = (socket_callback*)calloc (sizeof(*cb), 1); + if(!cb) { + errno = ENOMEM; + return -1; + } + + LOCK (); + + cb->fd = fd; + cb->callback = callback; + cb->arg = arg; + + cb->next = ctx.callbacks; + ctx.callbacks = cb; + + if (type & SERVER_READ) + FD_SET (fd, &ctx.read_fds); + if (type & SERVER_WRITE) + FD_SET (fd, &ctx.write_fds); + + if (fd >= ctx.max_fd) + ctx.max_fd = fd + 1; + + ++ctx.revision; + server_wakeup (); + + UNLOCK (); + + return 0; +} + +void +server_unwatch (int fd) +{ + socket_callback* cb; + + assert (fd != -1); + + LOCK (); + + FD_CLR (fd, &ctx.read_fds); + FD_CLR (fd, &ctx.write_fds); + + if (!ctx.callbacks) + goto done; + + /* First in list */; + if (ctx.callbacks->fd == fd) { + ctx.callbacks->closed = 1; + goto done; + } + + /* One ahead processing of rest */ + for (cb = ctx.callbacks; cb->next; cb = cb->next) { + if (cb->next->fd == fd) { + cb->next->closed = 1; + goto done; + } + } + +done: + server_wakeup (); + ++ctx.revision; + UNLOCK (); +} + +int +server_timer (int ms, server_timer_callback callback, void *arg) +{ + int ret; + + LOCK (); + ret = add_timer (ms, 0, callback, arg); + UNLOCK (); + + return ret; +} + +int +server_oneshot (int ms, server_timer_callback callback, void *arg) +{ + int ret; + + LOCK (); + ret = add_timer (ms, 1, callback, arg); + UNLOCK (); + + return ret; +} diff --git a/common/server-mainloop.h b/common/server-mainloop.h new file mode 100644 index 0000000..a749653 --- /dev/null +++ b/common/server-mainloop.h @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2006, Stefan Walter + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * * Redistributions in binary form must reproduce the + * above copyright notice, this list of conditions and + * the following disclaimer in the documentation and/or + * other materials provided with the distribution. + * * The names of contributors to this software may not be + * used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH + * DAMAGE. + */ + +#ifndef __SERVER_MAINLOOP_H__ +#define __SERVER_MAINLOOP_H__ + +#include + +#define SERVER_READ 0x01 +#define SERVER_WRITE 0x02 + +typedef void (*server_socket_callback) (int fd, int type, void* arg); +typedef int (*server_timer_callback) (uint64_t when, void* arg); + +int server_init (void); +void server_uninit (void); +int server_run (void); +void server_stop (void); +int server_stopped (void); +int server_watch (int fd, int type, server_socket_callback callback, void* arg); +void server_unwatch (int fd); +void server_wakeup (void); +int server_timer (int length, server_timer_callback callback, void* arg); +int server_oneshot (int length, server_timer_callback callback, void* arg); +uint64_t server_get_time (void); + +#endif /* __SERVER_MAINLOOP_H__ */ diff --git a/common/tpool.c b/common/tpool.c new file mode 100644 index 0000000..f980b86 --- /dev/null +++ b/common/tpool.c @@ -0,0 +1,398 @@ +/* + * HttpAuth + * + * Copyright (C) 2008 Stefan Walter + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/* + * ------------------------------------------------------------------------- + * Ideas from tpool.c - htun thread pool functions + * Copyright (C) 2002 Moshe Jacobson , + * Ola Nordström + * ------------------------------------------------------------------------- + */ + +/* + * most of the theory and implementation of the thread pool was taken + * from the o'reilly pthreads programming book. + */ + +#include +#include +#include +#include +#include +#include +#include + +#define __USE_GNU +#include + +#include "tpool.h" + +typedef struct _tpool_work { + void (*handler_routine)(void*); + void *arg; + struct _tpool_work *next; +} tpool_work; + +typedef struct _tpool_thread { + pthread_t handle; + int is_active; + struct _tpool_thread *next; +} tpool_thread; + +static int tpool_max_threads = 0; +static int tpool_min_threads = 0; +static int tpool_running_threads = 0; +static int tpool_idle_threads = 0; +static int tpool_zombie_threads = 0; +static tpool_thread *tpool_thread_list = NULL; + +static int tpool_max_queue_size = 0; +static int tpool_cur_queue_size = 0; +static tpool_work *tpool_queue_head = NULL; +static tpool_work *tpool_queue_tail = NULL; +static pthread_mutex_t tpool_queue_lock; +static pthread_cond_t tpool_queue_not_full; +static pthread_cond_t tpool_queue_not_empty; +static pthread_cond_t tpool_queue_empty; + +static int tpool_queue_closed = 0; +static int tpool_shutdown = 0; +static int tpool_initialized = 0; + +#define PTHREQ(x) do { \ + int r = (x); \ + if (r != 0) { \ + fprintf (stderr, "fatal: '%s' failed: %s", #x, strerror (r)); \ + assert (0 && "" #x); \ + } } while (0) + +static void* +tpool_thread_func (void *data) +{ + tpool_thread *thread = (tpool_thread*)data; + tpool_work *work; + int stop = 0; + sigset_t set; + + /* We handle these signals on the main thread */ + sigemptyset (&set); + sigaddset (&set, SIGINT); + sigaddset (&set, SIGTERM); + pthread_sigmask (SIG_BLOCK, &set, NULL); + + assert (thread); + assert (tpool_initialized); + + PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); + + assert (thread->is_active); + ++tpool_running_threads; + ++tpool_idle_threads; + + while (!tpool_shutdown && !stop) { + + /* No work to do? */ + while (tpool_cur_queue_size == 0 && !tpool_shutdown && !stop) { + + /* Too many idle threads? */ + if (tpool_running_threads > tpool_min_threads) { + stop = 1; + break; + } + + /* + * Sleep until there is work, while asleep the queue_lock + * is relinquished + */ + PTHREQ (pthread_cond_wait (&tpool_queue_not_empty, &tpool_queue_lock)); + } + + /* Are we shutting down? */ + if (stop || tpool_shutdown) + break; + + /* Pop the work off the queue */ + work = tpool_queue_head; + tpool_cur_queue_size--; + if (tpool_cur_queue_size == 0) + tpool_queue_head = tpool_queue_tail = NULL; + else + tpool_queue_head = work->next; + + /* Broadcast queue state to anyone who's waiting */ + if (tpool_cur_queue_size < tpool_max_queue_size) + PTHREQ (pthread_cond_broadcast (&tpool_queue_not_full)); + if (tpool_cur_queue_size == 0) + PTHREQ (pthread_cond_signal (&tpool_queue_empty)); + + --tpool_idle_threads; + assert (tpool_idle_threads >= 0); + + /* Unlock the queue lock while processing */ + PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); + + /* Perform the work */ + (work->handler_routine) (work->arg); + free (work); + + PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); + + ++tpool_idle_threads; + } + + --tpool_idle_threads; + --tpool_running_threads; + ++tpool_zombie_threads; + thread->is_active = 0; + assert (tpool_running_threads >= 0); + assert (tpool_idle_threads >= 0); + + PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); + + /* So we can double check the result code */ + return thread; +} + +static void +free_pool_stuff (void) +{ + tpool_work *work; + + if (!tpool_initialized) + return; + + /* No threads should be running at this point */ + assert (!tpool_thread_list); + + /* Clean up memory */ + while (tpool_queue_head != NULL) { + work = tpool_queue_head; + tpool_queue_head = tpool_queue_head->next; + free (work); + } + + pthread_cond_destroy (&tpool_queue_not_full); + pthread_cond_destroy (&tpool_queue_not_empty); + pthread_cond_destroy (&tpool_queue_empty); + pthread_mutex_destroy (&tpool_queue_lock); + + tpool_initialized = 0; +} + +int +tpool_init (int max_threads, int min_threads, int max_queued) +{ + assert (!tpool_initialized); + + /* set the desired thread pool values */ + tpool_max_threads = max_threads; + tpool_running_threads = 0; + tpool_idle_threads = 0; + tpool_zombie_threads = 0; + tpool_min_threads = min_threads; + + /* initialize the work queue */ + tpool_max_queue_size = max_queued; + tpool_cur_queue_size = 0; + tpool_queue_head = NULL; + tpool_queue_tail = NULL; + tpool_queue_closed = 0; + tpool_shutdown = 0; + + /* create the mutexs and cond vars */ + PTHREQ (pthread_mutex_init (&tpool_queue_lock, NULL)); + PTHREQ (pthread_cond_init (&tpool_queue_not_empty, NULL)); + PTHREQ (pthread_cond_init (&tpool_queue_not_full, NULL)); + PTHREQ (pthread_cond_init (&tpool_queue_empty, NULL)); + + tpool_initialized = 1; + return 0; +} + +static void +join_pool_threads (int all) +{ + tpool_thread *thread, **spot; + void *exit_code; + + /* Should be called from within the lock */ + + for (spot = &tpool_thread_list, thread = *spot; + thread && (all || tpool_zombie_threads > 0); ) { + + assert (thread->handle); + + if (all || !thread->is_active) { + if (!thread->is_active) + --tpool_zombie_threads; + + /* Join the thread */ + PTHREQ (pthread_join (thread->handle, &exit_code)); + assert (exit_code == thread); + + /* Remove and get the next in list */ + *spot = thread->next; + free (thread); + + } else { + /* Just get the next in list */ + spot = &thread->next; + } + + thread = *spot; + } +} + +int +tpool_add_work (void (*routine)(void*), void *arg) +{ + tpool_work *work; + tpool_thread *thread; + int r, ret = -1; + + assert (tpool_initialized); + + PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); + + /* Join any old threads */ + join_pool_threads (0); + + /* Should have found all of these above */ + assert (tpool_zombie_threads == 0); + + /* Are we closed for business? */ + if (tpool_shutdown || tpool_queue_closed) { + errno = ECANCELED; + goto done; + } + + /* No idle threads to run this work? */ + if (!tpool_idle_threads) { + + /* See if we can start another thread */ + if (tpool_max_threads <= 0 || + tpool_running_threads < tpool_max_threads) { + + /* The thread context */ + thread = calloc (1, sizeof (tpool_thread)); + if (thread == NULL) { + errno = ENOMEM; + goto done; + } + + /* Start the thread */ + thread->is_active = 1; + r = pthread_create (&thread->handle, NULL, tpool_thread_func, (void*)thread); + if (r != 0) { + errno = r; + free (thread); + goto done; + } + + /* Add the new thread to the list */ + thread->next = tpool_thread_list; + tpool_thread_list = thread; + } + } + + /* + * If the queue is full, yield this thread, and see if + * another processes something out of the queue. + */ + if (tpool_max_queue_size > 0 && tpool_cur_queue_size >= tpool_max_queue_size) { + PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); + pthread_yield(); + PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); + } + + /* Are we closed for business? */ + if (tpool_shutdown || tpool_queue_closed) { + errno = ECANCELED; + goto done; + } + + /* Is the queue still full? */ + if (tpool_max_queue_size > 0 && tpool_cur_queue_size >= tpool_max_queue_size) { + errno = EAGAIN; + goto done; + } + + /* Allocate the work structure */ + work = calloc (1, sizeof (tpool_work)); + if (work == NULL) { + errno = ENOMEM; + goto done; + } + + work->handler_routine = routine; + work->arg = arg; + work->next = NULL; + + if (tpool_cur_queue_size == 0) { + tpool_queue_tail = tpool_queue_head = work; + } else { + tpool_queue_tail->next = work; + tpool_queue_tail = work; + } + + PTHREQ (pthread_cond_broadcast (&tpool_queue_not_empty)); + tpool_cur_queue_size++; + ret = 0; + +done: + PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); + return ret; +} + +int +tpool_destroy (int finish) +{ + assert (tpool_initialized); + + PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); + + /* Can't have two places calling shutdown */ + assert (!tpool_queue_closed && !tpool_shutdown); + + /* close the queue to any new work */ + tpool_queue_closed = 1; + + /* if the finish flag is set, drain the queue */ + if (finish) { + while (tpool_cur_queue_size != 0) + PTHREQ (pthread_cond_wait (&tpool_queue_empty, &tpool_queue_lock)); + } + + /* Set the shutdown flag */ + tpool_shutdown = 1; + + PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); + + /* wake up all workers to rechedk the shutdown flag */ + PTHREQ (pthread_cond_broadcast (&tpool_queue_not_empty)); + PTHREQ (pthread_cond_broadcast (&tpool_queue_not_full)); + + join_pool_threads (1); + free_pool_stuff (); + + return 0; +} + diff --git a/common/tpool.h b/common/tpool.h new file mode 100644 index 0000000..ac5a3f4 --- /dev/null +++ b/common/tpool.h @@ -0,0 +1,36 @@ +/* + * HttpAuth + * + * Copyright (C) 2008 Stefan Walter + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the + * Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef _TPOOL_H_ +#define _TPOOL_H_ + +typedef void (*tpool_routine) (void*); + +/* returns -1 if pool creation failed, 0 if succeeded */ +int tpool_init (int max_threads, int min_threads, int max_queued); + +/* returns -1 if failed (ie busy, closing etc...), 0 if succeeded */ +int tpool_add_work (tpool_routine routine, void *arg); + +/* cleanup and close. Must only be called once. Specify |finish| to drain queue */ +int tpool_destroy (int finish); + +#endif /* _TPOOL_H_ */ -- cgit v1.2.3