From 19c9b187ab29f9304adb82d9c6005c69c92b3c17 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 16 Jan 2008 10:15:42 +0000 Subject: cleaned up queue disk startup --- queue.c | 60 ++++++++++++++++++++++++++++++++---------------------------- 1 file changed, 32 insertions(+), 28 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 88e41168..00f6e63f 100644 --- a/queue.c +++ b/queue.c @@ -127,6 +127,7 @@ queueStrtWrkThrd(queue_t *pThis, int i) ISOBJ_TYPE_assert(pThis, queue); assert(i >= 0 && i <= pThis->iNumWorkerThreads); + assert(pThis->pWrkThrds[i].tCurrCmd < eWRKTHRDCMD_RUN); queueTellWrkThrd(pThis, i, eWRKTHRDCMD_RUN); iState = pthread_create(&(pThis->pWrkThrds[i].thrdID), NULL, queueWorker, (void*) pThis); @@ -1441,50 +1442,50 @@ rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */ DEFiRet; int i; rsRetVal iRetLocal; + int bInitialized = 0; /* is queue already initialized? */ assert(pThis != NULL); /* call type-specific constructor */ - CHKiRet(pThis->qConstruct(pThis)); + CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ - dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType, - pThis->iMaxFileSize); + dbgprintf("Queue 0x%lx: type %d, disk assisted %d, maxFileSz %ld starting\n", queueGetID(pThis), pThis->qType, + pThis->bIsDA, pThis->iMaxFileSize); if(pThis->qType != QUEUETYPE_DIRECT) { if((pThis->pWrkThrds = calloc(pThis->iNumWorkerThreads + 1, sizeof(qWrkThrd_t))) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - /* worker 0 is reserved for disk-assisted mode */ - queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_NEVER_RAN); + if(pThis->bIsDA) { + /* If we are disk-assisted, we need to check if there is a QIF file + * which we need to load. -- rgerhards, 2008-01-15 + */ + iRetLocal = queueHaveQIF(pThis); + if(iRetLocal == RS_RET_OK) { + dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", + queueGetID(pThis)); - /* fire up the worker threads */ - for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) { - queueStrtWrkThrd(pThis, i); - } - } + /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ + pThis->qRunsDA = QRUNS_DA_INIT; - if(pThis->bIsDA) { - /* If we are disk-assisted, we need to check if there is a QIF file - * which we need to load. -- rgerhards, 2008-01-15 - */ - iRetLocal = queueHaveQIF(pThis); -dbgprintf("HaveQIF %d\n", iRet); - if(iRetLocal == RS_RET_OK) { -dbgprintf("need to restore disk queue\n"); - // code below to function! - /* if we reach this point, we are NOT currently running in DA mode. - * TODO: split this function, I think that would make the code easier - * to read. -- rgerhards, 2008-10-15 - */ - dbgprintf("Queue 0x%lx: on-disk queue present, needs to be reloaded\n", - queueGetID(pThis)); + /* now we must start our DA worker thread - it does the rest of the initialization */ + CHKiRet(queueStrtWrkThrd(pThis, 0)); + bInitialized = 1; + } + } - pThis->qRunsDA = QRUNS_DA_INIT; /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ + if(!bInitialized) { + dbgprintf("Queue 0x%lx: queue starts up without loading any disk state\n", queueGetID(pThis)); + /* worker 0 is reserved for disk-assisted mode, so do not start */ + queueTellWrkThrd(pThis, 0, eWRKTHRDCMD_NEVER_RAN); - /* now we must start our DA worker thread - it does the rest of the initialization */ - CHKiRet(queueStrtWrkThrd(pThis, 0)); + /* fire up the worker threads */ + for(i = 1 ; i <= pThis->iNumWorkerThreads ; ++i) { + queueStrtWrkThrd(pThis, i); + } } } + finalize_it: return iRet; } @@ -1613,6 +1614,9 @@ rsRetVal queueDestruct(queue_t *pThis) /* if running DA, tell the DA workers to shut down. This saves us some CPU cycles which * we can use to persist the remaining in-memory data to disk quicker. -- rgerhads, 2008-01-16 + * TODO: we actually need to change the queue to an "input-only" mode, that also prevents + * startup of the thread again further down in the process. None of that really hurts, so we + * leave it for the time being. -- rgerhards, 2008-01-16 */ if(pThis->qRunsDA != QRUNS_REGULAR) queueWrkThrdReqTrm(pThis->pqDA, eWRKTHRDCMD_SHUTDOWN_IMMEDIATE, 0); -- cgit v1.2.3