diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-03 13:28:45 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-03 13:28:45 +0000 |
commit | 8951958ff6d8241df9ffe048e2c0d0766a9d383b (patch) | |
tree | f667f9ec334d959d8afbf068553e7a1cf1bdcdde /syslogd.c | |
parent | 9e67ae041d964748755e5c9c45ebe55ff612391e (diff) | |
download | rsyslog-8951958ff6d8241df9ffe048e2c0d0766a9d383b.tar.gz rsyslog-8951958ff6d8241df9ffe048e2c0d0766a9d383b.tar.bz2 rsyslog-8951958ff6d8241df9ffe048e2c0d0766a9d383b.zip |
queue is now a full object and handles threading by itself
Diffstat (limited to 'syslogd.c')
-rw-r--r-- | syslogd.c | 230 |
1 files changed, 41 insertions, 189 deletions
@@ -7,33 +7,13 @@ * * to learn more about it and discuss any questions you may have. * - * Please note that as of now, a lot of the code in this file stems - * from the sysklogd project. To learn more over this project, please - * visit - * - * http://www.infodrom.org/projects/sysklogd/ - * + * rsyslog had initially been forked from the sysklogd project. * I would like to express my thanks to the developers of the sysklogd * package - without it, I would have had a much harder start... * - * Please note that I made quite some changes to the orignal package. - * I expect to do even more changes - up - * to a full rewrite - to meet my design goals, which among others - * contain a (at least) dual-thread design with a memory buffer for - * storing received bursts of data. This is also the reason why I - * kind of "forked" a completely new branch of the package. My intension - * is to do many changes and only this initial release will look - * similar to sysklogd (well, one never knows...). - * - * As I have made a lot of modifications, please assume that all bugs - * in this package are mine and not those of the sysklogd team. - * - * As of this writing, there already exist heavy - * modifications to the orginal sysklogd package. I suggest to no - * longer rely too much on code knowledge you eventually have with - * sysklogd - rgerhards 2005-07-05 - * The code is now almost completely different. Be careful! - * rgerhards, 2006-11-30 + * As of this writing (2008-01-03), there have been numerous changes to + * the original package. Be very careful when you apply some of your + * sysklogd knowledge to rsyslog. * * This Project was intiated and is maintained by * Rainer Gerhards <rgerhards@hq.adiscon.com>. See @@ -56,7 +36,7 @@ * to the database). * * rsyslog - An Enhanced syslogd Replacement. - * Copyright 2003-2007 Rainer Gerhards and Adiscon GmbH. + * Copyright 2003-2008 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -145,6 +125,7 @@ #include <stdarg.h> #include <time.h> #include <dlfcn.h> +#include <assert.h> #include <sys/syslog.h> #include <sys/param.h> @@ -155,7 +136,6 @@ #endif #include <sys/ioctl.h> #include <sys/wait.h> -#include <sys/socket.h> #include <sys/file.h> #include <sys/time.h> @@ -166,19 +146,10 @@ #include <sys/resource.h> #include <signal.h> -#include <netinet/in.h> -#include <netdb.h> #include <dirent.h> #include <glob.h> #include <sys/types.h> -#include <arpa/nameser.h> -#include <arpa/inet.h> -#include <resolv.h> -#include "pidfile.h" -#include <assert.h> -#include <pthread.h> - #if HAVE_PATHS_H #include <paths.h> #endif @@ -187,13 +158,13 @@ #include <zlib.h> #endif +#include "pidfile.h" #include "srUtils.h" #include "stringbuf.h" #include "syslogd-types.h" #include "template.h" #include "outchannel.h" #include "syslogd.h" -#include "sync.h" /* struct NetAddr */ #include "parse.h" #include "msg.h" @@ -319,12 +290,6 @@ static EHostnameCmpMode eDfltHostnameCmpMode; static rsCStrObj *pDfltHostnameCmp; static rsCStrObj *pDfltProgNameCmp; -/* supporting structures for multithreading */ -int bRunningMultithreaded = 0; /* Is this program running in multithreaded mode? */ -static pthread_t thrdWorker; -static int bGlblDone = 0; -/* END supporting structures for multithreading */ - static int bParseHOSTNAMEandTAG = 1; /* global config var: should the hostname and tag be * parsed inside message - rgerhards, 2006-03-13 */ static int bFinished = 0; /* used by termination signal handler, read-only except there @@ -423,7 +388,6 @@ static int logEveryMsg = 0;/* no repeat message processing - read-only after st static unsigned int Forwarding = 0; char LocalHostName[MAXHOSTNAMELEN+1];/* our hostname - read-only after startup */ char *LocalDomain; /* our local domain name - read-only after startup */ -//char *LogPort = "514"; /* port number for INET connections */ static int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */ int family = PF_UNSPEC; /* protocol family (IPv4, IPv6 or both), set via cmdline */ int send_to_all = 0; /* send message to all IPv4/IPv6 addresses */ @@ -562,7 +526,6 @@ static uchar template_StdPgSQLFmt[] = "\"insert into SystemEvents (Message, Faci /* up to the next comment, prototypes that should be removed by reordering */ -static void *singleWorker(); /* REMOVEME later 2005-10-24 */ /* Function prototypes. */ static char **crunch_list(char *list); static void printline(char *hname, char *msg, int iSource); @@ -1555,7 +1518,7 @@ logmsgInternal(int pri, char *msg, int flags) getCurrTime(&(pMsg->tTIMESTAMP)); /* use the current time! */ flags |= INTERNAL_MSG; - if(bRunningMultithreaded == 0) { /* not yet in queued mode */ + if(Initialized == 0) { /* not yet in queued mode */ iminternalAddMsg(pri, pMsg, flags); } else { /* we have the queue, so we can simply provide the @@ -1877,103 +1840,26 @@ static void processMsg(msg_t *pMsg) } -/* Start Threading-Related code */ - -/* shuts down the worker process. The worker will first finish - * with the message queue. Control returns, when done. - * This function is intended to be called during syslogd shutdown - * AND restart (init()!). - * rgerhards, 2005-10-25 - */ -static void stopWorker(void) -{ - if(bRunningMultithreaded) { - /* we could run single-threaded if there was an error - * during startup. Then, we obviously do not need to - * do anything to stop the worker ;) - */ - dbgprintf("Initiating worker thread shutdown sequence...\n"); - /* We are now done with all messages, so we need to wake up the - * worker thread and then wait for it to finish. - */ - bGlblDone = 1; - /* It's actually not "not empty" below but awaking the worker. The worker - * then finds out that it shall terminate and does so. - */ - pthread_cond_signal(pMsgQueue->notEmpty); - pthread_join(thrdWorker, NULL); - bRunningMultithreaded = 0; - dbgprintf("Worker thread terminated.\n"); - } -} - - -/* starts the worker thread. It must be made sure that the queue is - * already existing and the worker is NOT already running. - * rgerhards 2005-10-25 - */ -static void startWorker(void) -{ - int i; - if(pMsgQueue != NULL) { - bGlblDone = 0; /* we are NOT done (else worker would immediately terminate) */ - i = pthread_create(&thrdWorker, NULL, singleWorker, NULL); - dbgprintf("Worker thread started with state %d.\n", i); - bRunningMultithreaded = 1; - } else { - dbgprintf("message queue not existing, remaining single-threaded.\n"); - } -} - - -/* The worker thread (so far, we have dual-threading, so only one - * worker thread. Having more than one worker requires considerable - * additional code review in regard to thread-safety. +/* The consumer of dequeued messages. This function is called by the + * queue engine on dequeueing of a message. It runs on a SEPARATE + * THREAD. + * NOTE: Having more than one worker requires guarding of some + * message object structures and potentially others - need to be checked + * before we support multiple worker threads on the message queue. */ -static void * -singleWorker() +static rsRetVal +msgConsumer(void *pUsr) { - queue_t *fifo = pMsgQueue; - msg_t *pMsg; - sigset_t sigSet; + msg_t *pMsg = (msg_t*) pUsr; - assert(fifo != NULL); - - sigfillset(&sigSet); - pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + assert(pMsg != NULL); - while(!bGlblDone || !fifo->empty) { - pthread_mutex_lock(fifo->mut); - while (fifo->empty && !bGlblDone) { - dbgprintf("singleWorker: queue EMPTY, waiting for next message.\n"); - pthread_cond_wait (fifo->notEmpty, fifo->mut); - } - if(!fifo->empty) { - /* dequeue element (still protected from mutex) */ - queueDel(fifo, (void*) &pMsg); - assert(pMsg != NULL); - pthread_mutex_unlock(fifo->mut); - pthread_cond_signal (fifo->notFull); - /* do actual processing (the lengthy part, runs in parallel) */ - dbgprintf("Lone worker is running...\n"); - processMsg(pMsg); - MsgDestruct(pMsg); - /* If you need a delay for testing, here do a */ - /* sleep(1); */ - } else { /* the mutex must be unlocked in any case (important for termination) */ - pthread_mutex_unlock(fifo->mut); - } - - if(debugging_on && bGlblDone && !fifo->empty) - dbgprintf("Worker does not yet terminate because it still has messages to process.\n"); - } + processMsg(pMsg); + MsgDestruct(pMsg); - dbgprintf("Worker thread terminates\n"); - pthread_exit(0); + return RS_RET_OK; } -/* END threads-related code */ - /* This method enqueues a message into the the message buffer. It also * the worker thread, so that the message will be processed. @@ -1982,41 +1868,23 @@ singleWorker() */ static void enqueueMsg(msg_t *pMsg) { - int iRet; - queue_t *fifo = pMsgQueue; - struct timespec t; + DEFiRet; assert(pMsg != NULL); - if(bRunningMultithreaded == 0) { - /* multi-threading is not yet initialized, happens e.g. + if(Initialized == 0) { + /* queue is not yet initialized, happens e.g. * during startup and restart. rgerhards, 2005-10-25 + * TODO: check if that really still can happen! rgerhards, 2008-01-03 */ dbgprintf("enqueueMsg: not yet running on multiple threads\n"); processMsg(pMsg); } else { - /* "normal" mode, threading initialized */ - pthread_mutex_lock(fifo->mut); - - while (fifo->full) { - dbgprintf("enqueueMsg: queue FULL.\n"); - - clock_gettime (CLOCK_REALTIME, &t); - t.tv_sec += 2; - - if(pthread_cond_timedwait (fifo->notFull, - fifo->mut, &t) != 0) { - dbgprintf("enqueueMsg: cond timeout, dropping message!\n"); - MsgDestruct(pMsg); - goto unlock; - } + /* "normal" mode, queue initialized */ + CHKiRet_Hdlr(queueEnqObj(pMsgQueue, (void*) pMsg)) { + /* if we have an error return, the pMsg was not destructed */ + MsgDestruct(pMsg); } - queueAdd(fifo, pMsg); - unlock: - /* now activate the worker thread */ - pthread_mutex_unlock(fifo->mut); - iRet = pthread_cond_signal(fifo->notEmpty); - dbgprintf("EnqueueMsg signaled condition (%d)\n", iRet); } } @@ -2766,13 +2634,13 @@ die(int sig) /* close the inputs */ thrdTerminateAll(); /* TODO: inputs only, please */ + /* drain queue and stop worker thread */ + queueDestruct(pMsgQueue); + pMsgQueue = NULL; + /* Free ressources and close connections */ freeSelectors(); - /* Worker threads are stopped by freeSelectors() */ - queueDestruct(pMsgQueue); /* delete fifo here! */ - pMsgQueue = NULL; - /* rger 2005-02-22 * now clean up the in-memory structures. OK, the OS * would also take care of that, but if we do it @@ -3177,21 +3045,6 @@ static void freeSelectors(void) if(Files != NULL) { dbgprintf("Freeing log structures.\n"); - /* just in case, we flush the emergency log. If error messages occur after - * this stage, we loose them, but that's ok. With multi-threading, this can - * never happen. -- rgerhards, 2007-08-03 - */ - processImInternal(); - - /* we first wait until all messages are processed (stopWorker() does - * that. Then, we go one last time over all actions and flush any - * pending "message repeated n times" messages. We must use this sequence - * because otherwise we would flush at whatever message is currently being - * processed without draining the queue. That would lead to invalid - * results. -- rgerhards, 2007-12-12 - */ - stopWorker(); - for(f = Files ; f != NULL ; f = f->f_next) { llExecFunc(&f->llActList, freeSelectorsActions, NULL); } @@ -3448,6 +3301,13 @@ init(void) dbgprintf("rsyslog %s.\n", VERSION); dbgprintf("Called init.\n"); + /* delete the message queue, which also flushes all messages left over */ + if(pMsgQueue != NULL) { + dbgprintf("deleting main message queue\n"); + queueDestruct(pMsgQueue); /* delete pThis here! */ + pMsgQueue = NULL; + } + /* Close all open log files and free log descriptor array. This also frees * all output-modules instance data. */ @@ -3460,12 +3320,6 @@ init(void) dbgprintf("Clearing templates.\n"); tplDeleteNew(); - if(pMsgQueue != NULL) { - dbgprintf("deleting message queue\n"); - queueDestruct(pMsgQueue); /* delete fifo here! */ - pMsgQueue = NULL; - } - /* re-setting values to defaults (where applicable) */ /* TODO: once we have loadable modules, we must re-visit this code. The reason is * that config variables are not re-set, because the module is not yet loaded. On @@ -3510,14 +3364,12 @@ init(void) } /* create message queue */ - CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_FIXED_ARRAY, iMainMsgQueueSize)) { + CHKiRet_Hdlr(queueConstruct(&pMsgQueue, QUEUETYPE_FIXED_ARRAY, iMainMsgQueueSize, msgConsumer)) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); exit(1); } - startWorker(); - Initialized = 1; /* the output part and the queue is now ready to run. So it is a good time |