diff options
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 111 |
1 files changed, 107 insertions, 4 deletions
@@ -39,6 +39,7 @@ #include <fcntl.h> #include <unistd.h> #include <sys/stat.h> /* required for HP UX */ +#include <time.h> #include <errno.h> #include "rsyslog.h" @@ -56,6 +57,7 @@ DEFobjStaticHelpers /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex); +static rsRetVal queueRateLimiter(queue_t *pThis); static int queueChkStopWrkrDA(queue_t *pThis); static int queueIsIdleDA(queue_t *pThis); static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); @@ -272,6 +274,8 @@ queueStartDA(queue_t *pThis) CHKiRet(queueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(queueSettoEnq(pThis->pqDA, pThis->toEnq)); CHKiRet(queueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); + CHKiRet(queueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); + CHKiRet(queueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); CHKiRet(queueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(queueSetiDiscardMrk(pThis->pqDA, 0)); if(pThis->toQShutdown == 0) { @@ -1270,6 +1274,7 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; + pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ pThis->pszFilePrefix = NULL; pThis->qType = qType; @@ -1414,12 +1419,10 @@ queueDequeueConsumable(queue_t *pThis, wti_t *pWti, int iCancelStateSave) * on the nail [exact value]) -- rgerhards, 2008-03-14 */ if(iQueueSize < pThis->iFullDlyMrk) { -dbgoprint((obj_t*) pThis, "queue size %d below FullDlyMrk %d\n", iQueueSize, pThis->iFullDlyMrk); pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk); } if(iQueueSize < pThis->iLightDlyMrk) { -dbgoprint((obj_t*) pThis, "queue size %d below LightDlyMrk %d\n", iQueueSize, pThis->iLightDlyMrk); pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); } @@ -1453,6 +1456,104 @@ finalize_it: } +/* The rate limiter + * + * Here we may wait if a dequeue time window is defined or if we are + * rate-limited. TODO: If we do so, we should also look into the + * way new worker threads are spawned. Obviously, it doesn't make much + * sense to spawn additional worker threads when none of them can do any + * processing. However, it is deemed acceptable to allow this for an initial + * implementation of the timeframe/rate limiting feature. + * Please also note that these feature could also be implemented at the action + * level. However, that would limit them to be used together with actions. We have + * taken the broader approach, moving it right into the queue. This is even + * necessary if we want to prevent spawning of multiple unnecessary worker + * threads as described above. -- rgerhards, 2008-04-02 + * + * + * time window: tCurr is current time; tFrom is start time, tTo is end time (in mil 24h format). + * We may have tFrom = 4, tTo = 10 --> run from 4 to 10 hrs. nice and happy + * we may also have tFrom= 22, tTo = 4 -> run from 10pm to 4am, which is actually two + * windows: 0-4; 22-23:59 + * so when to run? Let's assume we have 3am + * + * if(tTo < tFrom) { + * if(tCurr < tTo [3 < 4] || tCurr > tFrom [3 > 22]) + * do work + * else + * sleep for tFrom - tCurr "hours" [22 - 5 --> 17] + * } else { + * if(tCurr >= tFrom [3 >= 4] && tCurr < tTo [3 < 10]) + * do work + * else + * sleep for tTo - tCurr "hours" [4 - 3 --> 1] + * } + * + * Bottom line: we need to check which type of window we have and need to adjust our + * logic accordingly. Of course, sleep calculations need to be done up to the minute, + * but you get the idea from the code above. + */ +static rsRetVal +queueRateLimiter(queue_t *pThis) +{ + DEFiRet; + int iDelay; + int iHrCurr; + time_t tCurr; + struct tm m; + + ISOBJ_TYPE_assert(pThis, queue); + + dbgoprint((obj_t*) pThis, "entering rate limiter\n"); + + iDelay = 0; + if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ + /* time calls are expensive, so only do them when needed */ + time(&tCurr); + localtime_r(&tCurr, &m); + iHrCurr = m.tm_hour; + + if(pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) { + if(iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) { + ; /* do not delay */ + } else { + iDelay = (pThis->iDeqtWinFromHr - iHrCurr) * 3600; + /* this time, we are already into the next hour, so we need + * to subtract our current minute and seconds. + */ + iDelay -= m.tm_min * 60; + iDelay -= m.tm_sec; + } + } else { + if(iHrCurr >= pThis->iDeqtWinFromHr && iHrCurr < pThis->iDeqtWinToHr) { + ; /* do not delay */ + } else { + if(iHrCurr < pThis->iDeqtWinFromHr) { + iDelay = (pThis->iDeqtWinFromHr - iHrCurr - 1) * 3600; /* -1 as we are already in the hour */ + iDelay += (60 - m.tm_min) * 60; + iDelay += 60 - m.tm_sec; + } else { + iDelay = (24 - iHrCurr + pThis->iDeqtWinFromHr) * 3600; + /* this time, we are already into the next hour, so we need + * to subtract our current minute and seconds. + */ + iDelay -= m.tm_min * 60; + iDelay -= m.tm_sec; + } + } + } + } + + if(iDelay > 0) { + dbgoprint((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); + srSleep(iDelay, 0); + } + + RETiRet; +} + + + /* This is the queue consumer in the regular (non-DA) case. It is * protected by the queue mutex, but MUST release it as soon as possible. * rgerhards, 2008-01-21 @@ -1693,6 +1794,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg)); @@ -2154,6 +2256,8 @@ finalize_it: /* some simple object access methods */ DEFpropSetMeth(queue, iPersistUpdCnt, int); +DEFpropSetMeth(queue, iDeqtWinFromHr, int); +DEFpropSetMeth(queue, iDeqtWinToHr, int); DEFpropSetMeth(queue, toQShutdown, long); DEFpropSetMeth(queue, toActShutdown, long); DEFpropSetMeth(queue, toWrkShutdown, long); @@ -2216,6 +2320,5 @@ BEGINObjClassInit(queue, 1, OBJ_IS_CORE_MODULE) OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty); ENDObjClassInit(queue) -/* - * vi:set ai: +/* vi:set ai: */ |