From bff48ee5ed9b1ce7ce18792cdf07066ceb89da25 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 9 Jan 2008 08:58:06 +0000 Subject: implemented queue object method to set the file name prefix --- queue.c | 60 +++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 15 deletions(-) (limited to 'queue.c') diff --git a/queue.c b/queue.c index 9b2bf7e2..ceb4e3ab 100644 --- a/queue.c +++ b/queue.c @@ -200,9 +200,13 @@ static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int assert(pThis != NULL); assert(pFile != NULL); + + if(pThis->pszFilePrefix == NULL) + ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); + /* now open the write file */ - CHKiRet(genFileName(&pFile->pszFileName, pThis->tVars.disk.pszSpoolDir, pThis->tVars.disk.lenSpoolDir, - (uchar*) "mainq", 5, pFile->iCurrFileNum, (uchar*) "qf", 2)); + CHKiRet(genFileName(&pFile->pszFileName, pThis->pszSpoolDir, pThis->lenSpoolDir, + pThis->pszFilePrefix, pThis->lenFilePrefix, pFile->iCurrFileNum, (uchar*) "qf", 2)); pFile->fd = open((char*)pFile->pszFileName, flags, mode); // TODO: open modes! pFile->iCurrOffs = 0; @@ -376,12 +380,6 @@ static rsRetVal qConstructDisk(queue_t *pThis) assert(pThis != NULL); - if((pThis->tVars.disk.pszSpoolDir = (uchar*) strdup((char*)pszSpoolDirectory)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - - pThis->tVars.disk.lenSpoolDir = strlen((char*)pThis->tVars.disk.pszSpoolDir); - pThis->tVars.disk.iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ - pThis->tVars.disk.fWrite.iCurrFileNum = 1; pThis->tVars.disk.fWrite.iCurrOffs = 0; pThis->tVars.disk.fWrite.fd = -1; @@ -394,7 +392,6 @@ static rsRetVal qConstructDisk(queue_t *pThis) pThis->tVars.disk.fRead.iUngetC = -1; pThis->tVars.disk.fRead.bDeleteOnClose = 1; -finalize_it: return iRet; } @@ -410,8 +407,8 @@ static rsRetVal qDestructDisk(queue_t *pThis) if(pThis->tVars.disk.fRead.fd != -1) qDiskCloseFile(pThis, &pThis->tVars.disk.fRead); - if(pThis->tVars.disk.pszSpoolDir != NULL) - free(pThis->tVars.disk.pszSpoolDir); + if(pThis->pszSpoolDir != NULL) + free(pThis->pszSpoolDir); return iRet; } @@ -434,7 +431,7 @@ static rsRetVal qAddDisk(queue_t *pThis, void* pUsr) /* TODO: handle error case -- rgerhards, 2008-01-07 */ pThis->tVars.disk.fWrite.iCurrOffs += iWritten; - if(pThis->tVars.disk.fWrite.iCurrOffs >= pThis->tVars.disk.iMaxFileSize) + if(pThis->tVars.disk.fWrite.iCurrOffs >= pThis->iMaxFileSize) CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fWrite)); finalize_it: @@ -646,6 +643,11 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, } /* we have an object, so let's fill the properties */ + if((pThis->pszSpoolDir = (uchar*) strdup((char*)pszSpoolDirectory)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + + pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir); + pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; @@ -656,6 +658,8 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, pThis->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t)); pthread_cond_init (pThis->notEmpty, NULL); pThis->iNumWorkerThreads = iWorkerThreads; + + pThis->pszFilePrefix = NULL; pThis->qType = qType; /* set type-specific handlers and other very type-specific things (we can not totally hide it...) */ @@ -715,7 +719,7 @@ rsRetVal queueStart(queue_t *pThis) assert(pThis != NULL); dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType, - pThis->tVars.disk.iMaxFileSize); + pThis->iMaxFileSize); if(pThis->qType != QUEUETYPE_DIRECT) { if((pThis->pWorkerThreads = calloc(pThis->iNumWorkerThreads, sizeof(pthread_t))) == NULL) @@ -775,6 +779,32 @@ rsRetVal queueDestruct(queue_t *pThis) } +/* set the queue's file prefix + * The passed-in string is duplicated. So if the caller does not need + * it any longer, it must free it. + * rgerhards, 2008-01-09 + */ +rsRetVal +queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) +{ + DEFiRet; + + assert(pThis != NULL); + assert(pszPrefix != NULL); + + if(iLenPrefix < 1) + ABORT_FINALIZE(RS_RET_FILE_PREFIX_MISSING); + + if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + + memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); /* always think about the \0! */ + pThis->lenFilePrefix = iLenPrefix; + +finalize_it: + return iRet; +} + /* set the queue's maximum file size * rgerhards, 2008-01-09 */ @@ -783,13 +813,13 @@ queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize) { DEFiRet; - assert(pThis != 0); + assert(pThis != NULL); if(iMaxFileSize < 1024) { ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW); } - pThis->tVars.disk.iMaxFileSize = iMaxFileSize; + pThis->iMaxFileSize = iMaxFileSize; finalize_it: return iRet; -- cgit v1.2.3