diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-08 08:45:24 +0000 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-01-08 08:45:24 +0000 |
commit | 6d4bd34517643505dab731fec16d3afeba2169ab (patch) | |
tree | 8815be565a7c07e9050d7a0de80282e884edcd18 /queue.c | |
parent | c44de2807a899521c8542321d91e3074f3c40086 (diff) | |
download | rsyslog-6d4bd34517643505dab731fec16d3afeba2169ab.tar.gz rsyslog-6d4bd34517643505dab731fec16d3afeba2169ab.tar.bz2 rsyslog-6d4bd34517643505dab731fec16d3afeba2169ab.zip |
implemented queue disk reader to switch to multiple files
Diffstat (limited to 'queue.c')
-rw-r--r-- | queue.c | 59 |
1 files changed, 30 insertions, 29 deletions
@@ -198,23 +198,20 @@ static rsRetVal qDelLinkedList(queue_t *pThis, void **ppUsr) static rsRetVal qDiskOpenFile(queue_t *pThis, queueFileDescription_t *pFile, int flags, mode_t mode) { DEFiRet; - uchar *pszFile = NULL; assert(pThis != NULL); assert(pFile != NULL); /* now open the write file */ - CHKiRet(genFileName(&pszFile, pThis->tVars.disk.pszSpoolDir, pThis->tVars.disk.lenSpoolDir, + CHKiRet(genFileName(&pFile->pszFileName, pThis->tVars.disk.pszSpoolDir, pThis->tVars.disk.lenSpoolDir, (uchar*) "mainq", 5, pFile->iCurrFileNum, (uchar*) "qf", 2)); - pFile->fd = open((char*)pszFile, flags, mode); // TODO: open modes! + pFile->fd = open((char*)pFile->pszFileName, flags, mode); // TODO: open modes! pFile->iCurrOffs = 0; - dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pszFile, flags, pFile->fd); + dbgprintf("Queue 0x%lx: opened file '%s' for %d as %d\n", (unsigned long) pThis, pFile->pszFileName, flags, pFile->fd); finalize_it: - if(pszFile != NULL) - free(pszFile); /* no longer needed in any case (just for open) */ - +dbgprintf("qDiskOpen iRet %d\n", iRet); return iRet; } @@ -231,6 +228,11 @@ static rsRetVal qDiskCloseFile(queue_t *pThis, queueFileDescription_t *pFile) close(pFile->fd); // TODO: error check pFile->fd = -1; + if(pFile->pszFileName != NULL) { + free(pFile->pszFileName); /* no longer needed in any case (just for open) */ + pFile->pszFileName = NULL; + } + return iRet; } @@ -271,7 +273,7 @@ static rsRetVal qDiskReadChar(queueFileDescription_t *pFile, uchar *pC) assert(pFile != NULL); assert(pC != NULL); -dbgprintf("qDiskRead index %d, max %d\n", pFile->iBufPtr, pFile->iBufPtrMax); +//dbgprintf("qDiskRead index %d, max %d\n", pFile->iBufPtr, pFile->iBufPtrMax); if(pFile->pIOBuf == NULL) { /* TODO: maybe we should move that to file open... */ if((pFile->pIOBuf = (uchar*) malloc(sizeof(uchar) * qFILE_IOBUF_SIZE )) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -437,33 +439,32 @@ static rsRetVal qDelDisk(queue_t *pThis, void **ppUsr) DEFiRet; msg_t *pMsg = NULL; serialStore_t serialStore; + int bRun; assert(pThis != NULL); - if(pThis->tVars.disk.fRead.fd == -1) - CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes! - - /* de-serialize object from file */ -retry: + /* de-serialize object from file + * We need to try at least twice because we may run into EOF and need + * to switch files. + */ serialStore.pUsr = &pThis->tVars.disk.fRead; serialStore.funcGetChar = (rsRetVal (*)(void*, uchar*)) qDiskReadChar; serialStore.funcUngetChar = (rsRetVal (*)(void*, uchar)) qDiskUnreadChar; - iRet= objDeserialize((void*) &pMsg, objMsg, &serialStore); - - if(iRet == RS_RET_OK) - ; - else if(iRet == RS_RET_EOF) { -dbgprintf("EOF!\n"); - CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fRead)); - goto retry; - } else - FINALIZE; - /* switch to next file when EOF is reached. We may also delete the last file in that case. - pThis->tVars.disk.fWrite.iCurrOffs += iWritten; - if(pThis->tVars.disk.fWrite.iCurrOffs >= pThis->tVars.disk.iMaxFileSize) - CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fWrite)); - */ -dbgprintf("got object %lx\n", (unsigned long) pMsg); + bRun = 1; + while(bRun) { + /* first check if we need to (re)open the file (we may have switched to a new one!) */ + if(pThis->tVars.disk.fRead.fd == -1) + CHKiRet(qDiskOpenFile(pThis, &pThis->tVars.disk.fRead, O_RDONLY, 0600)); // TODO: open modes! + + iRet = objDeserialize((void*) &pMsg, objMsg, &serialStore); + if(iRet == RS_RET_OK) + bRun = 0; /* we are done */ + else if(iRet == RS_RET_EOF) { + dbgprintf("Queue 0x%lx: EOF on file %d\n", (unsigned long) pThis, pThis->tVars.disk.fRead.fd); + CHKiRet(qDiskNextFile(pThis, &pThis->tVars.disk.fRead)); + } else + FINALIZE; + } *ppUsr = (void*) pMsg; |