/* * HttpAuth * * Copyright (C) 2008 Stefan Walter * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU General Public License for more details. * You should have received a copy of the GNU General Public License * along with this program; if not, write to the * Free Software Foundation, Inc., * 59 Temple Place, Suite 330, * Boston, MA 02111-1307, USA. */ /* * ------------------------------------------------------------------------- * Ideas from tpool.c - htun thread pool functions * Copyright (C) 2002 Moshe Jacobson , * Ola Nordström * ------------------------------------------------------------------------- */ /* * most of the theory and implementation of the thread pool was taken * from the o'reilly pthreads programming book. */ #include #include #include #include #include #include #include #define __USE_GNU #include #include "tpool.h" typedef struct _tpool_work { void (*handler_routine)(void*); void *arg; struct _tpool_work *next; } tpool_work; typedef struct _tpool_thread { pthread_t handle; int is_active; struct _tpool_thread *next; } tpool_thread; static int tpool_max_threads = 0; static int tpool_min_threads = 0; static int tpool_running_threads = 0; static int tpool_idle_threads = 0; static int tpool_zombie_threads = 0; static tpool_thread *tpool_thread_list = NULL; static int tpool_max_queue_size = 0; static int tpool_cur_queue_size = 0; static tpool_work *tpool_queue_head = NULL; static tpool_work *tpool_queue_tail = NULL; static pthread_mutex_t tpool_queue_lock; static pthread_cond_t tpool_queue_not_full; static pthread_cond_t tpool_queue_not_empty; static pthread_cond_t tpool_queue_empty; static int tpool_queue_closed = 0; static int tpool_shutdown = 0; static int tpool_initialized = 0; #define PTHREQ(x) do { \ int r = (x); \ if (r != 0) { \ fprintf (stderr, "fatal: '%s' failed: %s", #x, strerror (r)); \ assert (0 && "" #x); \ } } while (0) static void* tpool_thread_func (void *data) { tpool_thread *thread = (tpool_thread*)data; tpool_work *work; int stop = 0; sigset_t set; /* We handle these signals on the main thread */ sigemptyset (&set); sigaddset (&set, SIGINT); sigaddset (&set, SIGTERM); pthread_sigmask (SIG_BLOCK, &set, NULL); assert (thread); assert (tpool_initialized); PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); assert (thread->is_active); ++tpool_running_threads; ++tpool_idle_threads; while (!tpool_shutdown && !stop) { /* No work to do? */ while (tpool_cur_queue_size == 0 && !tpool_shutdown && !stop) { /* Too many idle threads? */ if (tpool_running_threads > tpool_min_threads) { stop = 1; break; } /* * Sleep until there is work, while asleep the queue_lock * is relinquished */ PTHREQ (pthread_cond_wait (&tpool_queue_not_empty, &tpool_queue_lock)); } /* Are we shutting down? */ if (stop || tpool_shutdown) break; /* Pop the work off the queue */ work = tpool_queue_head; tpool_cur_queue_size--; if (tpool_cur_queue_size == 0) tpool_queue_head = tpool_queue_tail = NULL; else tpool_queue_head = work->next; /* Broadcast queue state to anyone who's waiting */ if (tpool_cur_queue_size < tpool_max_queue_size) PTHREQ (pthread_cond_broadcast (&tpool_queue_not_full)); if (tpool_cur_queue_size == 0) PTHREQ (pthread_cond_signal (&tpool_queue_empty)); --tpool_idle_threads; assert (tpool_idle_threads >= 0); /* Unlock the queue lock while processing */ PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); /* Perform the work */ (work->handler_routine) (work->arg); free (work); PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); ++tpool_idle_threads; } --tpool_idle_threads; --tpool_running_threads; ++tpool_zombie_threads; thread->is_active = 0; assert (tpool_running_threads >= 0); assert (tpool_idle_threads >= 0); PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); /* So we can double check the result code */ return thread; } static void free_pool_stuff (void) { tpool_work *work; if (!tpool_initialized) return; /* No threads should be running at this point */ assert (!tpool_thread_list); /* Clean up memory */ while (tpool_queue_head != NULL) { work = tpool_queue_head; tpool_queue_head = tpool_queue_head->next; free (work); } pthread_cond_destroy (&tpool_queue_not_full); pthread_cond_destroy (&tpool_queue_not_empty); pthread_cond_destroy (&tpool_queue_empty); pthread_mutex_destroy (&tpool_queue_lock); tpool_initialized = 0; } int tpool_init (int max_threads, int min_threads, int max_queued) { assert (!tpool_initialized); /* set the desired thread pool values */ tpool_max_threads = max_threads; tpool_running_threads = 0; tpool_idle_threads = 0; tpool_zombie_threads = 0; tpool_min_threads = min_threads; /* initialize the work queue */ tpool_max_queue_size = max_queued; tpool_cur_queue_size = 0; tpool_queue_head = NULL; tpool_queue_tail = NULL; tpool_queue_closed = 0; tpool_shutdown = 0; /* create the mutexs and cond vars */ PTHREQ (pthread_mutex_init (&tpool_queue_lock, NULL)); PTHREQ (pthread_cond_init (&tpool_queue_not_empty, NULL)); PTHREQ (pthread_cond_init (&tpool_queue_not_full, NULL)); PTHREQ (pthread_cond_init (&tpool_queue_empty, NULL)); tpool_initialized = 1; return 0; } static void join_pool_threads (int all) { tpool_thread *thread, **spot; void *exit_code; /* Should be called from within the lock */ for (spot = &tpool_thread_list, thread = *spot; thread && (all || tpool_zombie_threads > 0); ) { assert (thread->handle); if (all || !thread->is_active) { if (!thread->is_active) --tpool_zombie_threads; /* Join the thread */ PTHREQ (pthread_join (thread->handle, &exit_code)); assert (exit_code == thread); /* Remove and get the next in list */ *spot = thread->next; free (thread); } else { /* Just get the next in list */ spot = &thread->next; } thread = *spot; } } int tpool_add_work (void (*routine)(void*), void *arg) { tpool_work *work; tpool_thread *thread; int r, ret = -1; assert (tpool_initialized); PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); /* Join any old threads */ join_pool_threads (0); /* Should have found all of these above */ assert (tpool_zombie_threads == 0); /* Are we closed for business? */ if (tpool_shutdown || tpool_queue_closed) { errno = ECANCELED; goto done; } /* No idle threads to run this work? */ if (!tpool_idle_threads) { /* See if we can start another thread */ if (tpool_max_threads <= 0 || tpool_running_threads < tpool_max_threads) { /* The thread context */ thread = calloc (1, sizeof (tpool_thread)); if (thread == NULL) { errno = ENOMEM; goto done; } /* Start the thread */ thread->is_active = 1; r = pthread_create (&thread->handle, NULL, tpool_thread_func, (void*)thread); if (r != 0) { errno = r; free (thread); goto done; } /* Add the new thread to the list */ thread->next = tpool_thread_list; tpool_thread_list = thread; } } /* * If the queue is full, yield this thread, and see if * another processes something out of the queue. */ if (tpool_max_queue_size > 0 && tpool_cur_queue_size >= tpool_max_queue_size) { PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); pthread_yield(); PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); } /* Are we closed for business? */ if (tpool_shutdown || tpool_queue_closed) { errno = ECANCELED; goto done; } /* Is the queue still full? */ if (tpool_max_queue_size > 0 && tpool_cur_queue_size >= tpool_max_queue_size) { errno = EAGAIN; goto done; } /* Allocate the work structure */ work = calloc (1, sizeof (tpool_work)); if (work == NULL) { errno = ENOMEM; goto done; } work->handler_routine = routine; work->arg = arg; work->next = NULL; if (tpool_cur_queue_size == 0) { tpool_queue_tail = tpool_queue_head = work; } else { tpool_queue_tail->next = work; tpool_queue_tail = work; } PTHREQ (pthread_cond_broadcast (&tpool_queue_not_empty)); tpool_cur_queue_size++; ret = 0; done: PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); return ret; } int tpool_destroy (int finish) { assert (tpool_initialized); PTHREQ (pthread_mutex_lock (&tpool_queue_lock)); /* Can't have two places calling shutdown */ assert (!tpool_queue_closed && !tpool_shutdown); /* close the queue to any new work */ tpool_queue_closed = 1; /* if the finish flag is set, drain the queue */ if (finish) { while (tpool_cur_queue_size != 0) PTHREQ (pthread_cond_wait (&tpool_queue_empty, &tpool_queue_lock)); } /* Set the shutdown flag */ tpool_shutdown = 1; PTHREQ (pthread_mutex_unlock (&tpool_queue_lock)); /* wake up all workers to rechedk the shutdown flag */ PTHREQ (pthread_cond_broadcast (&tpool_queue_not_empty)); PTHREQ (pthread_cond_broadcast (&tpool_queue_not_full)); join_pool_threads (1); free_pool_stuff (); return 0; }