Merge pull request #434 from terrelln/dev

Pzstd Improvements
diff --git a/.gitignore b/.gitignore
index c939f12..220a1e0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,3 +32,4 @@
 *.swp
 .DS_Store
 googletest/
+*.d
diff --git a/.travis.yml b/.travis.yml
index 9e2f586..3be4575 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -22,7 +22,7 @@
           packages:
             - gcc-4.8
             - g++-4.8
-      env: PLATFORM="Ubuntu 12.04 container" CMD="make zlibwrapper && make clean && make -C tests test-zstd_nolegacy && make clean && make clean && make cmaketest && make clean && make -C contrib/pzstd pzstd && make -C contrib/pzstd googletest && make -C contrib/pzstd test && make -C contrib/pzstd clean"
+      env: PLATFORM="Ubuntu 12.04 container" CMD="make zlibwrapper && make clean && make -C tests test-zstd_nolegacy && make clean && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest && make -C contrib/pzstd all && make -C contrib/pzstd check && make -C contrib/pzstd clean"
     - os: linux
       sudo: false
       env: PLATFORM="Ubuntu 12.04 container" CMD="make usan"
@@ -55,6 +55,20 @@
           packages:
             - libc6-dev-i386
             - gcc-multilib
+    - os: linux
+      sudo: required
+      install:
+        - export CXX="g++-6" CC="gcc-6"
+        - export LDFLAGS="-fuse-ld=gold"
+        - export TESTFLAGS='--gtest_filter=-*ExtremelyLarge*'
+      env: PLATFORM="Ubuntu 12.04" CMD='cd contrib/pzstd && make googletest && make tsan && make check && make clean && make asan && make check && make clean && cd ../..'
+      addons:
+        apt:
+          sources:
+            - ubuntu-toolchain-r-test
+          packages:
+            - gcc-6
+            - g++-6
     # Ubuntu 14.04 LTS Server Edition 64 bit
     - os: linux
       dist: trusty
@@ -69,7 +83,7 @@
       sudo: required
       install:
         - export CXX="g++-4.8" CC="gcc-4.8"
-      env: PLATFORM="Ubuntu 14.04" CMD="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean && make clangtest && make clean && make -C contrib/pzstd pzstd32 && make -C contrib/pzstd googletest32 && make -C contrib/pzstd test32 && make -C contrib/pzstd clean"
+      env: PLATFORM="Ubuntu 14.04" CMD="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean && make clangtest && make clean && make -C contrib/pzstd googletest32 && make -C contrib/pzstd all32 && make -C contrib/pzstd check && make -C contrib/pzstd clean"
       addons:
         apt:
           packages:
diff --git a/appveyor.yml b/appveyor.yml
index 6345c7b..fbdc30c 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -50,10 +50,9 @@
       ECHO *** &&
       ECHO *** Building pzstd for %PLATFORM% &&
       ECHO *** &&
-      ECHO make -C contrib\pzstd pzstd &&
-      make -C contrib\pzstd pzstd &&
       make -C contrib\pzstd googletest-mingw64 &&
-      make -C contrib\pzstd test &&
+      make -C contrib\pzstd all &&
+      make -C contrib\pzstd check &&
       make -C contrib\pzstd clean
     )
   - if [%COMPILER%]==[gcc] (
diff --git a/contrib/pzstd/Logging.h b/contrib/pzstd/Logging.h
new file mode 100644
index 0000000..76c982a
--- /dev/null
+++ b/contrib/pzstd/Logging.h
@@ -0,0 +1,72 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+#pragma once
+
+#include <cstdio>
+#include <mutex>
+
+namespace pzstd {
+
+constexpr int ERROR = 1;
+constexpr int INFO = 2;
+constexpr int DEBUG = 3;
+constexpr int VERBOSE = 4;
+
+class Logger {
+  std::mutex mutex_;
+  FILE* out_;
+  const int level_;
+
+  using Clock = std::chrono::system_clock;
+  Clock::time_point lastUpdate_;
+  std::chrono::milliseconds refreshRate_;
+
+ public:
+  explicit Logger(int level, FILE* out = stderr)
+      : out_(out), level_(level), lastUpdate_(Clock::now()),
+        refreshRate_(150) {}
+
+
+  bool logsAt(int level) {
+    return level <= level_;
+  }
+
+  template <typename... Args>
+  void operator()(int level, const char *fmt, Args... args) {
+    if (level > level_) {
+      return;
+    }
+    std::lock_guard<std::mutex> lock(mutex_);
+    std::fprintf(out_, fmt, args...);
+  }
+
+  template <typename... Args>
+  void update(int level, const char *fmt, Args... args) {
+    if (level > level_) {
+      return;
+    }
+    std::lock_guard<std::mutex> lock(mutex_);
+    auto now = Clock::now();
+    if (now - lastUpdate_ > refreshRate_) {
+      lastUpdate_ = now;
+      std::fprintf(out_, "\r");
+      std::fprintf(out_, fmt, args...);
+    }
+  }
+
+  void clear(int level) {
+    if (level > level_) {
+      return;
+    }
+    std::lock_guard<std::mutex> lock(mutex_);
+    std::fprintf(out_, "\r%79s\r", "");
+  }
+};
+
+}
diff --git a/contrib/pzstd/Makefile b/contrib/pzstd/Makefile
index e30be0b..2de5041 100644
--- a/contrib/pzstd/Makefile
+++ b/contrib/pzstd/Makefile
@@ -7,20 +7,71 @@
 # of patent rights can be found in the PATENTS file in the same directory.
 # ##########################################################################
 
+# Standard variables for installation
+DESTDIR ?=
+PREFIX  ?= /usr/local
+BINDIR  := $(DESTDIR)$(PREFIX)/bin
+
 ZSTDDIR = ../../lib
 PROGDIR = ../../programs
 
-CPPFLAGS = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
-CXXFLAGS  ?= -O3
-CXXFLAGS  += -std=c++11
-CXXFLAGS  += $(MOREFLAGS)
-FLAGS    = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
+# External program to use to run tests, e.g. qemu or valgrind
+TESTPROG  ?=
+# Flags to pass to the tests
+TESTFLAGS ?=
+
+# We use gcc/clang to generate the header dependencies of files
+DEPFLAGS = -MMD -MP -MF $*.Td
+POSTCOMPILE = mv -f $*.Td $*.d
+
+# CFLAGS, CXXFLAGS, CPPFLAGS, and LDFLAGS are for the users to override
+CFLAGS   ?= -O3 -Wall -Wextra
+CXXFLAGS ?= -O3 -Wall -Wextra -pedantic -std=c++11
+CPPFLAGS ?=
+LDFLAGS  ?=
+
+# Include flags
+PZSTD_INC  = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
+GTEST_INC  = -isystem googletest/googletest/include
+
+PZSTD_CPPFLAGS  = $(PZSTD_INC) $(GTEST_INC)
+PZSTD_CCXXFLAGS =
+PZSTD_CFLAGS    = $(PZSTD_CCXXFLAGS)
+PZSTD_CXXFLAGS  = $(PZSTD_CCXXFLAGS)
+PZSTD_LDFLAGS   =
+EXTRA_FLAGS     =
+ALL_CFLAGS      = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CFLAGS)   $(PZSTD_CFLAGS)
+ALL_CXXFLAGS    = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CXXFLAGS) $(PZSTD_CXXFLAGS)
+ALL_LDFLAGS     = $(EXTRA_FLAGS) $(LDFLAGS) $(PZSTD_LDFLAGS)
 
 
-ZSTDCOMMON_FILES := $(ZSTDDIR)/common/*.c
-ZSTDCOMP_FILES := $(ZSTDDIR)/compress/zstd_compress.c $(ZSTDDIR)/compress/fse_compress.c $(ZSTDDIR)/compress/huf_compress.c
-ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/huf_decompress.c
-ZSTD_FILES := $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES)
+# gtest libraries need to go before "-lpthread" because they depend on it.
+GTEST_LIB  = -L googletest/build/googlemock/gtest
+LIBS       = $(GTEST_LIB) -lpthread
+
+# Compilation commands
+LD_COMMAND  = $(CXX) $^          $(ALL_LDFLAGS) $(LIBS) -o $@
+CC_COMMAND  = $(CC)  $(DEPFLAGS) $(ALL_CFLAGS)   -c $<  -o $@
+CXX_COMMAND = $(CXX) $(DEPFLAGS) $(ALL_CXXFLAGS) -c $<  -o $@
+
+# Get a list of all zstd files so we rebuild the static library when we need to
+ZSTDCOMMON_FILES := $(wildcard $(ZSTDDIR)/common/*.c) \
+                    $(wildcard $(ZSTDDIR)/common/*.h)
+ZSTDCOMP_FILES   := $(wildcard $(ZSTDDIR)/compress/*.c) \
+                    $(wildcard $(ZSTDDIR)/compress/*.h)
+ZSTDDECOMP_FILES := $(wildcard $(ZSTDDIR)/decompress/*.c) \
+                    $(wildcard $(ZSTDDIR)/decompress/*.h)
+ZSTDPROG_FILES   := $(wildcard $(PROGDIR)/*.c) \
+                    $(wildcard $(PROGDIR)/*.h)
+ZSTD_FILES       := $(wildcard $(ZSTDDIR)/*.h) \
+                    $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) \
+                    $(ZSTDPROG_FILES)
+
+# List all the pzstd source files so we can determine their dependencies
+PZSTD_SRCS  := $(wildcard *.cpp)
+PZSTD_TESTS := $(wildcard test/*.cpp)
+UTILS_TESTS := $(wildcard utils/test/*.cpp)
+ALL_SRCS    := $(PZSTD_SRCS) $(PZSTD_TESTS) $(UTILS_TESTS)
 
 
 # Define *.exe as extension for Windows systems
@@ -30,89 +81,169 @@
 EXT =
 endif
 
-.PHONY: default all test clean test32 googletest googletest32
+# Standard targets
+.PHONY: default
+default: all
 
-default: pzstd
+.PHONY: check
+check:
+	$(TESTPROG) ./utils/test/BufferTest$(EXT) $(TESTFLAGS)
+	$(TESTPROG) ./utils/test/RangeTest$(EXT) $(TESTFLAGS)
+	$(TESTPROG) ./utils/test/ResourcePoolTest$(EXT) $(TESTFLAGS)
+	$(TESTPROG) ./utils/test/ScopeGuardTest$(EXT) $(TESTFLAGS)
+	$(TESTPROG) ./utils/test/ThreadPoolTest$(EXT) $(TESTFLAGS)
+	$(TESTPROG) ./utils/test/WorkQueueTest$(EXT) $(TESTFLAGS)
+	$(TESTPROG) ./test/OptionsTest$(EXT) $(TESTFLAGS)
+	$(TESTPROG) ./test/PzstdTest$(EXT) $(TESTFLAGS)
 
-all: pzstd
+.PHONY: install
+install: PZSTD_CPPFLAGS += -DNDEBUG
+install: pzstd$(EXT)
+	install -d -m 755 $(BINDIR)/
+	install -m 755 pzstd$(EXT) $(BINDIR)/pzstd$(EXT)
+
+.PHONY: uninstall
+uninstall:
+	$(RM) $(BINDIR)/pzstd$(EXT)
+
+# Targets for many different builds
+.PHONY: all
+all: PZSTD_CPPFLAGS += -DNDEBUG
+all: pzstd$(EXT) tests roundtrip
+
+.PHONY: debug
+debug: EXTRA_FLAGS += -g
+debug: pzstd$(EXT) tests roundtrip
+
+.PHONY: tsan
+tsan: PZSTD_CCXXFLAGS += -fsanitize=thread -fPIC
+tsan: PZSTD_LDFLAGS   += -fsanitize=thread -pie
+tsan: debug
+
+.PHONY: asan
+asan: EXTRA_FLAGS += -fsanitize=address
+asan: debug
+
+.PHONY: ubsan
+ubsan: EXTRA_FLAGS += -fsanitize=undefined
+ubsan: debug
+
+.PHONY: all32
+all32: EXTRA_FLAGS += -m32
+all32: all
+
+.PHONY: debug32
+debug32: EXTRA_FLAGS += -m32
+debug32: debug
+
+.PHONY: asan32
+asan32: EXTRA_FLAGS += -m32
+asan32: asan
+
+.PHONY: tsan32
+tsan32: EXTRA_FLAGS += -m32
+tsan32: tsan
+
+.PHONY: ubsan32
+ubsan32: EXTRA_FLAGS += -m32
+ubsan32: ubsan
+
+# Run long round trip tests
+.PHONY: roundtripcheck
+roundtripcheck: roundtrip check
+	$(TESTPROG) ./test/RoundTripTest$(EXT) $(TESTFLAGS)
+
+# Build the main binary
+pzstd$(EXT): main.o Options.o Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a
+	$(LD_COMMAND)
+
+# Target that depends on all the tests
+.PHONY: tests
+tests: EXTRA_FLAGS += -Wno-deprecated-declarations
+tests: $(patsubst %,%$(EXT),$(basename $(PZSTD_TESTS) $(UTILS_TESTS)))
+
+# Build the round trip tests
+.PHONY: roundtrip
+roundtrip: EXTRA_FLAGS += -Wno-deprecated-declarations
+roundtrip: test/RoundTripTest$(EXT)
+
+# Use the static library that zstd builds for simplicity and
+# so we get the compiler options correct
+$(ZSTDDIR)/libzstd.a: $(ZSTD_FILES)
+	$(MAKE) -C $(ZSTDDIR) libzstd CFLAGS="$(ALL_CFLAGS)" LDFLAGS="$(ALL_LDFLAGS)"
 
 
-libzstd.a: $(ZSTD_FILES)
-	$(MAKE) -C $(ZSTDDIR) libzstd
-	@cp $(ZSTDDIR)/libzstd.a .
+# Rules to build the tests
+test/RoundTripTest$(EXT): test/RoundTripTest.o $(PROGDIR)/datagen.o Options.o \
+                          Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a
+	$(LD_COMMAND)
 
-Pzstd.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h
-	$(CXX) $(FLAGS) -c Pzstd.cpp -o $@
+test/%Test$(EXT): GTEST_LIB += -lgtest -lgtest_main
+test/%Test$(EXT): test/%Test.o $(PROGDIR)/datagen.o Options.o Pzstd.o  \
+                  SkippableFrame.o $(ZSTDDIR)/libzstd.a
+	$(LD_COMMAND)
 
-SkippableFrame.o: SkippableFrame.h SkippableFrame.cpp utils/*.h
-	$(CXX) $(FLAGS) -c SkippableFrame.cpp -o $@
+utils/test/%Test$(EXT): GTEST_LIB += -lgtest -lgtest_main
+utils/test/%Test$(EXT): utils/test/%Test.o
+	$(LD_COMMAND)
 
-Options.o: Options.h Options.cpp
-	$(CXX) $(FLAGS) -c Options.cpp -o $@
 
-main.o: main.cpp *.h utils/*.h
-	$(CXX) $(FLAGS) -c main.cpp -o $@
+GTEST_CMAKEFLAGS =
 
-pzstd: Pzstd.o SkippableFrame.o Options.o main.o libzstd.a
-	$(CXX) $(FLAGS) $^ -o $@$(EXT) -lpthread
-
-libzstd32.a: $(ZSTD_FILES)
-	$(MAKE) -C $(ZSTDDIR) libzstd MOREFLAGS="-m32"
-	@cp $(ZSTDDIR)/libzstd.a libzstd32.a
-
-Pzstd32.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h
-	$(CXX) -m32 $(FLAGS) -c Pzstd.cpp -o $@
-
-SkippableFrame32.o: SkippableFrame.h SkippableFrame.cpp utils/*.h
-	$(CXX) -m32 $(FLAGS) -c SkippableFrame.cpp -o $@
-
-Options32.o: Options.h Options.cpp
-	$(CXX) -m32 $(FLAGS) -c Options.cpp -o $@
-
-main32.o: main.cpp *.h utils/*.h
-	$(CXX) -m32 $(FLAGS) -c main.cpp -o $@
-
-pzstd32: Pzstd32.o SkippableFrame32.o Options32.o main32.o libzstd32.a
-	$(CXX) -m32 $(FLAGS) $^ -o $@$(EXT) -lpthread
-
+# Install googletest
+.PHONY: googletest
+googletest: PZSTD_CCXXFLAGS += -fPIC
 googletest:
 	@$(RM) -rf googletest
 	@git clone https://github.com/google/googletest
 	@mkdir -p googletest/build
-	@cd googletest/build && cmake .. && make
+	@cd googletest/build && cmake $(GTEST_CMAKEFLAGS) -DCMAKE_CXX_FLAGS="$(ALL_CXXFLAGS)" .. && $(MAKE)
 
-googletest32:
-	@$(RM) -rf googletest
-	@git clone https://github.com/google/googletest
-	@mkdir -p googletest/build
-	@cd googletest/build && cmake .. -DCMAKE_CXX_FLAGS=-m32 && make
+.PHONY: googletest32
+googletest32: PZSTD_CCXXFLAGS  += -m32
+googletest32: googletest
 
-googletest-mingw64:
-	$(RM) -rf googletest
-	git clone https://github.com/google/googletest
-	mkdir -p googletest/build
-	cd googletest/build && cmake -G "MSYS Makefiles" .. && $(MAKE)
+.PHONY: googletest-mingw64
+googletest-mingw64: GTEST_CMAKEFLAGS += -G "MSYS Makefiles"
+googletest-mingw64: googletest
 
-test:
-	$(MAKE) libzstd.a
-	$(MAKE) pzstd MOREFLAGS="-Wall -Wextra -pedantic -Werror"
-	$(MAKE) -C utils/test clean
-	$(MAKE) -C utils/test test MOREFLAGS="-Wall -Wextra -pedantic -Werror"
-	$(MAKE) -C test clean
-	$(MAKE) -C test test MOREFLAGS="-Wall -Wextra -pedantic -Werror"
-
-test32:
-	$(MAKE) libzstd.a MOREFLAGS="-m32"
-	$(MAKE) pzstd MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
-	$(MAKE) -C utils/test clean
-	$(MAKE) -C utils/test test MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
-	$(MAKE) -C test clean
-	$(MAKE) -C test test MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror"
-
-
+.PHONY: clean
 clean:
+	$(RM) -f *.o pzstd$(EXT) *.Td *.d
+	$(RM) -f test/*.o test/*Test$(EXT) test/*.Td test/*.d
+	$(RM) -f utils/test/*.o utils/test/*Test$(EXT) utils/test/*.Td utils/test/*.d
+	$(RM) -f $(PROGDIR)/*.o $(PROGDIR)/*.Td $(PROGDIR)/*.d
 	$(MAKE) -C $(ZSTDDIR) clean
-	$(MAKE) -C utils/test clean
-	$(MAKE) -C test clean
-	@$(RM) -rf libzstd.a *.o pzstd$(EXT) pzstd32$(EXT)
 	@echo Cleaning completed
+
+
+# Cancel implicit rules
+%.o: %.c
+%.o: %.cpp
+
+# Object file rules
+%.o: %.c
+	$(CC_COMMAND)
+	$(POSTCOMPILE)
+
+$(PROGDIR)/%.o: $(PROGDIR)/%.c
+	$(CC_COMMAND)
+	$(POSTCOMPILE)
+
+%.o: %.cpp
+	$(CXX_COMMAND)
+	$(POSTCOMPILE)
+
+test/%.o: test/%.cpp
+	$(CXX_COMMAND)
+	$(POSTCOMPILE)
+
+utils/test/%.o: utils/test/%.cpp
+	$(CXX_COMMAND)
+	$(POSTCOMPILE)
+
+# Dependency file stuff
+.PRECIOUS: %.d test/%.d utils/test/%.d
+
+# Include rules that specify header file dependencies
+-include $(patsubst %,%.d,$(basename $(ALL_SRCS)))
diff --git a/contrib/pzstd/Options.cpp b/contrib/pzstd/Options.cpp
index ece8c07..18c069e 100644
--- a/contrib/pzstd/Options.cpp
+++ b/contrib/pzstd/Options.cpp
@@ -303,6 +303,12 @@
     } // while (*options != 0);
   }   // for (int i = 1; i < argc; ++i);
 
+  // Set options for test mode
+  if (test) {
+    outputFile = nullOutput;
+    keepSource = true;
+  }
+
   // Input file defaults to standard input if not provided.
   if (localInputFiles.empty()) {
     localInputFiles.emplace_back(kStdIn);
@@ -399,11 +405,6 @@
     verbosity = 1;
   }
 
-  // Set options for test mode
-  if (test) {
-    outputFile = nullOutput;
-    keepSource = true;
-  }
   return Status::Success;
 }
 
diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp
index e0826b9..c5b4ce4 100644
--- a/contrib/pzstd/Pzstd.cpp
+++ b/contrib/pzstd/Pzstd.cpp
@@ -15,6 +15,7 @@
 #include "utils/WorkQueue.h"
 
 #include <chrono>
+#include <cinttypes>
 #include <cstddef>
 #include <cstdio>
 #include <memory>
@@ -58,26 +59,24 @@
                              FILE* inputFd,
                              const std::string &outputFile,
                              FILE* outputFd,
-                             ErrorHolder &errorHolder) {
+                             SharedState& state) {
   auto inputSize = fileSizeOrZero(inputFile);
   // WorkQueue outlives ThreadPool so in the case of error we are certain
-  // we don't accidently try to call push() on it after it is destroyed.
+  // we don't accidently try to call push() on it after it is destroyed
   WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
   std::uint64_t bytesRead;
   std::uint64_t bytesWritten;
   {
-    // Initialize the thread pool with numThreads + 1
-    // We add one because the read thread spends most of its time waiting.
-    // This also sets the minimum number of threads to 2, so the algorithm
-    // doesn't deadlock.
-    ThreadPool executor(options.numThreads + 1);
+    // Initialize the (de)compression thread pool with numThreads
+    ThreadPool executor(options.numThreads);
+    // Run the reader thread on an extra thread
+    ThreadPool readExecutor(1);
     if (!options.decompress) {
       // Add a job that reads the input and starts all the compression jobs
-      executor.add(
-          [&errorHolder, &outs, &executor, inputFd, inputSize, &options,
-                                                               &bytesRead] {
+      readExecutor.add(
+          [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
             bytesRead = asyncCompressChunks(
-                errorHolder,
+                state,
                 outs,
                 executor,
                 inputFd,
@@ -86,29 +85,28 @@
                 options.determineParameters());
           });
       // Start writing
-      bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
-                               options.verbosity);
+      bytesWritten = writeFile(state, outs, outputFd, options.decompress);
     } else {
       // Add a job that reads the input and starts all the decompression jobs
-      executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
-        bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd);
+      readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
+        bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
       });
       // Start writing
-      bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
-                               options.verbosity);
+      bytesWritten = writeFile(state, outs, outputFd, options.decompress);
     }
   }
-  if (options.verbosity > 1 && !errorHolder.hasError()) {
+  if (!state.errorHolder.hasError()) {
     std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
     std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
     if (!options.decompress) {
       double ratio = static_cast<double>(bytesWritten) /
                      static_cast<double>(bytesRead + !bytesRead);
-      std::fprintf(stderr, "%-20s :%6.2f%%   (%6llu => %6llu bytes, %s)\n",
+      state.log(INFO, "%-20s :%6.2f%%   (%6" PRIu64 " => %6" PRIu64
+                   " bytes, %s)\n",
                    inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
                    outputFileName.c_str());
     } else {
-      std::fprintf(stderr, "%-20s: %llu bytes \n",
+      state.log(INFO, "%-20s: %" PRIu64 " bytes \n",
                    inputFileName.c_str(),bytesWritten);
     }
   }
@@ -138,7 +136,7 @@
 
 static FILE *openOutputFile(const Options &options,
                             const std::string &outputFile,
-                            ErrorHolder &errorHolder) {
+                            SharedState& state) {
   if (outputFile == "-") {
     SET_BINARY_MODE(stdout);
     return stdout;
@@ -148,82 +146,78 @@
     auto outputFd = std::fopen(outputFile.c_str(), "rb");
     if (outputFd != nullptr) {
       std::fclose(outputFd);
-      if (options.verbosity <= 1) {
-        errorHolder.setError("Output file exists");
+      if (!state.log.logsAt(INFO)) {
+        state.errorHolder.setError("Output file exists");
         return nullptr;
       }
-      std::fprintf(
-          stderr,
+      state.log(
+          INFO,
           "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
           outputFile.c_str());
       int c = getchar();
       if (c != 'y' && c != 'Y') {
-        errorHolder.setError("Not overwritten");
+        state.errorHolder.setError("Not overwritten");
         return nullptr;
       }
     }
   }
   auto outputFd = std::fopen(outputFile.c_str(), "wb");
-  if (!errorHolder.check(
+  if (!state.errorHolder.check(
           outputFd != nullptr, "Failed to open output file")) {
-    return 0;
+    return nullptr;
   }
   return outputFd;
 }
 
 int pzstdMain(const Options &options) {
   int returnCode = 0;
+  SharedState state(options);
   for (const auto& input : options.inputFiles) {
-    // Setup the error holder
-    ErrorHolder errorHolder;
+    // Setup the shared state
     auto printErrorGuard = makeScopeGuard([&] {
-      if (errorHolder.hasError()) {
+      if (state.errorHolder.hasError()) {
         returnCode = 1;
-        if (options.verbosity > 0) {
-          std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(),
-                       errorHolder.getError().c_str());
-        }
-      } else {
-
+        state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(),
+                  state.errorHolder.getError().c_str());
       }
     });
     // Open the input file
-    auto inputFd = openInputFile(input, errorHolder);
+    auto inputFd = openInputFile(input, state.errorHolder);
     if (inputFd == nullptr) {
       continue;
     }
     auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
     // Open the output file
     auto outputFile = options.getOutputFile(input);
-    if (!errorHolder.check(outputFile != "",
+    if (!state.errorHolder.check(outputFile != "",
                            "Input file does not have extension .zst")) {
       continue;
     }
-    auto outputFd = openOutputFile(options, outputFile, errorHolder);
+    auto outputFd = openOutputFile(options, outputFile, state);
     if (outputFd == nullptr) {
       continue;
     }
     auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
     // (de)compress the file
-    handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder);
-    if (errorHolder.hasError()) {
+    handleOneInput(options, input, inputFd, outputFile, outputFd, state);
+    if (state.errorHolder.hasError()) {
       continue;
     }
     // Delete the input file if necessary
     if (!options.keepSource) {
       // Be sure that we are done and have written everything before we delete
-      if (!errorHolder.check(std::fclose(inputFd) == 0,
+      if (!state.errorHolder.check(std::fclose(inputFd) == 0,
                              "Failed to close input file")) {
         continue;
       }
       closeInputGuard.dismiss();
-      if (!errorHolder.check(std::fclose(outputFd) == 0,
+      if (!state.errorHolder.check(std::fclose(outputFd) == 0,
                              "Failed to close output file")) {
         continue;
       }
       closeOutputGuard.dismiss();
       if (std::remove(input.c_str()) != 0) {
-        errorHolder.setError("Failed to remove input file");
+        state.errorHolder.setError("Failed to remove input file");
         continue;
       }
     }
@@ -269,27 +263,25 @@
 /**
  * Stream chunks of input from `in`, compress it, and stream it out to `out`.
  *
- * @param errorHolder Used to report errors and check if an error occured
+ * @param state        The shared state
  * @param in           Queue that we `pop()` input buffers from
  * @param out          Queue that we `push()` compressed output buffers to
  * @param maxInputSize An upper bound on the size of the input
- * @param parameters   The zstd parameters to use for compression
  */
 static void compress(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     std::shared_ptr<BufferWorkQueue> in,
     std::shared_ptr<BufferWorkQueue> out,
-    size_t maxInputSize,
-    ZSTD_parameters parameters) {
+    size_t maxInputSize) {
+  auto& errorHolder = state.errorHolder;
   auto guard = makeScopeGuard([&] { out->finish(); });
   // Initialize the CCtx
-  std::unique_ptr<ZSTD_CStream, size_t (*)(ZSTD_CStream*)> ctx(
-      ZSTD_createCStream(), ZSTD_freeCStream);
+  auto ctx = state.cStreamPool->get();
   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
     return;
   }
   {
-    auto err = ZSTD_initCStream_advanced(ctx.get(), nullptr, 0, parameters, 0);
+    auto err = ZSTD_resetCStream(ctx.get(), 0);
     if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
       return;
     }
@@ -396,7 +388,7 @@
 }
 
 std::uint64_t asyncCompressChunks(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
     ThreadPool& executor,
     FILE* fd,
@@ -410,23 +402,23 @@
   // independently.
   size_t step = calculateStep(size, numThreads, params);
   auto status = FileStatus::Continue;
-  while (status == FileStatus::Continue && !errorHolder.hasError()) {
+  while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
     // Make a new input queue that we will put the chunk's input data into.
     auto in = std::make_shared<BufferWorkQueue>();
     auto inGuard = makeScopeGuard([&] { in->finish(); });
     // Make a new output queue that compress will put the compressed data into.
     auto out = std::make_shared<BufferWorkQueue>();
     // Start compression in the thread pool
-    executor.add([&errorHolder, in, out, step, params] {
+    executor.add([&state, in, out, step] {
       return compress(
-          errorHolder, std::move(in), std::move(out), step, params);
+          state, std::move(in), std::move(out), step);
     });
     // Pass the output queue to the writer thread.
     chunks.push(std::move(out));
     // Fill the input queue for the compression job we just started
     status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
   }
-  errorHolder.check(status != FileStatus::Error, "Error reading input");
+  state.errorHolder.check(status != FileStatus::Error, "Error reading input");
   return bytesRead;
 }
 
@@ -434,24 +426,24 @@
  * Decompress a frame, whose data is streamed into `in`, and stream the output
  * to `out`.
  *
- * @param errorHolder Used to report errors and check if an error occured
+ * @param state        The shared state
  * @param in           Queue that we `pop()` input buffers from. It contains
  *                      exactly one compressed frame.
  * @param out          Queue that we `push()` decompressed output buffers to
  */
 static void decompress(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     std::shared_ptr<BufferWorkQueue> in,
     std::shared_ptr<BufferWorkQueue> out) {
+  auto& errorHolder = state.errorHolder;
   auto guard = makeScopeGuard([&] { out->finish(); });
   // Initialize the DCtx
-  std::unique_ptr<ZSTD_DStream, size_t (*)(ZSTD_DStream*)> ctx(
-      ZSTD_createDStream(), ZSTD_freeDStream);
+  auto ctx = state.dStreamPool->get();
   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
     return;
   }
   {
-    auto err = ZSTD_initDStream(ctx.get());
+    auto err = ZSTD_resetDStream(ctx.get());
     if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
       return;
     }
@@ -509,7 +501,7 @@
 }
 
 std::uint64_t asyncDecompressFrames(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
     ThreadPool& executor,
     FILE* fd) {
@@ -522,7 +514,7 @@
   // Otherwise, we will decompress using only one decompression task.
   const size_t chunkSize = ZSTD_DStreamInSize();
   auto status = FileStatus::Continue;
-  while (status == FileStatus::Continue && !errorHolder.hasError()) {
+  while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
     // Make a new input queue that we will put the frames's bytes into.
     auto in = std::make_shared<BufferWorkQueue>();
     auto inGuard = makeScopeGuard([&] { in->finish(); });
@@ -551,15 +543,15 @@
       out->setMaxSize(64);
     }
     // Start decompression in the thread pool
-    executor.add([&errorHolder, in, out] {
-      return decompress(errorHolder, std::move(in), std::move(out));
+    executor.add([&state, in, out] {
+      return decompress(state, std::move(in), std::move(out));
     });
     // Pass the output queue to the writer thread
     frames.push(std::move(out));
     if (frameSize == 0) {
       // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
       // Pass the rest of the source to this decompression task
-      while (status == FileStatus::Continue && !errorHolder.hasError()) {
+      while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
         status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
       }
       break;
@@ -567,7 +559,7 @@
     // Fill the input queue for the decompression job we just started
     status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
   }
-  errorHolder.check(status != FileStatus::Error, "Error reading input");
+  state.errorHolder.check(status != FileStatus::Error, "Error reading input");
   return totalBytesRead;
 }
 
@@ -582,32 +574,14 @@
   return true;
 }
 
-void updateWritten(int verbosity, std::uint64_t bytesWritten) {
-  if (verbosity <= 1) {
-    return;
-  }
-  using Clock = std::chrono::system_clock;
-  static Clock::time_point then;
-  constexpr std::chrono::milliseconds refreshRate{150};
-
-  auto now = Clock::now();
-  if (now - then > refreshRate) {
-    then = now;
-    std::fprintf(stderr, "\rWritten: %u MB   ",
-                 static_cast<std::uint32_t>(bytesWritten >> 20));
-  }
-}
-
 std::uint64_t writeFile(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
     FILE* outputFd,
-    bool decompress,
-    int verbosity) {
-  auto lineClearGuard = makeScopeGuard([verbosity] {
-    if (verbosity > 1) {
-      std::fprintf(stderr, "\r%79s\r", "");
-    }
+    bool decompress) {
+  auto& errorHolder = state.errorHolder;
+  auto lineClearGuard = makeScopeGuard([&state] {
+    state.log.clear(INFO);
   });
   std::uint64_t bytesWritten = 0;
   std::shared_ptr<BufferWorkQueue> out;
@@ -633,7 +607,8 @@
         return bytesWritten;
       }
       bytesWritten += buffer.size();
-      updateWritten(verbosity, bytesWritten);
+      state.log.update(INFO, "Written: %u MB   ",
+                static_cast<std::uint32_t>(bytesWritten >> 20));
     }
   }
   return bytesWritten;
diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h
index fe44ccf..9fb2c48 100644
--- a/contrib/pzstd/Pzstd.h
+++ b/contrib/pzstd/Pzstd.h
@@ -9,9 +9,11 @@
 #pragma once
 
 #include "ErrorHolder.h"
+#include "Logging.h"
 #include "Options.h"
 #include "utils/Buffer.h"
 #include "utils/Range.h"
+#include "utils/ResourcePool.h"
 #include "utils/ThreadPool.h"
 #include "utils/WorkQueue.h"
 #define ZSTD_STATIC_LINKING_ONLY
@@ -32,12 +34,58 @@
  */
 int pzstdMain(const Options& options);
 
+class SharedState {
+ public:
+  SharedState(const Options& options) : log(options.verbosity) {
+    if (!options.decompress) {
+      auto parameters = options.determineParameters();
+      cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
+          [parameters]() -> ZSTD_CStream* {
+            auto zcs = ZSTD_createCStream();
+            if (zcs) {
+              auto err = ZSTD_initCStream_advanced(
+                  zcs, nullptr, 0, parameters, 0);
+              if (ZSTD_isError(err)) {
+                ZSTD_freeCStream(zcs);
+                return nullptr;
+              }
+            }
+            return zcs;
+          },
+          [](ZSTD_CStream *zcs) {
+            ZSTD_freeCStream(zcs);
+          }});
+    } else {
+      dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
+          []() -> ZSTD_DStream* {
+            auto zds = ZSTD_createDStream();
+            if (zds) {
+              auto err = ZSTD_initDStream(zds);
+              if (ZSTD_isError(err)) {
+                ZSTD_freeDStream(zds);
+                return nullptr;
+              }
+            }
+            return zds;
+          },
+          [](ZSTD_DStream *zds) {
+            ZSTD_freeDStream(zds);
+          }});
+    }
+  }
+
+  Logger log;
+  ErrorHolder errorHolder;
+  std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
+  std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
+};
+
 /**
  * Streams input from `fd`, breaks input up into chunks, and compresses each
  * chunk independently.  Output of each chunk gets streamed to a queue, and
  * the output queues get put into `chunks` in order.
  *
- * @param errorHolder  Used to report errors and coordinate early shutdown
+ * @param state        The shared state
  * @param chunks       Each compression jobs output queue gets `pushed()` here
  *                      as soon as it is available
  * @param executor     The thread pool to run compression jobs in
@@ -48,7 +96,7 @@
  * @returns            The number of bytes read from the file
  */
 std::uint64_t asyncCompressChunks(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
     ThreadPool& executor,
     FILE* fd,
@@ -62,7 +110,7 @@
  * decompression job.  Output of each frame gets streamed to a queue, and
  * the output queues get put into `frames` in order.
  *
- * @param errorHolder  Used to report errors and coordinate early shutdown
+ * @param state        The shared state
  * @param frames       Each decompression jobs output queue gets `pushed()` here
  *                      as soon as it is available
  * @param executor     The thread pool to run compression jobs in
@@ -70,7 +118,7 @@
  * @returns            The number of bytes read from the file
  */
 std::uint64_t asyncDecompressFrames(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
     ThreadPool& executor,
     FILE* fd);
@@ -79,18 +127,16 @@
  * Streams input in from each queue in `outs` in order, and writes the data to
  * `outputFd`.
  *
- * @param errorHolder  Used to report errors and coordinate early exit
+ * @param state        The shared state
  * @param outs         A queue of output queues, one for each
  *                      (de)compression job.
  * @param outputFd     The file descriptor to write to
  * @param decompress   Are we decompressing?
- * @param verbosity    The verbosity level to log at
  * @returns            The number of bytes written
  */
 std::uint64_t writeFile(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
     FILE* outputFd,
-    bool decompress,
-    int verbosity);
+    bool decompress);
 }
