Merge github.com:grpc/grpc into lfe3
diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h
index 0f7e115..b763603 100644
--- a/include/grpc++/impl/codegen/proto_utils.h
+++ b/include/grpc++/impl/codegen/proto_utils.h
@@ -41,8 +41,11 @@
 
 class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
  public:
-  explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
-      : block_size_(block_size), byte_count_(0), have_backup_(false) {
+  GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size)
+      : block_size_(block_size),
+        total_size_(total_size),
+        byte_count_(0),
+        have_backup_(false) {
     *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0);
     slice_buffer_ = &(*bp)->data.raw.slice_buffer;
   }
@@ -54,11 +57,20 @@
   }
 
   bool Next(void** data, int* size) override {
+    // Protobuf should not ask for more memory than total_size_.
+    GPR_CODEGEN_ASSERT(byte_count_ < total_size_);
     if (have_backup_) {
       slice_ = backup_slice_;
       have_backup_ = false;
     } else {
-      slice_ = g_core_codegen_interface->grpc_slice_malloc(block_size_);
+      // When less than a whole block is needed, only allocate that much.
+      // But make sure the allocated slice is not inlined.
+      size_t remain = total_size_ - byte_count_ > block_size_
+                          ? block_size_
+                          : total_size_ - byte_count_;
+      slice_ = g_core_codegen_interface->grpc_slice_malloc(
+          remain > GRPC_SLICE_INLINED_SIZE ? remain
+                                           : GRPC_SLICE_INLINED_SIZE + 1);
     }
     *data = GRPC_SLICE_START_PTR(slice_);
     // On win x64, int is only 32bit
@@ -70,7 +82,7 @@
 
   void BackUp(int count) override {
     g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_);
-    if (count == block_size_) {
+    if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) {
       backup_slice_ = slice_;
     } else {
       backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail(
@@ -90,6 +102,7 @@
  protected:
   friend class GrpcBufferWriterPeer;
   const int block_size_;
+  const int total_size_;
   int64_t byte_count_;
   grpc_slice_buffer* slice_buffer_;
   bool have_backup_;
@@ -175,20 +188,20 @@
       "BufferWriter must be a subclass of io::ZeroCopyOutputStream");
   *own_buffer = true;
   int byte_size = msg.ByteSize();
-  if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
+  if ((size_t)byte_size <= GRPC_SLICE_INLINED_SIZE) {
     grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
     GPR_CODEGEN_ASSERT(
         GRPC_SLICE_END_PTR(slice) ==
         msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
     *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
     g_core_codegen_interface->grpc_slice_unref(slice);
+
     return g_core_codegen_interface->ok();
-  } else {
-    BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
-    return msg.SerializeToZeroCopyStream(&writer)
-               ? g_core_codegen_interface->ok()
-               : Status(StatusCode::INTERNAL, "Failed to serialize message");
   }
+  BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size);
+  return msg.SerializeToZeroCopyStream(&writer)
+             ? g_core_codegen_interface->ok()
+             : Status(StatusCode::INTERNAL, "Failed to serialize message");
 }
 
 // BufferReader must be a subclass of io::ZeroCopyInputStream.
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index bf842ba..0888bef 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -202,10 +202,7 @@
 
   struct SyncServerSettings {
     SyncServerSettings()
-        : num_cqs(GPR_MAX(1, gpr_cpu_num_cores())),
-          min_pollers(1),
-          max_pollers(2),
-          cq_timeout_msec(10000) {}
+        : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
 
     /// Number of server completion queues to create to listen to incoming RPCs.
     int num_cqs;
diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc
index 7996777..a4bfbcb 100644
--- a/src/core/lib/iomgr/timer_generic.cc
+++ b/src/core/lib/iomgr/timer_generic.cc
@@ -25,6 +25,7 @@
 #include "src/core/lib/iomgr/timer.h"
 
 #include <grpc/support/alloc.h>
+#include <grpc/support/cpu.h>
 #include <grpc/support/log.h>
 #include <grpc/support/string_util.h>
 #include <grpc/support/sync.h>
@@ -37,8 +38,6 @@
 
 #define INVALID_HEAP_INDEX 0xffffffffu
 
-#define LOG2_NUM_SHARDS 5
-#define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
 #define ADD_DEADLINE_SCALE 0.33
 #define MIN_QUEUE_WINDOW_DURATION 0.01
 #define MAX_QUEUE_WINDOW_DURATION 1
@@ -74,14 +73,16 @@
   grpc_timer list;
 } timer_shard;
 
+static size_t g_num_shards;
+
 /* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address
  * is hashed to select the timer shard to add the timer to */
-static timer_shard g_shards[NUM_SHARDS];
+static timer_shard* g_shards;
 
 /* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
  * the deadline of the next timer in each shard).
  * Access to this is protected by g_shared_mutables.mu */
-static timer_shard* g_shard_queue[NUM_SHARDS];
+static timer_shard** g_shard_queue;
 
 #ifndef NDEBUG
 
@@ -241,6 +242,11 @@
 void grpc_timer_list_init(grpc_exec_ctx* exec_ctx) {
   uint32_t i;
 
+  g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores());
+  g_shards = (timer_shard*)gpr_zalloc(g_num_shards * sizeof(*g_shards));
+  g_shard_queue =
+      (timer_shard**)gpr_zalloc(g_num_shards * sizeof(*g_shard_queue));
+
   g_shared_mutables.initialized = true;
   g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
   gpr_mu_init(&g_shared_mutables.mu);
@@ -250,7 +256,7 @@
   grpc_register_tracer(&grpc_timer_trace);
   grpc_register_tracer(&grpc_timer_check_trace);
 
-  for (i = 0; i < NUM_SHARDS; i++) {
+  for (i = 0; i < g_num_shards; i++) {
     timer_shard* shard = &g_shards[i];
     gpr_mu_init(&shard->mu);
     grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
@@ -267,17 +273,19 @@
 }
 
 void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx) {
-  int i;
+  size_t i;
   run_some_expired_timers(
       exec_ctx, GPR_ATM_MAX, nullptr,
       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
-  for (i = 0; i < NUM_SHARDS; i++) {
+  for (i = 0; i < g_num_shards; i++) {
     timer_shard* shard = &g_shards[i];
     gpr_mu_destroy(&shard->mu);
     grpc_timer_heap_destroy(&shard->heap);
   }
   gpr_mu_destroy(&g_shared_mutables.mu);
   gpr_tls_destroy(&g_last_seen_min_timer);
+  gpr_free(g_shards);
+  gpr_free(g_shard_queue);
   g_shared_mutables.initialized = false;
 }
 
@@ -311,7 +319,7 @@
              g_shard_queue[shard->shard_queue_index - 1]->min_deadline) {
     swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
   }
-  while (shard->shard_queue_index < NUM_SHARDS - 1 &&
+  while (shard->shard_queue_index < g_num_shards - 1 &&
          shard->min_deadline >
              g_shard_queue[shard->shard_queue_index + 1]->min_deadline) {
     swap_adjacent_shards_in_queue(shard->shard_queue_index);
@@ -323,7 +331,7 @@
 void grpc_timer_init(grpc_exec_ctx* exec_ctx, grpc_timer* timer,
                      grpc_millis deadline, grpc_closure* closure) {
   int is_first_timer = 0;
-  timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+  timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
   timer->closure = closure;
   timer->deadline = deadline;
 
@@ -417,7 +425,7 @@
     return;
   }
 
-  timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)];
+  timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
   gpr_mu_lock(&shard->mu);
   if (GRPC_TRACER_ON(grpc_timer_trace)) {
     gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer,
diff --git a/test/cpp/codegen/proto_utils_test.cc b/test/cpp/codegen/proto_utils_test.cc
index 2b853b1..cc355bb 100644
--- a/test/cpp/codegen/proto_utils_test.cc
+++ b/test/cpp/codegen/proto_utils_test.cc
@@ -16,15 +16,16 @@
  *
  */
 
+#include <grpc++/impl/codegen/grpc_library.h>
 #include <grpc++/impl/codegen/proto_utils.h>
 #include <grpc++/impl/grpc_library.h>
+#include <grpc/impl/codegen/byte_buffer.h>
+#include <grpc/slice.h>
 #include <gtest/gtest.h>
 
 namespace grpc {
 namespace internal {
 
-static GrpcLibraryInitializer g_gli_initializer;
-
 // Provide access to GrpcBufferWriter internals.
 class GrpcBufferWriterPeer {
  public:
@@ -44,35 +45,120 @@
 // GrpcBufferWriter Next()/Backup() invocations could result in a dangling
 // pointer returned by Next() due to the interaction between grpc_slice inlining
 // and GRPC_SLICE_START_PTR.
-TEST_F(ProtoUtilsTest, BackupNext) {
-  // Ensure the GrpcBufferWriter internals are initialized.
-  g_gli_initializer.summon();
-
+TEST_F(ProtoUtilsTest, TinyBackupThenNext) {
   grpc_byte_buffer* bp;
-  GrpcBufferWriter writer(&bp, 8192);
+  const int block_size = 1024;
+  GrpcBufferWriter writer(&bp, block_size, 8192);
   GrpcBufferWriterPeer peer(&writer);
 
   void* data;
   int size;
   // Allocate a slice.
   ASSERT_TRUE(writer.Next(&data, &size));
-  EXPECT_EQ(8192, size);
-  // Return a single byte. Before the fix that this test acts as a regression
-  // for, this would have resulted in an inlined backup slice.
+  EXPECT_EQ(block_size, size);
+  // Return a single byte.
   writer.BackUp(1);
-  EXPECT_TRUE(!peer.have_backup());
-  // On the next allocation, the slice is non-inlined.
+  EXPECT_FALSE(peer.have_backup());
+  // On the next allocation, the returned slice is non-inlined.
   ASSERT_TRUE(writer.Next(&data, &size));
   EXPECT_TRUE(peer.slice().refcount != nullptr);
+  EXPECT_EQ(block_size, size);
 
   // Cleanup.
   g_core_codegen_interface->grpc_byte_buffer_destroy(bp);
 }
 
+namespace {
+
+// Set backup_size to 0 to indicate no backup is needed.
+void BufferWriterTest(int block_size, int total_size, int backup_size) {
+  grpc_byte_buffer* bp;
+  GrpcBufferWriter writer(&bp, block_size, total_size);
+
+  int written_size = 0;
+  void* data;
+  int size = 0;
+  bool backed_up_entire_slice = false;
+
+  while (written_size < total_size) {
+    EXPECT_TRUE(writer.Next(&data, &size));
+    EXPECT_GT(size, 0);
+    EXPECT_TRUE(data);
+    int write_size = size;
+    bool should_backup = false;
+    if (backup_size > 0 && size > backup_size) {
+      write_size = size - backup_size;
+      should_backup = true;
+    } else if (size == backup_size && !backed_up_entire_slice) {
+      // only backup entire slice once.
+      backed_up_entire_slice = true;
+      should_backup = true;
+      write_size = 0;
+    }
+    // May need a last backup.
+    if (write_size + written_size > total_size) {
+      write_size = total_size - written_size;
+      should_backup = true;
+      backup_size = size - write_size;
+      ASSERT_GT(backup_size, 0);
+    }
+    for (int i = 0; i < write_size; i++) {
+      ((uint8_t*)data)[i] = written_size % 128;
+      written_size++;
+    }
+    if (should_backup) {
+      writer.BackUp(backup_size);
+    }
+  }
+  EXPECT_EQ(grpc_byte_buffer_length(bp), (size_t)total_size);
+
+  grpc_byte_buffer_reader reader;
+  grpc_byte_buffer_reader_init(&reader, bp);
+  int read_bytes = 0;
+  while (read_bytes < total_size) {
+    grpc_slice s;
+    EXPECT_TRUE(grpc_byte_buffer_reader_next(&reader, &s));
+    for (size_t i = 0; i < GRPC_SLICE_LENGTH(s); i++) {
+      EXPECT_EQ(GRPC_SLICE_START_PTR(s)[i], read_bytes % 128);
+      read_bytes++;
+    }
+    grpc_slice_unref(s);
+  }
+  EXPECT_EQ(read_bytes, total_size);
+  grpc_byte_buffer_reader_destroy(&reader);
+  grpc_byte_buffer_destroy(bp);
+}
+
+TEST(WriterTest, TinyBlockTinyBackup) {
+  for (int i = 2; i < (int)GRPC_SLICE_INLINED_SIZE; i++) {
+    BufferWriterTest(i, 256, 1);
+  }
+}
+
+TEST(WriterTest, SmallBlockTinyBackup) { BufferWriterTest(64, 256, 1); }
+
+TEST(WriterTest, SmallBlockNoBackup) { BufferWriterTest(64, 256, 0); }
+
+TEST(WriterTest, SmallBlockFullBackup) { BufferWriterTest(64, 256, 64); }
+
+TEST(WriterTest, LargeBlockTinyBackup) { BufferWriterTest(4096, 8192, 1); }
+
+TEST(WriterTest, LargeBlockNoBackup) { BufferWriterTest(4096, 8192, 0); }
+
+TEST(WriterTest, LargeBlockFullBackup) { BufferWriterTest(4096, 8192, 4096); }
+
+TEST(WriterTest, LargeBlockLargeBackup) { BufferWriterTest(4096, 8192, 4095); }
+
+}  // namespace
 }  // namespace internal
 }  // namespace grpc
 
 int main(int argc, char** argv) {
+  // Ensure the GrpcBufferWriter internals are initialized.
+  grpc::internal::GrpcLibraryInitializer init;
+  init.summon();
+  grpc::GrpcLibraryCodegen lib;
+
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }