From e0df42e01467dfb70498821114b581c02184c70c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 17 Jan 2008 16:30:49 +0000 Subject: fixed sync issue on shutdown process if need to persist pure memory queue to disk --- queue.c | 40 ++++++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 2b241d82..83d65a4f 100644 --- a/queue.c +++ b/queue.c @@ -123,7 +123,6 @@ queueTellActWrkThrd(queue_t *pThis, int iIdx, qWrkCmd_t tCmd) assert(iIdx >= 0 && iIdx <= pThis->iNumWorkerThreads); if(pThis->pWrkThrds[iIdx].tCurrCmd >= eWRKTHRD_RUN_CREATED) { - dbgprintf("Queue 0x%lx: sending command %d to thread %d\n", queueGetID(pThis), tCmd, iIdx); qWrkrSetState(&pThis->pWrkThrds[iIdx], tCmd); } else { dbgprintf("Queue 0x%lx: command %d NOT sent to inactive thread %d\n", queueGetID(pThis), tCmd, iIdx); @@ -156,6 +155,34 @@ qWrkrConstructFinalize(qWrkThrd_t *pThis, queue_t *pQueue, int i) } +/* Waitis until the specified worker thread + * changed to full running state (aka have started up). This function + * MUST NOT be called while the queue mutex is locked as it does + * this itself. The wait is without timeout. + * rgerhards, 2008-01-17 + */ +static inline rsRetVal +qWrkrWaitStartup(qWrkThrd_t *pThis) +{ + int iCancelStateSave; + + assert(pThis != NULL); + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + pthread_mutex_lock(pThis->pQueue->mut); + if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) { + dbgprintf("Queue 0x%lx: waiting on worker thread %d startup\n", queueGetID(pThis->pQueue), + pThis->iThrd); + pthread_cond_wait(&pThis->condInitDone, pThis->pQueue->mut); +dbgprintf("startup done!\n"); + } + pthread_mutex_unlock(pThis->pQueue->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + + return RS_RET_OK; +} + + /* initialize the qWrkThrd_t structure - this MUST be called right after * startup of a worker thread. -- rgerhards, 2008-01-17 */ @@ -1188,19 +1215,14 @@ queueWrkThrdTrm(queue_t *pThis, qWrkCmd_t tShutdownCmd, long iTimeout) struct timespec t; queueTellActWrkThrds(pThis, 0, tShutdownCmd);/* first tell the workers our request */ -dbgprintf("WrkThrdTrm 0\n"); queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ /* race: must make sure all are running! */ -dbgprintf("WrkThrdTrm 1\n"); queueTimeoutComp(&t, iTimeout);/* get timeout */ -dbgprintf("WrkThrdTrm 2\n"); /* and wait for their termination */ pthread_mutex_lock(pThis->mut); bTimedOut = 0; -dbgprintf("WrkThrdTrm 3, thrds: %d\n", pThis->iCurNumWrkThrd); while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { -dbgprintf("WrkThrdTrm 4 to %d\n", bTimedOut); dbgprintf("Queue 0x%lx: waiting %ldms on worker thread termination, %d still running\n", queueGetID(pThis), iTimeout, pThis->iCurNumWrkThrd); @@ -1236,11 +1258,12 @@ queueWrkThrdCancel(queue_t *pThis) queueWakeupWrkThrds(pThis, 1); /* awake all workers including DA-worker */ /* first tell the workers our request */ - for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) + for(i = 0 ; i <= pThis->iNumWorkerThreads ; ++i) { if(pThis->pWrkThrds[i].tCurrCmd >= eWRKTHRD_TERMINATING) { dbgprintf("Queue 0x%lx: canceling worker thread %d\n", queueGetID(pThis), i); pthread_cancel(pThis->pWrkThrds[i].thrdID); } + } return iRet; } @@ -1843,8 +1866,9 @@ pThis->bSaveOnShutdown = 1; // TODO: Test remove queueGetID(pThis), pThis->iQueueSize); pThis->iLowWtrMrk = 0; /* disable low water mark algo */ queueInitDA(pThis, QUEUE_MODE_ENQONLY); /* start DA queue in enqueue-only mode */ + qWrkrWaitStartup(QUEUE_PTR_DA_WORKER(pThis)); /* wait until DA worker has actually started */ pThis->toQShutdown = QUEUE_TIMEOUT_ETERNAL; - queueShutdownWorkers(pThis); + queueShutdownWorkers(pThis); /* and tell it to shut down. The trick is it will run until q is drained */ } /* if running DA, terminate disk queue */ -- cgit v1.2.3