Merge pull request #434 from terrelln/dev
Pzstd Improvements
diff --git a/.gitignore b/.gitignore
index c939f12..220a1e0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,3 +32,4 @@
*.swp
.DS_Store
googletest/
+*.d
diff --git a/.travis.yml b/.travis.yml
index 9e2f586..3be4575 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -22,7 +22,7 @@
packages:
- gcc-4.8
- g++-4.8
- env: PLATFORM="Ubuntu 12.04 container" CMD="make zlibwrapper && make clean && make -C tests test-zstd_nolegacy && make clean && make clean && make cmaketest && make clean && make -C contrib/pzstd pzstd && make -C contrib/pzstd googletest && make -C contrib/pzstd test && make -C contrib/pzstd clean"
+ env: PLATFORM="Ubuntu 12.04 container" CMD="make zlibwrapper && make clean && make -C tests test-zstd_nolegacy && make clean && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest && make -C contrib/pzstd all && make -C contrib/pzstd check && make -C contrib/pzstd clean"
- os: linux
sudo: false
env: PLATFORM="Ubuntu 12.04 container" CMD="make usan"
@@ -55,6 +55,20 @@
packages:
- libc6-dev-i386
- gcc-multilib
+ - os: linux
+ sudo: required
+ install:
+ - export CXX="g++-6" CC="gcc-6"
+ - export LDFLAGS="-fuse-ld=gold"
+ - export TESTFLAGS='--gtest_filter=-*ExtremelyLarge*'
+ env: PLATFORM="Ubuntu 12.04" CMD='cd contrib/pzstd && make googletest && make tsan && make check && make clean && make asan && make check && make clean && cd ../..'
+ addons:
+ apt:
+ sources:
+ - ubuntu-toolchain-r-test
+ packages:
+ - gcc-6
+ - g++-6
# Ubuntu 14.04 LTS Server Edition 64 bit
- os: linux
dist: trusty
@@ -69,7 +83,7 @@
sudo: required
install:
- export CXX="g++-4.8" CC="gcc-4.8"
- env: PLATFORM="Ubuntu 14.04" CMD="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean && make clangtest && make clean && make -C contrib/pzstd pzstd32 && make -C contrib/pzstd googletest32 && make -C contrib/pzstd test32 && make -C contrib/pzstd clean"
+ env: PLATFORM="Ubuntu 14.04" CMD="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean && make clangtest && make clean && make -C contrib/pzstd googletest32 && make -C contrib/pzstd all32 && make -C contrib/pzstd check && make -C contrib/pzstd clean"
addons:
apt:
packages:
diff --git a/appveyor.yml b/appveyor.yml
index 6345c7b..fbdc30c 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -50,10 +50,9 @@
ECHO *** &&
ECHO *** Building pzstd for %PLATFORM% &&
ECHO *** &&
- ECHO make -C contrib\pzstd pzstd &&
- make -C contrib\pzstd pzstd &&
make -C contrib\pzstd googletest-mingw64 &&
- make -C contrib\pzstd test &&
+ make -C contrib\pzstd all &&
+ make -C contrib\pzstd check &&
make -C contrib\pzstd clean
)
- if [%COMPILER%]==[gcc] (
diff --git a/contrib/pzstd/Logging.h b/contrib/pzstd/Logging.h
new file mode 100644
index 0000000..76c982a
--- /dev/null
+++ b/contrib/pzstd/Logging.h
@@ -0,0 +1,72 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+#pragma once
+
+#include <cstdio>
+#include <mutex>
+
+namespace pzstd {
+
+constexpr int ERROR = 1;
+constexpr int INFO = 2;
+constexpr int DEBUG = 3;
+constexpr int VERBOSE = 4;
+
+class Logger {
+ std::mutex mutex_;
+ FILE* out_;
+ const int level_;
+
+ using Clock = std::chrono::system_clock;
+ Clock::time_point lastUpdate_;
+ std::chrono::milliseconds refreshRate_;
+
+ public:
+ explicit Logger(int level, FILE* out = stderr)
+ : out_(out), level_(level), lastUpdate_(Clock::now()),
+ refreshRate_(150) {}
+
+
+ bool logsAt(int level) {
+ return level <= level_;
+ }
+
+ template <typename... Args>
+ void operator()(int level, const char *fmt, Args... args) {
+ if (level > level_) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ std::fprintf(out_, fmt, args...);
+ }
+
+ template <typename... Args>
+ void update(int level, const char *fmt, Args... args) {
+ if (level > level_) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto now = Clock::now();
+ if (now - lastUpdate_ > refreshRate_) {
+ lastUpdate_ = now;
+ std::fprintf(out_, "\r");
+ std::fprintf(out_, fmt, args...);
+ }
+ }
+
+ void clear(int level) {
+ if (level > level_) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ std::fprintf(out_, "\r%79s\r", "");
+ }
+};
+
+}
diff --git a/contrib/pzstd/Makefile b/contrib/pzstd/Makefile
index e30be0b..2de5041 100644
--- a/contrib/pzstd/Makefile
+++ b/contrib/pzstd/Makefile
@@ -7,20 +7,71 @@
# of patent rights can be found in the PATENTS file in the same directory.
# ##########################################################################
+# Standard variables for installation
+DESTDIR ?=
+PREFIX ?= /usr/local
+BINDIR := $(DESTDIR)$(PREFIX)/bin
+
ZSTDDIR = ../../lib
PROGDIR = ../../programs
-CPPFLAGS = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
-CXXFLAGS ?= -O3
-CXXFLAGS += -std=c++11
-CXXFLAGS += $(MOREFLAGS)
-FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
+# External program to use to run tests, e.g. qemu or valgrind
+TESTPROG ?=
+# Flags to pass to the tests
+TESTFLAGS ?=
+
+# We use gcc/clang to generate the header dependencies of files
+DEPFLAGS = -MMD -MP -MF $*.Td
+POSTCOMPILE = mv -f $*.Td $*.d
+
+# CFLAGS, CXXFLAGS, CPPFLAGS, and LDFLAGS are for the users to override
+CFLAGS ?= -O3 -Wall -Wextra
+CXXFLAGS ?= -O3 -Wall -Wextra -pedantic -std=c++11
+CPPFLAGS ?=
+LDFLAGS ?=
+
+# Include flags
+PZSTD_INC = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
+GTEST_INC = -isystem googletest/googletest/include
+
+PZSTD_CPPFLAGS = $(PZSTD_INC) $(GTEST_INC)
+PZSTD_CCXXFLAGS =
+PZSTD_CFLAGS = $(PZSTD_CCXXFLAGS)
+PZSTD_CXXFLAGS = $(PZSTD_CCXXFLAGS)
+PZSTD_LDFLAGS =
+EXTRA_FLAGS =
+ALL_CFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CFLAGS) $(PZSTD_CFLAGS)
+ALL_CXXFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CXXFLAGS) $(PZSTD_CXXFLAGS)
+ALL_LDFLAGS = $(EXTRA_FLAGS) $(LDFLAGS) $(PZSTD_LDFLAGS)
-ZSTDCOMMON_FILES := $(ZSTDDIR)/common/*.c
-ZSTDCOMP_FILES := $(ZSTDDIR)/compress/zstd_compress.c $(ZSTDDIR)/compress/fse_compress.c $(ZSTDDIR)/compress/huf_compress.c
-ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/huf_decompress.c
-ZSTD_FILES := $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES)
+# gtest libraries need to go before "-lpthread" because they depend on it.
+GTEST_LIB = -L googletest/build/googlemock/gtest
+LIBS = $(GTEST_LIB) -lpthread
+
+# Compilation commands
+LD_COMMAND = $(CXX) $^ $(ALL_LDFLAGS) $(LIBS) -o $@
+CC_COMMAND = $(CC) $(DEPFLAGS) $(ALL_CFLAGS) -c $< -o $@
+CXX_COMMAND = $(CXX) $(DEPFLAGS) $(ALL_CXXFLAGS) -c $< -o $@
+
+# Get a list of all zstd files so we rebuild the static library when we need to
+ZSTDCOMMON_FILES := $(wildcard $(ZSTDDIR)/common/*.c) \
+ $(wildcard $(ZSTDDIR)/common/*.h)
+ZSTDCOMP_FILES := $(wildcard $(ZSTDDIR)/compress/*.c) \
+ $(wildcard $(ZSTDDIR)/compress/*.h)
+ZSTDDECOMP_FILES := $(wildcard $(ZSTDDIR)/decompress/*.c) \
+ $(wildcard $(ZSTDDIR)/decompress/*.h)
+ZSTDPROG_FILES := $(wildcard $(PROGDIR)/*.c) \
+ $(wildcard $(PROGDIR)/*.h)
+ZSTD_FILES := $(wildcard $(ZSTDDIR)/*.h) \
+ $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) \
+ $(ZSTDPROG_FILES)
+
+# List all the pzstd source files so we can determine their dependencies
+PZSTD_SRCS := $(wildcard *.cpp)
+PZSTD_TESTS := $(wildcard test/*.cpp)
+UTILS_TESTS := $(wildcard utils/test/*.cpp)
+ALL_SRCS := $(PZSTD_SRCS) $(PZSTD_TESTS) $(UTILS_TESTS)
# Define *.exe as extension for Windows systems
@@ -30,89 +81,169 @@
EXT =
endif
-.PHONY: default all test clean test32 googletest googletest32
+# Standard targets
+.PHONY: default
+default: all
-default: pzstd
+.PHONY: check
+check:
+ $(TESTPROG) ./utils/test/BufferTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./utils/test/RangeTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./utils/test/ResourcePoolTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./utils/test/ScopeGuardTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./utils/test/ThreadPoolTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./utils/test/WorkQueueTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./test/OptionsTest$(EXT) $(TESTFLAGS)
+ $(TESTPROG) ./test/PzstdTest$(EXT) $(TESTFLAGS)
-all: pzstd
+.PHONY: install
+install: PZSTD_CPPFLAGS += -DNDEBUG
+install: pzstd$(EXT)
+ install -d -m 755 $(BINDIR)/
+ install -m 755 pzstd$(EXT) $(BINDIR)/pzstd$(EXT)
+
+.PHONY: uninstall
+uninstall:
+ $(RM) $(BINDIR)/pzstd$(EXT)
+
+# Targets for many different builds
+.PHONY: all
+all: PZSTD_CPPFLAGS += -DNDEBUG
+all: pzstd$(EXT) tests roundtrip
+
+.PHONY: debug
+debug: EXTRA_FLAGS += -g
+debug: pzstd$(EXT) tests roundtrip
+
+.PHONY: tsan
+tsan: PZSTD_CCXXFLAGS += -fsanitize=thread -fPIC
+tsan: PZSTD_LDFLAGS += -fsanitize=thread -pie
+tsan: debug
+
+.PHONY: asan
+asan: EXTRA_FLAGS += -fsanitize=address
+asan: debug
+
+.PHONY: ubsan
+ubsan: EXTRA_FLAGS += -fsanitize=undefined
+ubsan: debug
+
+.PHONY: all32
+all32: EXTRA_FLAGS += -m32
+all32: all
+
+.PHONY: debug32
+debug32: EXTRA_FLAGS += -m32
+debug32: debug
+
+.PHONY: asan32
+asan32: EXTRA_FLAGS += -m32
+asan32: asan
+
+.PHONY: tsan32
+tsan32: EXTRA_FLAGS += -m32
+tsan32: tsan
+
+.PHONY: ubsan32
+ubsan32: EXTRA_FLAGS += -m32
+ubsan32: ubsan
+
+# Run long round trip tests
+.PHONY: roundtripcheck
+roundtripcheck: roundtrip check
+ $(TESTPROG) ./test/RoundTripTest$(EXT) $(TESTFLAGS)
+
+# Build the main binary
+pzstd$(EXT): main.o Options.o Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a
+ $(LD_COMMAND)
+
+# Target that depends on all the tests
+.PHONY: tests
+tests: EXTRA_FLAGS += -Wno-deprecated-declarations
+tests: $(patsubst %,%$(EXT),$(basename $(PZSTD_TESTS) $(UTILS_TESTS)))
+
+# Build the round trip tests
+.PHONY: roundtrip
+roundtrip: EXTRA_FLAGS += -Wno-deprecated-declarations
+roundtrip: test/RoundTripTest$(EXT)
+
+# Use the static library that zstd builds for simplicity and
+# so we get the compiler options correct
+$(ZSTDDIR)/libzstd.a: $(ZSTD_FILES)
+ $(MAKE) -C $(ZSTDDIR) libzstd CFLAGS="$(ALL_CFLAGS)" LDFLAGS="$(ALL_LDFLAGS)"
-libzstd.a: $(ZSTD_FILES)
- $(MAKE) -C $(ZSTDDIR) libzstd
- @cp $(ZSTDDIR)/libzstd.a .
+# Rules to build the tests
+test/RoundTripTest$(EXT): test/RoundTripTest.o $(PROGDIR)/datagen.o Options.o \
+ Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a
+ $(LD_COMMAND)
-Pzstd.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h
- $(CXX) $(FLAGS) -c Pzstd.cpp -o $@
+test/%Test$(EXT): GTEST_LIB += -lgtest -lgtest_main
+test/%Test$(EXT): test/%Test.o $(PROGDIR)/datagen.o Options.o Pzstd.o \
+ SkippableFrame.o $(ZSTDDIR)/libzstd.a
+ $(LD_COMMAND)
-SkippableFrame.o: SkippableFrame.h SkippableFrame.cpp utils/*.h
- $(CXX) $(FLAGS) -c SkippableFrame.cpp -o $@
+utils/test/%Test$(EXT): GTEST_LIB += -lgtest -lgtest_main
+utils/test/%Test$(EXT): utils/test/%Test.o
+ $(LD_COMMAND)
-Options.o: Options.h Options.cpp
- $(CXX) $(FLAGS) -c Options.cpp -o $@
-main.o: main.cpp *.h utils/*.h
- $(CXX) $(FLAGS) -c main.cpp -o $@
+GTEST_CMAKEFLAGS =
-pzstd: Pzstd.o SkippableFrame.o Options.o main.o libzstd.a
- $(CXX) $(FLAGS) $^ -o $@$(EXT) -lpthread
-
-libzstd32.a: $(ZSTD_FILES)
- $(MAKE) -C $(ZSTDDIR) libzstd MOREFLAGS="-m32"
- @cp $(ZSTDDIR)/libzstd.a libzstd32.a
-
-Pzstd32.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h
- $(CXX) -m32 $(FLAGS) -c Pzstd.cpp -o $@
-
-SkippableFrame32.o: SkippableFrame.h SkippableFrame.cpp utils/*.h
- $(CXX) -m32 $(FLAGS) -c SkippableFrame.cpp -o $@
-
-Options32.o: Options.h Options.cpp
- $(CXX) -m32 $(FLAGS) -c Options.cpp -o $@
-
-main32.o: main.cpp *.h utils/*.h
- $(CXX) -m32 $(FLAGS) -c main.cpp -o $@
-
-pzstd32: Pzstd32.o SkippableFrame32.o Options32.o main32.o libzstd32.a
- $(CXX) -m32 $(FLAGS) $^ -o $@$(EXT) -lpthread
-
+# Install googletest
+.PHONY: googletest
+googletest: PZSTD_CCXXFLAGS += -fPIC
googletest:
@$(RM) -rf googletest
@git clone https://github.com/google/googletest
@mkdir -p googletest/build
- @cd googletest/build && cmake .. && make
+ @cd googletest/build && cmake $(GTEST_CMAKEFLAGS) -DCMAKE_CXX_FLAGS="$(ALL_CXXFLAGS)" .. && $(MAKE)
-googletest32:
- @$(RM) -rf googletest
- @git clone https://github.com/google/googletest
- @mkdir -p googletest/build
- @cd googletest/build && cmake .. -DCMAKE_CXX_FLAGS=-m32 && make
+.PHONY: googletest32
+googletest32: PZSTD_CCXXFLAGS += -m32
+googletest32: googletest
-googletest-mingw64:
- $(RM) -rf googletest
- git clone https://github.com/google/googletest
- mkdir -p googletest/build
- cd googletest/build && cmake -G "MSYS Makefiles" .. && $(MAKE)
+.PHONY: googletest-mingw64
+googletest-mingw64: GTEST_CMAKEFLAGS += -G "MSYS Makefiles"
+googletest-mingw64: googletest
-test:
- $(MAKE) libzstd.a
- $(MAKE) pzstd MOREFLAGS="-Wall -Wextra -pedantic -Werror"
- $(MAKE) -C utils/test clean
- $(MAKE) -C utils/test test MOREFLAGS="-Wall -Wextra -pedantic -Werror"
- $(MAKE) -C test clean
- $(MAKE) -C test test MOREFLAGS="-Wall -Wextra -pedantic -Werror"
-
-test32:
- $(MAKE) libzstd.a MOREFLAGS="-m32"
- $(MAKE) pzstd MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
- $(MAKE) -C utils/test clean
- $(MAKE) -C utils/test test MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
- $(MAKE) -C test clean
- $(MAKE) -C test test MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
-
-
+.PHONY: clean
clean:
+ $(RM) -f *.o pzstd$(EXT) *.Td *.d
+ $(RM) -f test/*.o test/*Test$(EXT) test/*.Td test/*.d
+ $(RM) -f utils/test/*.o utils/test/*Test$(EXT) utils/test/*.Td utils/test/*.d
+ $(RM) -f $(PROGDIR)/*.o $(PROGDIR)/*.Td $(PROGDIR)/*.d
$(MAKE) -C $(ZSTDDIR) clean
- $(MAKE) -C utils/test clean
- $(MAKE) -C test clean
- @$(RM) -rf libzstd.a *.o pzstd$(EXT) pzstd32$(EXT)
@echo Cleaning completed
+
+
+# Cancel implicit rules
+%.o: %.c
+%.o: %.cpp
+
+# Object file rules
+%.o: %.c
+ $(CC_COMMAND)
+ $(POSTCOMPILE)
+
+$(PROGDIR)/%.o: $(PROGDIR)/%.c
+ $(CC_COMMAND)
+ $(POSTCOMPILE)
+
+%.o: %.cpp
+ $(CXX_COMMAND)
+ $(POSTCOMPILE)
+
+test/%.o: test/%.cpp
+ $(CXX_COMMAND)
+ $(POSTCOMPILE)
+
+utils/test/%.o: utils/test/%.cpp
+ $(CXX_COMMAND)
+ $(POSTCOMPILE)
+
+# Dependency file stuff
+.PRECIOUS: %.d test/%.d utils/test/%.d
+
+# Include rules that specify header file dependencies
+-include $(patsubst %,%.d,$(basename $(ALL_SRCS)))
diff --git a/contrib/pzstd/Options.cpp b/contrib/pzstd/Options.cpp
index ece8c07..18c069e 100644
--- a/contrib/pzstd/Options.cpp
+++ b/contrib/pzstd/Options.cpp
@@ -303,6 +303,12 @@
} // while (*options != 0);
} // for (int i = 1; i < argc; ++i);
+ // Set options for test mode
+ if (test) {
+ outputFile = nullOutput;
+ keepSource = true;
+ }
+
// Input file defaults to standard input if not provided.
if (localInputFiles.empty()) {
localInputFiles.emplace_back(kStdIn);
@@ -399,11 +405,6 @@
verbosity = 1;
}
- // Set options for test mode
- if (test) {
- outputFile = nullOutput;
- keepSource = true;
- }
return Status::Success;
}
diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp
index e0826b9..c5b4ce4 100644
--- a/contrib/pzstd/Pzstd.cpp
+++ b/contrib/pzstd/Pzstd.cpp
@@ -15,6 +15,7 @@
#include "utils/WorkQueue.h"
#include <chrono>
+#include <cinttypes>
#include <cstddef>
#include <cstdio>
#include <memory>
@@ -58,26 +59,24 @@
FILE* inputFd,
const std::string &outputFile,
FILE* outputFd,
- ErrorHolder &errorHolder) {
+ SharedState& state) {
auto inputSize = fileSizeOrZero(inputFile);
// WorkQueue outlives ThreadPool so in the case of error we are certain
- // we don't accidently try to call push() on it after it is destroyed.
+ // we don't accidently try to call push() on it after it is destroyed
WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
std::uint64_t bytesRead;
std::uint64_t bytesWritten;
{
- // Initialize the thread pool with numThreads + 1
- // We add one because the read thread spends most of its time waiting.
- // This also sets the minimum number of threads to 2, so the algorithm
- // doesn't deadlock.
- ThreadPool executor(options.numThreads + 1);
+ // Initialize the (de)compression thread pool with numThreads
+ ThreadPool executor(options.numThreads);
+ // Run the reader thread on an extra thread
+ ThreadPool readExecutor(1);
if (!options.decompress) {
// Add a job that reads the input and starts all the compression jobs
- executor.add(
- [&errorHolder, &outs, &executor, inputFd, inputSize, &options,
- &bytesRead] {
+ readExecutor.add(
+ [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
bytesRead = asyncCompressChunks(
- errorHolder,
+ state,
outs,
executor,
inputFd,
@@ -86,29 +85,28 @@
options.determineParameters());
});
// Start writing
- bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
- options.verbosity);
+ bytesWritten = writeFile(state, outs, outputFd, options.decompress);
} else {
// Add a job that reads the input and starts all the decompression jobs
- executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
- bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd);
+ readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
+ bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
});
// Start writing
- bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
- options.verbosity);
+ bytesWritten = writeFile(state, outs, outputFd, options.decompress);
}
}
- if (options.verbosity > 1 && !errorHolder.hasError()) {
+ if (!state.errorHolder.hasError()) {
std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
if (!options.decompress) {
double ratio = static_cast<double>(bytesWritten) /
static_cast<double>(bytesRead + !bytesRead);
- std::fprintf(stderr, "%-20s :%6.2f%% (%6llu => %6llu bytes, %s)\n",
+ state.log(INFO, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64
+ " bytes, %s)\n",
inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
outputFileName.c_str());
} else {
- std::fprintf(stderr, "%-20s: %llu bytes \n",
+ state.log(INFO, "%-20s: %" PRIu64 " bytes \n",
inputFileName.c_str(),bytesWritten);
}
}
@@ -138,7 +136,7 @@
static FILE *openOutputFile(const Options &options,
const std::string &outputFile,
- ErrorHolder &errorHolder) {
+ SharedState& state) {
if (outputFile == "-") {
SET_BINARY_MODE(stdout);
return stdout;
@@ -148,82 +146,78 @@
auto outputFd = std::fopen(outputFile.c_str(), "rb");
if (outputFd != nullptr) {
std::fclose(outputFd);
- if (options.verbosity <= 1) {
- errorHolder.setError("Output file exists");
+ if (!state.log.logsAt(INFO)) {
+ state.errorHolder.setError("Output file exists");
return nullptr;
}
- std::fprintf(
- stderr,
+ state.log(
+ INFO,
"pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
outputFile.c_str());
int c = getchar();
if (c != 'y' && c != 'Y') {
- errorHolder.setError("Not overwritten");
+ state.errorHolder.setError("Not overwritten");
return nullptr;
}
}
}
auto outputFd = std::fopen(outputFile.c_str(), "wb");
- if (!errorHolder.check(
+ if (!state.errorHolder.check(
outputFd != nullptr, "Failed to open output file")) {
- return 0;
+ return nullptr;
}
return outputFd;
}
int pzstdMain(const Options &options) {
int returnCode = 0;
+ SharedState state(options);
for (const auto& input : options.inputFiles) {
- // Setup the error holder
- ErrorHolder errorHolder;
+ // Setup the shared state
auto printErrorGuard = makeScopeGuard([&] {
- if (errorHolder.hasError()) {
+ if (state.errorHolder.hasError()) {
returnCode = 1;
- if (options.verbosity > 0) {
- std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(),
- errorHolder.getError().c_str());
- }
- } else {
-
+ state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(),
+ state.errorHolder.getError().c_str());
}
});
// Open the input file
- auto inputFd = openInputFile(input, errorHolder);
+ auto inputFd = openInputFile(input, state.errorHolder);
if (inputFd == nullptr) {
continue;
}
auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
// Open the output file
auto outputFile = options.getOutputFile(input);
- if (!errorHolder.check(outputFile != "",
+ if (!state.errorHolder.check(outputFile != "",
"Input file does not have extension .zst")) {
continue;
}
- auto outputFd = openOutputFile(options, outputFile, errorHolder);
+ auto outputFd = openOutputFile(options, outputFile, state);
if (outputFd == nullptr) {
continue;
}
auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
// (de)compress the file
- handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder);
- if (errorHolder.hasError()) {
+ handleOneInput(options, input, inputFd, outputFile, outputFd, state);
+ if (state.errorHolder.hasError()) {
continue;
}
// Delete the input file if necessary
if (!options.keepSource) {
// Be sure that we are done and have written everything before we delete
- if (!errorHolder.check(std::fclose(inputFd) == 0,
+ if (!state.errorHolder.check(std::fclose(inputFd) == 0,
"Failed to close input file")) {
continue;
}
closeInputGuard.dismiss();
- if (!errorHolder.check(std::fclose(outputFd) == 0,
+ if (!state.errorHolder.check(std::fclose(outputFd) == 0,
"Failed to close output file")) {
continue;
}
closeOutputGuard.dismiss();
if (std::remove(input.c_str()) != 0) {
- errorHolder.setError("Failed to remove input file");
+ state.errorHolder.setError("Failed to remove input file");
continue;
}
}
@@ -269,27 +263,25 @@
/**
* Stream chunks of input from `in`, compress it, and stream it out to `out`.
*
- * @param errorHolder Used to report errors and check if an error occured
+ * @param state The shared state
* @param in Queue that we `pop()` input buffers from
* @param out Queue that we `push()` compressed output buffers to
* @param maxInputSize An upper bound on the size of the input
- * @param parameters The zstd parameters to use for compression
*/
static void compress(
- ErrorHolder& errorHolder,
+ SharedState& state,
std::shared_ptr<BufferWorkQueue> in,
std::shared_ptr<BufferWorkQueue> out,
- size_t maxInputSize,
- ZSTD_parameters parameters) {
+ size_t maxInputSize) {
+ auto& errorHolder = state.errorHolder;
auto guard = makeScopeGuard([&] { out->finish(); });
// Initialize the CCtx
- std::unique_ptr<ZSTD_CStream, size_t (*)(ZSTD_CStream*)> ctx(
- ZSTD_createCStream(), ZSTD_freeCStream);
+ auto ctx = state.cStreamPool->get();
if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
return;
}
{
- auto err = ZSTD_initCStream_advanced(ctx.get(), nullptr, 0, parameters, 0);
+ auto err = ZSTD_resetCStream(ctx.get(), 0);
if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
return;
}
@@ -396,7 +388,7 @@
}
std::uint64_t asyncCompressChunks(
- ErrorHolder& errorHolder,
+ SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor,
FILE* fd,
@@ -410,23 +402,23 @@
// independently.
size_t step = calculateStep(size, numThreads, params);
auto status = FileStatus::Continue;
- while (status == FileStatus::Continue && !errorHolder.hasError()) {
+ while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
// Make a new input queue that we will put the chunk's input data into.
auto in = std::make_shared<BufferWorkQueue>();
auto inGuard = makeScopeGuard([&] { in->finish(); });
// Make a new output queue that compress will put the compressed data into.
auto out = std::make_shared<BufferWorkQueue>();
// Start compression in the thread pool
- executor.add([&errorHolder, in, out, step, params] {
+ executor.add([&state, in, out, step] {
return compress(
- errorHolder, std::move(in), std::move(out), step, params);
+ state, std::move(in), std::move(out), step);
});
// Pass the output queue to the writer thread.
chunks.push(std::move(out));
// Fill the input queue for the compression job we just started
status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
}
- errorHolder.check(status != FileStatus::Error, "Error reading input");
+ state.errorHolder.check(status != FileStatus::Error, "Error reading input");
return bytesRead;
}
@@ -434,24 +426,24 @@
* Decompress a frame, whose data is streamed into `in`, and stream the output
* to `out`.
*
- * @param errorHolder Used to report errors and check if an error occured
+ * @param state The shared state
* @param in Queue that we `pop()` input buffers from. It contains
* exactly one compressed frame.
* @param out Queue that we `push()` decompressed output buffers to
*/
static void decompress(
- ErrorHolder& errorHolder,
+ SharedState& state,
std::shared_ptr<BufferWorkQueue> in,
std::shared_ptr<BufferWorkQueue> out) {
+ auto& errorHolder = state.errorHolder;
auto guard = makeScopeGuard([&] { out->finish(); });
// Initialize the DCtx
- std::unique_ptr<ZSTD_DStream, size_t (*)(ZSTD_DStream*)> ctx(
- ZSTD_createDStream(), ZSTD_freeDStream);
+ auto ctx = state.dStreamPool->get();
if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
return;
}
{
- auto err = ZSTD_initDStream(ctx.get());
+ auto err = ZSTD_resetDStream(ctx.get());
if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
return;
}
@@ -509,7 +501,7 @@
}
std::uint64_t asyncDecompressFrames(
- ErrorHolder& errorHolder,
+ SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor,
FILE* fd) {
@@ -522,7 +514,7 @@
// Otherwise, we will decompress using only one decompression task.
const size_t chunkSize = ZSTD_DStreamInSize();
auto status = FileStatus::Continue;
- while (status == FileStatus::Continue && !errorHolder.hasError()) {
+ while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
// Make a new input queue that we will put the frames's bytes into.
auto in = std::make_shared<BufferWorkQueue>();
auto inGuard = makeScopeGuard([&] { in->finish(); });
@@ -551,15 +543,15 @@
out->setMaxSize(64);
}
// Start decompression in the thread pool
- executor.add([&errorHolder, in, out] {
- return decompress(errorHolder, std::move(in), std::move(out));
+ executor.add([&state, in, out] {
+ return decompress(state, std::move(in), std::move(out));
});
// Pass the output queue to the writer thread
frames.push(std::move(out));
if (frameSize == 0) {
// We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
// Pass the rest of the source to this decompression task
- while (status == FileStatus::Continue && !errorHolder.hasError()) {
+ while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
}
break;
@@ -567,7 +559,7 @@
// Fill the input queue for the decompression job we just started
status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
}
- errorHolder.check(status != FileStatus::Error, "Error reading input");
+ state.errorHolder.check(status != FileStatus::Error, "Error reading input");
return totalBytesRead;
}
@@ -582,32 +574,14 @@
return true;
}
-void updateWritten(int verbosity, std::uint64_t bytesWritten) {
- if (verbosity <= 1) {
- return;
- }
- using Clock = std::chrono::system_clock;
- static Clock::time_point then;
- constexpr std::chrono::milliseconds refreshRate{150};
-
- auto now = Clock::now();
- if (now - then > refreshRate) {
- then = now;
- std::fprintf(stderr, "\rWritten: %u MB ",
- static_cast<std::uint32_t>(bytesWritten >> 20));
- }
-}
-
std::uint64_t writeFile(
- ErrorHolder& errorHolder,
+ SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd,
- bool decompress,
- int verbosity) {
- auto lineClearGuard = makeScopeGuard([verbosity] {
- if (verbosity > 1) {
- std::fprintf(stderr, "\r%79s\r", "");
- }
+ bool decompress) {
+ auto& errorHolder = state.errorHolder;
+ auto lineClearGuard = makeScopeGuard([&state] {
+ state.log.clear(INFO);
});
std::uint64_t bytesWritten = 0;
std::shared_ptr<BufferWorkQueue> out;
@@ -633,7 +607,8 @@
return bytesWritten;
}
bytesWritten += buffer.size();
- updateWritten(verbosity, bytesWritten);
+ state.log.update(INFO, "Written: %u MB ",
+ static_cast<std::uint32_t>(bytesWritten >> 20));
}
}
return bytesWritten;
diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h
index fe44ccf..9fb2c48 100644
--- a/contrib/pzstd/Pzstd.h
+++ b/contrib/pzstd/Pzstd.h
@@ -9,9 +9,11 @@
#pragma once
#include "ErrorHolder.h"
+#include "Logging.h"
#include "Options.h"
#include "utils/Buffer.h"
#include "utils/Range.h"
+#include "utils/ResourcePool.h"
#include "utils/ThreadPool.h"
#include "utils/WorkQueue.h"
#define ZSTD_STATIC_LINKING_ONLY
@@ -32,12 +34,58 @@
*/
int pzstdMain(const Options& options);
+class SharedState {
+ public:
+ SharedState(const Options& options) : log(options.verbosity) {
+ if (!options.decompress) {
+ auto parameters = options.determineParameters();
+ cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
+ [parameters]() -> ZSTD_CStream* {
+ auto zcs = ZSTD_createCStream();
+ if (zcs) {
+ auto err = ZSTD_initCStream_advanced(
+ zcs, nullptr, 0, parameters, 0);
+ if (ZSTD_isError(err)) {
+ ZSTD_freeCStream(zcs);
+ return nullptr;
+ }
+ }
+ return zcs;
+ },
+ [](ZSTD_CStream *zcs) {
+ ZSTD_freeCStream(zcs);
+ }});
+ } else {
+ dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
+ []() -> ZSTD_DStream* {
+ auto zds = ZSTD_createDStream();
+ if (zds) {
+ auto err = ZSTD_initDStream(zds);
+ if (ZSTD_isError(err)) {
+ ZSTD_freeDStream(zds);
+ return nullptr;
+ }
+ }
+ return zds;
+ },
+ [](ZSTD_DStream *zds) {
+ ZSTD_freeDStream(zds);
+ }});
+ }
+ }
+
+ Logger log;
+ ErrorHolder errorHolder;
+ std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
+ std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
+};
+
/**
* Streams input from `fd`, breaks input up into chunks, and compresses each
* chunk independently. Output of each chunk gets streamed to a queue, and
* the output queues get put into `chunks` in order.
*
- * @param errorHolder Used to report errors and coordinate early shutdown
+ * @param state The shared state
* @param chunks Each compression jobs output queue gets `pushed()` here
* as soon as it is available
* @param executor The thread pool to run compression jobs in
@@ -48,7 +96,7 @@
* @returns The number of bytes read from the file
*/
std::uint64_t asyncCompressChunks(
- ErrorHolder& errorHolder,
+ SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor,
FILE* fd,
@@ -62,7 +110,7 @@
* decompression job. Output of each frame gets streamed to a queue, and
* the output queues get put into `frames` in order.
*
- * @param errorHolder Used to report errors and coordinate early shutdown
+ * @param state The shared state
* @param frames Each decompression jobs output queue gets `pushed()` here
* as soon as it is available
* @param executor The thread pool to run compression jobs in
@@ -70,7 +118,7 @@
* @returns The number of bytes read from the file
*/
std::uint64_t asyncDecompressFrames(
- ErrorHolder& errorHolder,
+ SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor,
FILE* fd);
@@ -79,18 +127,16 @@
* Streams input in from each queue in `outs` in order, and writes the data to
* `outputFd`.
*
- * @param errorHolder Used to report errors and coordinate early exit
+ * @param state The shared state
* @param outs A queue of output queues, one for each
* (de)compression job.
* @param outputFd The file descriptor to write to
* @param decompress Are we decompressing?
- * @param verbosity The verbosity level to log at
* @returns The number of bytes written
*/
std::uint64_t writeFile(
- ErrorHolder& errorHolder,
+ SharedState& state,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd,
- bool decompress,
- int verbosity);
+ bool decompress);
}
diff --git a/contrib/pzstd/README.md b/contrib/pzstd/README.md
index 05ceb55..3fe7b0b 100644
--- a/contrib/pzstd/README.md
+++ b/contrib/pzstd/README.md
@@ -10,7 +10,7 @@
## Usage
-PZstandard supports the same command line interface as Zstandard, but also provies the `-p` option to specify the number of threads.
+PZstandard supports the same command line interface as Zstandard, but also provides the `-p` option to specify the number of threads.
Dictionary mode is not currently supported.
Basic usage
diff --git a/contrib/pzstd/test/Makefile b/contrib/pzstd/test/Makefile
deleted file mode 100644
index 4f6ba99..0000000
--- a/contrib/pzstd/test/Makefile
+++ /dev/null
@@ -1,48 +0,0 @@
-# ##########################################################################
-# Copyright (c) 2016-present, Facebook, Inc.
-# All rights reserved.
-#
-# This source code is licensed under the BSD-style license found in the
-# LICENSE file in the root directory of this source tree. An additional grant
-# of patent rights can be found in the PATENTS file in the same directory.
-# ##########################################################################
-
-# Define *.exe as extension for Windows systems
-ifneq (,$(filter Windows%,$(OS)))
-EXT =.exe
-else
-EXT =
-endif
-
-PZSTDDIR = ..
-PROGDIR = ../../../programs
-ZSTDDIR = ../../../lib
-
-# Set GTEST_INC and GTEST_LIB to work with your install of gtest
-GTEST_INC ?= -isystem $(PZSTDDIR)/googletest/googletest/include
-GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
-GTEST_FLAGS = $(GTEST_INC) $(GTEST_LIB)
-CPPFLAGS = -I$(PZSTDDIR) -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
-
-CXXFLAGS ?= -O3
-CXXFLAGS += -std=c++11 -Wno-deprecated-declarations
-CXXFLAGS += $(MOREFLAGS)
-FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
-
-datagen.o: $(PROGDIR)/datagen.*
- $(CC) $(CPPFLAGS) -O3 $(MOREFLAGS) $(LDFLAGS) -Wno-long-long -Wno-variadic-macros $(PROGDIR)/datagen.c -c -o $@
-
-%: %.cpp *.h datagen.o
- $(CXX) $(FLAGS) $@.cpp datagen.o $(PZSTDDIR)/Pzstd.o $(PZSTDDIR)/SkippableFrame.o $(PZSTDDIR)/Options.o $(PZSTDDIR)/libzstd.a -o $@$(EXT) $(GTEST_FLAGS) -lgtest -lgtest_main -lpthread
-
-.PHONY: test clean
-
-test: OptionsTest PzstdTest
- @./OptionsTest$(EXT)
- @./PzstdTest$(EXT)
-
-roundtrip: RoundTripTest
- @./RoundTripTest$(EXT)
-
-clean:
- @rm -f datagen.o OptionsTest PzstdTest RoundTripTest
diff --git a/contrib/pzstd/utils/ResourcePool.h b/contrib/pzstd/utils/ResourcePool.h
new file mode 100644
index 0000000..ed01130
--- /dev/null
+++ b/contrib/pzstd/utils/ResourcePool.h
@@ -0,0 +1,96 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+#pragma once
+
+#include <cassert>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+namespace pzstd {
+
+/**
+ * An unbounded pool of resources.
+ * A `ResourcePool<T>` requires a factory function that takes allocates `T*` and
+ * a free function that frees a `T*`.
+ * Calling `ResourcePool::get()` will give you a new `ResourcePool::UniquePtr`
+ * to a `T`, and when it goes out of scope the resource will be returned to the
+ * pool.
+ * The `ResourcePool<T>` *must* survive longer than any resources it hands out.
+ * Remember that `ResourcePool<T>` hands out mutable `T`s, so make sure to clean
+ * up the resource before or after every use.
+ */
+template <typename T>
+class ResourcePool {
+ public:
+ class Deleter;
+ using Factory = std::function<T*()>;
+ using Free = std::function<void(T*)>;
+ using UniquePtr = std::unique_ptr<T, Deleter>;
+
+ private:
+ std::mutex mutex_;
+ Factory factory_;
+ Free free_;
+ std::vector<T*> resources_;
+ unsigned inUse_;
+
+ public:
+ /**
+ * Creates a `ResourcePool`.
+ *
+ * @param factory The function to use to create new resources.
+ * @param free The function to use to free resources created by `factory`.
+ */
+ ResourcePool(Factory factory, Free free)
+ : factory_(std::move(factory)), free_(std::move(free)), inUse_(0) {}
+
+ /**
+ * @returns A unique pointer to a resource. The resource is null iff
+ * there are no avaiable resources and `factory()` returns null.
+ */
+ UniquePtr get() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (!resources_.empty()) {
+ UniquePtr resource{resources_.back(), Deleter{*this}};
+ resources_.pop_back();
+ ++inUse_;
+ return resource;
+ }
+ UniquePtr resource{factory_(), Deleter{*this}};
+ ++inUse_;
+ return resource;
+ }
+
+ ~ResourcePool() noexcept {
+ assert(inUse_ == 0);
+ for (const auto resource : resources_) {
+ free_(resource);
+ }
+ }
+
+ class Deleter {
+ ResourcePool *pool_;
+ public:
+ explicit Deleter(ResourcePool &pool) : pool_(&pool) {}
+
+ void operator() (T *resource) {
+ std::lock_guard<std::mutex> lock(pool_->mutex_);
+ // Make sure we don't put null resources into the pool
+ if (resource) {
+ pool_->resources_.push_back(resource);
+ }
+ assert(pool_->inUse_ > 0);
+ --pool_->inUse_;
+ }
+ };
+};
+
+}
diff --git a/contrib/pzstd/utils/ThreadPool.h b/contrib/pzstd/utils/ThreadPool.h
index a1d1fc0..99b3ecf 100644
--- a/contrib/pzstd/utils/ThreadPool.h
+++ b/contrib/pzstd/utils/ThreadPool.h
@@ -27,7 +27,7 @@
explicit ThreadPool(std::size_t numThreads) {
threads_.reserve(numThreads);
for (std::size_t i = 0; i < numThreads; ++i) {
- threads_.emplace_back([&] {
+ threads_.emplace_back([this] {
std::function<void()> task;
while (tasks_.pop(task)) {
task();
diff --git a/contrib/pzstd/utils/WorkQueue.h b/contrib/pzstd/utils/WorkQueue.h
index 5382135..780e536 100644
--- a/contrib/pzstd/utils/WorkQueue.h
+++ b/contrib/pzstd/utils/WorkQueue.h
@@ -28,6 +28,7 @@
std::mutex mutex_;
std::condition_variable readerCv_;
std::condition_variable writerCv_;
+ std::condition_variable finishCv_;
std::queue<T> queue_;
bool done_;
@@ -53,12 +54,13 @@
/**
* Push an item onto the work queue. Notify a single thread that work is
* available. If `finish()` has been called, do nothing and return false.
+ * If `push()` returns false, then `item` has not been moved from.
*
* @param item Item to push onto the queue.
* @returns True upon success, false if `finish()` has been called. An
* item was pushed iff `push()` returns true.
*/
- bool push(T item) {
+ bool push(T&& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
while (full() && !done_) {
@@ -124,19 +126,14 @@
}
readerCv_.notify_all();
writerCv_.notify_all();
+ finishCv_.notify_all();
}
/// Blocks until `finish()` has been called (but the queue may not be empty).
void waitUntilFinished() {
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
- readerCv_.wait(lock);
- // If we were woken by a push, we need to wake a thread waiting on pop().
- if (!done_) {
- lock.unlock();
- readerCv_.notify_one();
- lock.lock();
- }
+ finishCv_.wait(lock);
}
}
};
diff --git a/contrib/pzstd/utils/test/Makefile b/contrib/pzstd/utils/test/Makefile
deleted file mode 100644
index b9ea73e..0000000
--- a/contrib/pzstd/utils/test/Makefile
+++ /dev/null
@@ -1,42 +0,0 @@
-# ##########################################################################
-# Copyright (c) 2016-present, Facebook, Inc.
-# All rights reserved.
-#
-# This source code is licensed under the BSD-style license found in the
-# LICENSE file in the root directory of this source tree. An additional grant
-# of patent rights can be found in the PATENTS file in the same directory.
-# ##########################################################################
-
-# Define *.exe as extension for Windows systems
-ifneq (,$(filter Windows%,$(OS)))
-EXT =.exe
-else
-EXT =
-endif
-
-PZSTDDIR = ../..
-
-# Set GTEST_INC and GTEST_LIB to work with your install of gtest
-GTEST_INC ?= -isystem $(PZSTDDIR)/googletest/googletest/include
-GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
-
-CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB)
-CXXFLAGS ?= -O3
-CXXFLAGS += -std=c++11
-CXXFLAGS += $(MOREFLAGS)
-FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
-
-%: %.cpp
- $(CXX) $(FLAGS) $^ -o $@$(EXT) -lgtest -lgtest_main -lpthread
-
-.PHONY: test clean
-
-test: BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest
- @./BufferTest$(EXT)
- @./RangeTest$(EXT)
- @./ScopeGuardTest$(EXT)
- @./ThreadPoolTest$(EXT)
- @./WorkQueueTest$(EXT)
-
-clean:
- @rm -f BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest
diff --git a/contrib/pzstd/utils/test/ResourcePoolTest.cpp b/contrib/pzstd/utils/test/ResourcePoolTest.cpp
new file mode 100644
index 0000000..a6a86b3
--- /dev/null
+++ b/contrib/pzstd/utils/test/ResourcePoolTest.cpp
@@ -0,0 +1,72 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+#include "utils/ResourcePool.h"
+
+#include <gtest/gtest.h>
+#include <atomic>
+#include <thread>
+
+using namespace pzstd;
+
+TEST(ResourcePool, FullTest) {
+ unsigned numCreated = 0;
+ unsigned numDeleted = 0;
+ {
+ ResourcePool<int> pool(
+ [&numCreated] { ++numCreated; return new int{5}; },
+ [&numDeleted](int *x) { ++numDeleted; delete x; });
+
+ {
+ auto i = pool.get();
+ EXPECT_EQ(5, *i);
+ *i = 6;
+ }
+ {
+ auto i = pool.get();
+ EXPECT_EQ(6, *i);
+ auto j = pool.get();
+ EXPECT_EQ(5, *j);
+ *j = 7;
+ }
+ {
+ auto i = pool.get();
+ EXPECT_EQ(6, *i);
+ auto j = pool.get();
+ EXPECT_EQ(7, *j);
+ }
+ }
+ EXPECT_EQ(2, numCreated);
+ EXPECT_EQ(numCreated, numDeleted);
+}
+
+TEST(ResourcePool, ThreadSafe) {
+ std::atomic<unsigned> numCreated{0};
+ std::atomic<unsigned> numDeleted{0};
+ {
+ ResourcePool<int> pool(
+ [&numCreated] { ++numCreated; return new int{0}; },
+ [&numDeleted](int *x) { ++numDeleted; delete x; });
+ auto push = [&pool] {
+ for (int i = 0; i < 100; ++i) {
+ auto x = pool.get();
+ ++*x;
+ }
+ };
+ std::thread t1{push};
+ std::thread t2{push};
+ t1.join();
+ t2.join();
+
+ auto x = pool.get();
+ auto y = pool.get();
+ EXPECT_EQ(200, *x + *y);
+ }
+ EXPECT_GE(2, numCreated);
+ EXPECT_EQ(numCreated, numDeleted);
+}
diff --git a/contrib/pzstd/utils/test/WorkQueueTest.cpp b/contrib/pzstd/utils/test/WorkQueueTest.cpp
index ebf375a..7f58ccb 100644
--- a/contrib/pzstd/utils/test/WorkQueueTest.cpp
+++ b/contrib/pzstd/utils/test/WorkQueueTest.cpp
@@ -10,6 +10,7 @@
#include "utils/WorkQueue.h"
#include <gtest/gtest.h>
+#include <memory>
#include <mutex>
#include <thread>
#include <vector>
@@ -64,7 +65,7 @@
const int max = 100;
for (int i = 0; i < 10; ++i) {
- queue.push(i);
+ queue.push(int{i});
}
std::thread thread([ &queue, max ] {
@@ -80,7 +81,7 @@
std::this_thread::yield();
for (int i = 10; i < max; ++i) {
- queue.push(i);
+ queue.push(int{i});
}
queue.finish();
@@ -97,7 +98,7 @@
}
for (int i = 0; i < 50; ++i) {
- queue.push(i);
+ queue.push(int{i});
}
queue.finish();
@@ -126,7 +127,7 @@
pusherThreads.emplace_back(
[ &queue, min, max ] {
for (int i = min; i < max; ++i) {
- queue.push(i);
+ queue.push(int{i});
}
});
}
@@ -212,7 +213,7 @@
pusherThreads.emplace_back(
[ &queue, min, max ] {
for (int i = min; i < max; ++i) {
- queue.push(i);
+ queue.push(int{i});
}
});
}
@@ -231,6 +232,18 @@
}
}
+TEST(WorkQueue, FailedPush) {
+ WorkQueue<std::unique_ptr<int>> queue;
+ std::unique_ptr<int> x(new int{5});
+ EXPECT_TRUE(queue.push(std::move(x)));
+ EXPECT_EQ(nullptr, x);
+ queue.finish();
+ x.reset(new int{6});
+ EXPECT_FALSE(queue.push(std::move(x)));
+ EXPECT_NE(nullptr, x);
+ EXPECT_EQ(6, *x);
+}
+
TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
{
BufferWorkQueue queue;