summaryrefslogtreecommitdiff
path: root/common/server-mainloop.c
diff options
context:
space:
mode:
authorStef Walter <stef@memberwebs.com>2008-07-21 19:35:56 +0000
committerStef Walter <stef@memberwebs.com>2008-07-21 19:35:56 +0000
commit4c4bfb64b62ff5b7b7fa21ec0185db797f434386 (patch)
tree531eaed845b2997a3d71edaf2a522a00ea9307da /common/server-mainloop.c
parent56805d33c1ed477f6839074748bfa373db01c431 (diff)
- Rework event handling system so we don't use a full thread per
connection, but instead only use threads for active requests.
Diffstat (limited to 'common/server-mainloop.c')
-rw-r--r--common/server-mainloop.c583
1 files changed, 583 insertions, 0 deletions
diff --git a/common/server-mainloop.c b/common/server-mainloop.c
new file mode 100644
index 0000000..c102b88
--- /dev/null
+++ b/common/server-mainloop.c
@@ -0,0 +1,583 @@
+/*
+ * Copyright (c) 2006, Stefan Walter
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ * * Redistributions in binary form must reproduce the
+ * above copyright notice, this list of conditions and
+ * the following disclaimer in the documentation and/or
+ * other materials provided with the distribution.
+ * * The names of contributors to this software may not be
+ * used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+ * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+ * DAMAGE.
+ */
+
+#include "config.h"
+#include "server-mainloop.h"
+
+#include <sys/time.h>
+
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#ifdef HAVE_PTHREAD
+#include <pthread.h>
+#endif
+
+typedef struct _socket_callback {
+ int fd;
+ server_socket_callback callback;
+ void *arg;
+ int closed;
+
+ struct _socket_callback *next;
+} socket_callback;
+
+typedef struct _timer_callback {
+ struct timeval at;
+ struct timeval interval;
+ server_timer_callback callback;
+ void* arg;
+ int closed;
+
+ struct _timer_callback *next;
+} timer_callback;
+
+typedef struct _server_context {
+ int stopped;
+ fd_set read_fds;
+ fd_set write_fds;
+ int max_fd;
+ int revision;
+ socket_callback *callbacks;
+ timer_callback *timers;
+ int wakeup[2];
+#ifdef HAVE_PTHREAD
+ pthread_mutex_t mutex;
+#endif
+} server_context;
+
+#ifdef HAVE_PTHREAD
+#define LOCK() pthread_mutex_lock (&ctx.mutex)
+#define UNLOCK() pthread_mutex_unlock (&ctx.mutex)
+#else
+#define LOCK()
+#define UNLOCK()
+#endif
+
+/* Global context */
+static server_context ctx;
+
+static void
+timeval_add(struct timeval *t1, struct timeval *t2)
+{
+ assert (t1->tv_usec < 1000000);
+ assert (t2->tv_usec < 1000000);
+
+ t1->tv_sec += t2->tv_sec;
+ t1->tv_usec += t2->tv_usec;
+ if (t1->tv_usec >= 1000000) {
+ t1->tv_usec -= 1000000;
+ t1->tv_sec += 1;
+ }
+}
+
+static void
+timeval_subtract (struct timeval *t1, struct timeval *t2)
+{
+ assert (t1->tv_usec < 1000000);
+ assert (t2->tv_usec < 1000000);
+
+ t1->tv_sec -= t2->tv_sec;
+ if (t1->tv_usec < t2->tv_usec) {
+ t1->tv_usec += 1000000;
+ t1->tv_sec -= 1;
+ }
+ t1->tv_usec -= t2->tv_usec;
+}
+
+static int
+timeval_compare(struct timeval *t1, struct timeval *t2)
+{
+ assert (t1->tv_usec < 1000000);
+ assert (t2->tv_usec < 1000000);
+
+ if (t1->tv_sec > t2->tv_sec)
+ return 1;
+ else if (t1->tv_sec < t2->tv_sec)
+ return -1;
+ else {
+ if (t1->tv_usec > t2->tv_usec)
+ return 1;
+ else if (t1->tv_usec < t2->tv_usec)
+ return -1;
+ else
+ return 0;
+ }
+}
+
+#define timeval_empty(tv) \
+ ((tv)->tv_sec == 0 && (tv)->tv_usec == 0)
+
+#define timeval_to_ms(tv) \
+ ((((uint64_t)(tv).tv_sec) * 1000L) + (((uint64_t)(tv).tv_usec) / 1000L))
+
+#define timeval_dump(tv) \
+ (fprintf(stderr, "{ %d:%d }", (uint)((tv).tv_sec), (uint)((tv).tv_usec / 1000)))
+
+static int
+add_timer (int ms, int oneshot, server_timer_callback callback, void *arg)
+{
+ struct timeval interval;
+ timer_callback *cb;
+
+ assert (ms || oneshot);
+ assert (callback != NULL);
+
+ interval.tv_sec = ms / 1000;
+ interval.tv_usec = (ms % 1000) * 1000; /* into micro seconds */
+
+ cb = (timer_callback*)calloc (1, sizeof (*cb));
+ if(!cb) {
+ errno = ENOMEM;
+ return -1;
+ }
+
+ if (gettimeofday (&(cb->at), NULL) == -1) {
+ free(cb);
+ return -1;
+ }
+
+ timeval_add (&(cb->at), &interval);
+
+ if (oneshot)
+ memset (&(cb->interval), 0, sizeof (cb->interval));
+ else
+ memcpy (&(cb->interval), &interval, sizeof(cb->interval));
+
+ cb->callback = callback;
+ cb->arg = arg;
+
+ cb->next = ctx.timers;
+ ctx.timers = cb;
+ ++ctx.revision;
+ server_wakeup ();
+
+ return 0;
+}
+
+static void
+remove_closed (void)
+{
+ timer_callback **tcb, *timcb;
+ socket_callback **scb, *sockcb;
+
+ for (tcb = &ctx.timers; *tcb; ) {
+ timcb = *tcb;
+ if (timcb->closed) {
+ *tcb = timcb->next;
+ free (timcb);
+ continue;
+ } else {
+ tcb = &timcb->next;
+ }
+ }
+
+ for (scb = &ctx.callbacks; *scb; ) {
+ sockcb = *scb;
+ if (sockcb->closed) {
+ *scb = sockcb->next;
+ free (sockcb);
+ continue;
+ } else {
+ scb = &sockcb->next;
+ }
+ }
+}
+
+int
+server_init (void)
+{
+ memset (&ctx, 0, sizeof (ctx));
+
+ if (pipe (ctx.wakeup) < 0)
+ return -1;
+
+ FD_ZERO (&ctx.read_fds);
+ FD_ZERO (&ctx.write_fds);
+
+ /* The wakeup descriptor */
+ FD_SET (ctx.wakeup[0], &ctx.read_fds);
+ fcntl (ctx.wakeup[0], F_SETFL, fcntl (ctx.wakeup[0], F_GETFL, 0) | O_NONBLOCK);
+
+ ctx.max_fd = ctx.wakeup[0] + 1;
+
+ ctx.stopped = 1;
+ ctx.callbacks = NULL;
+ ctx.timers = NULL;
+
+
+#ifdef HAVE_PTHREAD
+ {
+ int ret = pthread_mutex_init (&ctx.mutex, NULL);
+ if (ret != 0) {
+ errno = ret;
+ close (ctx.wakeup[0]);
+ close (ctx.wakeup[1]);
+ return -1;
+ }
+ }
+#endif
+
+ return 0;
+}
+
+void
+server_uninit (void)
+{
+ timer_callback *timcb;
+ timer_callback *timn;
+ socket_callback *sockcb;
+ socket_callback *sockn;
+
+ LOCK ();
+
+ for(timcb = ctx.timers; timcb; timcb = timn) {
+ timn = timcb->next;
+ free (timcb);
+ }
+
+ ctx.timers = NULL;
+
+ for(sockcb = ctx.callbacks; sockcb; sockcb = sockn) {
+ sockn = sockcb->next;
+ free (sockcb);
+ }
+
+ ctx.callbacks = NULL;
+
+ UNLOCK ();
+
+#ifdef HAVE_PTHREAD
+ pthread_mutex_destroy (&ctx.mutex);
+#endif
+}
+
+uint64_t
+server_get_time (void)
+{
+ struct timeval tv;
+ if (gettimeofday (&tv, NULL) == -1)
+ return 0L;
+ return timeval_to_ms (tv);
+}
+
+int
+server_run (void)
+{
+ char buffer[16];
+ struct timeval *timeout;
+ struct timeval tv, current;
+ timer_callback *timcb;
+ socket_callback *sockcb;
+ fd_set rfds, wfds;
+ int r, revision, max_fd;
+ int ret = 0;
+
+ /* For doing lock safe callbacks */
+ int type, fd;
+ server_socket_callback scb;
+ server_timer_callback tcb;
+ void *arg;
+
+ LOCK ();
+
+ /* No watches have been set */
+ assert (ctx.max_fd > -1);
+
+ ctx.stopped = 0;
+ ctx.revision = 0;
+
+ while (!ctx.stopped) {
+
+ /* Drain the wakeup if necessary */
+ while (read (ctx.wakeup[0], buffer, sizeof (buffer)) > 0);
+
+ /* Watch for the various fds */
+ memcpy (&rfds, &ctx.read_fds, sizeof (rfds));
+ memcpy (&wfds, &ctx.write_fds, sizeof (wfds));
+
+ /* Prepare for timers */
+ timeout = NULL;
+ if (gettimeofday (&current, NULL) == -1) {
+ ret = -1;
+ break;
+ }
+
+ /* Cycle through timers */
+ for (timcb = ctx.timers; timcb; timcb = timcb->next) {
+ assert (timcb->callback);
+
+ if (timcb->closed)
+ continue;
+
+ /* Call any timers that have already passed */
+ if (timeval_compare (&current, &timcb->at) >= 0) {
+
+ /* Do a lock safe callback */
+ tcb = timcb->callback;
+ arg = timcb->arg;
+
+ /* Convert to milliseconds, and make the call */
+ UNLOCK ();
+ r = (tcb) (timeval_to_ms (current), arg);
+ LOCK ();
+
+ /* Reset timer if so desired */
+ if (r == 1 && !timeval_empty (&timcb->interval)) {
+ timeval_add (&timcb->at, &timcb->interval);
+
+ /* If the time has already passed, just use current time */
+ if (timeval_compare (&(timcb->at), &current) <= 0)
+ memcpy (&(timcb->at), &current, sizeof (timcb->at));
+
+ /* Otherwise remove it. Either one shot, or returned 0 */
+ } else {
+ timcb->closed = 1;
+ continue;
+ }
+ }
+
+ /* Get soonest timer */
+ if (!timeout || timeval_compare (&timcb->at, timeout) < 0)
+ timeout = &timcb->at;
+ }
+
+ /* Convert to an offset */
+ if (timeout) {
+ memcpy (&tv, timeout, sizeof (tv));
+ timeout = &tv;
+ timeval_subtract (timeout, &current);
+ }
+
+ if (ctx.stopped)
+ continue;
+
+ remove_closed ();
+
+ /* No callbacks? No use continuing */
+ if (!ctx.callbacks && !ctx.timers)
+ break;
+
+ /* fprintf(stderr, "selecting with timeout: ");
+ timeval_dump(timeout);
+ fprintf(stderr, "\n"); */
+
+ /* We can see if changes happen while in select */
+ revision = ctx.revision;
+ max_fd = ctx.max_fd;
+
+ UNLOCK ();
+ r = select (max_fd, &rfds, &wfds, NULL, timeout);
+ LOCK ();
+
+ if (r < 0) {
+ /* Interrupted so try again, and possibly exit */
+ if (errno == EINTR)
+ continue;
+
+ if (errno == EBADF) {
+ if (ctx.revision != revision)
+ continue;
+ assert (0 && "bad file descriptor being watched");
+ }
+
+ /* Programmer errors */
+ assert (errno != EINVAL);
+ ret = -1;
+ break;
+ }
+
+ /* Timeout, just jump to timeout processing */
+ if(r == 0)
+ continue;
+
+ /* Process ready file descriptors */
+ for (sockcb = ctx.callbacks; sockcb; sockcb = sockcb->next) {
+ assert (sockcb->fd != -1);
+ if (sockcb->closed)
+ continue;
+
+ /* Call any that are set */
+ type = 0;
+ if (FD_ISSET (sockcb->fd, &rfds))
+ type = SERVER_READ;
+ if (FD_ISSET (sockcb->fd, &wfds))
+ type = SERVER_WRITE;
+
+ /* Lock safe callback */
+ if (type != 0) {
+ scb = sockcb->callback;
+ fd = sockcb->fd;
+ arg = sockcb->arg;
+
+ UNLOCK ();
+ (scb) (fd, type, arg);
+ LOCK ();
+ }
+ }
+ }
+
+ UNLOCK ();
+ return ret;
+}
+
+void
+server_wakeup (void)
+{
+ /*
+ * This can be called from within arbitrary
+ * locks, so be careful.
+ */
+ int r = write (ctx.wakeup[1], "a", 1);
+ fprintf (stderr, "wakeup: %d\n", r);
+}
+
+void
+server_stop (void)
+{
+ /*
+ * This could be called from a signal handler,
+ * so it's not worth trying to lock.
+ */
+ ctx.stopped = 1;
+ server_wakeup ();
+}
+
+int
+server_stopped (void)
+{
+ return ctx.stopped;
+}
+
+int
+server_watch (int fd, int type, server_socket_callback callback, void *arg)
+{
+ socket_callback *cb;
+ assert (type != 0);
+ assert (fd != -1);
+ assert (callback != NULL);
+
+ cb = (socket_callback*)calloc (sizeof(*cb), 1);
+ if(!cb) {
+ errno = ENOMEM;
+ return -1;
+ }
+
+ LOCK ();
+
+ cb->fd = fd;
+ cb->callback = callback;
+ cb->arg = arg;
+
+ cb->next = ctx.callbacks;
+ ctx.callbacks = cb;
+
+ if (type & SERVER_READ)
+ FD_SET (fd, &ctx.read_fds);
+ if (type & SERVER_WRITE)
+ FD_SET (fd, &ctx.write_fds);
+
+ if (fd >= ctx.max_fd)
+ ctx.max_fd = fd + 1;
+
+ ++ctx.revision;
+ server_wakeup ();
+
+ UNLOCK ();
+
+ return 0;
+}
+
+void
+server_unwatch (int fd)
+{
+ socket_callback* cb;
+
+ assert (fd != -1);
+
+ LOCK ();
+
+ FD_CLR (fd, &ctx.read_fds);
+ FD_CLR (fd, &ctx.write_fds);
+
+ if (!ctx.callbacks)
+ goto done;
+
+ /* First in list */;
+ if (ctx.callbacks->fd == fd) {
+ ctx.callbacks->closed = 1;
+ goto done;
+ }
+
+ /* One ahead processing of rest */
+ for (cb = ctx.callbacks; cb->next; cb = cb->next) {
+ if (cb->next->fd == fd) {
+ cb->next->closed = 1;
+ goto done;
+ }
+ }
+
+done:
+ server_wakeup ();
+ ++ctx.revision;
+ UNLOCK ();
+}
+
+int
+server_timer (int ms, server_timer_callback callback, void *arg)
+{
+ int ret;
+
+ LOCK ();
+ ret = add_timer (ms, 0, callback, arg);
+ UNLOCK ();
+
+ return ret;
+}
+
+int
+server_oneshot (int ms, server_timer_callback callback, void *arg)
+{
+ int ret;
+
+ LOCK ();
+ ret = add_timer (ms, 1, callback, arg);
+ UNLOCK ();
+
+ return ret;
+}