From 9b48c4a481c64503605f25e1d0648d24f43437f1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 2 Apr 2008 16:53:29 +0000 Subject: begun working on time-window based dequeueing (and rate limiting in general) --- queue.c | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) (limited to 'queue.c') diff --git a/queue.c b/queue.c index ed720c55..bfdac204 100644 --- a/queue.c +++ b/queue.c @@ -55,6 +55,7 @@ DEFobjStaticHelpers /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex); +static rsRetVal queueRateLimiter(queue_t *pThis); static int queueChkStopWrkrDA(queue_t *pThis); static int queueIsIdleDA(queue_t *pThis); static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); @@ -341,6 +342,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA)); @@ -1450,6 +1452,60 @@ finalize_it: } +/* The rate limiter - we only need one - do we? + * +* Here we may wait if a dequeue time window is defined or if we are + * rate-limited. TODO: If we do so, we should also look into the + * way new worker threads are spawned. Obviously, it doesn't make much + * sense to spawn additional worker threads when none of them can do any + * processing. However, it is deemed acceptable to allow this for an initial + * implementation of the timeframe/rate limiting feature. + * Please also note that these feature could also be implemented at the action + * level. However, that would limit them to be used together with actions. We have + * taken the broader approach, moving it right into the queue. This is even + * necessary if we want to prevent spawning of multiple unnecessary worker + * threads as described above. -- rgerhards, 2008-04-02 + * + * + * time window: tCurr is current time; tFrom is start time, tTo is end time (in mil 24h format). + * We may have tFrom = 4, tTo = 10 --> run from 4 to 10 hrs. nice and happy + * we may also have tFrom= 22, tTo = 4 -> run from 10pm to 4am, which is actually two + * windows: 0-4; 22-23:59 + * so when to run? Let's assume we have 3am + * + * if(tTo < tFrom) { + * if(tCurr < tTo [3 < 4] || tCurr > tFrom [3 > 22]) + * do work + * else + * sleep for tFrom - tCurr "hours" [22 - 5 --> 17] + * } else { + * if(tCurr >= tFrom [3 >= 4] && tCurr < tTo [3 < 10]) + * do work + * else + * sleep for tTo - tCurr "hours" [4 - 3 --> 1] + * } + * + * Bottom line: we need to check which type of window we have and need to adjust our + * logic accordingly. Of course, sleep calculations need to be done up to the minute, + * but you get the idea from the code above. + */ +static rsRetVal +queueRateLimiter(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + + dbgoprint((obj_t*) pThis, "entering rate limiter\n"); + srSleep(2, 0); + +finalize_it: + dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet); + RETiRet; +} + + + /* This is the queue consumer in the regular (non-DA) case. It is * protected by the queue mutex, but MUST release it as soon as possible. * rgerhards, 2008-01-21 @@ -1690,6 +1746,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg)); -- cgit v1.2.3 From 11461ad9c6de62556df79a35ff0c4902e2881f57 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 3 Apr 2008 08:47:35 +0000 Subject: bugfix: memory leaks in script engine --- queue.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'queue.c') diff --git a/queue.c b/queue.c index bfdac204..57484e60 100644 --- a/queue.c +++ b/queue.c @@ -2208,6 +2208,8 @@ finalize_it: /* some simple object access methods */ DEFpropSetMeth(queue, iPersistUpdCnt, int); +DEFpropSetMeth(queue, iDeqtWinFromHr, int); +DEFpropSetMeth(queue, iDeqtWinToHr, int); DEFpropSetMeth(queue, toQShutdown, long); DEFpropSetMeth(queue, toActShutdown, long); DEFpropSetMeth(queue, toWrkShutdown, long); -- cgit v1.2.3 From 46fbfee41e88034135725beb4136d44b94388ede Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 3 Apr 2008 13:19:48 +0000 Subject: added the capability to specify a processing (actually dequeue) timeframe with queues - so things can be configured to be done at off-peak hours --- queue.c | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 18 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 57484e60..7456f4a6 100644 --- a/queue.c +++ b/queue.c @@ -1,16 +1,16 @@ /* queue.c -* -* This file implements the queue object and its several queueing methods. -* -* File begun on 2008-01-03 by RGerhards -* -* There is some in-depth documentation available in doc/dev_queue.html -* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it -* if you are getting aquainted to the object. -* -* Copyright 2008 Rainer Gerhards and Adiscon GmbH. -* -* This file is part of rsyslog. + * + * This file implements the queue object and its several queueing methods. + * + * File begun on 2008-01-03 by RGerhards + * + * There is some in-depth documentation available in doc/dev_queue.html + * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it + * if you are getting aquainted to the object. + * + * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. * * Rsyslog is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -38,6 +38,7 @@ #include #include #include /* required for HP UX */ +#include #include #include "rsyslog.h" @@ -272,6 +273,8 @@ queueStartDA(queue_t *pThis) CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); + CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); + CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); if(pThis->toQShutdown == 0) { @@ -342,7 +345,6 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); - CHKiRet(wtpSetpfRateLimiter (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA)); @@ -1269,6 +1271,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; + pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ pThis->pszFilePrefix = NULL; pThis->qType = qType; @@ -1486,20 +1489,67 @@ finalize_it: * } * * Bottom line: we need to check which type of window we have and need to adjust our - * logic accordingly. Of course, sleep calculations need to be done up to the minute, + * logic accordingly. Of course, sleep calculations need to be done up to the minute, * but you get the idea from the code above. */ static rsRetVal queueRateLimiter(queue_t *pThis) { DEFiRet; + int iDelay; + int iHrCurr; + time_t tCurr; + struct tm m; ISOBJ_TYPE_assert(pThis, queue); dbgoprint((obj_t*) pThis, "entering rate limiter\n"); - srSleep(2, 0); -finalize_it: + iDelay = 0; +dbgprintf("deq win from %d to %d\n", pThis->iDeqtWinFromHr, pThis->iDeqtWinToHr); + if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ + /* time calls are expensive, so only do them when needed */ + time(&tCurr); + localtime_r(&tCurr, &m); + iHrCurr = m.tm_hour; +RUNLOG_VAR("%d", iHrCurr); + + if(pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) { + if(iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) { + ; /* do not delay */ + } else { + iDelay = (pThis->iDeqtWinFromHr - iHrCurr) * 3600; + /* this time, we are already into the next hour, so we need + * to subtract our current minute and seconds. + */ + iDelay -= m.tm_min * 60; + iDelay -= m.tm_sec; + } + } else { + if(iHrCurr >= pThis->iDeqtWinFromHr && iHrCurr < pThis->iDeqtWinToHr) { + ; /* do not delay */ + } else { + if(iHrCurr < pThis->iDeqtWinFromHr) { + iDelay = (pThis->iDeqtWinFromHr - iHrCurr - 1) * 3600; /* -1 as we are already in the hour */ + iDelay += (60 - m.tm_min) * 60; + iDelay += 60 - m.tm_sec; + } else { + iDelay = (24 - iHrCurr + pThis->iDeqtWinFromHr) * 3600; + /* this time, we are already into the next hour, so we need + * to subtract our current minute and seconds. + */ + iDelay -= m.tm_min * 60; + iDelay -= m.tm_sec; + } + } + } + } + + if(iDelay > 0) { + dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); + srSleep(iDelay, 0); + } + dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet); RETiRet; } @@ -2272,6 +2322,5 @@ BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE) OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); ENDObjClassInit(queue) -/* - * vi:set ai: +/* vi:set ai: */ -- cgit v1.2.3 From 5ba8b2c77c3971a69bde54f3dd0ede81e641d798 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 4 Apr 2008 12:35:28 +0000 Subject: cleanup --- queue.c | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 7456f4a6..0828cc1d 100644 --- a/queue.c +++ b/queue.c @@ -1416,12 +1416,10 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) * on the nail [exact value]) -- rgerhards, 2008-03-14 */ if(iQueueSize < pThis->iFullDlyMrk) { -dbgoprint((obj_t*) pThis, "queue size %d below FullDlyMrk %d\n", iQueueSize, pThis->iFullDlyMrk); pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk); } if(iQueueSize < pThis->iLightDlyMrk) { -dbgoprint((obj_t*) pThis, "queue size %d below LightDlyMrk %d\n", iQueueSize, pThis->iLightDlyMrk); pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); } @@ -1455,9 +1453,9 @@ finalize_it: } -/* The rate limiter - we only need one - do we? +/* The rate limiter * -* Here we may wait if a dequeue time window is defined or if we are + * Here we may wait if a dequeue time window is defined or if we are * rate-limited. TODO: If we do so, we should also look into the * way new worker threads are spawned. Obviously, it doesn't make much * sense to spawn additional worker threads when none of them can do any @@ -1506,13 +1504,11 @@ queueRateLimiter(queue_t *pThis) dbgoprint((obj_t*) pThis, "entering rate limiter\n"); iDelay = 0; -dbgprintf("deq win from %d to %d\n", pThis->iDeqtWinFromHr, pThis->iDeqtWinToHr); if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ /* time calls are expensive, so only do them when needed */ time(&tCurr); localtime_r(&tCurr, &m); iHrCurr = m.tm_hour; -RUNLOG_VAR("%d", iHrCurr); if(pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) { if(iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) { @@ -1550,7 +1546,6 @@ RUNLOG_VAR("%d", iHrCurr); srSleep(iDelay, 0); } - dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet); RETiRet; } -- cgit v1.2.3