Merge pull request #4928 from jtattermusch/fix_32bit_linux_tests

Fix 32bit linux tests
diff --git a/Makefile b/Makefile
index f753921..2270fc3 100644
--- a/Makefile
+++ b/Makefile
@@ -208,7 +208,7 @@
 CXXFLAGS_msan = -O0 -fsanitize=memory -fsanitize-memory-track-origins -fno-omit-frame-pointer -DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=1 -Wno-unused-command-line-argument -fPIE -pie
 LDFLAGS_msan = -fsanitize=memory -DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=1 -fPIE -pie $(if $(JENKINS_BUILD),-Wl$(comma)-Ttext-segment=0x7e0000000000,)
 DEFINES_msan = NDEBUG
-DEFINES_msan += GRPC_TEST_SLOWDOWN_BUILD_FACTOR=1.5
+DEFINES_msan += GRPC_TEST_SLOWDOWN_BUILD_FACTOR=2
 
 VALID_CONFIG_mutrace = 1
 CC_mutrace = $(DEFAULT_CC)
diff --git a/build.yaml b/build.yaml
index 6cb861d..3b02c38 100644
--- a/build.yaml
+++ b/build.yaml
@@ -2561,7 +2561,7 @@
       -fPIE -pie $(if $(JENKINS_BUILD),-Wl$(comma)-Ttext-segment=0x7e0000000000,)
     LDXX: clang++
     compile_the_world: true
-    timeout_multiplier: 1.5
+    timeout_multiplier: 2
   mutrace:
     CPPFLAGS: -O0
     DEFINES: _DEBUG DEBUG
diff --git a/setup.py b/setup.py
index 52d3d22..01e9180 100644
--- a/setup.py
+++ b/setup.py
@@ -119,6 +119,7 @@
 }
 
 INSTALL_REQUIRES = (
+    'six>=1.10',
     'enum34>=1.0.4',
     'futures>=2.2.0',
     # TODO(atash): eventually split the grpcio package into a metapackage
@@ -131,6 +132,7 @@
 ) + INSTALL_REQUIRES
 
 COMMAND_CLASS = {
+    'install': commands.Install,
     'doc': commands.SphinxDocumentation,
     'build_proto_modules': commands.BuildProtoModules,
     'build_project_metadata': commands.BuildProjectMetadata,
@@ -138,6 +140,7 @@
     'build_ext': commands.BuildExt,
     'gather': commands.Gather,
     'run_interop': commands.RunInterop,
+    'bdist_egg_grpc_custom': commands.BdistEggCustomName,
 }
 
 # Ensure that package data is copied over before any commands have been run:
@@ -187,7 +190,7 @@
 
 setuptools.setup(
     name='grpcio',
-    version='0.12.0b5',
+    version='0.12.0b6',
     license=LICENSE,
     ext_modules=CYTHON_EXTENSION_MODULES,
     packages=list(PACKAGES),
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 748eef9..fccc1dd 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -284,9 +284,13 @@
   c->connector = connector;
   grpc_connector_ref(c->connector);
   c->num_filters = args->filter_count;
-  c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
-  memcpy((void *)c->filters, args->filters,
-         sizeof(grpc_channel_filter *) * c->num_filters);
+  if (c->num_filters > 0) {
+    c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
+    memcpy((void *)c->filters, args->filters,
+           sizeof(grpc_channel_filter *) * c->num_filters);
+  } else {
+    c->filters = NULL;
+  }
   c->addr = gpr_malloc(args->addr_len);
   memcpy(c->addr, args->addr, args->addr_len);
   grpc_pollset_set_init(&c->pollset_set);
@@ -483,7 +487,9 @@
   /* build final filter list */
   num_filters = c->num_filters + c->connecting_result.num_filters + 1;
   filters = gpr_malloc(sizeof(*filters) * num_filters);
-  memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
+  if (c->num_filters > 0) {
+    memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
+  }
   memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
          sizeof(*filters) * c->connecting_result.num_filters);
   filters[num_filters - 1] = &grpc_connected_channel_filter;
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py
index 29b506a..a6b8ad3 100644
--- a/src/python/grpcio/commands.py
+++ b/src/python/grpcio/commands.py
@@ -30,15 +30,22 @@
 """Provides distutils command classes for the GRPC Python setup process."""
 
 import distutils
+import glob
 import os
 import os.path
