zstd cli can now compress using multi-threading
added : command -T#
added : ZSTD_resetCStream() (zstdmt_compress)
added : FIO_setNbThreads() (fileio)
diff --git a/programs/fileio.c b/programs/fileio.c
index a112cc0..3864a5f 100644
--- a/programs/fileio.c
+++ b/programs/fileio.c
@@ -34,11 +34,14 @@
#include "fileio.h"
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */
#include "zstd.h"
-#ifdef ZSTD_GZDECOMPRESS
-#include "zlib.h"
-#if !defined(z_const)
- #define z_const
+#ifdef ZSTD_MULTITHREAD
+# include "zstdmt_compress.h"
#endif
+#ifdef ZSTD_GZDECOMPRESS
+# include "zlib.h"
+# if !defined(z_const)
+# define z_const
+# endif
#endif
@@ -103,7 +106,13 @@
void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); }
static U32 g_memLimit = 0;
void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; }
-
+static U32 g_nbThreads = 1;
+void FIO_setNbThreads(unsigned nbThreads) {
+#ifndef ZSTD_MULTITHREAD
+ if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
+#endif
+ g_nbThreads = nbThreads;
+}
/*-*************************************
@@ -226,22 +235,30 @@
* Compression
************************************************************************/
typedef struct {
+ FILE* srcFile;
+ FILE* dstFile;
void* srcBuffer;
size_t srcBufferSize;
void* dstBuffer;
size_t dstBufferSize;
+#ifdef ZSTD_MULTITHREAD
+ ZSTDMT_CCtx* cctx;
+#else
ZSTD_CStream* cctx;
- FILE* dstFile;
- FILE* srcFile;
+#endif
} cRess_t;
-static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
+static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
U64 srcSize, ZSTD_compressionParameters* comprParams)
{
cRess_t ress;
memset(&ress, 0, sizeof(ress));
+#ifdef ZSTD_MULTITHREAD
+ ress.cctx = ZSTDMT_createCCtx(g_nbThreads);
+#else
ress.cctx = ZSTD_createCStream();
+#endif
if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream");
ress.srcBufferSize = ZSTD_CStreamInSize();
ress.srcBuffer = malloc(ress.srcBufferSize);
@@ -264,7 +281,11 @@
if (comprParams->searchLength) params.cParams.searchLength = comprParams->searchLength;
if (comprParams->targetLength) params.cParams.targetLength = comprParams->targetLength;
if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1);
+#ifdef ZSTD_MULTITHREAD
+ { size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
+#else
{ size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
+#endif
if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
} }
free(dictBuffer);
@@ -277,7 +298,11 @@
{
free(ress.srcBuffer);
free(ress.dstBuffer);
+#ifdef ZSTD_MULTITHREAD
+ ZSTDMT_freeCCtx(ress.cctx);
+#else
ZSTD_freeCStream(ress.cctx); /* never fails */
+#endif
}
@@ -296,7 +321,11 @@
U64 const fileSize = UTIL_getFileSize(srcFileName);
/* init */
+#ifdef ZSTD_MULTITHREAD
+ { size_t const resetError = ZSTDMT_resetCStream(ress.cctx, fileSize);
+#else
{ size_t const resetError = ZSTD_resetCStream(ress.cctx, fileSize);
+#endif
if (ZSTD_isError(resetError)) EXM_THROW(21, "Error initializing compression : %s", ZSTD_getErrorName(resetError));
}
@@ -311,11 +340,14 @@
/* Compress using buffered streaming */
{ ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
- { size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
- if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); }
- if (inBuff.pos != inBuff.size)
- /* inBuff should be entirely consumed since buffer sizes are recommended ones */
- EXM_THROW(24, "Compression error : input block not fully consumed");
+ while (inBuff.pos != inBuff.size) { /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */
+#ifdef ZSTD_MULTITHREAD
+ size_t const result = ZSTDMT_compressStream(ress.cctx, &outBuff, &inBuff);
+#else
+ size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
+#endif
+ if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result));
+ }
/* Write cBlock */
{ size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
@@ -326,13 +358,19 @@
}
/* End of Frame */
- { ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
- size_t const result = ZSTD_endStream(ress.cctx, &outBuff);
- if (result!=0) EXM_THROW(26, "Compression error : cannot create frame end");
-
- { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
- if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); }
- compressedfilesize += outBuff.pos;
+ { size_t result = 1;
+ while (result!=0) { /* note : is there any possibility of endless loop ? */
+ ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
+#ifdef ZSTD_MULTITHREAD
+ result = ZSTDMT_endStream(ress.cctx, &outBuff);
+#else
+ result = ZSTD_endStream(ress.cctx, &outBuff);
+#endif
+ if (ZSTD_isError(result)) EXM_THROW(26, "Compression error during frame end : %s", ZSTD_getErrorName(result));
+ { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
+ if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); }
+ compressedfilesize += outBuff.pos;
+ }
}
/* Status */
@@ -632,7 +670,7 @@
if (ZSTD_isError(readSizeHint)) EXM_THROW(36, "Decoding error : %s", ZSTD_getErrorName(readSizeHint));
/* Write block */
- storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips);
+ storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips);
frameSize += outBuff.pos;
DISPLAYUPDATE(2, "\rDecoded : %u MB... ", (U32)((alreadyDecoded+frameSize)>>20) );