/* * Copyright (c) 2006, Stefan Walter * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * * Redistributions of source code must retain the above * copyright notice, this list of conditions and the * following disclaimer. * * Redistributions in binary form must reproduce the * above copyright notice, this list of conditions and * the following disclaimer in the documentation and/or * other materials provided with the distribution. * * The names of contributors to this software may not be * used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH * DAMAGE. */ #include "config.h" #include "server-mainloop.h" #include #include #include #include #include #include #include #include #ifdef HAVE_PTHREAD #include #endif typedef struct _socket_callback { int fd; server_socket_callback callback; void *arg; int closed; struct _socket_callback *next; } socket_callback; typedef struct _timer_callback { struct timeval at; struct timeval interval; server_timer_callback callback; void* arg; int closed; struct _timer_callback *next; } timer_callback; typedef struct _server_context { int stopped; fd_set read_fds; fd_set write_fds; int max_fd; int revision; socket_callback *callbacks; timer_callback *timers; int wakeup[2]; #ifdef HAVE_PTHREAD pthread_mutex_t mutex; #endif } server_context; #ifdef HAVE_PTHREAD #define LOCK() pthread_mutex_lock (&ctx.mutex) #define UNLOCK() pthread_mutex_unlock (&ctx.mutex) #else #define LOCK() #define UNLOCK() #endif /* Global context */ static server_context ctx; static void timeval_add(struct timeval *t1, struct timeval *t2) { assert (t1->tv_usec < 1000000); assert (t2->tv_usec < 1000000); t1->tv_sec += t2->tv_sec; t1->tv_usec += t2->tv_usec; if (t1->tv_usec >= 1000000) { t1->tv_usec -= 1000000; t1->tv_sec += 1; } } static void timeval_subtract (struct timeval *t1, struct timeval *t2) { assert (t1->tv_usec < 1000000); assert (t2->tv_usec < 1000000); t1->tv_sec -= t2->tv_sec; if (t1->tv_usec < t2->tv_usec) { t1->tv_usec += 1000000; t1->tv_sec -= 1; } t1->tv_usec -= t2->tv_usec; } static int timeval_compare(struct timeval *t1, struct timeval *t2) { assert (t1->tv_usec < 1000000); assert (t2->tv_usec < 1000000); if (t1->tv_sec > t2->tv_sec) return 1; else if (t1->tv_sec < t2->tv_sec) return -1; else { if (t1->tv_usec > t2->tv_usec) return 1; else if (t1->tv_usec < t2->tv_usec) return -1; else return 0; } } #define timeval_empty(tv) \ ((tv)->tv_sec == 0 && (tv)->tv_usec == 0) #define timeval_to_ms(tv) \ ((((uint64_t)(tv).tv_sec) * 1000L) + (((uint64_t)(tv).tv_usec) / 1000L)) #define timeval_dump(tv) \ (fprintf(stderr, "{ %d:%d }", (uint)((tv).tv_sec), (uint)((tv).tv_usec / 1000))) static int add_timer (int ms, int oneshot, server_timer_callback callback, void *arg) { struct timeval interval; timer_callback *cb; assert (ms || oneshot); assert (callback != NULL); interval.tv_sec = ms / 1000; interval.tv_usec = (ms % 1000) * 1000; /* into micro seconds */ cb = (timer_callback*)calloc (1, sizeof (*cb)); if(!cb) { errno = ENOMEM; return -1; } if (gettimeofday (&(cb->at), NULL) == -1) { free(cb); return -1; } timeval_add (&(cb->at), &interval); if (oneshot) memset (&(cb->interval), 0, sizeof (cb->interval)); else memcpy (&(cb->interval), &interval, sizeof(cb->interval)); cb->callback = callback; cb->arg = arg; cb->next = ctx.timers; ctx.timers = cb; ++ctx.revision; server_wakeup (); return 0; } static void remove_closed (void) { timer_callback **tcb, *timcb; socket_callback **scb, *sockcb; for (tcb = &ctx.timers; *tcb; ) { timcb = *tcb; if (timcb->closed) { *tcb = timcb->next; free (timcb); continue; } else { tcb = &timcb->next; } } for (scb = &ctx.callbacks; *scb; ) { sockcb = *scb; if (sockcb->closed) { *scb = sockcb->next; free (sockcb); continue; } else { scb = &sockcb->next; } } } int server_init (void) { memset (&ctx, 0, sizeof (ctx)); if (pipe (ctx.wakeup) < 0) return -1; FD_ZERO (&ctx.read_fds); FD_ZERO (&ctx.write_fds); /* The wakeup descriptor */ FD_SET (ctx.wakeup[0], &ctx.read_fds); fcntl (ctx.wakeup[0], F_SETFL, fcntl (ctx.wakeup[0], F_GETFL, 0) | O_NONBLOCK); ctx.max_fd = ctx.wakeup[0] + 1; ctx.stopped = 1; ctx.callbacks = NULL; ctx.timers = NULL; #ifdef HAVE_PTHREAD { int ret = pthread_mutex_init (&ctx.mutex, NULL); if (ret != 0) { errno = ret; close (ctx.wakeup[0]); close (ctx.wakeup[1]); return -1; } } #endif return 0; } void server_uninit (void) { timer_callback *timcb; timer_callback *timn; socket_callback *sockcb; socket_callback *sockn; LOCK (); for(timcb = ctx.timers; timcb; timcb = timn) { timn = timcb->next; free (timcb); } ctx.timers = NULL; for(sockcb = ctx.callbacks; sockcb; sockcb = sockn) { sockn = sockcb->next; free (sockcb); } ctx.callbacks = NULL; UNLOCK (); #ifdef HAVE_PTHREAD pthread_mutex_destroy (&ctx.mutex); #endif } uint64_t server_get_time (void) { struct timeval tv; if (gettimeofday (&tv, NULL) == -1) return 0L; return timeval_to_ms (tv); } int server_run (void) { char buffer[16]; struct timeval *timeout; struct timeval tv, current; timer_callback *timcb; socket_callback *sockcb; fd_set rfds, wfds; int r, revision, max_fd; int ret = 0; /* For doing lock safe callbacks */ int type, fd; server_socket_callback scb; server_timer_callback tcb; void *arg; LOCK (); /* No watches have been set */ assert (ctx.max_fd > -1); ctx.stopped = 0; ctx.revision = 0; while (!ctx.stopped) { /* Drain the wakeup if necessary */ while (read (ctx.wakeup[0], buffer, sizeof (buffer)) > 0); /* Watch for the various fds */ memcpy (&rfds, &ctx.read_fds, sizeof (rfds)); memcpy (&wfds, &ctx.write_fds, sizeof (wfds)); /* Prepare for timers */ timeout = NULL; if (gettimeofday (¤t, NULL) == -1) { ret = -1; break; } /* Cycle through timers */ for (timcb = ctx.timers; timcb; timcb = timcb->next) { assert (timcb->callback); if (timcb->closed) continue; /* Call any timers that have already passed */ if (timeval_compare (¤t, &timcb->at) >= 0) { /* Do a lock safe callback */ tcb = timcb->callback; arg = timcb->arg; /* Convert to milliseconds, and make the call */ UNLOCK (); r = (tcb) (timeval_to_ms (current), arg); LOCK (); /* Reset timer if so desired */ if (r == 1 && !timeval_empty (&timcb->interval)) { timeval_add (&timcb->at, &timcb->interval); /* If the time has already passed, just use current time */ if (timeval_compare (&(timcb->at), ¤t) <= 0) memcpy (&(timcb->at), ¤t, sizeof (timcb->at)); /* Otherwise remove it. Either one shot, or returned 0 */ } else { timcb->closed = 1; continue; } } /* Get soonest timer */ if (!timeout || timeval_compare (&timcb->at, timeout) < 0) timeout = &timcb->at; } /* Convert to an offset */ if (timeout) { memcpy (&tv, timeout, sizeof (tv)); timeout = &tv; timeval_subtract (timeout, ¤t); } if (ctx.stopped) continue; remove_closed (); /* No callbacks? No use continuing */ if (!ctx.callbacks && !ctx.timers) break; /* fprintf(stderr, "selecting with timeout: "); timeval_dump(timeout); fprintf(stderr, "\n"); */ /* We can see if changes happen while in select */ revision = ctx.revision; max_fd = ctx.max_fd; UNLOCK (); r = select (max_fd, &rfds, &wfds, NULL, timeout); LOCK (); if (r < 0) { /* Interrupted so try again, and possibly exit */ if (errno == EINTR) continue; if (errno == EBADF) { if (ctx.revision != revision) continue; assert (0 && "bad file descriptor being watched"); } /* Programmer errors */ assert (errno != EINVAL); ret = -1; break; } /* Timeout, just jump to timeout processing */ if(r == 0) continue; /* Process ready file descriptors */ for (sockcb = ctx.callbacks; sockcb; sockcb = sockcb->next) { assert (sockcb->fd != -1); if (sockcb->closed) continue; /* Call any that are set */ type = 0; if (FD_ISSET (sockcb->fd, &rfds)) type = SERVER_READ; if (FD_ISSET (sockcb->fd, &wfds)) type = SERVER_WRITE; /* Lock safe callback */ if (type != 0) { scb = sockcb->callback; fd = sockcb->fd; arg = sockcb->arg; UNLOCK (); (scb) (fd, type, arg); LOCK (); } } } UNLOCK (); return ret; } void server_wakeup (void) { /* * This can be called from within arbitrary * locks, so be careful. */ int r = write (ctx.wakeup[1], "a", 1); fprintf (stderr, "wakeup: %d\n", r); } void server_stop (void) { /* * This could be called from a signal handler, * so it's not worth trying to lock. */ ctx.stopped = 1; server_wakeup (); } int server_stopped (void) { return ctx.stopped; } int server_watch (int fd, int type, server_socket_callback callback, void *arg) { socket_callback *cb; assert (type != 0); assert (fd != -1); assert (callback != NULL); cb = (socket_callback*)calloc (sizeof(*cb), 1); if(!cb) { errno = ENOMEM; return -1; } LOCK (); cb->fd = fd; cb->callback = callback; cb->arg = arg; cb->next = ctx.callbacks; ctx.callbacks = cb; if (type & SERVER_READ) FD_SET (fd, &ctx.read_fds); if (type & SERVER_WRITE) FD_SET (fd, &ctx.write_fds); if (fd >= ctx.max_fd) ctx.max_fd = fd + 1; ++ctx.revision; server_wakeup (); UNLOCK (); return 0; } void server_unwatch (int fd) { socket_callback* cb; assert (fd != -1); LOCK (); FD_CLR (fd, &ctx.read_fds); FD_CLR (fd, &ctx.write_fds); if (!ctx.callbacks) goto done; /* First in list */; if (ctx.callbacks->fd == fd) { ctx.callbacks->closed = 1; goto done; } /* One ahead processing of rest */ for (cb = ctx.callbacks; cb->next; cb = cb->next) { if (cb->next->fd == fd) { cb->next->closed = 1; goto done; } } done: server_wakeup (); ++ctx.revision; UNLOCK (); } int server_timer (int ms, server_timer_callback callback, void *arg) { int ret; LOCK (); ret = add_timer (ms, 0, callback, arg); UNLOCK (); return ret; } int server_oneshot (int ms, server_timer_callback callback, void *arg) { int ret; LOCK (); ret = add_timer (ms, 1, callback, arg); UNLOCK (); return ret; }