mirror of
https://github.com/RPCS3/rpcs3.git
synced 2024-11-22 18:53:28 +01:00
1139 lines
29 KiB
C
1139 lines
29 KiB
C
/* MtDec.c -- Multi-thread Decoder
|
|
2019-02-02 : Igor Pavlov : Public domain */
|
|
|
|
#include "Precomp.h"
|
|
|
|
// #define SHOW_DEBUG_INFO
|
|
|
|
// #include <stdio.h>
|
|
|
|
#ifdef SHOW_DEBUG_INFO
|
|
#include <stdio.h>
|
|
#endif
|
|
|
|
#ifdef SHOW_DEBUG_INFO
|
|
#define PRF(x) x
|
|
#else
|
|
#define PRF(x)
|
|
#endif
|
|
|
|
#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
|
|
|
|
#include "MtDec.h"
|
|
|
|
#ifndef _7ZIP_ST
|
|
|
|
void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
|
|
{
|
|
p->progress = progress;
|
|
p->res = SZ_OK;
|
|
p->totalInSize = 0;
|
|
p->totalOutSize = 0;
|
|
}
|
|
|
|
|
|
SRes MtProgress_Progress_ST(CMtProgress *p)
|
|
{
|
|
if (p->res == SZ_OK && p->progress)
|
|
if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
|
|
p->res = SZ_ERROR_PROGRESS;
|
|
return p->res;
|
|
}
|
|
|
|
|
|
SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
|
|
{
|
|
SRes res;
|
|
CriticalSection_Enter(&p->cs);
|
|
|
|
p->totalInSize += inSize;
|
|
p->totalOutSize += outSize;
|
|
if (p->res == SZ_OK && p->progress)
|
|
if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
|
|
p->res = SZ_ERROR_PROGRESS;
|
|
res = p->res;
|
|
|
|
CriticalSection_Leave(&p->cs);
|
|
return res;
|
|
}
|
|
|
|
|
|
SRes MtProgress_GetError(CMtProgress *p)
|
|
{
|
|
SRes res;
|
|
CriticalSection_Enter(&p->cs);
|
|
res = p->res;
|
|
CriticalSection_Leave(&p->cs);
|
|
return res;
|
|
}
|
|
|
|
|
|
void MtProgress_SetError(CMtProgress *p, SRes res)
|
|
{
|
|
CriticalSection_Enter(&p->cs);
|
|
if (p->res == SZ_OK)
|
|
p->res = res;
|
|
CriticalSection_Leave(&p->cs);
|
|
}
|
|
|
|
|
|
#define RINOK_THREAD(x) RINOK(x)
|
|
|
|
|
|
static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
|
|
{
|
|
if (Event_IsCreated(p))
|
|
return Event_Reset(p);
|
|
return AutoResetEvent_CreateNotSignaled(p);
|
|
}
|
|
|
|
|
|
struct __CMtDecBufLink
|
|
{
|
|
struct __CMtDecBufLink *next;
|
|
void *pad[3];
|
|
};
|
|
|
|
typedef struct __CMtDecBufLink CMtDecBufLink;
|
|
|
|
#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
|
|
#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
|
|
|
|
|
|
|
|
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
|
|
|
|
|
|
static WRes MtDecThread_CreateEvents(CMtDecThread *t)
|
|
{
|
|
WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
|
|
if (wres == 0)
|
|
{
|
|
wres = ArEvent_OptCreate_And_Reset(&t->canRead);
|
|
if (wres == 0)
|
|
return SZ_OK;
|
|
}
|
|
return wres;
|
|
}
|
|
|
|
|
|
static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
|
|
{
|
|
WRes wres = MtDecThread_CreateEvents(t);
|
|
// wres = 17; // for test
|
|
if (wres == 0)
|
|
{
|
|
if (Thread_WasCreated(&t->thread))
|
|
return SZ_OK;
|
|
wres = Thread_Create(&t->thread, ThreadFunc, t);
|
|
if (wres == 0)
|
|
return SZ_OK;
|
|
}
|
|
return MY_SRes_HRESULT_FROM_WRes(wres);
|
|
}
|
|
|
|
|
|
void MtDecThread_FreeInBufs(CMtDecThread *t)
|
|
{
|
|
if (t->inBuf)
|
|
{
|
|
void *link = t->inBuf;
|
|
t->inBuf = NULL;
|
|
do
|
|
{
|
|
void *next = ((CMtDecBufLink *)link)->next;
|
|
ISzAlloc_Free(t->mtDec->alloc, link);
|
|
link = next;
|
|
}
|
|
while (link);
|
|
}
|
|
}
|
|
|
|
|
|
static void MtDecThread_CloseThread(CMtDecThread *t)
|
|
{
|
|
if (Thread_WasCreated(&t->thread))
|
|
{
|
|
Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
|
|
Event_Set(&t->canRead);
|
|
Thread_Wait(&t->thread);
|
|
Thread_Close(&t->thread);
|
|
}
|
|
|
|
Event_Close(&t->canRead);
|
|
Event_Close(&t->canWrite);
|
|
}
|
|
|
|
static void MtDec_CloseThreads(CMtDec *p)
|
|
{
|
|
unsigned i;
|
|
for (i = 0; i < MTDEC__THREADS_MAX; i++)
|
|
MtDecThread_CloseThread(&p->threads[i]);
|
|
}
|
|
|
|
static void MtDecThread_Destruct(CMtDecThread *t)
|
|
{
|
|
MtDecThread_CloseThread(t);
|
|
MtDecThread_FreeInBufs(t);
|
|
}
|
|
|
|
|
|
|
|
static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
|
|
{
|
|
size_t size = *processedSize;
|
|
*processedSize = 0;
|
|
while (size != 0)
|
|
{
|
|
size_t cur = size;
|
|
SRes res = ISeqInStream_Read(stream, data, &cur);
|
|
*processedSize += cur;
|
|
data += cur;
|
|
size -= cur;
|
|
RINOK(res);
|
|
if (cur == 0)
|
|
return SZ_OK;
|
|
}
|
|
return SZ_OK;
|
|
}
|
|
|
|
|
|
static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
|
|
{
|
|
SRes res;
|
|
CriticalSection_Enter(&p->mtProgress.cs);
|
|
*wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
|
|
res = p->mtProgress.res;
|
|
CriticalSection_Leave(&p->mtProgress.cs);
|
|
return res;
|
|
}
|
|
|
|
static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
|
|
{
|
|
SRes res;
|
|
CriticalSection_Enter(&p->mtProgress.cs);
|
|
|
|
p->mtProgress.totalInSize += inSize;
|
|
p->mtProgress.totalOutSize += outSize;
|
|
if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
|
|
if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
|
|
p->mtProgress.res = SZ_ERROR_PROGRESS;
|
|
|
|
*wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
|
|
res = p->mtProgress.res;
|
|
|
|
CriticalSection_Leave(&p->mtProgress.cs);
|
|
|
|
return res;
|
|
}
|
|
|
|
static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
|
|
{
|
|
CriticalSection_Enter(&p->mtProgress.cs);
|
|
if (!p->needInterrupt || interruptIndex < p->interruptIndex)
|
|
{
|
|
p->interruptIndex = interruptIndex;
|
|
p->needInterrupt = True;
|
|
}
|
|
CriticalSection_Leave(&p->mtProgress.cs);
|
|
}
|
|
|
|
Byte *MtDec_GetCrossBuff(CMtDec *p)
|
|
{
|
|
Byte *cr = p->crossBlock;
|
|
if (!cr)
|
|
{
|
|
cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
|
|
if (!cr)
|
|
return NULL;
|
|
p->crossBlock = cr;
|
|
}
|
|
return MTDEC__DATA_PTR_FROM_LINK(cr);
|
|
}
|
|
|
|
|
|
/*
|
|
ThreadFunc2() returns:
|
|
0 - in all normal cases (even for stream error or memory allocation error)
|
|
(!= 0) - WRes error return by system threading function
|
|
*/
|
|
|
|
// #define MTDEC_ProgessStep (1 << 22)
|
|
#define MTDEC_ProgessStep (1 << 0)
|
|
|
|
static WRes ThreadFunc2(CMtDecThread *t)
|
|
{
|
|
CMtDec *p = t->mtDec;
|
|
|
|
PRF_STR_INT("ThreadFunc2", t->index);
|
|
|
|
// SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
|
|
|
|
for (;;)
|
|
{
|
|
SRes res, codeRes;
|
|
BoolInt wasInterrupted, isAllocError, overflow, finish;
|
|
SRes threadingErrorSRes;
|
|
BoolInt needCode, needWrite, needContinue;
|
|
|
|
size_t inDataSize_Start;
|
|
UInt64 inDataSize;
|
|
// UInt64 inDataSize_Full;
|
|
|
|
UInt64 blockIndex;
|
|
|
|
UInt64 inPrev = 0;
|
|
UInt64 outPrev = 0;
|
|
UInt64 inCodePos;
|
|
UInt64 outCodePos;
|
|
|
|
Byte *afterEndData = NULL;
|
|
size_t afterEndData_Size = 0;
|
|
|
|
BoolInt canCreateNewThread = False;
|
|
// CMtDecCallbackInfo parse;
|
|
CMtDecThread *nextThread;
|
|
|
|
PRF_STR_INT("Event_Wait(&t->canRead)", t->index);
|
|
|
|
RINOK_THREAD(Event_Wait(&t->canRead));
|
|
if (p->exitThread)
|
|
return 0;
|
|
|
|
PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
|
|
|
|
// if (t->index == 3) return 19; // for test
|
|
|
|
blockIndex = p->blockIndex++;
|
|
|
|
// PRF(printf("\ncanRead\n"))
|
|
|
|
res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
|
|
|
|
finish = p->readWasFinished;
|
|
needCode = False;
|
|
needWrite = False;
|
|
isAllocError = False;
|
|
overflow = False;
|
|
|
|
inDataSize_Start = 0;
|
|
inDataSize = 0;
|
|
// inDataSize_Full = 0;
|
|
|
|
if (res == SZ_OK && !wasInterrupted)
|
|
{
|
|
// if (p->inStream)
|
|
{
|
|
CMtDecBufLink *prev = NULL;
|
|
CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
|
|
size_t crossSize = p->crossEnd - p->crossStart;
|
|
|
|
PRF(printf("\ncrossSize = %d\n", crossSize));
|
|
|
|
for (;;)
|
|
{
|
|
if (!link)
|
|
{
|
|
link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
|
|
if (!link)
|
|
{
|
|
finish = True;
|
|
// p->allocError_for_Read_BlockIndex = blockIndex;
|
|
isAllocError = True;
|
|
break;
|
|
}
|
|
link->next = NULL;
|
|
if (prev)
|
|
{
|
|
// static unsigned g_num = 0;
|
|
// printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
|
|
prev->next = link;
|
|
}
|
|
else
|
|
t->inBuf = (void *)link;
|
|
}
|
|
|
|
{
|
|
Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
|
|
Byte *parseData = data;
|
|
size_t size;
|
|
|
|
if (crossSize != 0)
|
|
{
|
|
inDataSize = crossSize;
|
|
// inDataSize_Full = inDataSize;
|
|
inDataSize_Start = crossSize;
|
|
size = crossSize;
|
|
parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
|
|
PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",
|
|
(int)p->crossStart, (int)p->crossEnd, (int)finish));
|
|
}
|
|
else
|
|
{
|
|
size = p->inBufSize;
|
|
|
|
res = FullRead(p->inStream, data, &size);
|
|
|
|
// size = 10; // test
|
|
|
|
inDataSize += size;
|
|
// inDataSize_Full = inDataSize;
|
|
if (!prev)
|
|
inDataSize_Start = size;
|
|
|
|
p->readProcessed += size;
|
|
finish = (size != p->inBufSize);
|
|
if (finish)
|
|
p->readWasFinished = True;
|
|
|
|
// res = E_INVALIDARG; // test
|
|
|
|
if (res != SZ_OK)
|
|
{
|
|
// PRF(printf("\nRead error = %d\n", res))
|
|
// we want to decode all data before error
|
|
p->readRes = res;
|
|
// p->readError_BlockIndex = blockIndex;
|
|
p->readWasFinished = True;
|
|
finish = True;
|
|
res = SZ_OK;
|
|
// break;
|
|
}
|
|
|
|
if (inDataSize - inPrev >= MTDEC_ProgessStep)
|
|
{
|
|
res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
|
|
if (res != SZ_OK || wasInterrupted)
|
|
break;
|
|
inPrev = inDataSize;
|
|
}
|
|
}
|
|
|
|
{
|
|
CMtDecCallbackInfo parse;
|
|
|
|
parse.startCall = (prev == NULL);
|
|
parse.src = parseData;
|
|
parse.srcSize = size;
|
|
parse.srcFinished = finish;
|
|
parse.canCreateNewThread = True;
|
|
|
|
// PRF(printf("\nParse size = %d\n", (unsigned)size))
|
|
|
|
p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
|
|
|
|
needWrite = True;
|
|
canCreateNewThread = parse.canCreateNewThread;
|
|
|
|
// printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
|
|
|
|
if (
|
|
// parseRes != SZ_OK ||
|
|
// inDataSize - (size - parse.srcSize) > p->inBlockMax
|
|
// ||
|
|
parse.state == MTDEC_PARSE_OVERFLOW
|
|
// || wasInterrupted
|
|
)
|
|
{
|
|
// Overflow or Parse error - switch from MT decoding to ST decoding
|
|
finish = True;
|
|
overflow = True;
|
|
|
|
{
|
|
PRF(printf("\n Overflow"));
|
|
// PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
|
|
PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
|
|
}
|
|
|
|
if (crossSize != 0)
|
|
memcpy(data, parseData, size);
|
|
p->crossStart = 0;
|
|
p->crossEnd = 0;
|
|
break;
|
|
}
|
|
|
|
if (crossSize != 0)
|
|
{
|
|
memcpy(data, parseData, parse.srcSize);
|
|
p->crossStart += parse.srcSize;
|
|
}
|
|
|
|
if (parse.state != MTDEC_PARSE_CONTINUE || finish)
|
|
{
|
|
// we don't need to parse in current thread anymore
|
|
|
|
if (parse.state == MTDEC_PARSE_END)
|
|
finish = True;
|
|
|
|
needCode = True;
|
|
// p->crossFinished = finish;
|
|
|
|
if (parse.srcSize == size)
|
|
{
|
|
// full parsed - no cross transfer
|
|
p->crossStart = 0;
|
|
p->crossEnd = 0;
|
|
break;
|
|
}
|
|
|
|
if (parse.state == MTDEC_PARSE_END)
|
|
{
|
|
p->crossStart = 0;
|
|
p->crossEnd = 0;
|
|
|
|
if (crossSize != 0)
|
|
memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
|
|
afterEndData_Size = size - parse.srcSize;
|
|
afterEndData = parseData + parse.srcSize;
|
|
|
|
// we reduce data size to required bytes (parsed only)
|
|
inDataSize -= (size - parse.srcSize);
|
|
if (!prev)
|
|
inDataSize_Start = parse.srcSize;
|
|
break;
|
|
}
|
|
|
|
{
|
|
// partial parsed - need cross transfer
|
|
if (crossSize != 0)
|
|
inDataSize = parse.srcSize; // it's only parsed now
|
|
else
|
|
{
|
|
// partial parsed - is not in initial cross block - we need to copy new data to cross block
|
|
Byte *cr = MtDec_GetCrossBuff(p);
|
|
if (!cr)
|
|
{
|
|
{
|
|
PRF(printf("\ncross alloc error error\n"));
|
|
// res = SZ_ERROR_MEM;
|
|
finish = True;
|
|
// p->allocError_for_Read_BlockIndex = blockIndex;
|
|
isAllocError = True;
|
|
break;
|
|
}
|
|
}
|
|
|
|
{
|
|
size_t crSize = size - parse.srcSize;
|
|
inDataSize -= crSize;
|
|
p->crossEnd = crSize;
|
|
p->crossStart = 0;
|
|
memcpy(cr, parseData + parse.srcSize, crSize);
|
|
}
|
|
}
|
|
|
|
// inDataSize_Full = inDataSize;
|
|
if (!prev)
|
|
inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
|
|
|
|
finish = False;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (parse.srcSize != size)
|
|
{
|
|
res = SZ_ERROR_FAIL;
|
|
PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
prev = link;
|
|
link = link->next;
|
|
|
|
if (crossSize != 0)
|
|
{
|
|
crossSize = 0;
|
|
p->crossStart = 0;
|
|
p->crossEnd = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (res == SZ_OK)
|
|
res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
|
|
}
|
|
|
|
codeRes = SZ_OK;
|
|
|
|
if (res == SZ_OK && needCode && !wasInterrupted)
|
|
{
|
|
codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
|
|
if (codeRes != SZ_OK)
|
|
{
|
|
needCode = False;
|
|
finish = True;
|
|
// SZ_ERROR_MEM is expected error here.
|
|
// if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
|
|
// if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
|
|
}
|
|
}
|
|
|
|
if (res != SZ_OK || wasInterrupted)
|
|
finish = True;
|
|
|
|
nextThread = NULL;
|
|
threadingErrorSRes = SZ_OK;
|
|
|
|
if (!finish)
|
|
{
|
|
if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
|
|
{
|
|
SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
|
|
if (res2 == SZ_OK)
|
|
{
|
|
// if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
|
|
p->numStartedThreads++;
|
|
}
|
|
else
|
|
{
|
|
PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
|
|
if (p->numStartedThreads == 1)
|
|
{
|
|
// if only one thread is possible, we leave muti-threading code
|
|
finish = True;
|
|
needCode = False;
|
|
threadingErrorSRes = res2;
|
|
}
|
|
else
|
|
p->numStartedThreads_Limit = p->numStartedThreads;
|
|
}
|
|
}
|
|
|
|
if (!finish)
|
|
{
|
|
unsigned nextIndex = t->index + 1;
|
|
nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
|
|
RINOK_THREAD(Event_Set(&nextThread->canRead))
|
|
// We have started executing for new iteration (with next thread)
|
|
// And that next thread now is responsible for possible exit from decoding (threading_code)
|
|
}
|
|
}
|
|
|
|
// each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
|
|
// if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
|
|
// if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
|
|
// - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
|
|
// - otherwise we stop decoding and exit from ThreadFunc2()
|
|
|
|
// Don't change (finish) variable in the further code
|
|
|
|
|
|
// ---------- CODE ----------
|
|
|
|
inPrev = 0;
|
|
outPrev = 0;
|
|
inCodePos = 0;
|
|
outCodePos = 0;
|
|
|
|
if (res == SZ_OK && needCode && codeRes == SZ_OK)
|
|
{
|
|
BoolInt isStartBlock = True;
|
|
CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
|
|
|
|
for (;;)
|
|
{
|
|
size_t inSize;
|
|
int stop;
|
|
|
|
if (isStartBlock)
|
|
inSize = inDataSize_Start;
|
|
else
|
|
{
|
|
UInt64 rem = inDataSize - inCodePos;
|
|
inSize = p->inBufSize;
|
|
if (inSize > rem)
|
|
inSize = (size_t)rem;
|
|
}
|
|
|
|
inCodePos += inSize;
|
|
stop = True;
|
|
|
|
codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
|
|
(const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
|
|
(inCodePos == inDataSize), // srcFinished
|
|
&inCodePos, &outCodePos, &stop);
|
|
|
|
if (codeRes != SZ_OK)
|
|
{
|
|
PRF(printf("\nCode Interrupt error = %x\n", codeRes));
|
|
// we interrupt only later blocks
|
|
MtDec_Interrupt(p, blockIndex);
|
|
break;
|
|
}
|
|
|
|
if (stop || inCodePos == inDataSize)
|
|
break;
|
|
|
|
{
|
|
const UInt64 inDelta = inCodePos - inPrev;
|
|
const UInt64 outDelta = outCodePos - outPrev;
|
|
if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
|
|
{
|
|
// Sleep(1);
|
|
res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
|
|
if (res != SZ_OK || wasInterrupted)
|
|
break;
|
|
inPrev = inCodePos;
|
|
outPrev = outCodePos;
|
|
}
|
|
}
|
|
|
|
link = link->next;
|
|
isStartBlock = False;
|
|
}
|
|
}
|
|
|
|
|
|
// ---------- WRITE ----------
|
|
|
|
RINOK_THREAD(Event_Wait(&t->canWrite));
|
|
|
|
{
|
|
BoolInt isErrorMode = False;
|
|
BoolInt canRecode = True;
|
|
BoolInt needWriteToStream = needWrite;
|
|
|
|
if (p->exitThread) return 0; // it's never executed in normal cases
|
|
|
|
if (p->wasInterrupted)
|
|
wasInterrupted = True;
|
|
else
|
|
{
|
|
if (codeRes != SZ_OK) // || !needCode // check it !!!
|
|
{
|
|
p->wasInterrupted = True;
|
|
p->codeRes = codeRes;
|
|
if (codeRes == SZ_ERROR_MEM)
|
|
isAllocError = True;
|
|
}
|
|
|
|
if (threadingErrorSRes)
|
|
{
|
|
p->wasInterrupted = True;
|
|
p->threadingErrorSRes = threadingErrorSRes;
|
|
needWriteToStream = False;
|
|
}
|
|
if (isAllocError)
|
|
{
|
|
p->wasInterrupted = True;
|
|
p->isAllocError = True;
|
|
needWriteToStream = False;
|
|
}
|
|
if (overflow)
|
|
{
|
|
p->wasInterrupted = True;
|
|
p->overflow = True;
|
|
needWriteToStream = False;
|
|
}
|
|
}
|
|
|
|
if (needCode)
|
|
{
|
|
if (wasInterrupted)
|
|
{
|
|
inCodePos = 0;
|
|
outCodePos = 0;
|
|
}
|
|
{
|
|
const UInt64 inDelta = inCodePos - inPrev;
|
|
const UInt64 outDelta = outCodePos - outPrev;
|
|
// if (inDelta != 0 || outDelta != 0)
|
|
res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
|
|
}
|
|
}
|
|
|
|
needContinue = (!finish);
|
|
|
|
// if (res == SZ_OK && needWrite && !wasInterrupted)
|
|
if (needWrite)
|
|
{
|
|
// p->inProcessed += inCodePos;
|
|
|
|
res = p->mtCallback->Write(p->mtCallbackObject, t->index,
|
|
res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
|
|
afterEndData, afterEndData_Size,
|
|
&needContinue,
|
|
&canRecode);
|
|
|
|
// res= E_INVALIDARG; // for test
|
|
|
|
PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
|
|
PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
|
|
|
|
if (res != SZ_OK)
|
|
{
|
|
PRF(printf("\nWrite error = %d\n", res));
|
|
isErrorMode = True;
|
|
p->wasInterrupted = True;
|
|
}
|
|
if (res != SZ_OK
|
|
|| (!needContinue && !finish))
|
|
{
|
|
PRF(printf("\nWrite Interrupt error = %x\n", res));
|
|
MtDec_Interrupt(p, blockIndex);
|
|
}
|
|
}
|
|
|
|
if (canRecode)
|
|
if (!needCode
|
|
|| res != SZ_OK
|
|
|| p->wasInterrupted
|
|
|| codeRes != SZ_OK
|
|
|| wasInterrupted
|
|
|| p->numFilledThreads != 0
|
|
|| isErrorMode)
|
|
{
|
|
if (p->numFilledThreads == 0)
|
|
p->filledThreadStart = t->index;
|
|
if (inDataSize != 0 || !finish)
|
|
{
|
|
t->inDataSize_Start = inDataSize_Start;
|
|
t->inDataSize = inDataSize;
|
|
p->numFilledThreads++;
|
|
}
|
|
PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
|
|
PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
|
|
}
|
|
|
|
if (!finish)
|
|
{
|
|
RINOK_THREAD(Event_Set(&nextThread->canWrite));
|
|
}
|
|
else
|
|
{
|
|
if (needContinue)
|
|
{
|
|
// we restore decoding with new iteration
|
|
RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
|
|
}
|
|
else
|
|
{
|
|
// we exit from decoding
|
|
if (t->index == 0)
|
|
return SZ_OK;
|
|
p->exitThread = True;
|
|
}
|
|
RINOK_THREAD(Event_Set(&p->threads[0].canRead));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#ifdef _WIN32
|
|
#define USE_ALLOCA
|
|
#endif
|
|
|
|
#ifdef USE_ALLOCA
|
|
#ifdef _WIN32
|
|
#include <malloc.h>
|
|
#else
|
|
#include <stdlib.h>
|
|
#endif
|
|
#endif
|
|
|
|
|
|
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
|
|
{
|
|
WRes res;
|
|
|
|
CMtDecThread *t = (CMtDecThread *)pp;
|
|
CMtDec *p;
|
|
|
|
// fprintf(stdout, "\n%d = %p\n", t->index, &t);
|
|
|
|
res = ThreadFunc2(t);
|
|
p = t->mtDec;
|
|
if (res == 0)
|
|
return p->exitThreadWRes;
|
|
{
|
|
// it's unexpected situation for some threading function error
|
|
if (p->exitThreadWRes == 0)
|
|
p->exitThreadWRes = res;
|
|
PRF(printf("\nthread exit error = %d\n", res));
|
|
p->exitThread = True;
|
|
Event_Set(&p->threads[0].canRead);
|
|
Event_Set(&p->threads[0].canWrite);
|
|
MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
|
|
}
|
|
return res;
|
|
}
|
|
|
|
static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
|
|
{
|
|
CMtDecThread *t = (CMtDecThread *)pp;
|
|
|
|
// fprintf(stderr, "\n%d = %p - before", t->index, &t);
|
|
#ifdef USE_ALLOCA
|
|
t->allocaPtr = alloca(t->index * 128);
|
|
#endif
|
|
return ThreadFunc1(pp);
|
|
}
|
|
|
|
|
|
int MtDec_PrepareRead(CMtDec *p)
|
|
{
|
|
if (p->crossBlock && p->crossStart == p->crossEnd)
|
|
{
|
|
ISzAlloc_Free(p->alloc, p->crossBlock);
|
|
p->crossBlock = NULL;
|
|
}
|
|
|
|
{
|
|
unsigned i;
|
|
for (i = 0; i < MTDEC__THREADS_MAX; i++)
|
|
if (i > p->numStartedThreads
|
|
|| p->numFilledThreads <=
|
|
(i >= p->filledThreadStart ?
|
|
i - p->filledThreadStart :
|
|
i + p->numStartedThreads - p->filledThreadStart))
|
|
MtDecThread_FreeInBufs(&p->threads[i]);
|
|
}
|
|
|
|
return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
|
|
}
|
|
|
|
|
|
const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
|
|
{
|
|
while (p->numFilledThreads != 0)
|
|
{
|
|
CMtDecThread *t = &p->threads[p->filledThreadStart];
|
|
|
|
if (*inLim != 0)
|
|
{
|
|
{
|
|
void *link = t->inBuf;
|
|
void *next = ((CMtDecBufLink *)link)->next;
|
|
ISzAlloc_Free(p->alloc, link);
|
|
t->inBuf = next;
|
|
}
|
|
|
|
if (t->inDataSize == 0)
|
|
{
|
|
MtDecThread_FreeInBufs(t);
|
|
if (--p->numFilledThreads == 0)
|
|
break;
|
|
if (++p->filledThreadStart == p->numStartedThreads)
|
|
p->filledThreadStart = 0;
|
|
t = &p->threads[p->filledThreadStart];
|
|
}
|
|
}
|
|
|
|
{
|
|
size_t lim = t->inDataSize_Start;
|
|
if (lim != 0)
|
|
t->inDataSize_Start = 0;
|
|
else
|
|
{
|
|
UInt64 rem = t->inDataSize;
|
|
lim = p->inBufSize;
|
|
if (lim > rem)
|
|
lim = (size_t)rem;
|
|
}
|
|
t->inDataSize -= lim;
|
|
*inLim = lim;
|
|
return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
|
|
}
|
|
}
|
|
|
|
{
|
|
size_t crossSize = p->crossEnd - p->crossStart;
|
|
if (crossSize != 0)
|
|
{
|
|
const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
|
|
*inLim = crossSize;
|
|
p->crossStart = 0;
|
|
p->crossEnd = 0;
|
|
return data;
|
|
}
|
|
*inLim = 0;
|
|
if (p->crossBlock)
|
|
{
|
|
ISzAlloc_Free(p->alloc, p->crossBlock);
|
|
p->crossBlock = NULL;
|
|
}
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
|
|
void MtDec_Construct(CMtDec *p)
|
|
{
|
|
unsigned i;
|
|
|
|
p->inBufSize = (size_t)1 << 18;
|
|
|
|
p->numThreadsMax = 0;
|
|
|
|
p->inStream = NULL;
|
|
|
|
// p->inData = NULL;
|
|
// p->inDataSize = 0;
|
|
|
|
p->crossBlock = NULL;
|
|
p->crossStart = 0;
|
|
p->crossEnd = 0;
|
|
|
|
p->numFilledThreads = 0;
|
|
|
|
p->progress = NULL;
|
|
p->alloc = NULL;
|
|
|
|
p->mtCallback = NULL;
|
|
p->mtCallbackObject = NULL;
|
|
|
|
p->allocatedBufsSize = 0;
|
|
|
|
for (i = 0; i < MTDEC__THREADS_MAX; i++)
|
|
{
|
|
CMtDecThread *t = &p->threads[i];
|
|
t->mtDec = p;
|
|
t->index = i;
|
|
t->inBuf = NULL;
|
|
Event_Construct(&t->canRead);
|
|
Event_Construct(&t->canWrite);
|
|
Thread_Construct(&t->thread);
|
|
}
|
|
|
|
// Event_Construct(&p->finishedEvent);
|
|
|
|
CriticalSection_Init(&p->mtProgress.cs);
|
|
}
|
|
|
|
|
|
static void MtDec_Free(CMtDec *p)
|
|
{
|
|
unsigned i;
|
|
|
|
p->exitThread = True;
|
|
|
|
for (i = 0; i < MTDEC__THREADS_MAX; i++)
|
|
MtDecThread_Destruct(&p->threads[i]);
|
|
|
|
// Event_Close(&p->finishedEvent);
|
|
|
|
if (p->crossBlock)
|
|
{
|
|
ISzAlloc_Free(p->alloc, p->crossBlock);
|
|
p->crossBlock = NULL;
|
|
}
|
|
}
|
|
|
|
|
|
void MtDec_Destruct(CMtDec *p)
|
|
{
|
|
MtDec_Free(p);
|
|
|
|
CriticalSection_Delete(&p->mtProgress.cs);
|
|
}
|
|
|
|
|
|
SRes MtDec_Code(CMtDec *p)
|
|
{
|
|
unsigned i;
|
|
|
|
p->inProcessed = 0;
|
|
|
|
p->blockIndex = 1; // it must be larger than not_defined index (0)
|
|
p->isAllocError = False;
|
|
p->overflow = False;
|
|
p->threadingErrorSRes = SZ_OK;
|
|
|
|
p->needContinue = True;
|
|
|
|
p->readWasFinished = False;
|
|
p->needInterrupt = False;
|
|
p->interruptIndex = (UInt64)(Int64)-1;
|
|
|
|
p->readProcessed = 0;
|
|
p->readRes = SZ_OK;
|
|
p->codeRes = SZ_OK;
|
|
p->wasInterrupted = False;
|
|
|
|
p->crossStart = 0;
|
|
p->crossEnd = 0;
|
|
|
|
p->filledThreadStart = 0;
|
|
p->numFilledThreads = 0;
|
|
|
|
{
|
|
unsigned numThreads = p->numThreadsMax;
|
|
if (numThreads > MTDEC__THREADS_MAX)
|
|
numThreads = MTDEC__THREADS_MAX;
|
|
p->numStartedThreads_Limit = numThreads;
|
|
p->numStartedThreads = 0;
|
|
}
|
|
|
|
if (p->inBufSize != p->allocatedBufsSize)
|
|
{
|
|
for (i = 0; i < MTDEC__THREADS_MAX; i++)
|
|
{
|
|
CMtDecThread *t = &p->threads[i];
|
|
if (t->inBuf)
|
|
MtDecThread_FreeInBufs(t);
|
|
}
|
|
if (p->crossBlock)
|
|
{
|
|
ISzAlloc_Free(p->alloc, p->crossBlock);
|
|
p->crossBlock = NULL;
|
|
}
|
|
|
|
p->allocatedBufsSize = p->inBufSize;
|
|
}
|
|
|
|
MtProgress_Init(&p->mtProgress, p->progress);
|
|
|
|
// RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
|
|
p->exitThread = False;
|
|
p->exitThreadWRes = 0;
|
|
|
|
{
|
|
WRes wres;
|
|
WRes sres;
|
|
CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
|
|
// wres = MtDecThread_CreateAndStart(nextThread);
|
|
wres = MtDecThread_CreateEvents(nextThread);
|
|
if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
|
|
if (wres == 0) { wres = Event_Set(&nextThread->canRead);
|
|
if (wres == 0) { wres = ThreadFunc(nextThread);
|
|
if (wres != 0)
|
|
{
|
|
p->needContinue = False;
|
|
MtDec_CloseThreads(p);
|
|
}}}}
|
|
|
|
// wres = 17; // for test
|
|
// wres = Event_Wait(&p->finishedEvent);
|
|
|
|
sres = MY_SRes_HRESULT_FROM_WRes(wres);
|
|
|
|
if (sres != 0)
|
|
p->threadingErrorSRes = sres;
|
|
|
|
if (
|
|
// wres == 0
|
|
// wres != 0
|
|
// || p->mtc.codeRes == SZ_ERROR_MEM
|
|
p->isAllocError
|
|
|| p->threadingErrorSRes != SZ_OK
|
|
|| p->overflow)
|
|
{
|
|
// p->needContinue = True;
|
|
}
|
|
else
|
|
p->needContinue = False;
|
|
|
|
if (p->needContinue)
|
|
return SZ_OK;
|
|
|
|
// if (sres != SZ_OK)
|
|
return sres;
|
|
// return E_FAIL;
|
|
}
|
|
}
|
|
|
|
#endif
|