+import platform
 import re
+import shutil
 import subprocess
 import sys
+import traceback
 
 import setuptools
+from setuptools.command import bdist_egg
 from setuptools.command import build_ext
 from setuptools.command import build_py
+from setuptools.command import easy_install
+from setuptools.command import install
 from setuptools.command import test
 
 import support
@@ -58,6 +65,129 @@
   """Simple exception class for GRPC custom commands."""
 
 
+# TODO(atash): Remove this once PyPI has better Linux bdist support. See
+# https://bitbucket.org/pypa/pypi/issues/120/binary-wheels-for-linux-are-not-supported
+def _get_linux_bdist_egg(decorated_basename, target_egg_basename):
+  """Returns a string path to a .egg file for Linux to install.
+
+  If we can retrieve a pre-compiled egg from online, uses it. Else, emits a
+  warning and builds from source.
+  """
+  # Break import style to ensure that setup.py has had a chance to install the
+  # relevant package eggs.
+  from six.moves.urllib import request
+  decorated_path = decorated_basename + '.egg'
+  try:
+    url = (
+        'https://storage.googleapis.com/grpc-precompiled-binaries/'
+        'python/{target}'
+            .format(target=decorated_path))
+    egg_data = request.urlopen(url).read()
+  except IOError as error:
+    raise CommandError(
+        '{}\n\nCould not find the bdist egg {}: {}'
+            .format(traceback.format_exc(), decorated_path, error.message))
+  # Our chosen local egg path.
+  egg_path = target_egg_basename + '.egg'
+  try:
+    with open(egg_path, 'w') as egg_file:
+      egg_file.write(egg_data)
+  except IOError as error:
+    raise CommandError(
+        '{}\n\nCould not write grpcio egg: {}'
+            .format(traceback.format_exc(), error.message))
+  return egg_path
+
+
+class EggNameMixin(object):
+
+  def egg_name(self, with_custom):
+    """
+    Args:
+      with_custom: Boolean describing whether or not to decorate the egg name
+        with custom gRPC-specific target information.
+    """
+    egg_command = self.get_finalized_command('bdist_egg')
+    base = os.path.splitext(os.path.basename(egg_command.egg_output))[0]
+    if with_custom:
+      flavor = 'ucs2' if sys.maxunicode == 65535 else 'ucs4'
+      return '{base}-{flavor}'.format(base=base, flavor=flavor)
+    else:
+      return base
+
+
+class Install(install.install, EggNameMixin):
+  """Custom Install command for gRPC Python.
+
+  This is for bdist shims and whatever else we might need a custom install
+  command for.
+  """
+
+  user_options = install.install.user_options + [
+      # TODO(atash): remove this once manylinux gets on PyPI. See
+      # https://bitbucket.org/pypa/pypi/issues/120/binary-wheels-for-linux-are-not-supported
+      ('use-linux-bdist', None,
+       'Whether to retrieve a binary for Linux instead of building from '
+       'source.'),
+  ]
+
+  def initialize_options(self):
+    install.install.initialize_options(self)
+    self.use_linux_bdist = False
+
+  def finalize_options(self):
+    install.install.finalize_options(self)
+
+  def run(self):
+    if self.use_linux_bdist:
+      try:
+        egg_path = _get_linux_bdist_egg(self.egg_name(True),
+                                        self.egg_name(False))
+      except CommandError as error:
+        sys.stderr.write(
+            '\nWARNING: Failed to acquire grpcio prebuilt binary:\n'
+            '{}.\n\n'.format(error.message))
+        raise
+      try:
+        self._run_bdist_retrieval_install(egg_path)
+      except Exception as error:
+        # if anything else happens (and given how there's no way to really know
+        # what's happening in setuptools here, I mean *anything*), warn the user
+        # and fall back to building from source.
+        sys.stderr.write(
+            '{}\nWARNING: Failed to install grpcio prebuilt binary.\n\n'
+                .format(traceback.format_exc()))
+        install.install.run(self)
+    else:
+      install.install.run(self)
+
+  # TODO(atash): Remove this once PyPI has better Linux bdist support. See
+  # https://bitbucket.org/pypa/pypi/issues/120/binary-wheels-for-linux-are-not-supported
+  def _run_bdist_retrieval_install(self, bdist_egg):
+    easy_install = self.distribution.get_command_class('easy_install')
+    easy_install_command = easy_install(
+        self.distribution, args='x', root=self.root, record=self.record,
+    )
+    easy_install_command.ensure_finalized()
+    easy_install_command.always_copy_from = '.'
+    easy_install_command.package_index.scan(glob.glob('*.egg'))
+    arguments = [bdist_egg]
+    if setuptools.bootstrap_install_from:
+      args.insert(0, setuptools.bootstrap_install_from)
+    easy_install_command.args = arguments
+    easy_install_command.run()
+    setuptools.bootstrap_install_from = None
+
+
+class BdistEggCustomName(bdist_egg.bdist_egg, EggNameMixin):
+  """Thin wrapper around the bdist_egg command to build with our custom name."""
+
+  def run(self):
+    bdist_egg.bdist_egg.run(self)
+    target = os.path.join(self.dist_dir, '{}.egg'.format(self.egg_name(True)))
+    shutil.move(self.get_outputs()[0], target)
+
+
 class SphinxDocumentation(setuptools.Command):
   """Command to generate documentation via sphinx."""
 
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index f270cd0..e423ee2 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -46,13 +46,14 @@
 #include <grpc++/client_context.h>
 #include <grpc++/generic/generic_stub.h>
 #include <grpc/grpc.h>
