diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-04-02 16:53:29 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-04-02 16:53:29 +0000 |
commit | 9b48c4a481c64503605f25e1d0648d24f43437f1 (patch) | |
tree | 017325a20b756a2047cb3eb7d9aacdf19a52d964 | |
parent | 38f0cd67626ce56b0014b05b513e2e573da25e6f (diff) | |
download | rsyslog-9b48c4a481c64503605f25e1d0648d24f43437f1.tar.gz rsyslog-9b48c4a481c64503605f25e1d0648d24f43437f1.tar.bz2 rsyslog-9b48c4a481c64503605f25e1d0648d24f43437f1.zip |
begun working on time-window based dequeueing (and rate limiting in
general)
-rw-r--r-- | doc/rsyslog_ng_comparison.html | 2 | ||||
-rw-r--r-- | queue.c | 57 | ||||
-rw-r--r-- | queue.h | 13 | ||||
-rw-r--r-- | wti.c | 8 | ||||
-rw-r--r-- | wtp.c | 1 | ||||
-rw-r--r-- | wtp.h | 1 |
6 files changed, 80 insertions, 2 deletions
diff --git a/doc/rsyslog_ng_comparison.html b/doc/rsyslog_ng_comparison.html index 2a1d15bd..852ab4ed 100644 --- a/doc/rsyslog_ng_comparison.html +++ b/doc/rsyslog_ng_comparison.html @@ -122,7 +122,7 @@ based framing on syslog/tcp connections</td> <td valign="top">yes</td> </tr> <tr> -<td valign="top">syslog over RELP<br>this is a truely reliable solution (plain tcp syslog can lose messages!)</td> +<td valign="top">syslog over RELP<br>truly reliable message delivery (<a href="http://rgerhards.blogspot.com/2008/04/on-unreliability-of-plain-tcp-syslog.html">Why is plain tcp syslog not reliable?</a>)</td> <td valign="top">yes</td> <td valign="top">no</td> </tr> @@ -55,6 +55,7 @@ DEFobjStaticHelpers /* forward-definitions */ rsRetVal queueChkPersist(queue_t *pThis); static rsRetVal queueSetEnqOnly(queue_t *pThis, int bEnqOnly, int bLockMutex); +static rsRetVal queueRateLimiter(queue_t *pThis); static int queueChkStopWrkrDA(queue_t *pThis); static int queueIsIdleDA(queue_t *pThis); static rsRetVal queueConsumerDA(queue_t *pThis, wti_t *pWti, int iCancelStateSave); @@ -341,6 +342,7 @@ queueInitDA(queue_t *pThis, int bEnqOnly, int bLockMutex) lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrDA)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) queueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerDA)); @@ -1450,6 +1452,60 @@ finalize_it: } +/* The rate limiter - we only need one - do we? + * +* Here we may wait if a dequeue time window is defined or if we are + * rate-limited. TODO: If we do so, we should also look into the + * way new worker threads are spawned. Obviously, it doesn't make much + * sense to spawn additional worker threads when none of them can do any + * processing. However, it is deemed acceptable to allow this for an initial + * implementation of the timeframe/rate limiting feature. + * Please also note that these feature could also be implemented at the action + * level. However, that would limit them to be used together with actions. We have + * taken the broader approach, moving it right into the queue. This is even + * necessary if we want to prevent spawning of multiple unnecessary worker + * threads as described above. -- rgerhards, 2008-04-02 + * + * + * time window: tCurr is current time; tFrom is start time, tTo is end time (in mil 24h format). + * We may have tFrom = 4, tTo = 10 --> run from 4 to 10 hrs. nice and happy + * we may also have tFrom= 22, tTo = 4 -> run from 10pm to 4am, which is actually two + * windows: 0-4; 22-23:59 + * so when to run? Let's assume we have 3am + * + * if(tTo < tFrom) { + * if(tCurr < tTo [3 < 4] || tCurr > tFrom [3 > 22]) + * do work + * else + * sleep for tFrom - tCurr "hours" [22 - 5 --> 17] + * } else { + * if(tCurr >= tFrom [3 >= 4] && tCurr < tTo [3 < 10]) + * do work + * else + * sleep for tTo - tCurr "hours" [4 - 3 --> 1] + * } + * + * Bottom line: we need to check which type of window we have and need to adjust our + * logic accordingly. Of course, sleep calculations need to be done up to the minute, + * but you get the idea from the code above. + */ +static rsRetVal +queueRateLimiter(queue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, queue); + + dbgoprint((obj_t*) pThis, "entering rate limiter\n"); + srSleep(2, 0); + +finalize_it: + dbgoprint((obj_t*) pThis, "rate limiter returns with iRet %d\n", iRet); + RETiRet; +} + + + /* This is the queue consumer in the regular (non-DA) case. It is * protected by the queue mutex, but MUST release it as soon as possible. * rgerhards, 2008-01-21 @@ -1690,6 +1746,7 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) queueRateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueChkStopWrkrReg)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) queueIsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) queueConsumerReg)); @@ -82,9 +82,20 @@ typedef struct queue_s { int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ int toEnq; /* enqueue timeout */ - /* rate limiting settings (will be expanded */ + /* rate limiting settings (will be expanded) */ int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */ /* end rate limiting */ + /* dequeue time window settings (may also be expanded) */ + int iDeqtWinFromHr; /* begin of dequeue time window (hour only) */ + int iDeqtWinToHr; /* end of dequeue time window (hour only) */ + /* note that begin and end have specific semantics. It is a big difference if we have + * begin 4, end 22 or begin 22, end 4. In the later case, dequeuing will run from 10p, + * throughout the night and stop at 4 in the morning. In the first case, it will start + * at 4am, run throughout the day, and stop at 10 in the evening! So far, not logic is + * applied to detect user configuration errors (and tell me how should we detect what + * the user really wanted...). -- rgerhards, 2008-04-02 + */ + /* ane dequeue time window */ rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer @@ -370,6 +370,14 @@ wtiWorker(wti_t *pThis) pthread_yield(); /* see big comment in function header */ # endif + /* if we have a rate-limiter set for this worker pool, let's call it. Please + * keep in mind that the rate-limiter may hold us for an extended period + * of time. -- rgerhards, 2008-04-02 + */ + if(pWtp->pfRateLimiter != NULL) { + pWtp->pfRateLimiter(pWtp->pUsr); + } + wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */ BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); @@ -545,6 +545,7 @@ DEFpropSetMeth(wtp, pUsr, void*); DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t); DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t); DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); +DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)); DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)); DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)); @@ -68,6 +68,7 @@ typedef struct wtp_s { pthread_mutex_t *pmutUsr; pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */ rsRetVal (*pfChkStopWrkr)(void *pUsr, int); + rsRetVal (*pfRateLimiter)(void *pUsr); rsRetVal (*pfIsIdle)(void *pUsr, int); rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int); rsRetVal (*pfOnIdle)(void *pUsr, int); |