Merge pull request #520 from iburinoc/spec
Updated format specification to be easier to understand
diff --git a/.travis.yml b/.travis.yml
index 6bf99f1..ba9f996 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,10 +8,6 @@
# Container-based Ubuntu 12.04 LTS Server Edition 64 bit (doesn't support 32-bit includes)
- - env: Ubu=12.04cont Cmd="make test && make clean && make travis-install"
- os: linux
- sudo: false
-
- env: Ubu=12.04cont Cmd="make zlibwrapper && make clean && make -C tests test-symbols && make clean && make -C tests test-zstd-nolegacy && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest pzstd tests check && make -C contrib/pzstd clean"
os: linux
sudo: false
@@ -80,7 +76,6 @@
- gcc-arm-linux-gnueabi
- libc6-dev-armel-cross
- # Ubuntu 14.04 LTS Server Edition 64 bit
- env: Ubu=14.04 Cmd="make aarch64test"
dist: trusty
sudo: required
@@ -92,7 +87,7 @@
- gcc-aarch64-linux-gnu
- libc6-dev-arm64-cross
- - env: Ubu=14.04 Cmd='make ppctest && make clean && make ppc64test'
+ - env: Ubu=14.04 Cmd='make ppctest'
dist: trusty
sudo: required
addons:
@@ -101,7 +96,16 @@
- qemu-system-ppc
- qemu-user-static
- gcc-powerpc-linux-gnu
- - libc6-dev-armel-cross
+
+ - env: Ubu=14.04 Cmd='make ppc64test'
+ dist: trusty
+ sudo: required
+ addons:
+ apt:
+ packages:
+ - qemu-system-ppc
+ - qemu-user-static
+ - gcc-powerpc-linux-gnu
- env: Ubu=14.04 Cmd='make -C lib all && CFLAGS="-O1 -g" make -C zlibWrapper valgrindTest && make -C tests valgrindTest'
os: linux
@@ -112,9 +116,26 @@
packages:
- valgrind
+
+
+ # other feature branches => short tests
+ - env: Ubu=12.04cont Cmd="make test && make clean && make travis-install"
+ os: linux
+ sudo: false
+
+ - env: Ubu=14.04 Cmd="make -C tests test32"
+ os: linux
+ dist: trusty
+ sudo: required
+ addons:
+ apt:
+ packages:
+ - libc6-dev-i386
+ - gcc-multilib
+
- env: Ubu=14.04 Cmd="make gpptest && make clean && make gnu90test && make clean
&& make c99test && make clean && make gnu99test && make clean
- && make clangtest && make clean && make -C contrib/pzstd googletest32
+ && make clangtest && make clean && make -C contrib/pzstd googletest32
&& make -C contrib/pzstd all32 && make -C contrib/pzstd check && make -C contrib/pzstd clean"
os: linux
dist: trusty
@@ -131,16 +152,6 @@
- g++-4.8
- g++-4.8-multilib
- - env: Ubu=14.04 Cmd="make -C tests test32"
- os: linux
- dist: trusty
- sudo: required
- addons:
- apt:
- packages:
- - libc6-dev-i386
- - gcc-multilib
-
- env: Ubu=14.04 Cmd="make gcc5test && make clean && make gcc6test && make clean && make -C tests dll"
os: linux
dist: trusty
@@ -158,5 +169,7 @@
script:
- JOB_NUMBER=$(echo $TRAVIS_JOB_NUMBER | sed -e 's:[0-9][0-9]*\.\(.*\):\1:')
- # - if [ $JOB_NUMBER -eq 9 ] || [ $JOB_NUMBER -eq 10 ]; then sh -c "$Cmd"; fi
- - sh -c "$Cmd"
+ # dev => normal tests; other feature branches => short tests (number > 11)
+ - if [ "$TRAVIS_PULL_REQUEST" = "true" ] || [ $JOB_NUMBER -gt 11 ] || [ "$TRAVIS_BRANCH" = "dev" ] && [ "$TRAVIS_BRANCH" != "master" ]; then sh -c "$Cmd"; fi
+ # master => long tests, as this is the final step towards a Release
+ - if [ "$TRAVIS_BRANCH" = "master" ]; then FUZZERTEST=-T10mn sh -c "$Cmd"; fi
diff --git a/Makefile b/Makefile
index 8569ee6..14d1510 100644
--- a/Makefile
+++ b/Makefile
@@ -50,6 +50,11 @@
@$(MAKE) -C $(PRGDIR) $@
cp $(PRGDIR)/zstd$(EXT) .
+.PHONY: zstdmt
+zstdmt:
+ @$(MAKE) -C $(PRGDIR) $@
+ cp $(PRGDIR)/zstd$(EXT) ./zstdmt$(EXT)
+
.PHONY: zlibwrapper
zlibwrapper:
$(MAKE) -C $(ZWRAPDIR) test
diff --git a/NEWS b/NEWS
index 46bdb25..6e82494 100644
--- a/NEWS
+++ b/NEWS
@@ -1,11 +1,15 @@
v1.1.3
+cli : new : experimental target `make zstdmt`, with multi-threading support
cli : new : advanced commands for detailed parameters, by Przemyslaw Skibinski
cli : fix zstdless on Mac OS-X, by Andrew Janke
+cli : fix #232 "compress non-files"
dictBuilder : improved dictionary generation quality, thanks to Nick Terrell
-API : fix : all symbols properly exposed in libzstd, by Nick Terrell
-API : fix : ZSTD_initCStream_usingCDict() properly writes dictID into frame header, by Gregory Szorc (#511)
+API : new : lib/compress/ZSTDMT_compress.h multithreading API (experimental)
API : new : ZSTD_create?Dict_byReference(), requested by Bartosz Taudul
API : new : ZDICT_finalizeDictionary()
+API : fix : ZSTD_initCStream_usingCDict() properly writes dictID into frame header, by Gregory Szorc (#511)
+API : fix : all symbols properly exposed in libzstd, by Nick Terrell
+build : support for Solaris target, by Przemyslaw Skibinski
v1.1.2
API : streaming : decompression : changed : automatic implicit reset when chain-decoding new frames without init
diff --git a/build/VS2008/zstd/zstd.vcproj b/build/VS2008/zstd/zstd.vcproj
index 3bd5ed5..0beb59d 100644
--- a/build/VS2008/zstd/zstd.vcproj
+++ b/build/VS2008/zstd/zstd.vcproj
@@ -381,6 +381,14 @@
>
</File>
<File
+ RelativePath="..\..\..\lib\common\pool.c"
+ >
+ </File>
+ <File
+ RelativePath="..\..\..\lib\common\threading.c"
+ >
+ </File>
+ <File
RelativePath="..\..\..\lib\common\xxhash.c"
>
</File>
@@ -432,6 +440,10 @@
RelativePath="..\..\..\programs\zstdcli.c"
>
</File>
+ <File
+ RelativePath="..\..\..\lib\compress\zstdmt_compress.c"
+ >
+ </File>
</Filter>
<Filter
Name="Header Files"
@@ -451,10 +463,6 @@
>
</File>
<File
- RelativePath="..\..\..\lib\common\zstd_errors.h"
- >
- </File>
- <File
RelativePath="..\..\..\lib\common\fse.h"
>
</File>
@@ -475,6 +483,14 @@
>
</File>
<File
+ RelativePath="..\..\..\lib\common\pool.h"
+ >
+ </File>
+ <File
+ RelativePath="..\..\..\lib\common\threading.h"
+ >
+ </File>
+ <File
RelativePath="..\..\..\lib\common\xxhash.h"
>
</File>
@@ -491,6 +507,10 @@
>
</File>
<File
+ RelativePath="..\..\..\lib\common\zstd_errors.h"
+ >
+ </File>
+ <File
RelativePath="..\..\..\lib\common\zstd_internal.h"
>
</File>
@@ -534,6 +554,10 @@
RelativePath="..\..\..\lib\legacy\zstd_v07.h"
>
</File>
+ <File
+ RelativePath="..\..\..\lib\compress\zstdmt_compress.h"
+ >
+ </File>
</Filter>
</Files>
<Globals>
diff --git a/build/VS2010/zstd/zstd.vcxproj b/build/VS2010/zstd/zstd.vcxproj
index 2b30966..3939c55 100644
--- a/build/VS2010/zstd/zstd.vcxproj
+++ b/build/VS2010/zstd/zstd.vcxproj
@@ -21,11 +21,14 @@
<ItemGroup>
<ClCompile Include="..\..\..\lib\common\entropy_common.c" />
<ClCompile Include="..\..\..\lib\common\error_private.c" />
+ <ClCompile Include="..\..\..\lib\common\pool.c" />
+ <ClCompile Include="..\..\..\lib\common\threading.c" />
<ClCompile Include="..\..\..\lib\common\xxhash.c" />
<ClCompile Include="..\..\..\lib\common\zstd_common.c" />
<ClCompile Include="..\..\..\lib\common\fse_decompress.c" />
<ClCompile Include="..\..\..\lib\compress\fse_compress.c" />
<ClCompile Include="..\..\..\lib\compress\huf_compress.c" />
+ <ClCompile Include="..\..\..\lib\compress\zstdmt_compress.c" />
<ClCompile Include="..\..\..\lib\compress\zstd_compress.c" />
<ClCompile Include="..\..\..\lib\decompress\huf_decompress.c" />
<ClCompile Include="..\..\..\lib\decompress\zstd_decompress.c" />
@@ -46,7 +49,10 @@
<ClCompile Include="..\..\..\programs\zstdcli.c" />
</ItemGroup>
<ItemGroup>
+ <ClInclude Include="..\..\..\lib\common\pool.h" />
+ <ClInclude Include="..\..\..\lib\common\threading.h" />
<ClInclude Include="..\..\..\lib\common\xxhash.h" />
+ <ClInclude Include="..\..\..\lib\compress\zstdmt_compress.h" />
<ClInclude Include="..\..\..\lib\dictBuilder\zdict.h" />
<ClInclude Include="..\..\..\lib\dictBuilder\divsufsort.h" />
<ClInclude Include="..\..\..\lib\common\fse.h" />
@@ -67,6 +73,7 @@
<ClInclude Include="..\..\..\programs\datagen.h" />
<ClInclude Include="..\..\..\programs\dibio.h" />
<ClInclude Include="..\..\..\programs\fileio.h" />
+ <ClInclude Include="..\..\..\programs\platform.h" />
<ClInclude Include="..\..\..\programs\util.h" />
</ItemGroup>
<ItemGroup>
@@ -138,7 +145,7 @@
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<LinkIncremental>false</LinkIncremental>
- <IncludePath>$(IncludePath);$(SolutionDir)..\..\lib;$(SolutionDir)..\..\lib\legacy;$(SolutionDir)..\..\lib\common;$(SolutionDir)..\..\lib\dictBuilder;$(UniversalCRT_IncludePath);</IncludePath>
+ <IncludePath>$(IncludePath);$(SolutionDir)..\..\lib;$(SolutionDir)..\..\lib\compress;$(SolutionDir)..\..\lib\legacy;$(SolutionDir)..\..\lib\common;$(SolutionDir)..\..\lib\dictBuilder;$(UniversalCRT_IncludePath);</IncludePath>
<RunCodeAnalysis>false</RunCodeAnalysis>
<LibraryPath>$(LibraryPath);</LibraryPath>
</PropertyGroup>
@@ -207,6 +214,7 @@
<TreatWarningAsError>false</TreatWarningAsError>
<EnablePREfast>false</EnablePREfast>
<RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <AdditionalOptions>/DZSTD_MULTITHREAD %(AdditionalOptions)</AdditionalOptions>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@@ -219,4 +227,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
-</Project>
+</Project>
\ No newline at end of file
diff --git a/build/cmake/lib/CMakeLists.txt b/build/cmake/lib/CMakeLists.txt
index 34a639c..db75278 100644
--- a/build/cmake/lib/CMakeLists.txt
+++ b/build/cmake/lib/CMakeLists.txt
@@ -1,30 +1,10 @@
# ################################################################
-# zstd - Makefile
-# Copyright (C) Yann Collet 2014-2016
-# All rights reserved.
-#
-# BSD license
-#
-# Redistribution and use in source and binary forms, with or without modification,
-# are permitted provided that the following conditions are met:
-#
-# * Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
-#
-# * Redistributions in binary form must reproduce the above copyright notice, this
-# list of conditions and the following disclaimer in the documentation and/or
-# other materials provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
-# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
-# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+# * Copyright (c) 2014-present, Yann Collet, Facebook, Inc.
+# * All rights reserved.
+# *
+# * This source code is licensed under the BSD-style license found in the
+# * LICENSE file in the root directory of this source tree. An additional grant
+# * of patent rights can be found in the PATENTS file in the same directory.
#
# You can contact the author at :
# - zstd homepage : http://www.zstd.net/
@@ -58,13 +38,16 @@
SET(Sources
${LIBRARY_DIR}/common/entropy_common.c
+ ${LIBRARY_DIR}/common/fse_decompress.c
+ ${LIBRARY_DIR}/common/threading.c
+ ${LIBRARY_DIR}/common/pool.c
${LIBRARY_DIR}/common/zstd_common.c
${LIBRARY_DIR}/common/error_private.c
${LIBRARY_DIR}/common/xxhash.c
- ${LIBRARY_DIR}/common/fse_decompress.c
${LIBRARY_DIR}/compress/fse_compress.c
${LIBRARY_DIR}/compress/huf_compress.c
${LIBRARY_DIR}/compress/zstd_compress.c
+ ${LIBRARY_DIR}/compress/zstdmt_compress.c
${LIBRARY_DIR}/decompress/huf_decompress.c
${LIBRARY_DIR}/decompress/zstd_decompress.c
${LIBRARY_DIR}/dictBuilder/cover.c
diff --git a/build/cmake/programs/CMakeLists.txt b/build/cmake/programs/CMakeLists.txt
index c2931b0..9b3c3ac 100644
--- a/build/cmake/programs/CMakeLists.txt
+++ b/build/cmake/programs/CMakeLists.txt
@@ -1,30 +1,10 @@
# ################################################################
-# zstd - Makefile
-# Copyright (C) Yann Collet 2014-2016
-# All rights reserved.
-#
-# BSD license
-#
-# Redistribution and use in source and binary forms, with or without modification,
-# are permitted provided that the following conditions are met:
-#
-# * Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
-#
-# * Redistributions in binary form must reproduce the above copyright notice, this
-# list of conditions and the following disclaimer in the documentation and/or
-# other materials provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
-# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
-# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+# * Copyright (c) 2015-present, Yann Collet, Facebook, Inc.
+# * All rights reserved.
+# *
+# * This source code is licensed under the BSD-style license found in the
+# * LICENSE file in the root directory of this source tree. An additional grant
+# * of patent rights can be found in the PATENTS file in the same directory.
#
# You can contact the author at :
# - zstd homepage : http://www.zstd.net/
@@ -40,7 +20,7 @@
# Define programs directory, where sources and header files are located
SET(LIBRARY_DIR ${ROOT_DIR}/lib)
SET(PROGRAMS_DIR ${ROOT_DIR}/programs)
-INCLUDE_DIRECTORIES(${PROGRAMS_DIR} ${LIBRARY_DIR} ${LIBRARY_DIR}/common ${LIBRARY_DIR}/dictBuilder)
+INCLUDE_DIRECTORIES(${PROGRAMS_DIR} ${LIBRARY_DIR} ${LIBRARY_DIR}/common ${LIBRARY_DIR}/compression ${LIBRARY_DIR}/dictBuilder)
IF (ZSTD_LEGACY_SUPPORT)
SET(PROGRAMS_LEGACY_DIR ${PROGRAMS_DIR}/legacy)
diff --git a/lib/Makefile b/lib/Makefile
index 01b4183..c4a5ecb 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -38,6 +38,8 @@
ZSTD_FILES+= $(wildcard legacy/*.c)
endif
+ZSTD_OBJ := $(patsubst %.c,%.o,$(ZSTD_FILES))
+
# OS X linker doesn't support -soname, and use different extension
# see : https://developer.apple.com/library/mac/documentation/DeveloperTools/Conceptual/DynamicLibraries/100-Articles/DynamicLibraryDesignGuidelines.html
ifeq ($(shell uname), Darwin)
@@ -62,10 +64,9 @@
all: lib
libzstd.a: ARFLAGS = rcs
-libzstd.a: $(ZSTD_FILES)
+libzstd.a: $(ZSTD_OBJ)
@echo compiling static library
- @$(CC) $(FLAGS) -c $^
- @$(AR) $(ARFLAGS) $@ *.o
+ @$(AR) $(ARFLAGS) $@ $^
$(LIBZSTD): LDFLAGS += -shared -fPIC -fvisibility=hidden
$(LIBZSTD): $(ZSTD_FILES)
@@ -86,7 +87,7 @@
clean:
@$(RM) core *.o *.a *.gcda *.$(SHARED_EXT) *.$(SHARED_EXT).* libzstd.pc dll/libzstd.dll dll/libzstd.lib
- @$(RM) decompress/*.o
+ @$(RM) common/*.o compress/*.o decompress/*.o dictBuilder/*.o legacy/*.o deprecated/*.o
@echo Cleaning library completed
#-----------------------------------------------------------------------------
diff --git a/lib/common/pool.c b/lib/common/pool.c
new file mode 100644
index 0000000..693217f
--- /dev/null
+++ b/lib/common/pool.c
@@ -0,0 +1,194 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+
+
+/* ====== Dependencies ======= */
+#include <stddef.h> /* size_t */
+#include <stdlib.h> /* malloc, calloc, free */
+#include "pool.h"
+
+/* ====== Compiler specifics ====== */
+#if defined(_MSC_VER)
+# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
+#endif
+
+
+#ifdef ZSTD_MULTITHREAD
+
+#include <threading.h> /* pthread adaptation */
+
+/* A job is a function and an opaque argument */
+typedef struct POOL_job_s {
+ POOL_function function;
+ void *opaque;
+} POOL_job;
+
+struct POOL_ctx_s {
+ /* Keep track of the threads */
+ pthread_t *threads;
+ size_t numThreads;
+
+ /* The queue is a circular buffer */
+ POOL_job *queue;
+ size_t queueHead;
+ size_t queueTail;
+ size_t queueSize;
+ /* The mutex protects the queue */
+ pthread_mutex_t queueMutex;
+ /* Condition variable for pushers to wait on when the queue is full */
+ pthread_cond_t queuePushCond;
+ /* Condition variables for poppers to wait on when the queue is empty */
+ pthread_cond_t queuePopCond;
+ /* Indicates if the queue is shutting down */
+ int shutdown;
+};
+
+/* POOL_thread() :
+ Work thread for the thread pool.
+ Waits for jobs and executes them.
+ @returns : NULL on failure else non-null.
+*/
+static void* POOL_thread(void* opaque) {
+ POOL_ctx* const ctx = (POOL_ctx*)opaque;
+ if (!ctx) { return NULL; }
+ for (;;) {
+ /* Lock the mutex and wait for a non-empty queue or until shutdown */
+ pthread_mutex_lock(&ctx->queueMutex);
+ while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) {
+ pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
+ }
+ /* empty => shutting down: so stop */
+ if (ctx->queueHead == ctx->queueTail) {
+ pthread_mutex_unlock(&ctx->queueMutex);
+ return opaque;
+ }
+ /* Pop a job off the queue */
+ { POOL_job const job = ctx->queue[ctx->queueHead];
+ ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
+ /* Unlock the mutex, signal a pusher, and run the job */
+ pthread_mutex_unlock(&ctx->queueMutex);
+ pthread_cond_signal(&ctx->queuePushCond);
+ job.function(job.opaque);
+ }
+ }
+ /* Unreachable */
+}
+
+POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
+ POOL_ctx *ctx;
+ /* Check the parameters */
+ if (!numThreads || !queueSize) { return NULL; }
+ /* Allocate the context and zero initialize */
+ ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx));
+ if (!ctx) { return NULL; }
+ /* Initialize the job queue.
+ * It needs one extra space since one space is wasted to differentiate empty
+ * and full queues.
+ */
+ ctx->queueSize = queueSize + 1;
+ ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job));
+ ctx->queueHead = 0;
+ ctx->queueTail = 0;
+ pthread_mutex_init(&ctx->queueMutex, NULL);
+ pthread_cond_init(&ctx->queuePushCond, NULL);
+ pthread_cond_init(&ctx->queuePopCond, NULL);
+ ctx->shutdown = 0;
+ /* Allocate space for the thread handles */
+ ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t));
+ ctx->numThreads = 0;
+ /* Check for errors */
+ if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
+ /* Initialize the threads */
+ { size_t i;
+ for (i = 0; i < numThreads; ++i) {
+ if (pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
+ ctx->numThreads = i;
+ POOL_free(ctx);
+ return NULL;
+ } }
+ ctx->numThreads = numThreads;
+ }
+ return ctx;
+}
+
+/*! POOL_join() :
+ Shutdown the queue, wake any sleeping threads, and join all of the threads.
+*/
+static void POOL_join(POOL_ctx *ctx) {
+ /* Shut down the queue */
+ pthread_mutex_lock(&ctx->queueMutex);
+ ctx->shutdown = 1;
+ pthread_mutex_unlock(&ctx->queueMutex);
+ /* Wake up sleeping threads */
+ pthread_cond_broadcast(&ctx->queuePushCond);
+ pthread_cond_broadcast(&ctx->queuePopCond);
+ /* Join all of the threads */
+ { size_t i;
+ for (i = 0; i < ctx->numThreads; ++i) {
+ pthread_join(ctx->threads[i], NULL);
+ } }
+}
+
+void POOL_free(POOL_ctx *ctx) {
+ if (!ctx) { return; }
+ POOL_join(ctx);
+ pthread_mutex_destroy(&ctx->queueMutex);
+ pthread_cond_destroy(&ctx->queuePushCond);
+ pthread_cond_destroy(&ctx->queuePopCond);
+ if (ctx->queue) free(ctx->queue);
+ if (ctx->threads) free(ctx->threads);
+ free(ctx);
+}
+
+void POOL_add(void *ctxVoid, POOL_function function, void *opaque) {
+ POOL_ctx *ctx = (POOL_ctx *)ctxVoid;
+ if (!ctx) { return; }
+
+ pthread_mutex_lock(&ctx->queueMutex);
+ { POOL_job const job = {function, opaque};
+ /* Wait until there is space in the queue for the new job */
+ size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
+ while (ctx->queueHead == newTail && !ctx->shutdown) {
+ pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
+ newTail = (ctx->queueTail + 1) % ctx->queueSize;
+ }
+ /* The queue is still going => there is space */
+ if (!ctx->shutdown) {
+ ctx->queue[ctx->queueTail] = job;
+ ctx->queueTail = newTail;
+ }
+ }
+ pthread_mutex_unlock(&ctx->queueMutex);
+ pthread_cond_signal(&ctx->queuePopCond);
+}
+
+#else /* ZSTD_MULTITHREAD not defined */
+/* No multi-threading support */
+
+/* We don't need any data, but if it is empty malloc() might return NULL. */
+struct POOL_ctx_s {
+ int data;
+};
+
+POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) {
+ (void)numThreads;
+ (void)queueSize;
+ return (POOL_ctx *)malloc(sizeof(POOL_ctx));
+}
+
+void POOL_free(POOL_ctx *ctx) {
+ if (ctx) free(ctx);
+}
+
+void POOL_add(void *ctx, POOL_function function, void *opaque) {
+ (void)ctx;
+ function(opaque);
+}
+
+#endif /* ZSTD_MULTITHREAD */
diff --git a/lib/common/pool.h b/lib/common/pool.h
new file mode 100644
index 0000000..c26f543
--- /dev/null
+++ b/lib/common/pool.h
@@ -0,0 +1,46 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+#ifndef POOL_H
+#define POOL_H
+
+#include <stddef.h> /* size_t */
+
+typedef struct POOL_ctx_s POOL_ctx;
+
+/*! POOL_create() :
+ Create a thread pool with at most `numThreads` threads.
+ `numThreads` must be at least 1.
+ The maximum number of queued jobs before blocking is `queueSize`.
+ `queueSize` must be at least 1.
+ @return : The POOL_ctx pointer on success else NULL.
+*/
+POOL_ctx *POOL_create(size_t numThreads, size_t queueSize);
+
+/*! POOL_free() :
+ Free a thread pool returned by POOL_create().
+*/
+void POOL_free(POOL_ctx *ctx);
+
+/*! POOL_function :
+ The function type that can be added to a thread pool.
+*/
+typedef void (*POOL_function)(void *);
+/*! POOL_add_function :
+ The function type for a generic thread pool add function.
+*/
+typedef void (*POOL_add_function)(void *, POOL_function, void *);
+
+/*! POOL_add() :
+ Add the job `function(opaque)` to the thread pool.
+ Possibly blocks until there is room in the queue.
+ Note : The function may be executed asynchronously, so `opaque` must live until the function has been completed.
+*/
+void POOL_add(void *ctx, POOL_function function, void *opaque);
+
+#endif
diff --git a/lib/common/threading.c b/lib/common/threading.c
new file mode 100644
index 0000000..b56e594
--- /dev/null
+++ b/lib/common/threading.c
@@ -0,0 +1,79 @@
+
+/**
+ * Copyright (c) 2016 Tino Reichardt
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ *
+ * You can contact the author at:
+ * - zstdmt source repository: https://github.com/mcmilk/zstdmt
+ */
+
+/**
+ * This file will hold wrapper for systems, which do not support pthreads
+ */
+
+/* ====== Compiler specifics ====== */
+#if defined(_MSC_VER)
+# pragma warning(disable : 4206) /* disable: C4206: translation unit is empty (when ZSTD_MULTITHREAD is not defined) */
+#endif
+
+
+#if defined(ZSTD_MULTITHREAD) && defined(_WIN32)
+
+/**
+ * Windows minimalist Pthread Wrapper, based on :
+ * http://www.cse.wustl.edu/~schmidt/win32-cv-1.html
+ */
+
+
+/* === Dependencies === */
+#include <process.h>
+#include <errno.h>
+#include "threading.h"
+
+
+/* === Implementation === */
+
+static unsigned __stdcall worker(void *arg)
+{
+ pthread_t* const thread = (pthread_t*) arg;
+ thread->arg = thread->start_routine(thread->arg);
+ return 0;
+}
+
+int pthread_create(pthread_t* thread, const void* unused,
+ void* (*start_routine) (void*), void* arg)
+{
+ (void)unused;
+ thread->arg = arg;
+ thread->start_routine = start_routine;
+ thread->handle = (HANDLE) _beginthreadex(NULL, 0, worker, thread, 0, NULL);
+
+ if (!thread->handle)
+ return errno;
+ else
+ return 0;
+}
+
+int _pthread_join(pthread_t * thread, void **value_ptr)
+{
+ DWORD result;
+
+ if (!thread->handle) return 0;
+
+ result = WaitForSingleObject(thread->handle, INFINITE);
+ switch (result) {
+ case WAIT_OBJECT_0:
+ if (value_ptr) *value_ptr = thread->arg;
+ return 0;
+ case WAIT_ABANDONED:
+ return EINVAL;
+ default:
+ return GetLastError();
+ }
+}
+
+#endif /* ZSTD_MULTITHREAD */
diff --git a/lib/common/threading.h b/lib/common/threading.h
new file mode 100644
index 0000000..74b2ec0
--- /dev/null
+++ b/lib/common/threading.h
@@ -0,0 +1,104 @@
+
+/**
+ * Copyright (c) 2016 Tino Reichardt
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ *
+ * You can contact the author at:
+ * - zstdmt source repository: https://github.com/mcmilk/zstdmt
+ */
+
+#ifndef THREADING_H_938743
+#define THREADING_H_938743
+
+#if defined (__cplusplus)
+extern "C" {
+#endif
+
+#if defined(ZSTD_MULTITHREAD) && defined(_WIN32)
+
+/**
+ * Windows minimalist Pthread Wrapper, based on :
+ * http://www.cse.wustl.edu/~schmidt/win32-cv-1.html
+ */
+#ifdef WINVER
+# undef WINVER
+#endif
+#define WINVER 0x0600
+
+#ifdef _WIN32_WINNT
+# undef _WIN32_WINNT
+#endif
+#define _WIN32_WINNT 0x0600
+
+#ifndef WIN32_LEAN_AND_MEAN
+# define WIN32_LEAN_AND_MEAN
+#endif
+
+#include <windows.h>
+
+/* mutex */
+#define pthread_mutex_t CRITICAL_SECTION
+#define pthread_mutex_init(a,b) InitializeCriticalSection((a))
+#define pthread_mutex_destroy(a) DeleteCriticalSection((a))
+#define pthread_mutex_lock(a) EnterCriticalSection((a))
+#define pthread_mutex_unlock(a) LeaveCriticalSection((a))
+
+/* condition variable */
+#define pthread_cond_t CONDITION_VARIABLE
+#define pthread_cond_init(a, b) InitializeConditionVariable((a))
+#define pthread_cond_destroy(a) /* No delete */
+#define pthread_cond_wait(a, b) SleepConditionVariableCS((a), (b), INFINITE)
+#define pthread_cond_signal(a) WakeConditionVariable((a))
+#define pthread_cond_broadcast(a) WakeAllConditionVariable((a))
+
+/* pthread_create() and pthread_join() */
+typedef struct {
+ HANDLE handle;
+ void* (*start_routine)(void*);
+ void* arg;
+} pthread_t;
+
+int pthread_create(pthread_t* thread, const void* unused,
+ void* (*start_routine) (void*), void* arg);
+
+#define pthread_join(a, b) _pthread_join(&(a), (b))
+int _pthread_join(pthread_t* thread, void** value_ptr);
+
+/**
+ * add here more wrappers as required
+ */
+
+
+#elif defined(ZSTD_MULTITHREAD) /* posix assumed ; need a better detection mathod */
+/* === POSIX Systems === */
+# include <pthread.h>
+
+#else /* ZSTD_MULTITHREAD not defined */
+/* No multithreading support */
+
+#define pthread_mutex_t int /* #define rather than typedef, as sometimes pthread support is implicit, resulting in duplicated symbols */
+#define pthread_mutex_init(a,b)
+#define pthread_mutex_destroy(a)
+#define pthread_mutex_lock(a)
+#define pthread_mutex_unlock(a)
+
+#define pthread_cond_t int
+#define pthread_cond_init(a,b)
+#define pthread_cond_destroy(a)
+#define pthread_cond_wait(a,b)
+#define pthread_cond_signal(a)
+#define pthread_cond_broadcast(a)
+
+/* do not use pthread_t */
+
+#endif /* ZSTD_MULTITHREAD */
+
+#if defined (__cplusplus)
+}
+#endif
+
+#endif /* THREADING_H_938743 */
diff --git a/lib/common/zstd_internal.h b/lib/common/zstd_internal.h
index 96e0577..4b56ce1 100644
--- a/lib/common/zstd_internal.h
+++ b/lib/common/zstd_internal.h
@@ -267,4 +267,13 @@
}
+/* hidden functions */
+
+/* ZSTD_invalidateRepCodes() :
+ * ensures next compression will not use repcodes from previous block.
+ * Note : only works with regular variant;
+ * do not use with extDict variant ! */
+void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx);
+
+
#endif /* ZSTD_CCOMMON_H_MODULE */
diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c
index df53db4..b6cf376 100644
--- a/lib/compress/zstd_compress.c
+++ b/lib/compress/zstd_compress.c
@@ -60,10 +60,11 @@
U32 nextToUpdate; /* index from which to continue dictionary update */
U32 nextToUpdate3; /* index from which to continue dictionary update */
U32 hashLog3; /* dispatch table : larger == faster, more memory */
- U32 loadedDictEnd;
+ U32 loadedDictEnd; /* index of end of dictionary */
+ U32 forceWindow; /* force back-references to respect limit of 1<<wLog, even for dictionary */
ZSTD_compressionStage_e stage;
U32 rep[ZSTD_REP_NUM];
- U32 savedRep[ZSTD_REP_NUM];
+ U32 repToConfirm[ZSTD_REP_NUM];
U32 dictID;
ZSTD_parameters params;
void* workSpace;
@@ -100,7 +101,7 @@
cctx = (ZSTD_CCtx*) ZSTD_malloc(sizeof(ZSTD_CCtx), customMem);
if (!cctx) return NULL;
memset(cctx, 0, sizeof(ZSTD_CCtx));
- memcpy(&(cctx->customMem), &customMem, sizeof(customMem));
+ cctx->customMem = customMem;
return cctx;
}
@@ -118,6 +119,15 @@
return sizeof(*cctx) + cctx->workSpaceSize;
}
+size_t ZSTD_setCCtxParameter(ZSTD_CCtx* cctx, ZSTD_CCtxParameter param, unsigned value)
+{
+ switch(param)
+ {
+ case ZSTD_p_forceWindow : cctx->forceWindow = value>0; cctx->loadedDictEnd = 0; return 0;
+ default: return ERROR(parameter_unknown);
+ }
+}
+
const seqStore_t* ZSTD_getSeqStore(const ZSTD_CCtx* ctx) /* hidden interface */
{
return &(ctx->seqStore);
@@ -317,6 +327,14 @@
}
}
+/* ZSTD_invalidateRepCodes() :
+ * ensures next compression will not use repcodes from previous block.
+ * Note : only works with regular variant;
+ * do not use with extDict variant ! */
+void ZSTD_invalidateRepCodes(ZSTD_CCtx* cctx) {
+ int i;
+ for (i=0; i<ZSTD_REP_NUM; i++) cctx->rep[i] = 0;
+}
/*! ZSTD_copyCCtx() :
* Duplicate an existing context `srcCCtx` into another one `dstCCtx`.
@@ -734,12 +752,19 @@
if ((size_t)(op-ostart) >= maxCSize) return 0; }
/* confirm repcodes */
- { int i; for (i=0; i<ZSTD_REP_NUM; i++) zc->rep[i] = zc->savedRep[i]; }
+ { int i; for (i=0; i<ZSTD_REP_NUM; i++) zc->rep[i] = zc->repToConfirm[i]; }
return op - ostart;
}
+#if 0 /* for debug */
+# define STORESEQ_DEBUG
+#include <stdio.h> /* fprintf */
+U32 g_startDebug = 0;
+const BYTE* g_start = NULL;
+#endif
+
/*! ZSTD_storeSeq() :
Store a sequence (literal length, literals, offset code and match length code) into seqStore_t.
`offsetCode` : distance to match, or 0 == repCode.
@@ -747,13 +772,14 @@
*/
MEM_STATIC void ZSTD_storeSeq(seqStore_t* seqStorePtr, size_t litLength, const void* literals, U32 offsetCode, size_t matchCode)
{
-#if 0 /* for debug */
- static const BYTE* g_start = NULL;
- const U32 pos = (U32)((const BYTE*)literals - g_start);
- if (g_start==NULL) g_start = (const BYTE*)literals;
- //if ((pos > 1) && (pos < 50000))
- printf("Cpos %6u :%5u literals & match %3u bytes at distance %6u \n",
- pos, (U32)litLength, (U32)matchCode+MINMATCH, (U32)offsetCode);
+#ifdef STORESEQ_DEBUG
+ if (g_startDebug) {
+ const U32 pos = (U32)((const BYTE*)literals - g_start);
+ if (g_start==NULL) g_start = (const BYTE*)literals;
+ if ((pos > 1895000) && (pos < 1895300))
+ fprintf(stderr, "Cpos %6u :%5u literals & match %3u bytes at distance %6u \n",
+ pos, (U32)litLength, (U32)matchCode+MINMATCH, (U32)offsetCode);
+ }
#endif
/* copy Literals */
ZSTD_wildcopy(seqStorePtr->lit, literals, litLength);
@@ -1003,8 +1029,8 @@
} } }
/* save reps for next block */
- cctx->savedRep[0] = offset_1 ? offset_1 : offsetSaved;
- cctx->savedRep[1] = offset_2 ? offset_2 : offsetSaved;
+ cctx->repToConfirm[0] = offset_1 ? offset_1 : offsetSaved;
+ cctx->repToConfirm[1] = offset_2 ? offset_2 : offsetSaved;
/* Last Literals */
{ size_t const lastLLSize = iend - anchor;
@@ -1118,7 +1144,7 @@
} } }
/* save reps for next block */
- ctx->savedRep[0] = offset_1; ctx->savedRep[1] = offset_2;
+ ctx->repToConfirm[0] = offset_1; ctx->repToConfirm[1] = offset_2;
/* Last Literals */
{ size_t const lastLLSize = iend - anchor;
@@ -1272,8 +1298,8 @@
} } }
/* save reps for next block */
- cctx->savedRep[0] = offset_1 ? offset_1 : offsetSaved;
- cctx->savedRep[1] = offset_2 ? offset_2 : offsetSaved;
+ cctx->repToConfirm[0] = offset_1 ? offset_1 : offsetSaved;
+ cctx->repToConfirm[1] = offset_2 ? offset_2 : offsetSaved;
/* Last Literals */
{ size_t const lastLLSize = iend - anchor;
@@ -1422,7 +1448,7 @@
} } }
/* save reps for next block */
- ctx->savedRep[0] = offset_1; ctx->savedRep[1] = offset_2;
+ ctx->repToConfirm[0] = offset_1; ctx->repToConfirm[1] = offset_2;
/* Last Literals */
{ size_t const lastLLSize = iend - anchor;
@@ -1954,8 +1980,8 @@
} }
/* Save reps for next block */
- ctx->savedRep[0] = offset_1 ? offset_1 : savedOffset;
- ctx->savedRep[1] = offset_2 ? offset_2 : savedOffset;
+ ctx->repToConfirm[0] = offset_1 ? offset_1 : savedOffset;
+ ctx->repToConfirm[1] = offset_2 ? offset_2 : savedOffset;
/* Last Literals */
{ size_t const lastLLSize = iend - anchor;
@@ -2149,7 +2175,7 @@
} }
/* Save reps for next block */
- ctx->savedRep[0] = offset_1; ctx->savedRep[1] = offset_2;
+ ctx->repToConfirm[0] = offset_1; ctx->repToConfirm[1] = offset_2;
/* Last Literals */
{ size_t const lastLLSize = iend - anchor;
@@ -2408,12 +2434,14 @@
cctx->nextSrc = ip + srcSize;
- { size_t const cSize = frame ?
+ if (srcSize) {
+ size_t const cSize = frame ?
ZSTD_compress_generic (cctx, dst, dstCapacity, src, srcSize, lastFrameChunk) :
ZSTD_compressBlock_internal (cctx, dst, dstCapacity, src, srcSize);
if (ZSTD_isError(cSize)) return cSize;
return cSize + fhSize;
- }
+ } else
+ return fhSize;
}
@@ -2449,7 +2477,7 @@
zc->dictBase = zc->base;
zc->base += ip - zc->nextSrc;
zc->nextToUpdate = zc->dictLimit;
- zc->loadedDictEnd = (U32)(iend - zc->base);
+ zc->loadedDictEnd = zc->forceWindow ? 0 : (U32)(iend - zc->base);
zc->nextSrc = iend;
if (srcSize <= HASH_READ_SIZE) return 0;
@@ -2593,7 +2621,6 @@
}
}
-
/*! ZSTD_compressBegin_internal() :
* @return : 0, or an error code */
static size_t ZSTD_compressBegin_internal(ZSTD_CCtx* cctx,
@@ -2625,9 +2652,9 @@
}
-size_t ZSTD_compressBegin(ZSTD_CCtx* zc, int compressionLevel)
+size_t ZSTD_compressBegin(ZSTD_CCtx* cctx, int compressionLevel)
{
- return ZSTD_compressBegin_usingDict(zc, NULL, 0, compressionLevel);
+ return ZSTD_compressBegin_usingDict(cctx, NULL, 0, compressionLevel);
}
@@ -2815,7 +2842,7 @@
return ZSTD_getParamsFromCCtx(cdict->refContext);
}
-size_t ZSTD_compressBegin_usingCDict(ZSTD_CCtx* cctx, const ZSTD_CDict* cdict, U64 pledgedSrcSize)
+size_t ZSTD_compressBegin_usingCDict(ZSTD_CCtx* cctx, const ZSTD_CDict* cdict, unsigned long long pledgedSrcSize)
{
if (cdict->dictContentSize) CHECK_F(ZSTD_copyCCtx(cctx, cdict->refContext, pledgedSrcSize))
else CHECK_F(ZSTD_compressBegin_advanced(cctx, NULL, 0, cdict->refContext->params, pledgedSrcSize));
diff --git a/lib/compress/zstd_opt.h b/lib/compress/zstd_opt.h
index f071c4f..8862bbd 100644
--- a/lib/compress/zstd_opt.h
+++ b/lib/compress/zstd_opt.h
@@ -38,7 +38,7 @@
ssPtr->cachedLiterals = NULL;
ssPtr->cachedPrice = ssPtr->cachedLitLength = 0;
- ssPtr->staticPrices = 0;
+ ssPtr->staticPrices = 0;
if (ssPtr->litLengthSum == 0) {
if (srcSize <= 1024) ssPtr->staticPrices = 1;
@@ -56,7 +56,7 @@
for (u=0; u<=MaxLit; u++) {
ssPtr->litFreq[u] = 1 + (ssPtr->litFreq[u]>>ZSTD_FREQ_DIV);
- ssPtr->litSum += ssPtr->litFreq[u];
+ ssPtr->litSum += ssPtr->litFreq[u];
}
for (u=0; u<=MaxLL; u++)
ssPtr->litLengthFreq[u] = 1;
@@ -634,7 +634,7 @@
} } /* for (cur=0; cur < last_pos; ) */
/* Save reps for next block */
- { int i; for (i=0; i<ZSTD_REP_NUM; i++) ctx->savedRep[i] = rep[i]; }
+ { int i; for (i=0; i<ZSTD_REP_NUM; i++) ctx->repToConfirm[i] = rep[i]; }
/* Last Literals */
{ size_t const lastLLSize = iend - anchor;
@@ -825,7 +825,7 @@
match_num = ZSTD_BtGetAllMatches_selectMLS_extDict(ctx, inr, iend, maxSearches, mls, matches, minMatch);
- if (match_num > 0 && matches[match_num-1].len > sufficient_len) {
+ if (match_num > 0 && (matches[match_num-1].len > sufficient_len || cur + matches[match_num-1].len >= ZSTD_OPT_NUM)) {
best_mlen = matches[match_num-1].len;
best_off = matches[match_num-1].off;
last_pos = cur + 1;
@@ -835,7 +835,7 @@
/* set prices using matches at position = cur */
for (u = 0; u < match_num; u++) {
mlen = (u>0) ? matches[u-1].len+1 : best_mlen;
- best_mlen = (cur + matches[u].len < ZSTD_OPT_NUM) ? matches[u].len : ZSTD_OPT_NUM - cur;
+ best_mlen = matches[u].len;
while (mlen <= best_mlen) {
if (opt[cur].mlen == 1) {
@@ -907,7 +907,7 @@
} } /* for (cur=0; cur < last_pos; ) */
/* Save reps for next block */
- { int i; for (i=0; i<ZSTD_REP_NUM; i++) ctx->savedRep[i] = rep[i]; }
+ { int i; for (i=0; i<ZSTD_REP_NUM; i++) ctx->repToConfirm[i] = rep[i]; }
/* Last Literals */
{ size_t lastLLSize = iend - anchor;
diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c
new file mode 100644
index 0000000..5f0bf2a
--- /dev/null
+++ b/lib/compress/zstdmt_compress.c
@@ -0,0 +1,732 @@
+/**
+ * Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+
+
+/* ====== Tuning parameters ====== */
+#define ZSTDMT_NBTHREADS_MAX 128
+
+
+/* ====== Compiler specifics ====== */
+#if defined(_MSC_VER)
+# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
+#endif
+
+
+/* ====== Dependencies ====== */
+#include <stdlib.h> /* malloc */
+#include <string.h> /* memcpy */
+#include <pool.h> /* threadpool */
+#include "threading.h" /* mutex */
+#include "zstd_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
+#include "zstdmt_compress.h"
+#define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */
+#include "xxhash.h"
+
+
+/* ====== Debug ====== */
+#if 0
+
+# include <stdio.h>
+# include <unistd.h>
+# include <sys/times.h>
+ static unsigned g_debugLevel = 3;
+# define DEBUGLOGRAW(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __VA_ARGS__); }
+# define DEBUGLOG(l, ...) if (l<=g_debugLevel) { fprintf(stderr, __FILE__ ": "); fprintf(stderr, __VA_ARGS__); fprintf(stderr, " \n"); }
+
+# define DEBUG_PRINTHEX(l,p,n) { \
+ unsigned debug_u; \
+ for (debug_u=0; debug_u<(n); debug_u++) \
+ DEBUGLOGRAW(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \
+ DEBUGLOGRAW(l, " \n"); \
+}
+
+static unsigned long long GetCurrentClockTimeMicroseconds()
+{
+ static clock_t _ticksPerSecond = 0;
+ if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);
+
+ struct tms junk; clock_t newTicks = (clock_t) times(&junk);
+ return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond);
+}
+
+#define MUTEX_WAIT_TIME_DLEVEL 5
+#define PTHREAD_MUTEX_LOCK(mutex) \
+if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \
+ unsigned long long beforeTime = GetCurrentClockTimeMicroseconds(); \
+ pthread_mutex_lock(mutex); \
+ unsigned long long afterTime = GetCurrentClockTimeMicroseconds(); \
+ unsigned long long elapsedTime = (afterTime-beforeTime); \
+ if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \
+ DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \
+ elapsedTime, #mutex); \
+ } \
+} else pthread_mutex_lock(mutex);
+
+#else
+
+# define DEBUGLOG(l, ...) {} /* disabled */
+# define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m)
+# define DEBUG_PRINTHEX(l,p,n) {}
+
+#endif
+
+
+/* ===== Buffer Pool ===== */
+
+typedef struct buffer_s {
+ void* start;
+ size_t size;
+} buffer_t;
+
+static const buffer_t g_nullBuffer = { NULL, 0 };
+
+typedef struct ZSTDMT_bufferPool_s {
+ unsigned totalBuffers;
+ unsigned nbBuffers;
+ buffer_t bTable[1]; /* variable size */
+} ZSTDMT_bufferPool;
+
+static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads)
+{
+ unsigned const maxNbBuffers = 2*nbThreads + 2;
+ ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)calloc(1, sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t));
+ if (bufPool==NULL) return NULL;
+ bufPool->totalBuffers = maxNbBuffers;
+ bufPool->nbBuffers = 0;
+ return bufPool;
+}
+
+static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
+{
+ unsigned u;
+ if (!bufPool) return; /* compatibility with free on NULL */
+ for (u=0; u<bufPool->totalBuffers; u++)
+ free(bufPool->bTable[u].start);
+ free(bufPool);
+}
+
+/* assumption : invocation from main thread only ! */
+static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
+{
+ if (pool->nbBuffers) { /* try to use an existing buffer */
+ buffer_t const buf = pool->bTable[--(pool->nbBuffers)];
+ size_t const availBufferSize = buf.size;
+ if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */
+ return buf;
+ free(buf.start); /* size conditions not respected : scratch this buffer and create a new one */
+ }
+ /* create new buffer */
+ { buffer_t buffer;
+ void* const start = malloc(bSize);
+ if (start==NULL) bSize = 0;
+ buffer.start = start; /* note : start can be NULL if malloc fails ! */
+ buffer.size = bSize;
+ return buffer;
+ }
+}
+
+/* store buffer for later re-use, up to pool capacity */
+static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
+{
+ if (buf.start == NULL) return; /* release on NULL */
+ if (pool->nbBuffers < pool->totalBuffers) {
+ pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */
+ return;
+ }
+ /* Reached bufferPool capacity (should not happen) */
+ free(buf.start);
+}
+
+
+/* ===== CCtx Pool ===== */
+
+typedef struct {
+ unsigned totalCCtx;
+ unsigned availCCtx;
+ ZSTD_CCtx* cctx[1]; /* variable size */
+} ZSTDMT_CCtxPool;
+
+/* assumption : CCtxPool invocation only from main thread */
+
+/* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */
+static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
+{
+ unsigned u;
+ for (u=0; u<pool->totalCCtx; u++)
+ ZSTD_freeCCtx(pool->cctx[u]); /* note : compatible with free on NULL */
+ free(pool);
+}
+
+/* ZSTDMT_createCCtxPool() :
+ * implies nbThreads >= 1 , checked by caller ZSTDMT_createCCtx() */
+static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads)
+{
+ ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*));
+ if (!cctxPool) return NULL;
+ cctxPool->totalCCtx = nbThreads;
+ cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */
+ cctxPool->cctx[0] = ZSTD_createCCtx();
+ if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }
+ DEBUGLOG(1, "cctxPool created, with %u threads", nbThreads);
+ return cctxPool;
+}
+
+static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* pool)
+{
+ if (pool->availCCtx) {
+ pool->availCCtx--;
+ return pool->cctx[pool->availCCtx];
+ }
+ return ZSTD_createCCtx(); /* note : can be NULL, when creation fails ! */
+}
+
+static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
+{
+ if (cctx==NULL) return; /* compatibility with release on NULL */
+ if (pool->availCCtx < pool->totalCCtx)
+ pool->cctx[pool->availCCtx++] = cctx;
+ else
+ /* pool overflow : should not happen, since totalCCtx==nbThreads */
+ ZSTD_freeCCtx(cctx);
+}
+
+
+/* ===== Thread worker ===== */
+
+typedef struct {
+ buffer_t buffer;
+ size_t filled;
+} inBuff_t;
+
+typedef struct {
+ ZSTD_CCtx* cctx;
+ buffer_t src;
+ const void* srcStart;
+ size_t srcSize;
+ size_t dictSize;
+ buffer_t dstBuff;
+ size_t cSize;
+ size_t dstFlushed;
+ unsigned firstChunk;
+ unsigned lastChunk;
+ unsigned jobCompleted;
+ unsigned jobScanned;
+ pthread_mutex_t* jobCompleted_mutex;
+ pthread_cond_t* jobCompleted_cond;
+ ZSTD_parameters params;
+ ZSTD_CDict* cdict;
+ unsigned long long fullFrameSize;
+} ZSTDMT_jobDescription;
+
+/* ZSTDMT_compressChunk() : POOL_function type */
+void ZSTDMT_compressChunk(void* jobDescription)
+{
+ ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
+ const void* const src = (const char*)job->srcStart + job->dictSize;
+ buffer_t const dstBuff = job->dstBuff;
+ DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
+ if (job->cdict) {
+ size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize);
+ if (job->cdict) DEBUGLOG(3, "using CDict ");
+ if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
+ } else {
+ size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize);
+ if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
+ ZSTD_setCCtxParameter(job->cctx, ZSTD_p_forceWindow, 1);
+ }
+ if (!job->firstChunk) { /* flush frame header */
+ size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, 0);
+ if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
+ ZSTD_invalidateRepCodes(job->cctx);
+ }
+
+ DEBUGLOG(4, "Compressing : ");
+ DEBUG_PRINTHEX(4, job->srcStart, 12);
+ job->cSize = (job->lastChunk) ? /* last chunk signal */
+ ZSTD_compressEnd (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
+ ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
+ DEBUGLOG(3, "compressed %u bytes into %u bytes (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
+
+_endJob:
+ PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
+ job->jobCompleted = 1;
+ job->jobScanned = 0;
+ pthread_cond_signal(job->jobCompleted_cond);
+ pthread_mutex_unlock(job->jobCompleted_mutex);
+}
+
+
+/* ------------------------------------------ */
+/* ===== Multi-threaded compression ===== */
+/* ------------------------------------------ */
+
+struct ZSTDMT_CCtx_s {
+ POOL_ctx* factory;
+ ZSTDMT_bufferPool* buffPool;
+ ZSTDMT_CCtxPool* cctxPool;
+ pthread_mutex_t jobCompleted_mutex;
+ pthread_cond_t jobCompleted_cond;
+ size_t targetSectionSize;
+ size_t inBuffSize;
+ size_t dictSize;
+ size_t targetDictSize;
+ inBuff_t inBuff;
+ ZSTD_parameters params;
+ XXH64_state_t xxhState;
+ unsigned nbThreads;
+ unsigned jobIDMask;
+ unsigned doneJobID;
+ unsigned nextJobID;
+ unsigned frameEnded;
+ unsigned allJobsCompleted;
+ unsigned overlapWrLog;
+ unsigned long long frameContentSize;
+ size_t sectionSize;
+ ZSTD_CDict* cdict;
+ ZSTD_CStream* cstream;
+ ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */
+};
+
+ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
+{
+ ZSTDMT_CCtx* cctx;
+ U32 const minNbJobs = nbThreads + 2;
+ U32 const nbJobsLog2 = ZSTD_highbit32(minNbJobs) + 1;
+ U32 const nbJobs = 1 << nbJobsLog2;
+ DEBUGLOG(5, "nbThreads : %u ; minNbJobs : %u ; nbJobsLog2 : %u ; nbJobs : %u \n",
+ nbThreads, minNbJobs, nbJobsLog2, nbJobs);
+ if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
+ cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbJobs*sizeof(ZSTDMT_jobDescription));
+ if (!cctx) return NULL;
+ cctx->nbThreads = nbThreads;
+ cctx->jobIDMask = nbJobs - 1;
+ cctx->allJobsCompleted = 1;
+ cctx->sectionSize = 0;
+ cctx->overlapWrLog = 3;
+ cctx->factory = POOL_create(nbThreads, 1);
+ cctx->buffPool = ZSTDMT_createBufferPool(nbThreads);
+ cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads);
+ if (!cctx->factory | !cctx->buffPool | !cctx->cctxPool) { /* one object was not created */
+ ZSTDMT_freeCCtx(cctx);
+ return NULL;
+ }
+ if (nbThreads==1) {
+ cctx->cstream = ZSTD_createCStream();
+ if (!cctx->cstream) {
+ ZSTDMT_freeCCtx(cctx); return NULL;
+ } }
+ pthread_mutex_init(&cctx->jobCompleted_mutex, NULL); /* Todo : check init function return */
+ pthread_cond_init(&cctx->jobCompleted_cond, NULL);
+ DEBUGLOG(4, "mt_cctx created, for %u threads \n", nbThreads);
+ return cctx;
+}
+
+/* ZSTDMT_releaseAllJobResources() :
+ * Ensure all workers are killed first. */
+static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
+{
+ unsigned jobID;
+ for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {
+ ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].dstBuff);
+ mtctx->jobs[jobID].dstBuff = g_nullBuffer;
+ ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[jobID].src);
+ mtctx->jobs[jobID].src = g_nullBuffer;
+ ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[jobID].cctx);
+ mtctx->jobs[jobID].cctx = NULL;
+ }
+ memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
+ ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->inBuff.buffer);
+ mtctx->inBuff.buffer = g_nullBuffer;
+ mtctx->allJobsCompleted = 1;
+}
+
+size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
+{
+ if (mtctx==NULL) return 0; /* compatible with free on NULL */
+ POOL_free(mtctx->factory);
+ if (!mtctx->allJobsCompleted) ZSTDMT_releaseAllJobResources(mtctx); /* stop workers first */
+ ZSTDMT_freeBufferPool(mtctx->buffPool); /* release job resources into pools first */
+ ZSTDMT_freeCCtxPool(mtctx->cctxPool);
+ ZSTD_freeCDict(mtctx->cdict);
+ ZSTD_freeCStream(mtctx->cstream);
+ pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
+ pthread_cond_destroy(&mtctx->jobCompleted_cond);
+ free(mtctx);
+ return 0;
+}
+
+size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
+{
+ switch(parameter)
+ {
+ case ZSTDMT_p_sectionSize :
+ mtctx->sectionSize = value;
+ return 0;
+ case ZSTDMT_p_overlapSectionRLog :
+ mtctx->overlapWrLog = value;
+ return 0;
+ default :
+ return ERROR(compressionParameter_unsupported);
+ }
+}
+
+
+/* ------------------------------------------ */
+/* ===== Multi-threaded compression ===== */
+/* ------------------------------------------ */
+
+size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
+ void* dst, size_t dstCapacity,
+ const void* src, size_t srcSize,
+ int compressionLevel)
+{
+ ZSTD_parameters params = ZSTD_getParams(compressionLevel, srcSize, 0);
+ size_t const chunkTargetSize = (size_t)1 << (params.cParams.windowLog + 2);
+ unsigned const nbChunksMax = (unsigned)(srcSize / chunkTargetSize) + (srcSize < chunkTargetSize) /* min 1 */;
+ unsigned nbChunks = MIN(nbChunksMax, mtctx->nbThreads);
+ size_t const proposedChunkSize = (srcSize + (nbChunks-1)) / nbChunks;
+ size_t const avgChunkSize = ((proposedChunkSize & 0x1FFFF) < 0xFFFF) ? proposedChunkSize + 0xFFFF : proposedChunkSize; /* avoid too small last block */
+ size_t remainingSrcSize = srcSize;
+ const char* const srcStart = (const char*)src;
+ size_t frameStartPos = 0;
+
+ DEBUGLOG(3, "windowLog : %2u => chunkTargetSize : %u bytes ", params.cParams.windowLog, (U32)chunkTargetSize);
+ DEBUGLOG(2, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize);
+ params.fParams.contentSizeFlag = 1;
+
+ if (nbChunks==1) { /* fallback to single-thread mode */
+ ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
+ return ZSTD_compressCCtx(cctx, dst, dstCapacity, src, srcSize, compressionLevel);
+ }
+
+ { unsigned u;
+ for (u=0; u<nbChunks; u++) {
+ size_t const chunkSize = MIN(remainingSrcSize, avgChunkSize);
+ size_t const dstBufferCapacity = u ? ZSTD_compressBound(chunkSize) : dstCapacity;
+ buffer_t const dstAsBuffer = { dst, dstCapacity };
+ buffer_t const dstBuffer = u ? ZSTDMT_getBuffer(mtctx->buffPool, dstBufferCapacity) : dstAsBuffer;
+ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(mtctx->cctxPool);
+
+ if ((cctx==NULL) || (dstBuffer.start==NULL)) {
+ mtctx->jobs[u].cSize = ERROR(memory_allocation); /* job result */
+ mtctx->jobs[u].jobCompleted = 1;
+ nbChunks = u+1;
+ break; /* let's wait for previous jobs to complete, but don't start new ones */
+ }
+
+ mtctx->jobs[u].srcStart = srcStart + frameStartPos;
+ mtctx->jobs[u].srcSize = chunkSize;
+ mtctx->jobs[u].fullFrameSize = srcSize;
+ mtctx->jobs[u].params = params;
+ mtctx->jobs[u].dstBuff = dstBuffer;
+ mtctx->jobs[u].cctx = cctx;
+ mtctx->jobs[u].firstChunk = (u==0);
+ mtctx->jobs[u].lastChunk = (u==nbChunks-1);
+ mtctx->jobs[u].jobCompleted = 0;
+ mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex;
+ mtctx->jobs[u].jobCompleted_cond = &mtctx->jobCompleted_cond;
+
+ DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)chunkSize);
+ DEBUG_PRINTHEX(3, mtctx->jobs[u].srcStart, 12);
+ POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
+
+ frameStartPos += chunkSize;
+ remainingSrcSize -= chunkSize;
+ } }
+ /* note : since nbChunks <= nbThreads, all jobs should be running immediately in parallel */
+
+ { unsigned chunkID;
+ size_t error = 0, dstPos = 0;
+ for (chunkID=0; chunkID<nbChunks; chunkID++) {
+ DEBUGLOG(3, "waiting for chunk %u ", chunkID);
+ PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
+ while (mtctx->jobs[chunkID].jobCompleted==0) {
+ DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", chunkID);
+ pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
+ }
+ pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
+ DEBUGLOG(3, "ready to write chunk %u ", chunkID);
+
+ ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[chunkID].cctx);
+ mtctx->jobs[chunkID].cctx = NULL;
+ mtctx->jobs[chunkID].srcStart = NULL;
+ { size_t const cSize = mtctx->jobs[chunkID].cSize;
+ if (ZSTD_isError(cSize)) error = cSize;
+ if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
+ if (chunkID) { /* note : chunk 0 is already written directly into dst */
+ if (!error) memcpy((char*)dst + dstPos, mtctx->jobs[chunkID].dstBuff.start, cSize);
+ ZSTDMT_releaseBuffer(mtctx->buffPool, mtctx->jobs[chunkID].dstBuff);
+ mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
+ }
+ dstPos += cSize ;
+ }
+ }
+ if (!error) DEBUGLOG(3, "compressed size : %u ", (U32)dstPos);
+ return error ? error : dstPos;
+ }
+
+}
+
+
+/* ====================================== */
+/* ======= Streaming API ======= */
+/* ====================================== */
+
+static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) {
+ while (zcs->doneJobID < zcs->nextJobID) {
+ unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
+ PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
+ while (zcs->jobs[jobID].jobCompleted==0) {
+ DEBUGLOG(4, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
+ pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
+ }
+ pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+ zcs->doneJobID++;
+ }
+}
+
+
+static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
+ const void* dict, size_t dictSize, unsigned updateDict,
+ ZSTD_parameters params, unsigned long long pledgedSrcSize)
+{
+ ZSTD_customMem const cmem = { NULL, NULL, NULL };
+ DEBUGLOG(3, "Started new compression, with windowLog : %u", params.cParams.windowLog);
+ if (zcs->nbThreads==1) return ZSTD_initCStream_advanced(zcs->cstream, dict, dictSize, params, pledgedSrcSize);
+ if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ zcs->allJobsCompleted = 1;
+ }
+ zcs->params = params;
+ if (updateDict) {
+ ZSTD_freeCDict(zcs->cdict); zcs->cdict = NULL;
+ if (dict && dictSize) {
+ zcs->cdict = ZSTD_createCDict_advanced(dict, dictSize, 0, params, cmem);
+ if (zcs->cdict == NULL) return ERROR(memory_allocation);
+ } }
+ zcs->frameContentSize = pledgedSrcSize;
+ zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2);
+ zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
+ zcs->targetDictSize = zcs->overlapWrLog < 10 ? (size_t)1 << (zcs->params.cParams.windowLog - zcs->overlapWrLog) : 0;
+ zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog) /* margin */ + zcs->targetDictSize;
+ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
+ if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
+ zcs->inBuff.filled = 0;
+ zcs->dictSize = 0;
+ zcs->doneJobID = 0;
+ zcs->nextJobID = 0;
+ zcs->frameEnded = 0;
+ zcs->allJobsCompleted = 0;
+ if (params.fParams.checksumFlag) XXH64_reset(&zcs->xxhState, 0);
+ return 0;
+}
+
+size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs,
+ const void* dict, size_t dictSize,
+ ZSTD_parameters params, unsigned long long pledgedSrcSize)
+{
+ return ZSTDMT_initCStream_internal(zcs, dict, dictSize, 1, params, pledgedSrcSize);
+}
+
+/* ZSTDMT_resetCStream() :
+ * pledgedSrcSize is optional and can be zero == unknown */
+size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* zcs, unsigned long long pledgedSrcSize)
+{
+ if (zcs->nbThreads==1) return ZSTD_resetCStream(zcs->cstream, pledgedSrcSize);
+ return ZSTDMT_initCStream_internal(zcs, NULL, 0, 0, zcs->params, pledgedSrcSize);
+}
+
+size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
+ ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
+ return ZSTDMT_initCStream_internal(zcs, NULL, 0, 1, params, 0);
+}
+
+
+static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsigned endFrame)
+{
+ size_t const dstBufferCapacity = ZSTD_compressBound(srcSize);
+ buffer_t const dstBuffer = ZSTDMT_getBuffer(zcs->buffPool, dstBufferCapacity);
+ ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(zcs->cctxPool);
+ unsigned const jobID = zcs->nextJobID & zcs->jobIDMask;
+
+ if ((cctx==NULL) || (dstBuffer.start==NULL)) {
+ zcs->jobs[jobID].jobCompleted = 1;
+ zcs->nextJobID++;
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return ERROR(memory_allocation);
+ }
+
+ DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
+ zcs->jobs[jobID].src = zcs->inBuff.buffer;
+ zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
+ zcs->jobs[jobID].srcSize = srcSize;
+ zcs->jobs[jobID].dictSize = zcs->dictSize; /* note : zcs->inBuff.filled is presumed >= srcSize + dictSize */
+ zcs->jobs[jobID].params = zcs->params;
+ if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0; /* do not calculate checksum within sections, just keep it in header for first section */
+ zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
+ zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
+ zcs->jobs[jobID].dstBuff = dstBuffer;
+ zcs->jobs[jobID].cctx = cctx;
+ zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
+ zcs->jobs[jobID].lastChunk = endFrame;
+ zcs->jobs[jobID].jobCompleted = 0;
+ zcs->jobs[jobID].dstFlushed = 0;
+ zcs->jobs[jobID].jobCompleted_mutex = &zcs->jobCompleted_mutex;
+ zcs->jobs[jobID].jobCompleted_cond = &zcs->jobCompleted_cond;
+
+ /* get a new buffer for next input */
+ if (!endFrame) {
+ size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
+ zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
+ if (zcs->inBuff.buffer.start == NULL) { /* not enough memory to allocate next input buffer */
+ zcs->jobs[jobID].jobCompleted = 1;
+ zcs->nextJobID++;
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return ERROR(memory_allocation);
+ }
+ DEBUGLOG(5, "inBuff filled to %u", (U32)zcs->inBuff.filled);
+ zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize;
+ DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", (U32)zcs->inBuff.filled, (U32)newDictSize, (U32)(zcs->inBuff.filled - newDictSize));
+ memmove(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, zcs->inBuff.filled);
+ DEBUGLOG(5, "new inBuff pre-filled");
+ zcs->dictSize = newDictSize;
+ } else {
+ zcs->inBuff.buffer = g_nullBuffer;
+ zcs->inBuff.filled = 0;
+ zcs->dictSize = 0;
+ zcs->frameEnded = 1;
+ if (zcs->nextJobID == 0)
+ zcs->params.fParams.checksumFlag = 0; /* single chunk : checksum is calculated directly within worker thread */
+ }
+
+ DEBUGLOG(3, "posting job %u : %u bytes (end:%u) (note : doneJob = %u=>%u)", zcs->nextJobID, (U32)zcs->jobs[jobID].srcSize, zcs->jobs[jobID].lastChunk, zcs->doneJobID, zcs->doneJobID & zcs->jobIDMask);
+ POOL_add(zcs->factory, ZSTDMT_compressChunk, &zcs->jobs[jobID]); /* this call is blocking when thread worker pool is exhausted */
+ zcs->nextJobID++;
+ return 0;
+}
+
+
+/* ZSTDMT_flushNextJob() :
+ * output : will be updated with amount of data flushed .
+ * blockToFlush : if >0, the function will block and wait if there is no data available to flush .
+ * @return : amount of data remaining within internal buffer, 1 if unknown but > 0, 0 if no more, or an error code */
+static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush)
+{
+ unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
+ if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
+ PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
+ while (zcs->jobs[wJobID].jobCompleted==0) {
+ DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);
+ if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
+ pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */
+ }
+ pthread_mutex_unlock(&zcs->jobCompleted_mutex);
+ /* compression job completed : output can be flushed */
+ { ZSTDMT_jobDescription job = zcs->jobs[wJobID];
+ if (!job.jobScanned) {
+ if (ZSTD_isError(job.cSize)) {
+ DEBUGLOG(5, "compression error detected ");
+ ZSTDMT_waitForAllJobsCompleted(zcs);
+ ZSTDMT_releaseAllJobResources(zcs);
+ return job.cSize;
+ }
+ ZSTDMT_releaseCCtx(zcs->cctxPool, job.cctx);
+ zcs->jobs[wJobID].cctx = NULL;
+ DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
+ if (zcs->params.fParams.checksumFlag) {
+ XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize);
+ if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) { /* write checksum at end of last section */
+ U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
+ DEBUGLOG(4, "writing checksum : %08X \n", checksum);
+ MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum);
+ job.cSize += 4;
+ zcs->jobs[wJobID].cSize += 4;
+ } }
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.src);
+ zcs->jobs[wJobID].srcStart = NULL;
+ zcs->jobs[wJobID].src = g_nullBuffer;
+ zcs->jobs[wJobID].jobScanned = 1;
+ }
+ { size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
+ DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
+ memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
+ output->pos += toWrite;
+ job.dstFlushed += toWrite;
+ }
+ if (job.dstFlushed == job.cSize) { /* output buffer fully flushed => move to next one */
+ ZSTDMT_releaseBuffer(zcs->buffPool, job.dstBuff);
+ zcs->jobs[wJobID].dstBuff = g_nullBuffer;
+ zcs->jobs[wJobID].jobCompleted = 0;
+ zcs->doneJobID++;
+ } else {
+ zcs->jobs[wJobID].dstFlushed = job.dstFlushed;
+ }
+ /* return value : how many bytes left in buffer ; fake it to 1 if unknown but >0 */
+ if (job.cSize > job.dstFlushed) return (job.cSize - job.dstFlushed);
+ if (zcs->doneJobID < zcs->nextJobID) return 1; /* still some buffer to flush */
+ zcs->allJobsCompleted = zcs->frameEnded; /* frame completed and entirely flushed */
+ return 0; /* everything flushed */
+} }
+
+
+size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
+{
+ if (zcs->frameEnded) return ERROR(stage_wrong); /* current frame being ended. Only flush is allowed. Restart with init */
+ if (zcs->nbThreads==1) return ZSTD_compressStream(zcs->cstream, output, input);
+
+ /* fill input buffer */
+ { size_t const toLoad = MIN(input->size - input->pos, zcs->inBuffSize - zcs->inBuff.filled);
+ memcpy((char*)zcs->inBuff.buffer.start + zcs->inBuff.filled, input->src, toLoad);
+ input->pos += toLoad;
+ zcs->inBuff.filled += toLoad;
+ }
+
+ if ( (zcs->inBuff.filled == zcs->inBuffSize) /* filled enough : let's compress */
+ && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */
+ CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) );
+ }
+
+ /* check for data to flush */
+ CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)) ); /* block if it wasn't possible to create new job due to saturation */
+
+ /* recommended next input size : fill current input buffer */
+ return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
+}
+
+
+static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned endFrame)
+{
+ size_t const srcSize = zcs->inBuff.filled - zcs->dictSize;
+
+ if (srcSize) DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
+ if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
+ && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
+ CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) );
+ }
+
+ /* check if there is any data available to flush */
+ DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u ", zcs->doneJobID, zcs->nextJobID);
+ return ZSTDMT_flushNextJob(zcs, output, 1);
+}
+
+
+size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
+{
+ if (zcs->nbThreads==1) return ZSTD_flushStream(zcs->cstream, output);
+ return ZSTDMT_flushStream_internal(zcs, output, 0);
+}
+
+size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
+{
+ if (zcs->nbThreads==1) return ZSTD_endStream(zcs->cstream, output);
+ return ZSTDMT_flushStream_internal(zcs, output, 1);
+}
diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h
new file mode 100644
index 0000000..92de52d
--- /dev/null
+++ b/lib/compress/zstdmt_compress.h
@@ -0,0 +1,64 @@
+/**
+ * Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+
+
+/* Note : All prototypes defined in this file shall be considered experimental.
+ * There is no guarantee of API continuity (yet) on any of these prototypes */
+
+/* === Dependencies === */
+#include <stddef.h> /* size_t */
+#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_parameters */
+#include "zstd.h" /* ZSTD_inBuffer, ZSTD_outBuffer, ZSTDLIB_API */
+
+
+/* === Simple one-pass functions === */
+
+typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx;
+ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads);
+ZSTDLIB_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* cctx);
+
+ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
+ void* dst, size_t dstCapacity,
+ const void* src, size_t srcSize,
+ int compressionLevel);
+
+
+/* === Streaming functions === */
+
+ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel);
+ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
+
+ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
+
+ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
+ZSTDLIB_API size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
+
+
+/* === Advanced functions and parameters === */
+
+#ifndef ZSTDMT_SECTION_SIZE_MIN
+# define ZSTDMT_SECTION_SIZE_MIN (1U << 20) /* 1 MB - Minimum size of each compression job */
+#endif
+
+ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, const void* dict, size_t dictSize, /**< dict can be released after init, a local copy is preserved within zcs */
+ ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown */
+
+/* ZSDTMT_parameter :
+ * List of parameters that can be set using ZSTDMT_setMTCtxParameter() */
+typedef enum {
+ ZSTDMT_p_sectionSize, /* size of input "section". Each section is compressed in parallel. 0 means default, which is dynamically determined within compression functions */
+ ZSTDMT_p_overlapSectionRLog /* reverse log of overlapped section; 0 == use a complete window, 3(default) == use 1/8th of window, values >=10 means no overlap */
+} ZSDTMT_parameter;
+
+/* ZSTDMT_setMTCtxParameter() :
+ * allow setting individual parameters, one at a time, among a list of enums defined in ZSTDMT_parameter.
+ * The function must be called typically after ZSTD_createCCtx().
+ * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions.
+ * @return : 0, or an error code (which can be tested using ZSTD_isError()) */
+ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value);
diff --git a/lib/decompress/zstd_decompress.c b/lib/decompress/zstd_decompress.c
index c53f3c3..9c04503 100644
--- a/lib/decompress/zstd_decompress.c
+++ b/lib/decompress/zstd_decompress.c
@@ -1973,7 +1973,7 @@
switch(paramType)
{
default : return ERROR(parameter_unknown);
- case ZSTDdsp_maxWindowSize : zds->maxWindowSize = paramValue ? paramValue : (U32)(-1); break;
+ case DStream_p_maxWindowSize : zds->maxWindowSize = paramValue ? paramValue : (U32)(-1); break;
}
return 0;
}
diff --git a/lib/legacy/zstd_v04.c b/lib/legacy/zstd_v04.c
index bd01131..e950907 100644
--- a/lib/legacy/zstd_v04.c
+++ b/lib/legacy/zstd_v04.c
@@ -3016,12 +3016,11 @@
{
U32 add = *dumps++;
if (add < 255) litLength += add;
- else
- {
+ else {
litLength = MEM_readLE32(dumps) & 0xFFFFFF; /* no pb : dumps is always followed by seq tables > 1 byte */
dumps += 3;
}
- if (dumps >= de) dumps = de-1; /* late correction, to avoid read overflow (data is now corrupted anyway) */
+ if (dumps >= de) { dumps = de-1; litLength = MaxLL+255; } /* late correction, to avoid read overflow (data is now corrupted anyway) */
}
/* Offset */
@@ -3043,16 +3042,14 @@
/* MatchLength */
matchLength = FSE_decodeSymbol(&(seqState->stateML), &(seqState->DStream));
- if (matchLength == MaxML)
- {
+ if (matchLength == MaxML) {
U32 add = *dumps++;
if (add < 255) matchLength += add;
- else
- {
+ else {
matchLength = MEM_readLE32(dumps) & 0xFFFFFF; /* no pb : dumps is always followed by seq tables > 1 byte */
dumps += 3;
}
- if (dumps >= de) dumps = de-1; /* late correction, to avoid read overflow (data is now corrupted anyway) */
+ if (dumps >= de) { dumps = de-1; matchLength = MaxML+255; } /* late correction, to avoid read overflow (data is now corrupted anyway) */
}
matchLength += MINMATCH;
@@ -3116,8 +3113,7 @@
/* Requirement: op <= oend_8 */
/* match within prefix */
- if (sequence.offset < 8)
- {
+ if (sequence.offset < 8) {
/* close range match, overlap */
const int sub2 = dec64table[sequence.offset];
op[0] = match[0];
@@ -3127,9 +3123,7 @@
match += dec32table[sequence.offset];
ZSTD_copy4(op+4, match);
match -= sub2;
- }
- else
- {
+ } else {
ZSTD_copy8(op, match);
}
op += 8; match += 8;
diff --git a/lib/legacy/zstd_v05.c b/lib/legacy/zstd_v05.c
index 3dd740e..43943d8 100644
--- a/lib/legacy/zstd_v05.c
+++ b/lib/legacy/zstd_v05.c
@@ -3230,7 +3230,7 @@
if (litLength&1) litLength>>=1, dumps += 3;
else litLength = (U16)(litLength)>>1, dumps += 2;
}
- if (dumps >= de) dumps = de-1; /* late correction, to avoid read overflow (data is now corrupted anyway) */
+ if (dumps >= de) { dumps = de-1; litLength = MaxLL+255; } /* late correction, to avoid read overflow (data is now corrupted anyway) */
}
/* Offset */
@@ -3263,7 +3263,7 @@
if (matchLength&1) matchLength>>=1, dumps += 3;
else matchLength = (U16)(matchLength)>>1, dumps += 2;
}
- if (dumps >= de) dumps = de-1; /* late correction, to avoid read overflow (data is now corrupted anyway) */
+ if (dumps >= de) { dumps = de-1; matchLength = MaxML+255; } /* late correction, to avoid read overflow (data is now corrupted anyway) */
}
matchLength += MINMATCH;
diff --git a/lib/zstd.h b/lib/zstd.h
index 5e4420b..f5cbf4b 100644
--- a/lib/zstd.h
+++ b/lib/zstd.h
@@ -369,9 +369,9 @@
} ZSTD_compressionParameters;
typedef struct {
- unsigned contentSizeFlag; /**< 1: content size will be in frame header (if known). */
- unsigned checksumFlag; /**< 1: will generate a 32-bits checksum at end of frame, to be used for error detection by decompressor */
- unsigned noDictIDFlag; /**< 1: no dict ID will be saved into frame header (if dictionary compression) */
+ unsigned contentSizeFlag; /**< 1: content size will be in frame header (when known) */
+ unsigned checksumFlag; /**< 1: generate a 32-bits checksum at end of frame, for error detection */
+ unsigned noDictIDFlag; /**< 1: no dictID will be saved into frame header (if dictionary compression) */
} ZSTD_frameParameters;
typedef struct {
@@ -401,6 +401,14 @@
* Gives the amount of memory used by a given ZSTD_CCtx */
ZSTDLIB_API size_t ZSTD_sizeof_CCtx(const ZSTD_CCtx* cctx);
+typedef enum {
+ ZSTD_p_forceWindow /* Force back-references to remain < windowSize, even when referencing Dictionary content (default:0)*/
+} ZSTD_CCtxParameter;
+/*! ZSTD_setCCtxParameter() :
+ * Set advanced parameters, selected through enum ZSTD_CCtxParameter
+ * @result : 0, or an error code (which can be tested with ZSTD_isError()) */
+ZSTDLIB_API size_t ZSTD_setCCtxParameter(ZSTD_CCtx* cctx, ZSTD_CCtxParameter param, unsigned value);
+
/*! ZSTD_createCDict_byReference() :
* Create a digested dictionary for compression
* Dictionary content is simply referenced, and therefore stays in dictBuffer.
@@ -519,7 +527,7 @@
/*===== Advanced Streaming decompression functions =====*/
-typedef enum { ZSTDdsp_maxWindowSize } ZSTD_DStreamParameter_e;
+typedef enum { DStream_p_maxWindowSize } ZSTD_DStreamParameter_e;
ZSTDLIB_API ZSTD_DStream* ZSTD_createDStream_advanced(ZSTD_customMem customMem);
ZSTDLIB_API size_t ZSTD_initDStream_usingDict(ZSTD_DStream* zds, const void* dict, size_t dictSize); /**< note: a dict will not be used if dict == NULL or dictSize < 8 */
ZSTDLIB_API size_t ZSTD_setDStreamParameter(ZSTD_DStream* zds, ZSTD_DStreamParameter_e paramType, unsigned paramValue);
@@ -561,10 +569,10 @@
In which case, it will "discard" the relevant memory section from its history.
Finish a frame with ZSTD_compressEnd(), which will write the last block(s) and optional checksum.
- It's possible to use a NULL,0 src content, in which case, it will write a final empty block to end the frame,
- Without last block mark, frames will be considered unfinished (broken) by decoders.
+ It's possible to use srcSize==0, in which case, it will write a final empty block to end the frame.
+ Without last block mark, frames will be considered unfinished (corrupted) by decoders.
- You can then reuse `ZSTD_CCtx` (ZSTD_compressBegin()) to compress some new frame.
+ `ZSTD_CCtx` object can be re-used (ZSTD_compressBegin()) to compress some new frame.
*/
/*===== Buffer-less streaming compression functions =====*/
@@ -572,6 +580,7 @@
ZSTDLIB_API size_t ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, int compressionLevel);
ZSTDLIB_API size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, ZSTD_parameters params, unsigned long long pledgedSrcSize);
ZSTDLIB_API size_t ZSTD_copyCCtx(ZSTD_CCtx* cctx, const ZSTD_CCtx* preparedCCtx, unsigned long long pledgedSrcSize);
+ZSTDLIB_API size_t ZSTD_compressBegin_usingCDict(ZSTD_CCtx* cctx, const ZSTD_CDict* cdict, unsigned long long pledgedSrcSize);
ZSTDLIB_API size_t ZSTD_compressContinue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);
ZSTDLIB_API size_t ZSTD_compressEnd(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);
diff --git a/programs/Makefile b/programs/Makefile
index a4c149a..4392939 100644
--- a/programs/Makefile
+++ b/programs/Makefile
@@ -1,5 +1,5 @@
# ##########################################################################
-# Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
+# Copyright (c) 2015-present, Yann Collet, Facebook, Inc.
# All rights reserved.
#
# This Makefile is validated for Linux, macOS, *BSD, Hurd, Solaris, MSYS2 targets
@@ -24,7 +24,7 @@
ALIGN_LOOP =
endif
-CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/dictBuilder
+CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/compress -I$(ZSTDDIR)/dictBuilder
CFLAGS ?= -O3
CFLAGS += -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \
-Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef \
@@ -34,8 +34,8 @@
ZSTDCOMMON_FILES := $(ZSTDDIR)/common/*.c
-ZSTDCOMP_FILES := $(ZSTDDIR)/compress/zstd_compress.c $(ZSTDDIR)/compress/fse_compress.c $(ZSTDDIR)/compress/huf_compress.c
-ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/huf_decompress.c
+ZSTDCOMP_FILES := $(ZSTDDIR)/compress/*.c
+ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/*.c
ZSTD_FILES := $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES)
ZDICT_FILES := $(ZSTDDIR)/dictBuilder/*.c
ZSTDDECOMP_O = $(ZSTDDIR)/decompress/zstd_decompress.o
@@ -49,6 +49,8 @@
ZSTDLEGACY_FILES:= $(ZSTDDIR)/legacy/*.c
endif
+ZSTDLIB_FILES := $(wildcard $(ZSTD_FILES)) $(wildcard $(ZSTDLEGACY_FILES)) $(wildcard $(ZDICT_FILES))
+ZSTDLIB_OBJ := $(patsubst %.c,%.o,$(ZSTDLIB_FILES))
# Define *.exe as extension for Windows systems
ifneq (,$(filter Windows%,$(OS)))
@@ -74,8 +76,7 @@
$(ZSTDDECOMP_O): CFLAGS += $(ALIGN_LOOP)
zstd : CPPFLAGS += -DZSTD_LEGACY_SUPPORT=$(ZSTD_LEGACY_SUPPORT)
-zstd : $(ZSTDDECOMP_O) $(ZSTD_FILES) $(ZSTDLEGACY_FILES) $(ZDICT_FILES) \
- zstdcli.c fileio.c bench.c datagen.c dibio.c
+zstd : $(ZSTDLIB_OBJ) zstdcli.o fileio.o bench.o datagen.o dibio.o
ifneq (,$(filter Windows%,$(OS)))
windres/generate_res.bat
endif
@@ -83,8 +84,7 @@
zstd32 : CPPFLAGS += -DZSTD_LEGACY_SUPPORT=$(ZSTD_LEGACY_SUPPORT)
-zstd32 : $(ZSTDDIR)/decompress/zstd_decompress.c $(ZSTD_FILES) $(ZSTDLEGACY_FILES) $(ZDICT_FILES) \
- zstdcli.c fileio.c bench.c datagen.c dibio.c
+zstd32 : $(ZSTDLIB_FILES) zstdcli.c fileio.c bench.c datagen.c dibio.c
ifneq (,$(filter Windows%,$(OS)))
windres/generate_res.bat
endif
@@ -106,32 +106,33 @@
$(RM) $(ZSTDDECOMP_O)
$(MAKE) zstd MOREFLAGS=-fprofile-use
-zstd-frugal: $(ZSTDDECOMP_O) $(ZSTD_FILES) zstdcli.c fileio.c
+zstd-frugal: $(ZSTD_FILES) zstdcli.c fileio.c
$(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT $^ -o zstd$(EXT)
-zstd-small: clean_decomp_o
- ZSTD_LEGACY_SUPPORT=0 CFLAGS="-Os -s" $(MAKE) zstd-frugal
+zstd-small:
+ CFLAGS="-Os -s" $(MAKE) zstd-frugal
-zstd-decompress-clean: $(ZSTDDECOMP_O) $(ZSTDCOMMON_FILES) $(ZSTDDECOMP_FILES) zstdcli.c fileio.c
- $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOCOMPRESS $^ -o zstd-decompress$(EXT)
-
-zstd-decompress: clean_decomp_o
- ZSTD_LEGACY_SUPPORT=0 $(MAKE) zstd-decompress-clean
+zstd-decompress: $(ZSTDCOMMON_FILES) $(ZSTDDECOMP_FILES) zstdcli.c fileio.c
+ $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOCOMPRESS $^ -o $@$(EXT)
zstd-compress: $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) zstdcli.c fileio.c
$(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NODECOMPRESS $^ -o $@$(EXT)
-gzstd: clean_decomp_o
+gzstd:
@echo "int main(){}" | $(CC) -o have_zlib -x c - -lz && echo found zlib || echo did not found zlib
@if [ -s have_zlib ]; then \
echo building gzstd with .gz decompression support \
- && rm have_zlib$(EXT) \
+ && $(RM) have_zlib$(EXT) fileio.o \
&& CPPFLAGS=-DZSTD_GZDECOMPRESS LDFLAGS="-lz" $(MAKE) zstd; \
else \
echo "WARNING : no zlib, building gzstd with only .zst files support : NO .gz SUPPORT !!!" \
&& $(MAKE) zstd; \
fi
+zstdmt: CPPFLAGS += -DZSTD_MULTITHREAD
+zstdmt: LDFLAGS += -lpthread
+zstdmt: zstd
+
generate_res:
windres/generate_res.bat
@@ -166,7 +167,7 @@
MANDIR ?= $(PREFIX)/man/man1
else
MANDIR ?= $(PREFIX)/share/man/man1
-endif
+endif
INSTALL_PROGRAM ?= $(INSTALL) -m 755
INSTALL_SCRIPT ?= $(INSTALL) -m 755
diff --git a/programs/bench.c b/programs/bench.c
index 9a4732a..1ca40d6 100644
--- a/programs/bench.c
+++ b/programs/bench.c
@@ -10,6 +10,14 @@
/* **************************************
+* Tuning parameters
+****************************************/
+#ifndef BMK_TIMETEST_DEFAULT_S /* default minimum time per test */
+#define BMK_TIMETEST_DEFAULT_S 3
+#endif
+
+
+/* **************************************
* Compiler Warnings
****************************************/
#ifdef _MSC_VER
@@ -24,7 +32,7 @@
#include "util.h" /* UTIL_getFileSize, UTIL_sleep */
#include <stdlib.h> /* malloc, free */
#include <string.h> /* memset */
-#include <stdio.h> /* fprintf, fopen, ftello64 */
+#include <stdio.h> /* fprintf, fopen */
#include <time.h> /* clock_t, clock, CLOCKS_PER_SEC */
#include "mem.h"
@@ -43,7 +51,6 @@
# define ZSTD_GIT_COMMIT_STRING ZSTD_EXPAND_AND_QUOTE(ZSTD_GIT_COMMIT)
#endif
-#define NBSECONDS 3
#define TIMELOOP_MICROSEC 1*1000000ULL /* 1 second */
#define ACTIVEPERIOD_MICROSEC 70*1000000ULL /* 70 seconds */
#define COOLPERIOD_SEC 10
@@ -92,8 +99,6 @@
/* *************************************
* Benchmark Parameters
***************************************/
-static U32 g_nbSeconds = NBSECONDS;
-static size_t g_blockSize = 0;
static int g_additionalParam = 0;
static U32 g_decodeOnly = 0;
@@ -101,19 +106,30 @@
void BMK_setAdditionalParam(int additionalParam) { g_additionalParam=additionalParam; }
-void BMK_SetNbSeconds(unsigned nbSeconds)
+static U32 g_nbSeconds = BMK_TIMETEST_DEFAULT_S;
+void BMK_setNbSeconds(unsigned nbSeconds)
{
g_nbSeconds = nbSeconds;
- DISPLAYLEVEL(3, "- test >= %u seconds per compression / decompression -\n", g_nbSeconds);
+ DISPLAYLEVEL(3, "- test >= %u seconds per compression / decompression - \n", g_nbSeconds);
}
-void BMK_SetBlockSize(size_t blockSize)
+static size_t g_blockSize = 0;
+void BMK_setBlockSize(size_t blockSize)
{
g_blockSize = blockSize;
- DISPLAYLEVEL(2, "using blocks of size %u KB \n", (U32)(blockSize>>10));
+ if (g_blockSize) DISPLAYLEVEL(2, "using blocks of size %u KB \n", (U32)(blockSize>>10));
}
-void BMK_setDecodeOnly(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); }
+void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); }
+
+static U32 g_nbThreads = 1;
+void BMK_setNbThreads(unsigned nbThreads) {
+#ifndef ZSTD_MULTITHREAD
+ if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
+#endif
+ g_nbThreads = nbThreads;
+}
+
/* ********************************************************
* Bench functions
@@ -132,6 +148,8 @@
#define MIN(a,b) ((a)<(b) ? (a) : (b))
#define MAX(a,b) ((a)>(b) ? (a) : (b))
+#include "compress/zstdmt_compress.h"
+
static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
const char* displayName, int cLevel,
const size_t* fileSizes, U32 nbFiles,
@@ -145,6 +163,7 @@
size_t const maxCompressedSize = ZSTD_compressBound(srcSize) + (maxNbBlocks * 1024); /* add some room for safety */
void* const compressedBuffer = malloc(maxCompressedSize);
void* resultBuffer = malloc(srcSize);
+ ZSTDMT_CCtx* const mtctx = ZSTDMT_createCCtx(g_nbThreads);
ZSTD_CCtx* const ctx = ZSTD_createCCtx();
ZSTD_DCtx* const dctx = ZSTD_createDCtx();
size_t const loadedCompressedSize = srcSize;
@@ -212,7 +231,7 @@
/* Bench */
{ U64 fastestC = (U64)(-1LL), fastestD = (U64)(-1LL);
U64 const crcOrig = g_decodeOnly ? 0 : XXH64(srcBuffer, srcSize, 0);
- UTIL_time_t coolTime;
+ UTIL_time_t coolTime, coolTick;
U64 const maxTime = (g_nbSeconds * TIMELOOP_MICROSEC) + 1;
U64 totalCTime=0, totalDTime=0;
U32 cCompleted=g_decodeOnly, dCompleted=0;
@@ -220,25 +239,27 @@
const char* const marks[NB_MARKS] = { " |", " /", " =", "\\" };
U32 markNb = 0;
+ UTIL_initTimer(&coolTick);
UTIL_getTime(&coolTime);
DISPLAYLEVEL(2, "\r%79s\r", "");
while (!cCompleted || !dCompleted) {
- UTIL_time_t clockStart;
/* overheat protection */
- if (UTIL_clockSpanMicro(coolTime, ticksPerSecond) > ACTIVEPERIOD_MICROSEC) {
+ if (UTIL_clockSpanMicro(coolTime, coolTick) > ACTIVEPERIOD_MICROSEC) {
DISPLAYLEVEL(2, "\rcooling down ... \r");
UTIL_sleep(COOLPERIOD_SEC);
UTIL_getTime(&coolTime);
}
if (!g_decodeOnly) {
+ UTIL_time_t clockTick, clockStart;
/* Compression */
DISPLAYLEVEL(2, "%2s-%-17.17s :%10u ->\r", marks[markNb], displayName, (U32)srcSize);
if (!cCompleted) memset(compressedBuffer, 0xE5, maxCompressedSize); /* warm up and erase result buffer */
UTIL_sleepMilli(1); /* give processor time to other processes */
UTIL_waitForNextTick(ticksPerSecond);
+ UTIL_initTimer(&clockTick);
UTIL_getTime(&clockStart);
if (!cCompleted) { /* still some time to do compression tests */
@@ -265,19 +286,26 @@
blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize,
cdict);
} else {
+#ifdef ZSTD_MULTITHREAD /* note : limitation : MT single-pass does not support compression with dictionary */
+ rSize = ZSTDMT_compressCCtx(mtctx,
+ blockTable[blockNb].cPtr, blockTable[blockNb].cRoom,
+ blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize,
+ cLevel);
+#else
rSize = ZSTD_compress_advanced (ctx,
blockTable[blockNb].cPtr, blockTable[blockNb].cRoom,
blockTable[blockNb].srcPtr,blockTable[blockNb].srcSize, NULL, 0, zparams);
+#endif
}
if (ZSTD_isError(rSize)) EXM_THROW(1, "ZSTD_compress_usingCDict() failed : %s", ZSTD_getErrorName(rSize));
blockTable[blockNb].cSize = rSize;
}
nbLoops++;
- } while (UTIL_clockSpanMicro(clockStart, ticksPerSecond) < clockLoop);
+ } while (UTIL_clockSpanMicro(clockStart, clockTick) < clockLoop);
ZSTD_freeCDict(cdict);
- { U64 const clockSpan = UTIL_clockSpanMicro(clockStart, ticksPerSecond);
- if (clockSpan < fastestC*nbLoops) fastestC = clockSpan / nbLoops;
- totalCTime += clockSpan;
+ { U64 const clockSpanMicro = UTIL_clockSpanMicro(clockStart, clockTick);
+ if (clockSpanMicro < fastestC*nbLoops) fastestC = clockSpanMicro / nbLoops;
+ totalCTime += clockSpanMicro;
cCompleted = (totalCTime >= maxTime);
} }
@@ -292,20 +320,24 @@
memcpy(compressedBuffer, srcBuffer, loadedCompressedSize);
}
- (void)fastestD; (void)crcOrig; /* unused when decompression disabled */
-#if 1
+#if 0 /* disable decompression test */
+ dCompleted=1;
+ (void)totalDTime; (void)fastestD; (void)crcOrig; /* unused when decompression disabled */
+#else
/* Decompression */
if (!dCompleted) memset(resultBuffer, 0xD6, srcSize); /* warm result buffer */
UTIL_sleepMilli(1); /* give processor time to other processes */
UTIL_waitForNextTick(ticksPerSecond);
- UTIL_getTime(&clockStart);
if (!dCompleted) {
U64 clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1;
U32 nbLoops = 0;
+ UTIL_time_t clockStart, clockTick;
ZSTD_DDict* const ddict = ZSTD_createDDict(dictBuffer, dictBufferSize);
if (!ddict) EXM_THROW(2, "ZSTD_createDDict() allocation failure");
+ UTIL_initTimer(&clockTick);
+ UTIL_getTime(&clockStart);
do {
U32 blockNb;
for (blockNb=0; blockNb<nbBlocks; blockNb++) {
@@ -314,19 +346,19 @@
blockTable[blockNb].cPtr, blockTable[blockNb].cSize,
ddict);
if (ZSTD_isError(regenSize)) {
- DISPLAY("ZSTD_decompress_usingDDict() failed on block %u : %s \n",
- blockNb, ZSTD_getErrorName(regenSize));
+ DISPLAY("ZSTD_decompress_usingDDict() failed on block %u of size %u : %s \n",
+ blockNb, (U32)blockTable[blockNb].cSize, ZSTD_getErrorName(regenSize));
clockLoop = 0; /* force immediate test end */
break;
}
blockTable[blockNb].resSize = regenSize;
}
nbLoops++;
- } while (UTIL_clockSpanMicro(clockStart, ticksPerSecond) < clockLoop);
+ } while (UTIL_clockSpanMicro(clockStart, clockTick) < clockLoop);
ZSTD_freeDDict(ddict);
- { U64 const clockSpan = UTIL_clockSpanMicro(clockStart, ticksPerSecond);
- if (clockSpan < fastestD*nbLoops) fastestD = clockSpan / nbLoops;
- totalDTime += clockSpan;
+ { U64 const clockSpanMicro = UTIL_clockSpanMicro(clockStart, clockTick);
+ if (clockSpanMicro < fastestD*nbLoops) fastestD = clockSpanMicro / nbLoops;
+ totalDTime += clockSpanMicro;
dCompleted = (totalDTime >= maxTime);
} }
@@ -353,6 +385,17 @@
pos = (U32)(u - bacc);
bNb = pos / (128 KB);
DISPLAY("(block %u, sub %u, pos %u) \n", segNb, bNb, pos);
+ if (u>5) {
+ int n;
+ for (n=-5; n<0; n++) DISPLAY("%02X ", ((const BYTE*)srcBuffer)[u+n]);
+ DISPLAY(" :%02X: ", ((const BYTE*)srcBuffer)[u]);
+ for (n=1; n<3; n++) DISPLAY("%02X ", ((const BYTE*)srcBuffer)[u+n]);
+ DISPLAY(" \n");
+ for (n=-5; n<0; n++) DISPLAY("%02X ", ((const BYTE*)resultBuffer)[u+n]);
+ DISPLAY(" :%02X: ", ((const BYTE*)resultBuffer)[u]);
+ for (n=1; n<3; n++) DISPLAY("%02X ", ((const BYTE*)resultBuffer)[u+n]);
+ DISPLAY(" \n");
+ }
break;
}
if (u==srcSize-1) { /* should never happen */
@@ -378,6 +421,7 @@
free(blockTable);
free(compressedBuffer);
free(resultBuffer);
+ ZSTDMT_freeCCtx(mtctx);
ZSTD_freeCCtx(ctx);
ZSTD_freeDCtx(dctx);
return 0;
diff --git a/programs/bench.h b/programs/bench.h
index 314f346..2918c02 100644
--- a/programs/bench.h
+++ b/programs/bench.h
@@ -15,14 +15,15 @@
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressionParameters */
#include "zstd.h" /* ZSTD_compressionParameters */
-int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles,const char* dictFileName,
+int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles,const char* dictFileName,
int cLevel, int cLevelLast, ZSTD_compressionParameters* compressionParams);
/* Set Parameters */
-void BMK_SetNbSeconds(unsigned nbLoops);
-void BMK_SetBlockSize(size_t blockSize);
-void BMK_setAdditionalParam(int additionalParam);
+void BMK_setNbSeconds(unsigned nbLoops);
+void BMK_setBlockSize(size_t blockSize);
+void BMK_setNbThreads(unsigned nbThreads);
void BMK_setNotificationLevel(unsigned level);
-void BMK_setDecodeOnly(unsigned decodeFlag);
+void BMK_setAdditionalParam(int additionalParam);
+void BMK_setDecodeOnlyMode(unsigned decodeFlag);
#endif /* BENCH_H_121279284357 */
diff --git a/programs/fileio.c b/programs/fileio.c
index a112cc0..86da001 100644
--- a/programs/fileio.c
+++ b/programs/fileio.c
@@ -7,6 +7,7 @@
* of patent rights can be found in the PATENTS file in the same directory.
*/
+
/* *************************************
* Compiler Options
***************************************/
@@ -34,11 +35,14 @@
#include "fileio.h"
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */
#include "zstd.h"
-#ifdef ZSTD_GZDECOMPRESS
-#include "zlib.h"
-#if !defined(z_const)
- #define z_const
+#ifdef ZSTD_MULTITHREAD
+# include "zstdmt_compress.h"
#endif
+#ifdef ZSTD_GZDECOMPRESS
+# include "zlib.h"
+# if !defined(z_const)
+# define z_const
+# endif
#endif
@@ -103,7 +107,23 @@
void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); }
static U32 g_memLimit = 0;
void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; }
-
+static U32 g_nbThreads = 1;
+void FIO_setNbThreads(unsigned nbThreads) {
+#ifndef ZSTD_MULTITHREAD
+ if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
+#endif
+ g_nbThreads = nbThreads;
+}
+static U32 g_blockSize = 0;
+void FIO_setBlockSize(unsigned blockSize) {
+ if (blockSize && g_nbThreads==1)
+ DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n");
+#ifdef ZSTD_MULTITHREAD
+ if (blockSize-1 < ZSTDMT_SECTION_SIZE_MIN-1) /* intentional underflow */
+ DISPLAYLEVEL(2, "Note : minimum block size is %u KB \n", (ZSTDMT_SECTION_SIZE_MIN>>10));
+#endif
+ g_blockSize = blockSize;
+}
/*-*************************************
@@ -138,6 +158,10 @@
f = stdin;
SET_BINARY_MODE(stdin);
} else {
+ if (!UTIL_doesFileExists(srcFileName)) {
+ DISPLAYLEVEL(1, "zstd: %s is not a regular file -- ignored \n", srcFileName);
+ return NULL;
+ }
f = fopen(srcFileName, "rb");
if ( f==NULL ) DISPLAYLEVEL(1, "zstd: %s: %s \n", srcFileName, strerror(errno));
}
@@ -226,23 +250,34 @@
* Compression
************************************************************************/
typedef struct {
+ FILE* srcFile;
+ FILE* dstFile;
void* srcBuffer;
size_t srcBufferSize;
void* dstBuffer;
size_t dstBufferSize;
+#ifdef ZSTD_MULTITHREAD
+ ZSTDMT_CCtx* cctx;
+#else
ZSTD_CStream* cctx;
- FILE* dstFile;
- FILE* srcFile;
+#endif
} cRess_t;
-static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
+static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
U64 srcSize, ZSTD_compressionParameters* comprParams)
{
cRess_t ress;
memset(&ress, 0, sizeof(ress));
+#ifdef ZSTD_MULTITHREAD
+ ress.cctx = ZSTDMT_createCCtx(g_nbThreads);
+ if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream");
+ if (cLevel==ZSTD_maxCLevel())
+ ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_overlapSectionRLog, 0); /* use complete window for overlap */
+#else
ress.cctx = ZSTD_createCStream();
if (ress.cctx == NULL) EXM_THROW(30, "zstd: allocation error : can't create ZSTD_CStream");
+#endif
ress.srcBufferSize = ZSTD_CStreamInSize();
ress.srcBuffer = malloc(ress.srcBufferSize);
ress.dstBufferSize = ZSTD_CStreamOutSize();
@@ -264,8 +299,14 @@
if (comprParams->searchLength) params.cParams.searchLength = comprParams->searchLength;
if (comprParams->targetLength) params.cParams.targetLength = comprParams->targetLength;
if (comprParams->strategy) params.cParams.strategy = (ZSTD_strategy)(comprParams->strategy - 1);
+#ifdef ZSTD_MULTITHREAD
+ { size_t const errorCode = ZSTDMT_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
+ if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
+ ZSTDMT_setMTCtxParameter(ress.cctx, ZSTDMT_p_sectionSize, g_blockSize);
+#else
{ size_t const errorCode = ZSTD_initCStream_advanced(ress.cctx, dictBuffer, dictBuffSize, params, srcSize);
if (ZSTD_isError(errorCode)) EXM_THROW(33, "Error initializing CStream : %s", ZSTD_getErrorName(errorCode));
+#endif
} }
free(dictBuffer);
}
@@ -277,7 +318,11 @@
{
free(ress.srcBuffer);
free(ress.dstBuffer);
+#ifdef ZSTD_MULTITHREAD
+ ZSTDMT_freeCCtx(ress.cctx);
+#else
ZSTD_freeCStream(ress.cctx); /* never fails */
+#endif
}
@@ -296,7 +341,11 @@
U64 const fileSize = UTIL_getFileSize(srcFileName);
/* init */
+#ifdef ZSTD_MULTITHREAD
+ { size_t const resetError = ZSTDMT_resetCStream(ress.cctx, fileSize);
+#else
{ size_t const resetError = ZSTD_resetCStream(ress.cctx, fileSize);
+#endif
if (ZSTD_isError(resetError)) EXM_THROW(21, "Error initializing compression : %s", ZSTD_getErrorName(resetError));
}
@@ -308,31 +357,39 @@
readsize += inSize;
DISPLAYUPDATE(2, "\rRead : %u MB ", (U32)(readsize>>20));
- /* Compress using buffered streaming */
{ ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
- ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
- { size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
- if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result)); }
- if (inBuff.pos != inBuff.size)
- /* inBuff should be entirely consumed since buffer sizes are recommended ones */
- EXM_THROW(24, "Compression error : input block not fully consumed");
+ while (inBuff.pos != inBuff.size) { /* note : is there any possibility of endless loop ? for example, if outBuff is not large enough ? */
+ ZSTD_outBuffer outBuff= { ress.dstBuffer, ress.dstBufferSize, 0 };
+#ifdef ZSTD_MULTITHREAD
+ size_t const result = ZSTDMT_compressStream(ress.cctx, &outBuff, &inBuff);
+#else
+ size_t const result = ZSTD_compressStream(ress.cctx, &outBuff, &inBuff);
+#endif
+ if (ZSTD_isError(result)) EXM_THROW(23, "Compression error : %s ", ZSTD_getErrorName(result));
- /* Write cBlock */
- { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
- if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); }
- compressedfilesize += outBuff.pos;
- }
+ /* Write compressed stream */
+ if (outBuff.pos) {
+ size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
+ if (sizeCheck!=outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName);
+ compressedfilesize += outBuff.pos;
+ } } }
DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", (U32)(readsize>>20), (double)compressedfilesize/readsize*100);
}
/* End of Frame */
- { ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
- size_t const result = ZSTD_endStream(ress.cctx, &outBuff);
- if (result!=0) EXM_THROW(26, "Compression error : cannot create frame end");
-
- { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
- if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); }
- compressedfilesize += outBuff.pos;
+ { size_t result = 1;
+ while (result!=0) { /* note : is there any possibility of endless loop ? */
+ ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
+#ifdef ZSTD_MULTITHREAD
+ result = ZSTDMT_endStream(ress.cctx, &outBuff);
+#else
+ result = ZSTD_endStream(ress.cctx, &outBuff);
+#endif
+ if (ZSTD_isError(result)) EXM_THROW(26, "Compression error during frame end : %s", ZSTD_getErrorName(result));
+ { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
+ if (sizeCheck!=outBuff.pos) EXM_THROW(27, "Write error : cannot write frame end into %s", dstFileName); }
+ compressedfilesize += outBuff.pos;
+ }
}
/* Status */
@@ -361,6 +418,7 @@
DISPLAYLEVEL(1, "zstd: %s is a directory -- ignored \n", srcFileName);
return 1;
}
+
ress.srcFile = FIO_openSrcFile(srcFileName);
if (!ress.srcFile) return 1; /* srcFile could not be opened */
@@ -481,7 +539,7 @@
/* Allocation */
ress.dctx = ZSTD_createDStream();
if (ress.dctx==NULL) EXM_THROW(60, "Can't create ZSTD_DStream");
- ZSTD_setDStreamParameter(ress.dctx, ZSTDdsp_maxWindowSize, g_memLimit);
+ ZSTD_setDStreamParameter(ress.dctx, DStream_p_maxWindowSize, g_memLimit);
ress.srcBufferSize = ZSTD_DStreamInSize();
ress.srcBuffer = malloc(ress.srcBufferSize);
ress.dstBufferSize = ZSTD_DStreamOutSize();
@@ -632,7 +690,7 @@
if (ZSTD_isError(readSizeHint)) EXM_THROW(36, "Decoding error : %s", ZSTD_getErrorName(readSizeHint));
/* Write block */
- storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips);
+ storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, storedSkips);
frameSize += outBuff.pos;
DISPLAYUPDATE(2, "\rDecoded : %u MB... ", (U32)((alreadyDecoded+frameSize)>>20) );
diff --git a/programs/fileio.h b/programs/fileio.h
index b716583..11178bc 100644
--- a/programs/fileio.h
+++ b/programs/fileio.h
@@ -12,12 +12,13 @@
#define FILEIO_H_23981798732
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressionParameters */
-#include "zstd.h" /* ZSTD_compressionParameters */
+#include "zstd.h" /* ZSTD_* */
#if defined (__cplusplus)
extern "C" {
#endif
+
/* *************************************
* Special i/o constants
**************************************/
@@ -40,6 +41,8 @@
void FIO_setChecksumFlag(unsigned checksumFlag);
void FIO_setRemoveSrcFile(unsigned flag);
void FIO_setMemLimit(unsigned memLimit);
+void FIO_setNbThreads(unsigned nbThreads);
+void FIO_setBlockSize(unsigned blockSize);
/*-*************************************
diff --git a/programs/util.h b/programs/util.h
index aaa4b7c..651027b 100644
--- a/programs/util.h
+++ b/programs/util.h
@@ -95,18 +95,26 @@
/*-****************************************
* Time functions
******************************************/
-#if !defined(_WIN32)
- typedef clock_t UTIL_time_t;
- UTIL_STATIC void UTIL_initTimer(UTIL_time_t* ticksPerSecond) { *ticksPerSecond=0; }
- UTIL_STATIC void UTIL_getTime(UTIL_time_t* x) { *x = clock(); }
- UTIL_STATIC U64 UTIL_getSpanTimeMicro(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { (void)ticksPerSecond; return 1000000ULL * (clockEnd - clockStart) / CLOCKS_PER_SEC; }
- UTIL_STATIC U64 UTIL_getSpanTimeNano(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { (void)ticksPerSecond; return 1000000000ULL * (clockEnd - clockStart) / CLOCKS_PER_SEC; }
-#else
+#if (PLATFORM_POSIX_VERSION >= 1)
+#include <unistd.h>
+#include <sys/times.h> /* times */
+ typedef U64 UTIL_time_t;
+ UTIL_STATIC void UTIL_initTimer(UTIL_time_t* ticksPerSecond) { *ticksPerSecond=sysconf(_SC_CLK_TCK); }
+ UTIL_STATIC void UTIL_getTime(UTIL_time_t* x) { struct tms junk; clock_t newTicks = (clock_t) times(&junk); (void)junk; *x = (UTIL_time_t)newTicks; }
+ UTIL_STATIC U64 UTIL_getSpanTimeMicro(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { return 1000000ULL * (clockEnd - clockStart) / ticksPerSecond; }
+ UTIL_STATIC U64 UTIL_getSpanTimeNano(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { return 1000000000ULL * (clockEnd - clockStart) / ticksPerSecond; }
+#elif defined(_WIN32) /* Windows */
typedef LARGE_INTEGER UTIL_time_t;
UTIL_STATIC void UTIL_initTimer(UTIL_time_t* ticksPerSecond) { if (!QueryPerformanceFrequency(ticksPerSecond)) fprintf(stderr, "ERROR: QueryPerformance not present\n"); }
UTIL_STATIC void UTIL_getTime(UTIL_time_t* x) { QueryPerformanceCounter(x); }
UTIL_STATIC U64 UTIL_getSpanTimeMicro(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { return 1000000ULL*(clockEnd.QuadPart - clockStart.QuadPart)/ticksPerSecond.QuadPart; }
UTIL_STATIC U64 UTIL_getSpanTimeNano(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { return 1000000000ULL*(clockEnd.QuadPart - clockStart.QuadPart)/ticksPerSecond.QuadPart; }
+#else /* relies on standard C (note : clock_t measurements can be wrong when using multi-threading) */
+ typedef clock_t UTIL_time_t;
+ UTIL_STATIC void UTIL_initTimer(UTIL_time_t* ticksPerSecond) { *ticksPerSecond=0; }
+ UTIL_STATIC void UTIL_getTime(UTIL_time_t* x) { *x = clock(); }
+ UTIL_STATIC U64 UTIL_getSpanTimeMicro(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { (void)ticksPerSecond; return 1000000ULL * (clockEnd - clockStart) / CLOCKS_PER_SEC; }
+ UTIL_STATIC U64 UTIL_getSpanTimeNano(UTIL_time_t ticksPerSecond, UTIL_time_t clockStart, UTIL_time_t clockEnd) { (void)ticksPerSecond; return 1000000000ULL * (clockEnd - clockStart) / CLOCKS_PER_SEC; }
#endif
diff --git a/programs/zstdcli.c b/programs/zstdcli.c
index 54c66a3..64f2c91 100644
--- a/programs/zstdcli.c
+++ b/programs/zstdcli.c
@@ -20,6 +20,7 @@
#endif
+
/*-************************************
* Dependencies
**************************************/
@@ -110,12 +111,16 @@
DISPLAY( " -q : suppress warnings; specify twice to suppress errors too\n");
DISPLAY( " -c : force write to standard output, even if it is the console\n");
#ifdef UTIL_HAS_CREATEFILELIST
- DISPLAY( " -r : operate recursively on directories\n");
+ DISPLAY( " -r : operate recursively on directories \n");
#endif
#ifndef ZSTD_NOCOMPRESS
DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel());
DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n");
- DISPLAY( "--[no-]check : integrity check (default:enabled)\n");
+ DISPLAY( "--[no-]check : integrity check (default:enabled) \n");
+#ifdef ZSTD_MULTITHREAD
+ DISPLAY( " -T# : use # threads for compression (default:1) \n");
+ DISPLAY( " -B# : select size of independent sections (default:0==automatic) \n");
+#endif
#endif
#ifndef ZSTD_NODECOMPRESS
DISPLAY( "--test : test compressed file integrity \n");
@@ -256,7 +261,10 @@
nextArgumentIsDictID=0,
nextArgumentsAreFiles=0,
ultra=0,
- lastCommand = 0;
+ lastCommand = 0,
+ nbThreads = 1;
+ unsigned bench_nbSeconds = 3; /* would be better if this value was synchronized from bench */
+ size_t blockSize = 0;
zstd_operation_mode operation = zom_compress;
ZSTD_compressionParameters compressionParams;
int cLevel = ZSTDCLI_CLEVEL_DEFAULT;
@@ -393,7 +401,7 @@
/* Decoding */
case 'd':
#ifndef ZSTD_NOBENCH
- if (operation==zom_bench) { BMK_setDecodeOnly(1); argument++; break; } /* benchmark decode (hidden option) */
+ if (operation==zom_bench) { BMK_setDecodeOnlyMode(1); argument++; break; } /* benchmark decode (hidden option) */
#endif
operation=zom_decompress; argument++; break;
@@ -437,34 +445,38 @@
#ifndef ZSTD_NOBENCH
/* Benchmark */
- case 'b': operation=zom_bench; argument++; break;
+ case 'b':
+ operation=zom_bench;
+ argument++;
+ break;
/* range bench (benchmark only) */
case 'e':
- /* compression Level */
- argument++;
- cLevelLast = readU32FromChar(&argument);
- break;
+ /* compression Level */
+ argument++;
+ cLevelLast = readU32FromChar(&argument);
+ break;
/* Modify Nb Iterations (benchmark only) */
case 'i':
argument++;
- { U32 const iters = readU32FromChar(&argument);
- BMK_setNotificationLevel(displayLevel);
- BMK_SetNbSeconds(iters);
- }
+ bench_nbSeconds = readU32FromChar(&argument);
break;
/* cut input into blocks (benchmark only) */
case 'B':
argument++;
- { size_t const bSize = readU32FromChar(&argument);
- BMK_setNotificationLevel(displayLevel);
- BMK_SetBlockSize(bSize);
- }
+ blockSize = readU32FromChar(&argument);
break;
+
#endif /* ZSTD_NOBENCH */
+ /* nb of threads (hidden option) */
+ case 'T':
+ argument++;
+ nbThreads = readU32FromChar(&argument);
+ break;
+
/* Dictionary Selection level */
case 's':
argument++;
@@ -553,6 +565,9 @@
if (operation==zom_bench) {
#ifndef ZSTD_NOBENCH
BMK_setNotificationLevel(displayLevel);
+ BMK_setBlockSize(blockSize);
+ BMK_setNbThreads(nbThreads);
+ BMK_setNbSeconds(bench_nbSeconds);
BMK_benchFiles(filenameTable, filenameIdx, dictFileName, cLevel, cLevelLast, &compressionParams);
#endif
goto _end;
@@ -603,7 +618,7 @@
} }
#endif
- /* No warning message in pipe mode (stdin + stdout) or multi-files mode */
+ /* No status message in pipe mode (stdin - stdout) or multi-files mode */
if (!strcmp(filenameTable[0], stdinmark) && outFileName && !strcmp(outFileName,stdoutmark) && (displayLevel==2)) displayLevel=1;
if ((filenameIdx>1) & (displayLevel==2)) displayLevel=1;
@@ -611,6 +626,8 @@
FIO_setNotificationLevel(displayLevel);
if (operation==zom_compress) {
#ifndef ZSTD_NOCOMPRESS
+ FIO_setNbThreads(nbThreads);
+ FIO_setBlockSize((U32)blockSize);
if ((filenameIdx==1) && outFileName)
operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams);
else
diff --git a/tests/.gitignore b/tests/.gitignore
index e932ad9..5352023 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -6,6 +6,7 @@
fuzzer-dll
zbufftest
zbufftest32
+zbufftest-dll
zstreamtest
zstreamtest32
zstreamtest-dll
@@ -15,6 +16,7 @@
roundTripCrash
longmatch
symbols
+pool
invalidDictionaries
# Tmp test directory
diff --git a/tests/Makefile b/tests/Makefile
index 15fdc77..f49d230 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -26,10 +26,10 @@
TESTARTEFACT := versionsTest namespaceTest
-CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/dictBuilder -I$(ZSTDDIR)/deprecated -I$(PRGDIR)
+CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/compress -I$(ZSTDDIR)/dictBuilder -I$(ZSTDDIR)/deprecated -I$(PRGDIR)
CFLAGS ?= -O3
-CFLAGS += -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \
- -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef
+CFLAGS += -g -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \
+ -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef
CFLAGS += $(MOREFLAGS)
FLAGS = $(CPPFLAGS) $(CFLAGS) $(LDFLAGS)
@@ -46,14 +46,16 @@
# Define *.exe as extension for Windows systems
ifneq (,$(filter Windows%,$(OS)))
EXT =.exe
+MULTITHREAD = -DZSTD_MULTITHREAD
else
EXT =
+MULTITHREAD = -pthread -DZSTD_MULTITHREAD
endif
VOID = /dev/null
ZSTREAM_TESTTIME = -T2mn
-FUZZERTEST= -T5mn
-ZSTDRTTEST= --test-large-data
+FUZZERTEST ?= -T5mn
+ZSTDRTTEST = --test-large-data
.PHONY: default all all32 dll clean test test32 test-all namespaceTest versionsTest
@@ -122,10 +124,10 @@
$(CC) $(CPPFLAGS) $(CFLAGS) $^ $(LDFLAGS) -o $@$(EXT)
zstreamtest : $(ZSTD_FILES) $(ZDICT_FILES) $(PRGDIR)/datagen.c zstreamtest.c
- $(CC) $(FLAGS) $^ -o $@$(EXT)
+ $(CC) $(FLAGS) $(MULTITHREAD) $^ -o $@$(EXT)
zstreamtest32 : $(ZSTD_FILES) $(ZDICT_FILES) $(PRGDIR)/datagen.c zstreamtest.c
- $(CC) -m32 $(FLAGS) $^ -o $@$(EXT)
+ $(CC) -m32 $(FLAGS) $(MULTITHREAD) $^ -o $@$(EXT)
zstreamtest-dll : LDFLAGS+= -L$(ZSTDDIR) -lzstd
zstreamtest-dll : $(ZSTDDIR)/common/xxhash.c $(PRGDIR)/datagen.c zstreamtest.c
@@ -156,6 +158,9 @@
$(CC) $(FLAGS) $^ -o $@$(EXT) -Wl,-rpath=$(ZSTDDIR) $(ZSTDDIR)/libzstd.so
endif
+pool : pool.c $(ZSTDDIR)/common/pool.c $(ZSTDDIR)/common/threading.c
+ $(CC) $(FLAGS) $(MULTITHREAD) $^ -o $@$(EXT)
+
namespaceTest:
if $(CC) namespaceTest.c ../lib/common/xxhash.c -o $@ ; then echo compilation should fail; exit 1 ; fi
$(RM) $@
@@ -174,7 +179,7 @@
fuzzer-dll$(EXT) zstreamtest-dll$(EXT) zbufftest-dll$(EXT)\
zstreamtest$(EXT) zstreamtest32$(EXT) \
datagen$(EXT) paramgrill$(EXT) roundTripCrash$(EXT) longmatch$(EXT) \
- symbols$(EXT) invalidDictionaries$(EXT)
+ symbols$(EXT) invalidDictionaries$(EXT) pool$(EXT)
@echo Cleaning completed
@@ -220,7 +225,7 @@
file $(ZSTD)
ZSTD="$(QEMU_SYS) $(ZSTD)" ./playTests.sh $(ZSTDRTTEST)
-test: test-zstd test-fullbench test-fuzzer test-zstream test-longmatch test-invalidDictionaries
+test: test-zstd test-fullbench test-fuzzer test-zstream test-longmatch test-invalidDictionaries test-pool
test32: test-zstd32 test-fullbench32 test-fuzzer32 test-zstream32
@@ -286,4 +291,7 @@
test-symbols: symbols
$(QEMU_SYS) ./symbols
+test-pool: pool
+ $(QEMU_SYS) ./pool
+
endif
diff --git a/tests/fuzzer.c b/tests/fuzzer.c
index 00cfb05..60546c0 100644
--- a/tests/fuzzer.c
+++ b/tests/fuzzer.c
@@ -755,6 +755,7 @@
CHECK (ZSTD_isError(errorCode), "ZSTD_copyCCtx error : %s", ZSTD_getErrorName(errorCode));
} }
XXH64_reset(&xxhState, 0);
+ ZSTD_setCCtxParameter(ctx, ZSTD_p_forceWindow, FUZ_rand(&lseed) & 1);
{ U32 const nbChunks = (FUZ_rand(&lseed) & 127) + 2;
U32 n;
for (totalTestSize=0, cSize=0, n=0 ; n<nbChunks ; n++) {
diff --git a/tests/playTests.sh b/tests/playTests.sh
index 5bb882a..35731f9 100755
--- a/tests/playTests.sh
+++ b/tests/playTests.sh
@@ -135,14 +135,14 @@
cat hello.zstd world.zstd > helloworld.zstd
$ZSTD -dc helloworld.zstd > result.tmp
cat result.tmp
-sdiff helloworld.tmp result.tmp
+$DIFF helloworld.tmp result.tmp
$ECHO "frame concatenation without checksum"
$ZSTD -c hello.tmp > hello.zstd --no-check
$ZSTD -c world.tmp > world.zstd --no-check
cat hello.zstd world.zstd > helloworld.zstd
$ZSTD -dc helloworld.zstd > result.tmp
cat result.tmp
-sdiff helloworld.tmp result.tmp
+$DIFF helloworld.tmp result.tmp
rm ./*.tmp ./*.zstd
$ECHO "frame concatenation tests completed"
diff --git a/tests/pool.c b/tests/pool.c
new file mode 100644
index 0000000..adc5947
--- /dev/null
+++ b/tests/pool.c
@@ -0,0 +1,70 @@
+#include "pool.h"
+#include "threading.h"
+#include <stddef.h>
+#include <stdio.h>
+
+#define ASSERT_TRUE(p) \
+ do { \
+ if (!(p)) { \
+ return 1; \
+ } \
+ } while (0)
+#define ASSERT_FALSE(p) ASSERT_TRUE(!(p))
+#define ASSERT_EQ(lhs, rhs) ASSERT_TRUE((lhs) == (rhs))
+
+struct data {
+ pthread_mutex_t mutex;
+ unsigned data[16];
+ size_t i;
+};
+
+void fn(void *opaque) {
+ struct data *data = (struct data *)opaque;
+ pthread_mutex_lock(&data->mutex);
+ data->data[data->i] = data->i;
+ ++data->i;
+ pthread_mutex_unlock(&data->mutex);
+}
+
+int testOrder(size_t numThreads, size_t queueSize) {
+ struct data data;
+ POOL_ctx *ctx = POOL_create(numThreads, queueSize);
+ ASSERT_TRUE(ctx);
+ data.i = 0;
+ pthread_mutex_init(&data.mutex, NULL);
+ {
+ size_t i;
+ for (i = 0; i < 16; ++i) {
+ POOL_add(ctx, &fn, &data);
+ }
+ }
+ POOL_free(ctx);
+ ASSERT_EQ(16, data.i);
+ {
+ size_t i;
+ for (i = 0; i < data.i; ++i) {
+ ASSERT_EQ(i, data.data[i]);
+ }
+ }
+ pthread_mutex_destroy(&data.mutex);
+ return 0;
+}
+
+int main(int argc, const char **argv) {
+ size_t numThreads;
+ for (numThreads = 1; numThreads <= 4; ++numThreads) {
+ size_t queueSize;
+ for (queueSize = 1; queueSize <= 2; ++queueSize) {
+ if (testOrder(numThreads, queueSize)) {
+ printf("FAIL: testOrder\n");
+ return 1;
+ }
+ }
+ }
+ printf("PASS: testOrder\n");
+ (void)argc;
+ (void)argv;
+ return (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n"), 1
+ : printf("PASS: testInvalid\n"), 0;
+ return 0;
+}
diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c
index 4024e5e..bef8734 100644
--- a/tests/zstreamtest.c
+++ b/tests/zstreamtest.c
@@ -29,6 +29,7 @@
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_maxCLevel, ZSTD_customMem, ZSTD_getDictID_fromFrame */
#include "zstd.h" /* ZSTD_compressBound */
#include "zstd_errors.h" /* ZSTD_error_srcSize_wrong */
+#include "zstdmt_compress.h"
#include "zdict.h" /* ZDICT_trainFromBuffer */
#include "datagen.h" /* RDG_genBuffer */
#define XXH_STATIC_LINKING_ONLY /* XXH64_state_t */
@@ -184,7 +185,7 @@
cSize = skippableFrameSize + 8;
/* Basic compression test */
- DISPLAYLEVEL(4, "test%3i : compress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
+ DISPLAYLEVEL(3, "test%3i : compress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
ZSTD_initCStream_usingDict(zc, CNBuffer, 128 KB, 1);
outBuff.dst = (char*)(compressedBuffer)+cSize;
outBuff.size = compressedBufferSize;
@@ -198,16 +199,16 @@
{ size_t const r = ZSTD_endStream(zc, &outBuff);
if (r != 0) goto _output_error; } /* error, or some data not flushed */
cSize += outBuff.pos;
- DISPLAYLEVEL(4, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100);
+ DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100);
- DISPLAYLEVEL(4, "test%3i : check CStream size : ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : check CStream size : ", testNb++);
{ size_t const s = ZSTD_sizeof_CStream(zc);
if (ZSTD_isError(s)) goto _output_error;
- DISPLAYLEVEL(4, "OK (%u bytes) \n", (U32)s);
+ DISPLAYLEVEL(3, "OK (%u bytes) \n", (U32)s);
}
/* skippable frame test */
- DISPLAYLEVEL(4, "test%3i : decompress skippable frame : ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : decompress skippable frame : ", testNb++);
ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
inBuff.src = compressedBuffer;
inBuff.size = cSize;
@@ -218,45 +219,45 @@
{ size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (r != 0) goto _output_error; }
if (outBuff.pos != 0) goto _output_error; /* skippable frame len is 0 */
- DISPLAYLEVEL(4, "OK \n");
+ DISPLAYLEVEL(3, "OK \n");
/* Basic decompression test */
inBuff2 = inBuff;
- DISPLAYLEVEL(4, "test%3i : decompress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
+ DISPLAYLEVEL(3, "test%3i : decompress %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
- { size_t const r = ZSTD_setDStreamParameter(zd, ZSTDdsp_maxWindowSize, 1000000000); /* large limit */
+ { size_t const r = ZSTD_setDStreamParameter(zd, DStream_p_maxWindowSize, 1000000000); /* large limit */
if (ZSTD_isError(r)) goto _output_error; }
{ size_t const remaining = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (remaining != 0) goto _output_error; } /* should reach end of frame == 0; otherwise, some data left, or an error */
if (outBuff.pos != CNBufferSize) goto _output_error; /* should regenerate the same amount */
if (inBuff.pos != inBuff.size) goto _output_error; /* should have read the entire frame */
- DISPLAYLEVEL(4, "OK \n");
+ DISPLAYLEVEL(3, "OK \n");
/* Re-use without init */
- DISPLAYLEVEL(4, "test%3i : decompress again without init (re-use previous settings): ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : decompress again without init (re-use previous settings): ", testNb++);
outBuff.pos = 0;
{ size_t const remaining = ZSTD_decompressStream(zd, &outBuff, &inBuff2);
if (remaining != 0) goto _output_error; } /* should reach end of frame == 0; otherwise, some data left, or an error */
if (outBuff.pos != CNBufferSize) goto _output_error; /* should regenerate the same amount */
if (inBuff.pos != inBuff.size) goto _output_error; /* should have read the entire frame */
- DISPLAYLEVEL(4, "OK \n");
+ DISPLAYLEVEL(3, "OK \n");
/* check regenerated data is byte exact */
- DISPLAYLEVEL(4, "test%3i : check decompressed result : ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : check decompressed result : ", testNb++);
{ size_t i;
for (i=0; i<CNBufferSize; i++) {
if (((BYTE*)decodedBuffer)[i] != ((BYTE*)CNBuffer)[i]) goto _output_error;
} }
- DISPLAYLEVEL(4, "OK \n");
+ DISPLAYLEVEL(3, "OK \n");
- DISPLAYLEVEL(4, "test%3i : check DStream size : ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : check DStream size : ", testNb++);
{ size_t const s = ZSTD_sizeof_DStream(zd);
if (ZSTD_isError(s)) goto _output_error;
- DISPLAYLEVEL(4, "OK (%u bytes) \n", (U32)s);
+ DISPLAYLEVEL(3, "OK (%u bytes) \n", (U32)s);
}
/* Byte-by-byte decompression test */
- DISPLAYLEVEL(4, "test%3i : decompress byte-by-byte : ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : decompress byte-by-byte : ", testNb++);
{ /* skippable frame */
size_t r = 1;
ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
@@ -282,18 +283,18 @@
}
if (outBuff.pos != CNBufferSize) goto _output_error; /* should regenerate the same amount */
if (inBuff.pos != cSize) goto _output_error; /* should have read the entire frame */
- DISPLAYLEVEL(4, "OK \n");
+ DISPLAYLEVEL(3, "OK \n");
/* check regenerated data is byte exact */
- DISPLAYLEVEL(4, "test%3i : check decompressed result : ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : check decompressed result : ", testNb++);
{ size_t i;
for (i=0; i<CNBufferSize; i++) {
if (((BYTE*)decodedBuffer)[i] != ((BYTE*)CNBuffer)[i]) goto _output_error;;
} }
- DISPLAYLEVEL(4, "OK \n");
+ DISPLAYLEVEL(3, "OK \n");
/* _srcSize compression test */
- DISPLAYLEVEL(4, "test%3i : compress_srcSize %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
+ DISPLAYLEVEL(3, "test%3i : compress_srcSize %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH);
ZSTD_initCStream_srcSize(zc, 1, CNBufferSize);
outBuff.dst = (char*)(compressedBuffer);
outBuff.size = compressedBufferSize;
@@ -308,10 +309,10 @@
if (r != 0) goto _output_error; } /* error, or some data not flushed */
{ unsigned long long origSize = ZSTD_getDecompressedSize(outBuff.dst, outBuff.pos);
if ((size_t)origSize != CNBufferSize) goto _output_error; } /* exact original size must be present */
- DISPLAYLEVEL(4, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100);
+ DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/COMPRESSIBLE_NOISE_LENGTH*100);
/* wrong _srcSize compression test */
- DISPLAYLEVEL(4, "test%3i : wrong srcSize : %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH-1);
+ DISPLAYLEVEL(3, "test%3i : wrong srcSize : %u bytes : ", testNb++, COMPRESSIBLE_NOISE_LENGTH-1);
ZSTD_initCStream_srcSize(zc, 1, CNBufferSize-1);
outBuff.dst = (char*)(compressedBuffer);
outBuff.size = compressedBufferSize;
@@ -324,10 +325,10 @@
if (inBuff.pos != inBuff.size) goto _output_error; /* entire input should be consumed */
{ size_t const r = ZSTD_endStream(zc, &outBuff);
if (ZSTD_getErrorCode(r) != ZSTD_error_srcSize_wrong) goto _output_error; /* must fail : wrong srcSize */
- DISPLAYLEVEL(4, "OK (error detected : %s) \n", ZSTD_getErrorName(r)); }
+ DISPLAYLEVEL(3, "OK (error detected : %s) \n", ZSTD_getErrorName(r)); }
/* Complex context re-use scenario */
- DISPLAYLEVEL(4, "test%3i : context re-use : ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : context re-use : ", testNb++);
ZSTD_freeCStream(zc);
zc = ZSTD_createCStream_advanced(customMem);
if (zc==NULL) goto _output_error; /* memory allocation issue */
@@ -361,10 +362,10 @@
{ size_t const r = ZSTD_endStream(zc, &outBuff);
if (r != 0) goto _output_error; } /* error, or some data not flushed */
}
- DISPLAYLEVEL(4, "OK \n");
+ DISPLAYLEVEL(3, "OK \n");
/* CDict scenario */
- DISPLAYLEVEL(4, "test%3i : digested dictionary : ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : digested dictionary : ", testNb++);
{ ZSTD_CDict* const cdict = ZSTD_createCDict(dictionary.start, dictionary.filled, 1);
size_t const initError = ZSTD_initCStream_usingCDict(zc, cdict);
if (ZSTD_isError(initError)) goto _output_error;
@@ -382,13 +383,13 @@
if (r != 0) goto _output_error; } /* error, or some data not flushed */
cSize = outBuff.pos;
ZSTD_freeCDict(cdict);
- DISPLAYLEVEL(4, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/CNBufferSize*100);
+ DISPLAYLEVEL(3, "OK (%u bytes : %.2f%%)\n", (U32)cSize, (double)cSize/CNBufferSize*100);
}
- DISPLAYLEVEL(4, "test%3i : check CStream size : ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : check CStream size : ", testNb++);
{ size_t const s = ZSTD_sizeof_CStream(zc);
if (ZSTD_isError(s)) goto _output_error;
- DISPLAYLEVEL(4, "OK (%u bytes) \n", (U32)s);
+ DISPLAYLEVEL(3, "OK (%u bytes) \n", (U32)s);
}
DISPLAYLEVEL(4, "test%3i : check Dictionary ID : ", testNb++);
@@ -398,7 +399,7 @@
}
/* DDict scenario */
- DISPLAYLEVEL(4, "test%3i : decompress %u bytes with digested dictionary : ", testNb++, (U32)CNBufferSize);
+ DISPLAYLEVEL(3, "test%3i : decompress %u bytes with digested dictionary : ", testNb++, (U32)CNBufferSize);
{ ZSTD_DDict* const ddict = ZSTD_createDDict(dictionary.start, dictionary.filled);
size_t const initError = ZSTD_initDStream_usingDDict(zd, ddict);
if (ZSTD_isError(initError)) goto _output_error;
@@ -413,19 +414,19 @@
if (outBuff.pos != CNBufferSize) goto _output_error; /* should regenerate the same amount */
if (inBuff.pos != inBuff.size) goto _output_error; /* should have read the entire frame */
ZSTD_freeDDict(ddict);
- DISPLAYLEVEL(4, "OK \n");
+ DISPLAYLEVEL(3, "OK \n");
}
/* test ZSTD_setDStreamParameter() resilience */
- DISPLAYLEVEL(4, "test%3i : wrong parameter for ZSTD_setDStreamParameter(): ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : wrong parameter for ZSTD_setDStreamParameter(): ", testNb++);
{ size_t const r = ZSTD_setDStreamParameter(zd, (ZSTD_DStreamParameter_e)999, 1); /* large limit */
if (!ZSTD_isError(r)) goto _output_error; }
- DISPLAYLEVEL(4, "OK \n");
+ DISPLAYLEVEL(3, "OK \n");
/* Memory restriction */
- DISPLAYLEVEL(4, "test%3i : maxWindowSize < frame requirement : ", testNb++);
+ DISPLAYLEVEL(3, "test%3i : maxWindowSize < frame requirement : ", testNb++);
ZSTD_initDStream_usingDict(zd, CNBuffer, 128 KB);
- { size_t const r = ZSTD_setDStreamParameter(zd, ZSTDdsp_maxWindowSize, 1000); /* too small limit */
+ { size_t const r = ZSTD_setDStreamParameter(zd, DStream_p_maxWindowSize, 1000); /* too small limit */
if (ZSTD_isError(r)) goto _output_error; }
inBuff.src = compressedBuffer;
inBuff.size = cSize;
@@ -435,7 +436,7 @@
outBuff.pos = 0;
{ size_t const r = ZSTD_decompressStream(zd, &outBuff, &inBuff);
if (!ZSTD_isError(r)) goto _output_error; /* must fail : frame requires > 100 bytes */
- DISPLAYLEVEL(4, "OK (%s)\n", ZSTD_getErrorName(r)); }
+ DISPLAYLEVEL(3, "OK (%s)\n", ZSTD_getErrorName(r)); }
_end:
@@ -464,6 +465,11 @@
for (u=0; u<max; u++) {
if (b1[u] != b2[u]) break;
}
+ DISPLAY("Error at position %u / %u \n", (U32)u, (U32)max);
+ DISPLAY(" %02X %02X %02X :%02X: %02X %02X %02X %02X %02X \n",
+ b1[u-3], b1[u-2], b1[u-1], b1[u-0], b1[u+1], b1[u+2], b1[u+3], b1[u+4], b1[u+5]);
+ DISPLAY(" %02X %02X %02X :%02X: %02X %02X %02X %02X %02X \n",
+ b2[u-3], b2[u-2], b2[u-1], b2[u-0], b2[u+1], b2[u+2], b2[u+3], b2[u+4], b2[u+5]);
return u;
}
@@ -712,6 +718,248 @@
}
+/* Multi-threading version of fuzzer Tests */
+static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double compressibility)
+{
+ static const U32 maxSrcLog = 24;
+ static const U32 maxSampleLog = 19;
+ size_t const srcBufferSize = (size_t)1<<maxSrcLog;
+ BYTE* cNoiseBuffer[5];
+ size_t const copyBufferSize= srcBufferSize + (1<<maxSampleLog);
+ BYTE* const copyBuffer = (BYTE*)malloc (copyBufferSize);
+ size_t const cBufferSize = ZSTD_compressBound(srcBufferSize);
+ BYTE* const cBuffer = (BYTE*)malloc (cBufferSize);
+ size_t const dstBufferSize = srcBufferSize;
+ BYTE* const dstBuffer = (BYTE*)malloc (dstBufferSize);
+ U32 result = 0;
+ U32 testNb = 0;
+ U32 coreSeed = seed;
+ ZSTDMT_CCtx* zc = ZSTDMT_createCCtx(2); /* will be reset sometimes */
+ ZSTD_DStream* zd = ZSTD_createDStream(); /* will be reset sometimes */
+ ZSTD_DStream* const zd_noise = ZSTD_createDStream();
+ clock_t const startClock = clock();
+ const BYTE* dict=NULL; /* can keep same dict on 2 consecutive tests */
+ size_t dictSize = 0;
+ U32 oldTestLog = 0;
+
+ /* allocations */
+ cNoiseBuffer[0] = (BYTE*)malloc (srcBufferSize);
+ cNoiseBuffer[1] = (BYTE*)malloc (srcBufferSize);
+ cNoiseBuffer[2] = (BYTE*)malloc (srcBufferSize);
+ cNoiseBuffer[3] = (BYTE*)malloc (srcBufferSize);
+ cNoiseBuffer[4] = (BYTE*)malloc (srcBufferSize);
+ CHECK (!cNoiseBuffer[0] || !cNoiseBuffer[1] || !cNoiseBuffer[2] || !cNoiseBuffer[3] || !cNoiseBuffer[4] ||
+ !copyBuffer || !dstBuffer || !cBuffer || !zc || !zd || !zd_noise ,
+ "Not enough memory, fuzzer tests cancelled");
+
+ /* Create initial samples */
+ RDG_genBuffer(cNoiseBuffer[0], srcBufferSize, 0.00, 0., coreSeed); /* pure noise */
+ RDG_genBuffer(cNoiseBuffer[1], srcBufferSize, 0.05, 0., coreSeed); /* barely compressible */
+ RDG_genBuffer(cNoiseBuffer[2], srcBufferSize, compressibility, 0., coreSeed);
+ RDG_genBuffer(cNoiseBuffer[3], srcBufferSize, 0.95, 0., coreSeed); /* highly compressible */
+ RDG_genBuffer(cNoiseBuffer[4], srcBufferSize, 1.00, 0., coreSeed); /* sparse content */
+ memset(copyBuffer, 0x65, copyBufferSize); /* make copyBuffer considered initialized */
+ ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */
+
+ /* catch up testNb */
+ for (testNb=1; testNb < startTest; testNb++)
+ FUZ_rand(&coreSeed);
+
+ /* test loop */
+ for ( ; (testNb <= nbTests) || (FUZ_GetClockSpan(startClock) < g_clockTime) ; testNb++ ) {
+ U32 lseed;
+ const BYTE* srcBuffer;
+ size_t totalTestSize, totalGenSize, cSize;
+ XXH64_state_t xxhState;
+ U64 crcOrig;
+ U32 resetAllowed = 1;
+ size_t maxTestSize;
+
+ /* init */
+ if (nbTests >= testNb) { DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); }
+ else { DISPLAYUPDATE(2, "\r%6u ", testNb); }
+ FUZ_rand(&coreSeed);
+ lseed = coreSeed ^ prime32;
+
+ /* states full reset (deliberately not synchronized) */
+ /* some issues can only happen when reusing states */
+ if ((FUZ_rand(&lseed) & 0xFF) == 131) {
+ U32 const nbThreads = (FUZ_rand(&lseed) % 6) + 1;
+ ZSTDMT_freeCCtx(zc);
+ zc = ZSTDMT_createCCtx(nbThreads);
+ resetAllowed=0;
+ }
+ if ((FUZ_rand(&lseed) & 0xFF) == 132) {
+ ZSTD_freeDStream(zd);
+ zd = ZSTD_createDStream();
+ ZSTD_initDStream_usingDict(zd, NULL, 0); /* ensure at least one init */
+ }
+
+ /* srcBuffer selection [0-4] */
+ { U32 buffNb = FUZ_rand(&lseed) & 0x7F;
+ if (buffNb & 7) buffNb=2; /* most common : compressible (P) */
+ else {
+ buffNb >>= 3;
+ if (buffNb & 7) {
+ const U32 tnb[2] = { 1, 3 }; /* barely/highly compressible */
+ buffNb = tnb[buffNb >> 3];
+ } else {
+ const U32 tnb[2] = { 0, 4 }; /* not compressible / sparse */
+ buffNb = tnb[buffNb >> 3];
+ } }
+ srcBuffer = cNoiseBuffer[buffNb];
+ }
+
+ /* compression init */
+ if ((FUZ_rand(&lseed)&1) /* at beginning, to keep same nb of rand */
+ && oldTestLog /* at least one test happened */ && resetAllowed) {
+ maxTestSize = FUZ_randomLength(&lseed, oldTestLog+2);
+ if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1;
+ { int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
+ size_t const resetError = ZSTDMT_initCStream(zc, compressionLevel);
+ CHECK(ZSTD_isError(resetError), "ZSTDMT_initCStream error : %s", ZSTD_getErrorName(resetError));
+ }
+ } else {
+ U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
+ U32 const cLevel = (FUZ_rand(&lseed) % (ZSTD_maxCLevel() - (testLog/3))) + 1;
+ maxTestSize = FUZ_rLogLength(&lseed, testLog);
+ oldTestLog = testLog;
+ /* random dictionary selection */
+ dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_randomLength(&lseed, maxSampleLog) : 0;
+ { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize);
+ dict = srcBuffer + dictStart;
+ }
+ { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? 0 : maxTestSize;
+ ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
+ DISPLAYLEVEL(5, "Init with windowLog = %u \n", params.cParams.windowLog);
+ params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
+ params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
+ { size_t const initError = ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize);
+ CHECK (ZSTD_isError(initError),"ZSTDMT_initCStream_advanced error : %s", ZSTD_getErrorName(initError));
+ } } }
+
+ /* multi-segments compression test */
+ XXH64_reset(&xxhState, 0);
+ { ZSTD_outBuffer outBuff = { cBuffer, cBufferSize, 0 } ;
+ U32 n;
+ for (n=0, cSize=0, totalTestSize=0 ; totalTestSize < maxTestSize ; n++) {
+ /* compress random chunks into randomly sized dst buffers */
+ { size_t const randomSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
+ size_t const srcSize = MIN (maxTestSize-totalTestSize, randomSrcSize);
+ size_t const srcStart = FUZ_rand(&lseed) % (srcBufferSize - srcSize);
+ size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
+ size_t const dstBuffSize = MIN(cBufferSize - cSize, randomDstSize);
+ ZSTD_inBuffer inBuff = { srcBuffer+srcStart, srcSize, 0 };
+ outBuff.size = outBuff.pos + dstBuffSize;
+
+ DISPLAYLEVEL(5, "Sending %u bytes to compress \n", (U32)srcSize);
+ { size_t const compressionError = ZSTDMT_compressStream(zc, &outBuff, &inBuff);
+ CHECK (ZSTD_isError(compressionError), "compression error : %s", ZSTD_getErrorName(compressionError)); }
+ DISPLAYLEVEL(5, "%u bytes read by ZSTDMT_compressStream \n", (U32)inBuff.pos);
+
+ XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
+ memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
+ totalTestSize += inBuff.pos;
+ }
+
+ /* random flush operation, to mess around */
+ if ((FUZ_rand(&lseed) & 15) == 0) {
+ size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
+ size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
+ outBuff.size = outBuff.pos + adjustedDstSize;
+ DISPLAYLEVEL(5, "Flushing into dst buffer of size %u \n", (U32)adjustedDstSize);
+ { size_t const flushError = ZSTDMT_flushStream(zc, &outBuff);
+ CHECK (ZSTD_isError(flushError), "flush error : %s", ZSTD_getErrorName(flushError));
+ } } }
+
+ /* final frame epilogue */
+ { size_t remainingToFlush = (size_t)(-1);
+ while (remainingToFlush) {
+ size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
+ size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize);
+ outBuff.size = outBuff.pos + adjustedDstSize;
+ DISPLAYLEVEL(5, "Ending into dst buffer of size %u \n", (U32)adjustedDstSize);
+ remainingToFlush = ZSTDMT_endStream(zc, &outBuff);
+ CHECK (ZSTD_isError(remainingToFlush), "flush error : %s", ZSTD_getErrorName(remainingToFlush));
+ DISPLAYLEVEL(5, "endStream : remainingToFlush : %u \n", (U32)remainingToFlush);
+ } }
+ DISPLAYLEVEL(5, "Frame completed \n");
+ crcOrig = XXH64_digest(&xxhState);
+ cSize = outBuff.pos;
+ }
+
+ /* multi - fragments decompression test */
+ if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) {
+ CHECK (ZSTD_isError(ZSTD_resetDStream(zd)), "ZSTD_resetDStream failed");
+ } else {
+ ZSTD_initDStream_usingDict(zd, dict, dictSize);
+ }
+ { size_t decompressionResult = 1;
+ ZSTD_inBuffer inBuff = { cBuffer, cSize, 0 };
+ ZSTD_outBuffer outBuff= { dstBuffer, dstBufferSize, 0 };
+ for (totalGenSize = 0 ; decompressionResult ; ) {
+ size_t const readCSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
+ size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
+ size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
+ inBuff.size = inBuff.pos + readCSrcSize;
+ outBuff.size = inBuff.pos + dstBuffSize;
+ decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff);
+ CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult));
+ }
+ CHECK (outBuff.pos != totalTestSize, "decompressed data : wrong size (%u != %u)", (U32)outBuff.pos, (U32)totalTestSize);
+ CHECK (inBuff.pos != cSize, "compressed data should be fully read (%u != %u)", (U32)inBuff.pos, (U32)cSize);
+ { U64 const crcDest = XXH64(dstBuffer, totalTestSize, 0);
+ if (crcDest!=crcOrig) findDiff(copyBuffer, dstBuffer, totalTestSize);
+ CHECK (crcDest!=crcOrig, "decompressed data corrupted");
+ } }
+
+ /*===== noisy/erroneous src decompression test =====*/
+
+ /* add some noise */
+ { U32 const nbNoiseChunks = (FUZ_rand(&lseed) & 7) + 2;
+ U32 nn; for (nn=0; nn<nbNoiseChunks; nn++) {
+ size_t const randomNoiseSize = FUZ_randomLength(&lseed, maxSampleLog);
+ size_t const noiseSize = MIN((cSize/3) , randomNoiseSize);
+ size_t const noiseStart = FUZ_rand(&lseed) % (srcBufferSize - noiseSize);
+ size_t const cStart = FUZ_rand(&lseed) % (cSize - noiseSize);
+ memcpy(cBuffer+cStart, srcBuffer+noiseStart, noiseSize);
+ } }
+
+ /* try decompression on noisy data */
+ ZSTD_initDStream(zd_noise); /* note : no dictionary */
+ { ZSTD_inBuffer inBuff = { cBuffer, cSize, 0 };
+ ZSTD_outBuffer outBuff= { dstBuffer, dstBufferSize, 0 };
+ while (outBuff.pos < dstBufferSize) {
+ size_t const randomCSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
+ size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
+ size_t const adjustedDstSize = MIN(dstBufferSize - outBuff.pos, randomDstSize);
+ outBuff.size = outBuff.pos + adjustedDstSize;
+ inBuff.size = inBuff.pos + randomCSrcSize;
+ { size_t const decompressError = ZSTD_decompressStream(zd, &outBuff, &inBuff);
+ if (ZSTD_isError(decompressError)) break; /* error correctly detected */
+ } } } }
+ DISPLAY("\r%u fuzzer tests completed \n", testNb);
+
+_cleanup:
+ ZSTDMT_freeCCtx(zc);
+ ZSTD_freeDStream(zd);
+ ZSTD_freeDStream(zd_noise);
+ free(cNoiseBuffer[0]);
+ free(cNoiseBuffer[1]);
+ free(cNoiseBuffer[2]);
+ free(cNoiseBuffer[3]);
+ free(cNoiseBuffer[4]);
+ free(copyBuffer);
+ free(cBuffer);
+ free(dstBuffer);
+ return result;
+
+_output_error:
+ result = 1;
+ goto _cleanup;
+}
+
+
/*-*******************************************************
* Command line
*********************************************************/
@@ -741,10 +989,11 @@
int testNb = 0;
int proba = FUZ_COMPRESSIBILITY_DEFAULT;
int result=0;
- U32 mainPause = 0;
- const char* programName = argv[0];
- ZSTD_customMem customMem = { allocFunction, freeFunction, NULL };
- ZSTD_customMem customNULL = { NULL, NULL, NULL };
+ int mainPause = 0;
+ int mtOnly = 0;
+ const char* const programName = argv[0];
+ ZSTD_customMem const customMem = { allocFunction, freeFunction, NULL };
+ ZSTD_customMem const customNULL = { NULL, NULL, NULL };
/* Check command line */
for(argNb=1; argNb<argc; argNb++) {
@@ -753,27 +1002,32 @@
/* Parsing commands. Aggregated commands are allowed */
if (argument[0]=='-') {
- argument++;
+ if (!strcmp(argument, "--mt")) { mtOnly=1; continue; }
+
+ argument++;
while (*argument!=0) {
switch(*argument)
{
case 'h':
return FUZ_usage(programName);
+
case 'v':
argument++;
- g_displayLevel=4;
+ g_displayLevel++;
break;
+
case 'q':
argument++;
g_displayLevel--;
break;
+
case 'p': /* pause at the end */
argument++;
mainPause = 1;
break;
- case 'i':
+ case 'i': /* limit tests by nb of iterations (default) */
argument++;
nbTests=0; g_clockTime=0;
while ((*argument>='0') && (*argument<='9')) {
@@ -783,7 +1037,7 @@
}
break;
- case 'T':
+ case 'T': /* limit tests by time */
argument++;
nbTests=0; g_clockTime=0;
while ((*argument>='0') && (*argument<='9')) {
@@ -796,7 +1050,7 @@
g_clockTime *= CLOCKS_PER_SEC;
break;
- case 's':
+ case 's': /* manually select seed */
argument++;
seed=0;
seedset=1;
@@ -807,7 +1061,7 @@
}
break;
- case 't':
+ case 't': /* select starting test number */
argument++;
testNb=0;
while ((*argument>='0') && (*argument<='9')) {
@@ -851,12 +1105,12 @@
if (testNb==0) {
result = basicUnitTests(0, ((double)proba) / 100, customNULL); /* constant seed for predictability */
if (!result) {
- DISPLAYLEVEL(4, "Unit tests using customMem :\n")
+ DISPLAYLEVEL(3, "Unit tests using customMem :\n")
result = basicUnitTests(0, ((double)proba) / 100, customMem); /* use custom memory allocation functions */
} }
- if (!result)
- result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100);
+ if (!result && !mtOnly) result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100);
+ if (!result) result = fuzzerTests_MT(seed, nbTests, testNb, ((double)proba) / 100);
if (mainPause) {
int unused;
diff --git a/zlibWrapper/.gitignore b/zlibWrapper/.gitignore
index 23d2f3a..6167ca4 100644
--- a/zlibWrapper/.gitignore
+++ b/zlibWrapper/.gitignore
@@ -22,4 +22,4 @@
*.txt
# Directories
-minizip/
\ No newline at end of file
+minizip/