/* * Copyright (c) 2004, Nate Nielsen * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * * Redistributions of source code must retain the above * copyright notice, this list of conditions and the * following disclaimer. * * Redistributions in binary form must reproduce the * above copyright notice, this list of conditions and * the following disclaimer in the documentation and/or * other materials provided with the distribution. * * The names of contributors to this software may not be * used to endorse or promote products derived from this * software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH * DAMAGE. * * * CONTRIBUTORS * Nate Nielsen */ /* * select() and stdio are basically mutually exclusive. * Hence all of this code to try to get some buffering * along with select IO multiplexing. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "usuals.h" #include "sock_any.h" #include "stringx.h" #include "sppriv.h" #define MAX_LOG_LINE 79 #define GET_IO_NAME(io) ((io)->name ? (io)->name : "??? ") #define HAS_EXTRA(io) ((io)->_ln > 0) static void close_raw(int* fd) { ASSERT(fd); shutdown(*fd, SHUT_RDWR); close(*fd); *fd = -1; } static void log_io_data(spctx_t* ctx, spio_t* io, const char* data, int read) { char buf[MAX_LOG_LINE + 1]; int pos, len; ASSERT(ctx && io && data); for(;;) { data += strspn(data, "\r\n"); if(!*data) break; pos = strcspn(data, "\r\n"); len = pos < MAX_LOG_LINE ? pos : MAX_LOG_LINE; memcpy(buf, data, len); buf[len] = 0; sp_messagex(ctx, LOG_DEBUG, "%s%s%s", GET_IO_NAME(io), read ? " < " : " > ", buf); data += pos; } } void spio_init(spio_t* io, const char* name) { ASSERT(io && name); memset(io, 0, sizeof(*io)); io->name = name; io->fd = -1; } int spio_connect(spctx_t* ctx, spio_t* io, const struct sockaddr_any* sany, const char* addrname) { int ret = 0; ASSERT(ctx && io && sany && addrname); ASSERT(io->fd == -1); if((io->fd = socket(SANY_TYPE(*sany), SOCK_STREAM, 0)) == -1) RETURN(-1); if(setsockopt(io->fd, SOL_SOCKET, SO_RCVTIMEO, &(g_state.timeout), sizeof(g_state.timeout)) == -1 || setsockopt(io->fd, SOL_SOCKET, SO_SNDTIMEO, &(g_state.timeout), sizeof(g_state.timeout)) == -1) sp_messagex(ctx, LOG_WARNING, "couldn't set timeouts on connection"); if(connect(io->fd, &SANY_ADDR(*sany), SANY_LEN(*sany)) == -1) RETURN(-1); /* As a double check */ io->line[0] = 0; io->_nx = NULL; io->_ln = 0; cleanup: if(ret < 0) { if(io->fd != -1) close(io->fd); sp_message(ctx, LOG_ERR, "couldn't connect to: %s", addrname); return -1; } ASSERT(io->fd != -1); sp_messagex(ctx, LOG_DEBUG, "%s connected to: %s", GET_IO_NAME(io), addrname); return 0; } void spio_disconnect(spctx_t* ctx, spio_t* io) { ASSERT(ctx && io); if(spio_valid(io)) { close_raw(&(io->fd)); sp_messagex(ctx, LOG_DEBUG, "%s connection closed", GET_IO_NAME(io)); } } unsigned int spio_select(spctx_t* ctx, ...) { fd_set mask; spio_t* io; int ret = 0; int i = 0; va_list ap; ASSERT(ctx); FD_ZERO(&mask); va_start(ap, ctx); while((io = va_arg(ap, spio_t*)) != NULL) { /* We can't handle more than 31 args */ if(i > (sizeof(int) * 8) - 2) break; /* Check if the buffer has something in it */ if(HAS_EXTRA(io)) ret |= (1 << i); /* Mark for select */ FD_SET(io->fd, &mask); i++; } va_end(ap); /* If any buffers had something present, then return */ if(ret != 0) return ret; /* Otherwise wait on more data */ switch(select(FD_SETSIZE, &mask, NULL, NULL, (struct timeval*)&(g_state.timeout))) { case 0: sp_messagex(ctx, LOG_ERR, "network operation timed out"); return ~0; case -1: sp_message(ctx, LOG_ERR, "couldn't select on sockets"); return ~0; }; /* See what came in */ i = 0; va_start(ap, ctx); while((io = va_arg(ap, spio_t*)) != NULL) { /* We can't handle more than 31 args */ if(i > (sizeof(int) * 8) - 2) break; /* Check if the buffer has something in it */ if(FD_ISSET(io->fd, &mask)) ret |= (1 << i); i++; } return ret; } int read_raw(spctx_t* ctx, spio_t* io, int opts) { int len, x; char* at; char* p; /* * Just a refresher: * * _nx: Extra data read on last read. * _ln: Length of that extra data. * * _nx should never be equal to line when entering this * function. And _ln should always be less than a full * buffer. */ /* Remaining data in the buffer */ if(io->_nx && io->_ln > 0) { ASSERT(!io->_nx || io->_nx > io->line); ASSERT(io->_ln < SP_LINE_LENGTH); ASSERT(io->_nx + io->_ln < io->line + SP_LINE_LENGTH); /* Check for a return in the current buffer */ if((p = (char*)memchr(io->_nx, '\n', io->_ln)) != NULL) { /* Move data to front */ x = (p - io->_nx) + 1; ASSERT(x > 0); memmove(io->line, io->_nx, x); /* Null teriminate it */ io->line[x] = 0; /* Do maintanence for next time around */ io->_ln -= x; io->_nx += x; /* A double check on the return value */ ASSERT(strlen(io->line) == x); return x; } /* Otherwise move all old data to front */ memmove(io->line, io->_nx, io->_ln); /* We always leave space for a null terminator */ len = (SP_LINE_LENGTH - io->_ln) - 1; at = io->line + io->_ln; } /* No data at front just read straight in */ else { /* We always leave space for a null terminator */ len = SP_LINE_LENGTH - 1; at = io->line; } for(;;) { /* Read a block of data */ ASSERT(io->fd != -1); x = read(io->fd, at, sizeof(char) * len); if(x == -1) { if(errno == EINTR) { /* When the application is quiting */ if(sp_is_quit()) return -1; /* For any other signal we go again */ continue; } if(errno == ECONNRESET) /* Not usually a big deal so supresse the error */ sp_messagex(ctx, LOG_DEBUG, "connection disconnected by peer: %s", GET_IO_NAME(io)); else if(errno == EAGAIN) sp_messagex(ctx, LOG_WARNING, "network read operation timed out: %s", GET_IO_NAME(io)); else sp_message(ctx, LOG_ERR, "couldn't read data from socket: %s", GET_IO_NAME(io)); /* * The basic logic here is that if we've had a fatal error * reading from the socket once then we shut it down as it's * no good trying to read from again later. */ close_raw(&(io->fd)); return -1; } /* End of data */ else if(x == 0) { /* Maintenance for remaining data */ io->_nx = NULL; io->_ln = 0; /* A double check on the return value */ ASSERT(strlen(io->line) == at - io->line); return at - io->line; } /* Check for a new line */ p = (char*)memchr(at, '\n', x); if(p != NULL) { p++; len = x - (p - at); /* Insert the null terminator */ memmove(p, p + 1, len); *p = 0; /* Do maintenence for remaining data */ io->_nx = p + 1; io->_ln = len; /* A double check on the return value */ ASSERT(strlen(io->line) == p - io->line); return p - io->line; } if(len <= 0) { /* Keep reading until we hit a new line */ if(opts & SPIO_DISCARD) { /* * K, basically the logic is that we're discarding * data ond the data will be screwed up. So overwriting * some valid data in order to flush the line and * keep the buffering simple is a price we pay gladly :) */ ASSERT(128 < SP_LINE_LENGTH); at = (io->line + SP_LINE_LENGTH) - 128; len = 128; /* Go for next read */ continue; } io->_nx = NULL; io->_ln = 0; /* Null terminate */ io->line[SP_LINE_LENGTH] = 0; /* A double check on the return value */ ASSERT(strlen(io->line) == p - io->line); return SP_LINE_LENGTH; } } } int spio_read_line(spctx_t* ctx, spio_t* io, int opts) { int x, l; char* t; ASSERT(ctx && io); if(!spio_valid(io)) { sp_messagex(ctx, LOG_WARNING, "tried to read from a closed connection"); return 0; } x = read_raw(ctx, io, opts); if(x > 0) { if(opts & SPIO_TRIM) { t = io->line; while(*t && isspace(*t)) t++; /* Bump the entire line down */ l = t - io->line; memmove(io->line, t, (x + 1) - l); x -= l; /* Now the end */ t = io->line + x; while(t > io->line && isspace(*(t - 1))) { *(--t) = 0; x--; } } if(!(opts & SPIO_QUIET)) log_io_data(ctx, io, io->line, 1); } return x; } int spio_write_data(spctx_t* ctx, spio_t* io, const char* data) { int len = strlen(data); ASSERT(ctx && io && data); if(!spio_valid(io)) { sp_message(ctx, LOG_ERR, "connection closed. can't write data."); return -1; } log_io_data(ctx, io, data, 0); return spio_write_data_raw(ctx, io, (unsigned char*)data, len); } int spio_write_data_raw(spctx_t* ctx, spio_t* io, unsigned char* buf, int len) { int r; ASSERT(ctx && io && buf); if(io->fd == -1) return 0; while(len > 0) { r = write(io->fd, buf, len); if(r > 0) { buf += r; len -= r; } else if(r == -1) { if(errno == EINTR) { /* When the application is quiting */ if(sp_is_quit()) return -1; /* For any other signal we go again */ continue; } /* * The basic logic here is that if we've had a fatal error * writing to the socket once then we shut it down as it's * no good trying to write to it again later. */ close_raw(&(io->fd)); if(errno == EAGAIN) sp_messagex(ctx, LOG_WARNING, "network write operation timed out: %s", GET_IO_NAME(io)); else sp_message(ctx, LOG_ERR, "couldn't write data to socket: %s", GET_IO_NAME(io)); return -1; } } return 0; } void spio_read_junk(spctx_t* ctx, spio_t* io) { char buf[16]; const char* t; int said = 0; int l; ASSERT(ctx); ASSERT(io); /* Truncate any data in buffer */ io->_ln = 0; io->_nx = 0; if(!spio_valid(io)) return; /* Make it non blocking */ fcntl(io->fd, F_SETFL, fcntl(io->fd, F_GETFL, 0) | O_NONBLOCK); for(;;) { l = read(io->fd, buf, sizeof(buf) - 1); if(l <= 0) break; buf[l] = 0; t = trim_start(buf); if(!said && *t) { sp_messagex(ctx, LOG_DEBUG, "received junk data from daemon"); said = 1; } } fcntl(io->fd, F_SETFL, fcntl(io->fd, F_GETFL, 0) & ~O_NONBLOCK); }