fixed a few access contention
passes thread sanitizer test
diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c
index 08171f5..5624b5e 100644
--- a/lib/compress/zstdmt_compress.c
+++ b/lib/compress/zstdmt_compress.c
@@ -390,15 +390,17 @@
BYTE* oend = op + dstBuff.size;
int blockNb;
DEBUGLOG(5, "ZSTDMT_compressChunk: compress %u bytes in %i blocks", (U32)job->srcSize, nbBlocks);
- job->cSize = 0;
+ assert(job->cSize == 0);
for (blockNb = 1; blockNb < nbBlocks; blockNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
ip += ZSTD_BLOCKSIZE_MAX;
op += cSize; assert(op < oend);
/* stats */
+ ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
job->consumed = ZSTD_BLOCKSIZE_MAX * blockNb;
+ ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
}
/* last block */
if ((nbBlocks > 0) | job->lastChunk /*need to output a "last block" flag*/ ) {
@@ -409,9 +411,11 @@
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
/* stats */
+ ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex); /* note : it's a mtctx mutex */
job->cSize += cSize;
+ job->consumed = job->srcSize;
+ ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
}
- job->consumed = job->srcSize;
}
#endif
@@ -422,6 +426,7 @@
job->src = g_nullBuffer; job->srcStart = NULL;
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
+ job->consumed = job->srcSize;
job->jobCompleted = 1;
job->jobScanned = 0;
ZSTD_pthread_cond_signal(job->jobCompleted_cond);
@@ -757,6 +762,8 @@
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].prefixSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize;
+ mtctx->jobs[u].consumed = 0;
+ mtctx->jobs[u].cSize = 0;
mtctx->jobs[u].cdict = (u==0) ? cdict : NULL;
mtctx->jobs[u].fullFrameSize = srcSize;
mtctx->jobs[u].params = jobParams;
@@ -996,6 +1003,7 @@
zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
zcs->jobs[jobID].srcSize = srcSize;
zcs->jobs[jobID].consumed = 0;
+ zcs->jobs[jobID].cSize = 0;
zcs->jobs[jobID].prefixSize = zcs->prefixSize;
assert(zcs->inBuff.filled >= srcSize + zcs->prefixSize);
zcs->jobs[jobID].params = zcs->params;
diff --git a/programs/fileio.c b/programs/fileio.c
index 6a7bb1b..18552aa 100644
--- a/programs/fileio.c
+++ b/programs/fileio.c
@@ -813,8 +813,8 @@
}
}
#if 1
- if (READY_FOR_UPDATE)
- { ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
+ if (READY_FOR_UPDATE) {
+ ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
(U32)(zfp.ingested >> 20),
(U32)(zfp.consumed >> 20),