+#include <grpc/support/cpu.h>
 #include <grpc/support/histogram.h>
 #include <grpc/support/log.h>
 
+#include "src/proto/grpc/testing/services.grpc.pb.h"
 #include "test/cpp/qps/client.h"
 #include "test/cpp/qps/timer.h"
 #include "test/cpp/util/create_test_channel.h"
-#include "src/proto/grpc/testing/services.grpc.pb.h"
 
 namespace grpc {
 namespace testing {
@@ -164,14 +165,15 @@
               std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
                   create_stub)
       : ClientImpl<StubType, RequestType>(config, create_stub),
+        num_async_threads_(NumThreads(config)),
         channel_lock_(new std::mutex[config.client_channels()]),
         contexts_(config.client_channels()),
         max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
         channel_count_(config.client_channels()),
-        pref_channel_inc_(config.async_client_threads()) {
-    SetupLoadTest(config, config.async_client_threads());
+        pref_channel_inc_(num_async_threads_) {
+    SetupLoadTest(config, num_async_threads_);
 
-    for (int i = 0; i < config.async_client_threads(); i++) {
+    for (int i = 0; i < num_async_threads_; i++) {
       cli_cqs_.emplace_back(new CompletionQueue);
       if (!closed_loop_) {
         rpc_deadlines_.emplace_back();
@@ -324,6 +326,9 @@
     return true;
   }
 
+ protected:
+  int num_async_threads_;
+
  private:
   class boolean {  // exists only to avoid data-race on vector<bool>
    public:
@@ -338,6 +343,15 @@
    private:
     bool val_;
   };
+  static int NumThreads(const ClientConfig& config) {
+    int num_threads = config.async_client_threads();
+    if (num_threads <= 0) {  // Use dynamic sizing
+      num_threads = gpr_cpu_num_cores();
+      gpr_log(GPR_INFO, "Sizing client server to %d threads", num_threads);
+    }
+    return num_threads;
+  }
+
   std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
 
   std::vector<deadline_list> rpc_deadlines_;  // per thread deadlines
@@ -363,7 +377,7 @@
  public:
   explicit AsyncUnaryClient(const ClientConfig& config)
       : AsyncClient(config, SetupCtx, BenchmarkStubCreator) {
-    StartThreads(config.async_client_threads());
+    StartThreads(num_async_threads_);
   }
   ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
 
@@ -461,7 +475,7 @@
     // async streaming currently only supports closed loop
     GPR_ASSERT(closed_loop_);
 
-    StartThreads(config.async_client_threads());
+    StartThreads(num_async_threads_);
   }
 
   ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
@@ -566,7 +580,7 @@
     // async streaming currently only supports closed loop
     GPR_ASSERT(closed_loop_);
 
-    StartThreads(config.async_client_threads());
+    StartThreads(num_async_threads_);
   }
 
   ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 66269ae..4914e19 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -31,24 +31,24 @@
  *
  */
 
+#include <deque>
 #include <list>
 #include <thread>
-#include <deque>
 #include <vector>
 
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/host_port.h>
 #include <grpc++/client_context.h>
 #include <grpc++/create_channel.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
 
 #include "src/core/support/env.h"
+#include "src/proto/grpc/testing/services.grpc.pb.h"
 #include "test/core/util/port.h"
 #include "test/core/util/test_config.h"
 #include "test/cpp/qps/driver.h"
 #include "test/cpp/qps/histogram.h"
 #include "test/cpp/qps/qps_worker.h"
-#include "src/proto/grpc/testing/services.grpc.pb.h"
 
 using std::list;
 using std::thread;
@@ -142,6 +142,12 @@
     }
   }
 
+  // if num_clients is set to <=0, do dynamic sizing: all workers
+  // except for servers are clients
+  if (num_clients <= 0) {
+    num_clients = workers.size() - num_servers;
+  }
+
   // TODO(ctiller): support running multiple configurations, and binpack
   // client/server pairs
   // to available workers
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index cd8d546..ffa6226 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -50,8 +50,8 @@
 #include <grpc/support/log.h>
 #include <gtest/gtest.h>
 
-#include "test/cpp/qps/server.h"
 #include "src/proto/grpc/testing/services.grpc.pb.h"
+#include "test/cpp/qps/server.h"
 
 namespace grpc {
 namespace testing {
@@ -85,7 +85,13 @@
 
     register_service(&builder, &async_service_);
 
-    for (int i = 0; i < config.async_server_threads(); i++) {
+    int num_threads = config.async_server_threads();
+    if (num_threads <= 0) {  // dynamic sizing
+      num_threads = cores();
+      gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads);
+    }
+
+    for (int i = 0; i < num_threads; i++) {
       srv_cqs_.emplace_back(builder.AddCompletionQueue());
     }
 
@@ -96,8 +102,8 @@
     auto process_rpc_bound =
         std::bind(process_rpc, config.payload_config(), _1, _2);
 
-    for (int i = 0; i < 10000 / config.async_server_threads(); i++) {
-      for (int j = 0; j < config.async_server_threads(); j++) {
+    for (int i = 0; i < 10000 / num_threads; i++) {
+      for (int j = 0; j < num_threads; j++) {
         if (request_unary_function) {
           auto request_unary =
               std::bind(request_unary_function, &async_service_, _1, _2, _3,
@@ -115,10 +121,10 @@
       }
     }
 
-    for (int i = 0; i < config.async_server_threads(); i++) {
+    for (int i = 0; i < num_threads; i++) {
       shutdown_state_.emplace_back(new PerThreadShutdownState());
     }
-    for (int i = 0; i < config.async_server_threads(); i++) {
+    for (int i = 0; i < num_threads; i++) {
       threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
     }
   }
diff --git a/tools/jenkins/run_jenkins.sh b/tools/jenkins/run_jenkins.sh
index 123b252..4cb31e6 100755
--- a/tools/jenkins/run_jenkins.sh
+++ b/tools/jenkins/run_jenkins.sh
@@ -49,7 +49,7 @@
 
 unset platform  # variable named 'platform' breaks the windows build
 
-python tools/run_tests/run_tests.py $USE_DOCKER_MAYBE -t -l $language -c $config -x report.xml -j 3 $@ || TESTS_FAILED="true"
+python tools/run_tests/run_tests.py $USE_DOCKER_MAYBE -t -l $language -c $config -x report.xml -j 2 $@ || TESTS_FAILED="true"
 
 if [ ! -e reports/index.html ]
 then
diff --git a/tools/run_tests/configs.json b/tools/run_tests/configs.json
index 769942d..d508c63 100644
--- a/tools/run_tests/configs.json
+++ b/tools/run_tests/configs.json
@@ -59,7 +59,7 @@
   }, 
   {
     "config": "msan", 
-    "timeout_multiplier": 1.5
+    "timeout_multiplier": 2
   }, 
   {
     "config": "mutrace"
diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py
index beeb99c..adf178b 100755
--- a/tools/run_tests/jobset.py
+++ b/tools/run_tests/jobset.py
@@ -360,7 +360,7 @@
       if self.cancelled(): return False
       current_cpu_cost = self.cpu_cost()
       if current_cpu_cost == 0: break
-      if current_cpu_cost + spec.cpu_cost < self._maxjobs: break
+      if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break
       self.reap()
     if self.cancelled(): return False
     if spec.hash_targets: