Merge github.com:grpc/grpc into tis-but-thy-name
diff --git a/include/grpc/byte_buffer.h b/include/grpc/byte_buffer.h
index a62054a..913e2a7 100644
--- a/include/grpc/byte_buffer.h
+++ b/include/grpc/byte_buffer.h
@@ -102,6 +102,10 @@
 int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
                                  gpr_slice *slice);
 
+/** Returns a RAW byte buffer instance from the output of \a reader. */
+grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
+    grpc_byte_buffer_reader *reader);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index e8c24c7..6ad377c 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -369,16 +369,17 @@
     watcher->fd = NULL;
     watcher->pollset = NULL;
     gpr_mu_unlock(&fd->watcher_mu);
+    GRPC_FD_UNREF(fd, "poll");
     return 0;
   }
   /* if there is nobody polling for read, but we need to, then start doing so */
-  if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
+  if (read_mask && !fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
     fd->read_watcher = watcher;
     mask |= read_mask;
   }
   /* if there is nobody polling for write, but we need to, then start doing so
    */
-  if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
+  if (write_mask && !fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
     fd->write_watcher = watcher;
     mask |= write_mask;
   }
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index dcf08d3..1900bbf 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -54,17 +54,25 @@
   pollset_hdr *h = pollset->data.ptr;
   struct epoll_event ev;
   int err;
+  grpc_fd_watcher watcher;
 
-  ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
-  ev.data.ptr = fd;
-  err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
-  if (err < 0) {
-    /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
-    if (errno != EEXIST) {
-      gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
-              strerror(errno));
+  /* We pretend to be polling whilst adding an fd to keep the fd from being
+     closed during the add. This may result in a spurious wakeup being assigned
+     to this pollset whilst adding, but that should be benign. */
+  GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0);
+  if (watcher.fd != NULL) {
+    ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+    ev.data.ptr = fd;
+    err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+    if (err < 0) {
+      /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
+      if (errno != EEXIST) {
+        gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
+                strerror(errno));
+      }
     }
   }
+  grpc_fd_end_poll(&watcher, 0, 0);
 }
 
 static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index 4817e00..a930949 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -55,6 +55,20 @@
   return bb;
 }
 
+grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
+    grpc_byte_buffer_reader *reader) {
+  grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer));
+  gpr_slice slice;
+  bb->type = GRPC_BB_RAW;
+  bb->data.raw.compression = GRPC_COMPRESS_NONE;
+  gpr_slice_buffer_init(&bb->data.raw.slice_buffer);
+
+  while (grpc_byte_buffer_reader_next(reader, &slice)) {
+    gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slice);
+  }
+  return bb;
+}
+
 grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
   switch (bb->type) {
     case GRPC_BB_RAW:
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 8f682e9..4664a08 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -109,9 +109,6 @@
         transport_parsing->incoming_stream_id;
   }
 
-  /* TODO(ctiller): re-implement */
-  GPR_ASSERT(transport_parsing->initial_window_update == 0);
-
   /* copy parsing qbuf to global qbuf */
   gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf);
 
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 0a7b8f5..0aa28da 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -933,6 +933,7 @@
         if (t->parsing.initial_window_update != 0) {
           grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
                                           update_global_window, t);
+          t->parsing.initial_window_update = 0;
         }
         /* handle higher level things */
         grpc_chttp2_publish_reads(&t->global, &t->parsing);
diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb
index 0ff8bb9..6dd0234 100644
--- a/src/ruby/ext/grpc/extconf.rb
+++ b/src/ruby/ext/grpc/extconf.rb
@@ -89,7 +89,7 @@
 $CFLAGS << ' -Wall '
 $CFLAGS << ' -pedantic '
 
-$LDFLAGS << ' -lgrpc -lgpr -ldl'
+$LDFLAGS << ' -lgrpc -lgpr -lz -ldl'
 
 crash('need grpc lib') unless have_library('grpc', 'grpc_channel_destroy')
 have_library('grpc', 'grpc_channel_destroy')
diff --git a/test/core/surface/byte_buffer_reader_test.c b/test/core/surface/byte_buffer_reader_test.c
index 7c2cb94..d9c60e4 100644
--- a/test/core/surface/byte_buffer_reader_test.c
+++ b/test/core/surface/byte_buffer_reader_test.c
@@ -160,6 +160,30 @@
   read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE);
 }
 
+static void test_byte_buffer_from_reader(void) {
+  gpr_slice slice;
+  grpc_byte_buffer *buffer, *buffer_from_reader;
+  grpc_byte_buffer_reader reader;
+
+  LOG_TEST("test_byte_buffer_from_reader");
+  slice = gpr_slice_malloc(4);
+  memcpy(GPR_SLICE_START_PTR(slice), "test", 4);
+  buffer = grpc_raw_byte_buffer_create(&slice, 1);
+  gpr_slice_unref(slice);
+  grpc_byte_buffer_reader_init(&reader, buffer);
+
+  buffer_from_reader = grpc_raw_byte_buffer_from_reader(&reader);
+  GPR_ASSERT(buffer->type == buffer_from_reader->type);
+  GPR_ASSERT(buffer_from_reader->data.raw.compression == GRPC_COMPRESS_NONE);
+  GPR_ASSERT(buffer_from_reader->data.raw.slice_buffer.count == 1);
+  GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(
+                        buffer_from_reader->data.raw.slice_buffer.slices[0]),
+                    "test", 4) == 0);
+
+  grpc_byte_buffer_destroy(buffer);
+  grpc_byte_buffer_destroy(buffer_from_reader);
+}
+
 int main(int argc, char **argv) {
   grpc_test_init(argc, argv);
   test_read_one_slice();
@@ -167,6 +191,7 @@
   test_read_none_compressed_slice();
   test_read_gzip_compressed_slice();
   test_read_deflate_compressed_slice();
+  test_byte_buffer_from_reader();
 
   return 0;
 }
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 210aef4..f5251e9 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -64,7 +64,7 @@
 
 class AsyncQpsServerTest : public Server {
  public:
-  AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) {
+  AsyncQpsServerTest(const ServerConfig &config, int port) {
     char *server_address = NULL;
     gpr_join_host_port(&server_address, "::", port);
 
@@ -97,6 +97,9 @@
       }
     }
     for (int i = 0; i < config.threads(); i++) {
+      shutdown_state_.emplace_back(new PerThreadShutdownState());
+    }
+    for (int i = 0; i < config.threads(); i++) {
       threads_.push_back(std::thread([=]() {
         // Wait until work is available or we are shutting down
         bool ok;
@@ -105,11 +108,9 @@
           ServerRpcContext *ctx = detag(got_tag);
           // The tag is a pointer to an RPC context to invoke
           bool still_going = ctx->RunNextState(ok);
-          std::unique_lock<std::mutex> g(shutdown_mutex_);
-          if (!shutdown_) {
+          if (!shutdown_state_[i]->shutdown()) {
             // this RPC context is done, so refresh it
             if (!still_going) {
-              g.unlock();
               ctx->Reset();
             }
           } else {
@@ -122,9 +123,8 @@
   }
   ~AsyncQpsServerTest() {
     server_->Shutdown();
-    {
-      std::lock_guard<std::mutex> g(shutdown_mutex_);
-      shutdown_ = true;
+    for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
+      (*ss)->set_shutdown();
     }
     for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
       thr->join();
@@ -316,8 +316,25 @@
   TestService::AsyncService async_service_;
   std::forward_list<ServerRpcContext *> contexts_;
 
-  std::mutex shutdown_mutex_;
-  bool shutdown_;
+  class PerThreadShutdownState {
+   public:
+    PerThreadShutdownState() : shutdown_(false) {}
+
+    bool shutdown() const {
+      std::lock_guard<std::mutex> lock(mutex_);
+      return shutdown_;
+    }
+
+    void set_shutdown() {
+      std::lock_guard<std::mutex> lock(mutex_);
+      shutdown_ = true;
+    }
+
+   private:
+    mutable std::mutex mutex_;
+    bool shutdown_;
+  };
+  std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
 };
 
 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,