diff --git a/contrib/pzstd/README.md b/contrib/pzstd/README.md
index 05ceb55..3fe7b0b 100644
--- a/contrib/pzstd/README.md
+++ b/contrib/pzstd/README.md
@@ -10,7 +10,7 @@
 
 ## Usage
 
-PZstandard supports the same command line interface as Zstandard, but also provies the `-p` option to specify the number of threads.
+PZstandard supports the same command line interface as Zstandard, but also provides the `-p` option to specify the number of threads.
 Dictionary mode is not currently supported.
 
 Basic usage
diff --git a/contrib/pzstd/test/Makefile b/contrib/pzstd/test/Makefile
deleted file mode 100644
index 4f6ba99..0000000
--- a/contrib/pzstd/test/Makefile
+++ /dev/null
@@ -1,48 +0,0 @@
-# ##########################################################################
-# Copyright (c) 2016-present, Facebook, Inc.
-# All rights reserved.
-#
-# This source code is licensed under the BSD-style license found in the
-# LICENSE file in the root directory of this source tree. An additional grant
-# of patent rights can be found in the PATENTS file in the same directory.
-# ##########################################################################
-
-# Define *.exe as extension for Windows systems
-ifneq (,$(filter Windows%,$(OS)))
-EXT =.exe
-else
-EXT =
-endif
-
-PZSTDDIR = ..
-PROGDIR = ../../../programs
-ZSTDDIR = ../../../lib
-
-# Set GTEST_INC and GTEST_LIB to work with your install of gtest
-GTEST_INC ?= -isystem $(PZSTDDIR)/googletest/googletest/include
-GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
-GTEST_FLAGS = $(GTEST_INC) $(GTEST_LIB)
-CPPFLAGS = -I$(PZSTDDIR) -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I.
-
-CXXFLAGS  ?= -O3
-CXXFLAGS  += -std=c++11 -Wno-deprecated-declarations
-CXXFLAGS  += $(MOREFLAGS)
-FLAGS    = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
-
-datagen.o: $(PROGDIR)/datagen.*
-	$(CC) $(CPPFLAGS) -O3 $(MOREFLAGS) $(LDFLAGS) -Wno-long-long -Wno-variadic-macros $(PROGDIR)/datagen.c -c -o $@
-
-%: %.cpp *.h datagen.o
-	$(CXX) $(FLAGS) $@.cpp datagen.o $(PZSTDDIR)/Pzstd.o $(PZSTDDIR)/SkippableFrame.o $(PZSTDDIR)/Options.o $(PZSTDDIR)/libzstd.a -o $@$(EXT) $(GTEST_FLAGS) -lgtest -lgtest_main -lpthread
-
-.PHONY: test clean
-
-test: OptionsTest PzstdTest
-	@./OptionsTest$(EXT)
-	@./PzstdTest$(EXT)
-
-roundtrip: RoundTripTest
-	@./RoundTripTest$(EXT)
-
-clean:
-	@rm -f datagen.o OptionsTest PzstdTest RoundTripTest
diff --git a/contrib/pzstd/utils/ResourcePool.h b/contrib/pzstd/utils/ResourcePool.h
new file mode 100644
index 0000000..ed01130
--- /dev/null
+++ b/contrib/pzstd/utils/ResourcePool.h
@@ -0,0 +1,96 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+#pragma once
+
+#include <cassert>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+namespace pzstd {
+
+/**
+ * An unbounded pool of resources.
+ * A `ResourcePool<T>` requires a factory function that takes allocates `T*` and
+ * a free function that frees a `T*`.
+ * Calling `ResourcePool::get()` will give you a new `ResourcePool::UniquePtr`
+ * to a `T`, and when it goes out of scope the resource will be returned to the
+ * pool.
+ * The `ResourcePool<T>` *must* survive longer than any resources it hands out.
+ * Remember that `ResourcePool<T>` hands out mutable `T`s, so make sure to clean
+ * up the resource before or after every use.
+ */
+template <typename T>
+class ResourcePool {
+ public:
+  class Deleter;
+  using Factory = std::function<T*()>;
+  using Free = std::function<void(T*)>;
+  using UniquePtr = std::unique_ptr<T, Deleter>;
+
+ private:
+  std::mutex mutex_;
+  Factory factory_;
+  Free free_;
+  std::vector<T*> resources_;
+  unsigned inUse_;
+
+ public:
+  /**
+   * Creates a `ResourcePool`.
+   *
+   * @param factory  The function to use to create new resources.
+   * @param free     The function to use to free resources created by `factory`.
+   */
+  ResourcePool(Factory factory, Free free)
+      : factory_(std::move(factory)), free_(std::move(free)), inUse_(0) {}
+
+  /**
+   * @returns  A unique pointer to a resource.  The resource is null iff
+   *           there are no avaiable resources and `factory()` returns null.
+   */
+  UniquePtr get() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    if (!resources_.empty()) {
+      UniquePtr resource{resources_.back(), Deleter{*this}};
+      resources_.pop_back();
+      ++inUse_;
+      return resource;
+    }
+    UniquePtr resource{factory_(), Deleter{*this}};
+    ++inUse_;
+    return resource;
+  }
+
+  ~ResourcePool() noexcept {
+    assert(inUse_ == 0);
+    for (const auto resource : resources_) {
+      free_(resource);
+    }
+  }
+
+  class Deleter {
+    ResourcePool *pool_;
+  public:
+    explicit Deleter(ResourcePool &pool) : pool_(&pool) {}
+
+    void operator() (T *resource) {
+      std::lock_guard<std::mutex> lock(pool_->mutex_);
+      // Make sure we don't put null resources into the pool
+      if (resource) {
+        pool_->resources_.push_back(resource);
+      }
+      assert(pool_->inUse_ > 0);
+      --pool_->inUse_;
+    }
+  };
+};
+
+}
diff --git a/contrib/pzstd/utils/ThreadPool.h b/contrib/pzstd/utils/ThreadPool.h
index a1d1fc0..99b3ecf 100644
--- a/contrib/pzstd/utils/ThreadPool.h
+++ b/contrib/pzstd/utils/ThreadPool.h
@@ -27,7 +27,7 @@
   explicit ThreadPool(std::size_t numThreads) {
     threads_.reserve(numThreads);
     for (std::size_t i = 0; i < numThreads; ++i) {
-      threads_.emplace_back([&] {
+      threads_.emplace_back([this] {
         std::function<void()> task;
         while (tasks_.pop(task)) {
           task();
diff --git a/contrib/pzstd/utils/WorkQueue.h b/contrib/pzstd/utils/WorkQueue.h
index 5382135..780e536 100644
--- a/contrib/pzstd/utils/WorkQueue.h
+++ b/contrib/pzstd/utils/WorkQueue.h
@@ -28,6 +28,7 @@
   std::mutex mutex_;
   std::condition_variable readerCv_;
   std::condition_variable writerCv_;
+  std::condition_variable finishCv_;
 
   std::queue<T> queue_;
   bool done_;
@@ -53,12 +54,13 @@
   /**
    * Push an item onto the work queue.  Notify a single thread that work is
    * available.  If `finish()` has been called, do nothing and return false.
+   * If `push()` returns false, then `item` has not been moved from.
    *
    * @param item  Item to push onto the queue.
    * @returns     True upon success, false if `finish()` has been called.  An
    *               item was pushed iff `push()` returns true.
    */
-  bool push(T item) {
+  bool push(T&& item) {
     {
       std::unique_lock<std::mutex> lock(mutex_);
       while (full() && !done_) {
@@ -124,19 +126,14 @@
     }
     readerCv_.notify_all();
     writerCv_.notify_all();
+    finishCv_.notify_all();
   }
 
   /// Blocks until `finish()` has been called (but the queue may not be empty).
   void waitUntilFinished() {
     std::unique_lock<std::mutex> lock(mutex_);
     while (!done_) {
-      readerCv_.wait(lock);
-      // If we were woken by a push, we need to wake a thread waiting on pop().
-      if (!done_) {
-        lock.unlock();
-        readerCv_.notify_one();
-        lock.lock();
-      }
+      finishCv_.wait(lock);
     }
   }
 };
diff --git a/contrib/pzstd/utils/test/Makefile b/contrib/pzstd/utils/test/Makefile
deleted file mode 100644
index b9ea73e..0000000
--- a/contrib/pzstd/utils/test/Makefile
+++ /dev/null
@@ -1,42 +0,0 @@
-# ##########################################################################
-# Copyright (c) 2016-present, Facebook, Inc.
-# All rights reserved.
-#
-# This source code is licensed under the BSD-style license found in the
-# LICENSE file in the root directory of this source tree. An additional grant
-# of patent rights can be found in the PATENTS file in the same directory.
-# ##########################################################################
-
-# Define *.exe as extension for Windows systems
-ifneq (,$(filter Windows%,$(OS)))
-EXT =.exe
-else
-EXT =
-endif
-
-PZSTDDIR = ../..
-
-# Set GTEST_INC and GTEST_LIB to work with your install of gtest
-GTEST_INC ?= -isystem $(PZSTDDIR)/googletest/googletest/include
-GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
-
-CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB)
-CXXFLAGS  ?= -O3
-CXXFLAGS  += -std=c++11
-CXXFLAGS  += $(MOREFLAGS)
-FLAGS    = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
-
-%: %.cpp
-	$(CXX) $(FLAGS) $^ -o $@$(EXT) -lgtest -lgtest_main -lpthread
-
-.PHONY: test clean
-
-test: BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest
-	@./BufferTest$(EXT)
-	@./RangeTest$(EXT)
-	@./ScopeGuardTest$(EXT)
-	@./ThreadPoolTest$(EXT)
-	@./WorkQueueTest$(EXT)
-
-clean:
-	@rm -f BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest
diff --git a/contrib/pzstd/utils/test/ResourcePoolTest.cpp b/contrib/pzstd/utils/test/ResourcePoolTest.cpp
new file mode 100644
index 0000000..a6a86b3
--- /dev/null
+++ b/contrib/pzstd/utils/test/ResourcePoolTest.cpp
@@ -0,0 +1,72 @@
+/**
+ * Copyright (c) 2016-present, Facebook, Inc.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ */
+#include "utils/ResourcePool.h"
+
+#include <gtest/gtest.h>
+#include <atomic>
+#include <thread>
+
+using namespace pzstd;
+
+TEST(ResourcePool, FullTest) {
+  unsigned numCreated = 0;
+  unsigned numDeleted = 0;
+  {
+    ResourcePool<int> pool(
+      [&numCreated] { ++numCreated; return new int{5}; },
+      [&numDeleted](int *x) { ++numDeleted; delete x; });
+
+    {
+      auto i = pool.get();
+      EXPECT_EQ(5, *i);
+      *i = 6;
+    }
+    {
+      auto i = pool.get();
+      EXPECT_EQ(6, *i);
+      auto j = pool.get();
+      EXPECT_EQ(5, *j);
+      *j = 7;
+    }
+    {
+      auto i = pool.get();
+      EXPECT_EQ(6, *i);
+      auto j = pool.get();
+      EXPECT_EQ(7, *j);
+    }
+  }
+  EXPECT_EQ(2, numCreated);
+  EXPECT_EQ(numCreated, numDeleted);
+}
+
+TEST(ResourcePool, ThreadSafe) {
+  std::atomic<unsigned> numCreated{0};
+  std::atomic<unsigned> numDeleted{0};
+  {
+    ResourcePool<int> pool(
+      [&numCreated] { ++numCreated; return new int{0}; },
+      [&numDeleted](int *x) { ++numDeleted; delete x; });
+    auto push = [&pool] {
+      for (int i = 0; i < 100; ++i) {
+        auto x = pool.get();
+        ++*x;
+      }
+    };
+    std::thread t1{push};
+    std::thread t2{push};
+    t1.join();
+    t2.join();
+
+    auto x = pool.get();
+    auto y = pool.get();
+    EXPECT_EQ(200, *x + *y);
+  }
+  EXPECT_GE(2, numCreated);
+  EXPECT_EQ(numCreated, numDeleted);
+}
diff --git a/contrib/pzstd/utils/test/WorkQueueTest.cpp b/contrib/pzstd/utils/test/WorkQueueTest.cpp
index ebf375a..7f58ccb 100644
--- a/contrib/pzstd/utils/test/WorkQueueTest.cpp
+++ b/contrib/pzstd/utils/test/WorkQueueTest.cpp
@@ -10,6 +10,7 @@
 #include "utils/WorkQueue.h"
 
 #include <gtest/gtest.h>
+#include <memory>
 #include <mutex>
 #include <thread>
 #include <vector>
@@ -64,7 +65,7 @@
   const int max = 100;
 
   for (int i = 0; i < 10; ++i) {
-    queue.push(i);
+    queue.push(int{i});
   }
 
   std::thread thread([ &queue, max ] {
@@ -80,7 +81,7 @@
 
   std::this_thread::yield();
   for (int i = 10; i < max; ++i) {
-    queue.push(i);
+    queue.push(int{i});
   }
   queue.finish();
 
@@ -97,7 +98,7 @@
   }
 
   for (int i = 0; i < 50; ++i) {
-    queue.push(i);
+    queue.push(int{i});
   }
   queue.finish();
 
@@ -126,7 +127,7 @@
     pusherThreads.emplace_back(
         [ &queue, min, max ] {
           for (int i = min; i < max; ++i) {
-            queue.push(i);
+            queue.push(int{i});
           }
         });
   }
@@ -212,7 +213,7 @@
     pusherThreads.emplace_back(
         [ &queue, min, max ] {
           for (int i = min; i < max; ++i) {
-            queue.push(i);
+            queue.push(int{i});
           }
         });
   }
@@ -231,6 +232,18 @@
   }
 }
 
+TEST(WorkQueue, FailedPush) {
+  WorkQueue<std::unique_ptr<int>> queue;
+  std::unique_ptr<int> x(new int{5});
+  EXPECT_TRUE(queue.push(std::move(x)));
+  EXPECT_EQ(nullptr, x);
+  queue.finish();
+  x.reset(new int{6});
+  EXPECT_FALSE(queue.push(std::move(x)));
+  EXPECT_NE(nullptr, x);
+  EXPECT_EQ(6, *x);
+}
+
 TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
   {
     BufferWorkQueue queue;