Merge pull request #3615 from dgquintas/gcc_520

Type conversion fixes to make GCC 5.2.0 happy
diff --git a/examples/python/helloworld/README.md b/examples/python/helloworld/README.md
index 070b9e8..e889863 100644
--- a/examples/python/helloworld/README.md
+++ b/examples/python/helloworld/README.md
@@ -1,113 +1 @@
-# gRPC Python Hello World
-
-This is a quick introduction with a simple example and installation instructions: for a more complete tutorial see [gRPC Basics: Python](../route_guide).
-
-### Install gRPC
-Make sure you have built gRPC Python from source on your system. Follow the instructions here:
-[https://github.com/grpc/grpc/blob/master/src/python/README.md](https://github.com/grpc/grpc/blob/master/src/python/README.md).
-
-This gives you a python virtual environment with installed gRPC Python
-in GRPC_ROOT/python2.7_virtual_environment. GRPC_ROOT is the path to which you
-have cloned the [gRPC git repo](https://github.com/grpc/grpc).
-
-### Get the source code
-
-The example code for our Hello World and our other examples live in the `examples`
-directory. Clone this repository to your local machine by running the
-following command:
-
-
-```sh
-$ git clone https://github.com/grpc/grpc.git
-```
-
-Change your current directory to examples/python/helloworld
-
-```sh
-$ cd examples/python/helloworld/
-```
-
-### Defining a service
-
-The first step in creating our example is to define a *service*: an RPC
-service specifies the methods that can be called remotely with their parameters
-and return types. As you saw in the
-[overview](#protocolbuffers) above, gRPC does this using [protocol
-buffers](https://developers.google.com/protocol-buffers/docs/overview). We
-use the protocol buffers interface definition language (IDL) to define our
-service methods, and define the parameters and return
-types as protocol buffer message types. Both the client and the
-server use interface code generated from the service definition.
-
-Here's our example service definition. The `Greeting`
-service has one method, `hello`, that lets the server receive a single
-`HelloRequest`
-message from the remote client containing the user's name, then send back
-a greeting in a single `HelloReply`. This is the simplest type of RPC you
-can specify in gRPC.
-
-```
-syntax = "proto3";
-
-option java_package = "io.grpc.examples";
-
-package helloworld;
-
-// The greeting service definition.
-service Greeter {
-  // Sends a greeting
-  rpc SayHello (HelloRequest) returns (HelloReply) {}
-}
-
-// The request message containing the user's name.
-message HelloRequest {
-  string name = 1;
-}
-
-// The response message containing the greetings
-message HelloReply {
-  string message = 1;
-}
-
-```
-
-<a name="generating"></a>
-### Generating gRPC code
-
-Once we've defined our service, we use the protocol buffer compiler
-`protoc` to generate the special client and server code we need to create
-our application. The generated code contains both stub code for clients to
-use and an abstract interface for servers to implement, both with the method
-defined in our `Greeting` service.
-
-To generate the client and server side interfaces:
-
-```sh
-$ ./run_codegen.sh
-```
-Which internally invokes the proto-compiler as:
-
-```sh
-$ protoc -I ../../protos --python_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_python_plugin` ../../protos/helloworld.proto
-```
-
-### The client
-
-Client-side code can be found in [greeter_client.py](greeter_client.py).
-
-You can run the client using:
-
-```sh
-$ ./run_client.sh
-```
-
-
-### The server
-
-Server side code can be found in [greeter_server.py](greeter_server.py). 
-
-You can run the server using:
-
-```sh
-$ ./run_server.sh
-```
+[This code's documentation lives on the grpc.io site.](http://www.grpc.io/docs)
diff --git a/examples/python/route_guide/README.md b/examples/python/route_guide/README.md
index cb1aa7d..17b8a8e 100644
--- a/examples/python/route_guide/README.md
+++ b/examples/python/route_guide/README.md
@@ -1,299 +1 @@
-#gRPC Basics: Python
-
-This tutorial provides a basic Python programmer's introduction to working with gRPC. By walking through this example you'll learn how to:
-
-- Define a service in a .proto file.
-- Generate server and client code using the protocol buffer compiler.
-- Use the Python gRPC API to write a simple client and server for your service.
-
-It assumes that you have read the [Getting started](https://github.com/grpc/grpc/tree/master/examples) guide and are familiar with [protocol buffers] (https://developers.google.com/protocol-buffers/docs/overview). Note that the example in this tutorial uses the proto3 version of the protocol buffers language, which is currently in alpha release:you can find out more in the [proto3 language guide](https://developers.google.com/protocol-buffers/docs/proto3) and see the [release notes](https://github.com/google/protobuf/releases) for the new version in the protocol buffers Github repository.
-
-This isn't a comprehensive guide to using gRPC in Python: more reference documentation is coming soon.
-
-
-## Why use gRPC?
-
-This example is a simple route mapping application that lets clients get information about features on their route, create a summary of their route, and exchange route information such as traffic updates with the server and other clients.
-
-With gRPC you can define your service once in a .proto file and implement clients and servers in any of gRPC's supported languages, which in turn can be run in environments ranging from servers inside Google to your own tablet, with all the complexity of communication between different languages and environments is handled for you by gRPC. You also get all the advantages of working with protocol buffers, including efficient serialization, a simple IDL, and easy interface updating.
-
-## Example code and setup
-
-The example code for this tutorial is in [examples/python/route_guide](.). To download the example, clone this repository by running the following command:
-```shell
-$ git clone https://github.com/grpc/grpc.git
-```
-
-Then change your current directory to `examples/python/route_guide`:
-```shell
-$ cd examples/python/route_guide
-```
-
-You also should have the relevant tools installed to generate the server and client interface code - if you don't already, follow the setup instructions in [the Python quick start guide](../helloworld).
-
-## Defining the service
-
-Your first step (as you'll know from [Getting started](https://github.com/grpc/grpc/tree/master/examples)) is to define the gRPC *service* and the method *request* and *response* types using [protocol buffers](https://developers.google.com/protocol-buffers/docs/overview). You can see the complete .proto file in [`examples/protos/route_guide.proto`](../../protos/route_guide.proto).
-
-To define a service, you specify a named `service` in your .proto file:
-
-```protobuf
-service RouteGuide {
-   // (Method definitions not shown)
-}
-```
-
-Then you define `rpc` methods inside your service definition, specifying their request and response types. gRPC lets you define four kinds of service method, all of which are used in the `RouteGuide` service:
-
-- A *simple RPC* where the client sends a request to the server using the stub and waits for a response to come back, just like a normal function call.
-```protobuf
-   // Obtains the feature at a given position.
-   rpc GetFeature(Point) returns (Feature) {}
-```
-
-- A *response-streaming RPC* where the client sends a request to the server and gets a stream to read a sequence of messages back. The client reads from the returned stream until there are no more messages. As you can see in the example, you specify a response-streaming method by placing the `stream` keyword before the *response* type.
-```protobuf
-  // Obtains the Features available within the given Rectangle.  Results are
-  // streamed rather than returned at once (e.g. in a response message with a
-  // repeated field), as the rectangle may cover a large area and contain a
-  // huge number of features.
-  rpc ListFeatures(Rectangle) returns (stream Feature) {}
-```
-
-- A *request-streaming RPC* where the client writes a sequence of messages and sends them to the server, again using a provided stream. Once the client has finished writing the messages, it waits for the server to read them all and return its response. You specify a request-streaming method by placing the `stream` keyword before the *request* type.
-```protobuf
-  // Accepts a stream of Points on a route being traversed, returning a
-  // RouteSummary when traversal is completed.
-  rpc RecordRoute(stream Point) returns (RouteSummary) {}
-```
-
-- A *bidirectionally-streaming RPC* where both sides send a sequence of messages using a read-write stream. The two streams operate independently, so clients and servers can read and write in whatever order they like: for example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes. The order of messages in each stream is preserved. You specify this type of method by placing the `stream` keyword before both the request and the response.
-```protobuf
-  // Accepts a stream of RouteNotes sent while a route is being traversed,
-  // while receiving other RouteNotes (e.g. from other users).
-  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
-```
-
-Your .proto file also contains protocol buffer message type definitions for all the request and response types used in our service methods - for example, here's the `Point` message type:
-```protobuf
-// Points are represented as latitude-longitude pairs in the E7 representation
-// (degrees multiplied by 10**7 and rounded to the nearest integer).
-// Latitudes should be in the range +/- 90 degrees and longitude should be in
-// the range +/- 180 degrees (inclusive).
-message Point {
-  int32 latitude = 1;
-  int32 longitude = 2;
-}
-```
-
-## Generating client and server code
-
-Next you need to generate the gRPC client and server interfaces from your .proto service definition. You do this using the protocol buffer compiler `protoc` with a special gRPC Python plugin. Make sure you've installed protoc and followed the gRPC Python plugin [installation instructions](https://github.com/grpc/grpc/blob/master/INSTALL) first):
-
-With `protoc` and the gRPC Python plugin installed, use the following command to generate the Python code:
-
-```shell
-$ protoc -I ../../protos --python_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_python_plugin` ../../protos/route_guide.proto
-```
-
-Note that as we've already provided a version of the generated code in the example repository, running this command regenerates the appropriate file rather than creates a new one. The generated code file is called `route_guide_pb2.py` and contains:
-- classes for the messages defined in route_guide.proto
-- abstract classes for the service defined in route_guide.proto
-   - `BetaRouteGuideServicer`, which defines the interface for implementations of the RouteGuide service
-   - `BetaRouteGuideStub`, which can be used by clients to invoke RouteGuide RPCs
-- functions for application use
-   - `beta_create_RouteGuide_server`, which creates a gRPC server given a `BetaRouteGuideServicer` object
-   - `beta_create_RouteGuide_stub`, which can be used by clients to create a stub object
-
-<a name="server"></a>
-## Creating the server
-
-First let's look at how you create a `RouteGuide` server. If you're only interested in creating gRPC clients, you can skip this section and go straight to [Creating the client](#client) (though you might find it interesting anyway!).
-
-Creating and running a `RouteGuide` server breaks down into two work items:
-- Implementing the servicer interface generated from our service definition with functions that perform the actual "work" of the service.
-- Running a gRPC server to listen for requests from clients and transmit responses.
-
-You can find the example `RouteGuide` server in [route_guide_server.py](route_guide_server.py).
-
-### Implementing RouteGuide
-
-`route_guide_server.py` has a `RouteGuideServicer` class that implements the generated interface `route_guide_pb2.BetaRouteGuideServicer`:
-
-```python
-# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
-class RouteGuideServicer(route_guide_pb2.BetaRouteGuideServicer):
-```
-
-`RouteGuideServicer` implements all the `RouteGuide` service methods.
-
-#### Simple RPC
-
-Let's look at the simplest type first, `GetFeature`, which just gets a `Point` from the client and returns the corresponding feature information from its database in a `Feature`.
-
-```python
-  def GetFeature(self, request, context):
-    feature = get_feature(self.db, request)
-    if feature is None:
-      return route_guide_pb2.Feature(name="", location=request)
-    else:
-      return feature
-```
-
-The method is passed a `route_guide_pb2.Point` request for the RPC, and a `ServicerContext` object that provides RPC-specific information such as timeout limits. It returns a `route_guide_pb2.Feature` response.
-
-#### Response-streaming RPC
-
-Now let's look at the next method. `ListFeatures` is a response-streaming RPC that sends multiple `Feature`s to the client.
-
-```python
-  def ListFeatures(self, request, context):
-    left = min(request.lo.longitude, request.hi.longitude)
-    right = max(request.lo.longitude, request.hi.longitude)
-    top = max(request.lo.latitude, request.hi.latitude)
-    bottom = min(request.lo.latitude, request.hi.latitude)
-    for feature in self.db:
-      if (feature.location.longitude >= left and
-          feature.location.longitude <= right and
-          feature.location.latitude >= bottom and
-          feature.location.latitude <= top):
-        yield feature
-```
-
-Here the request message is a `route_guide_pb2.Rectangle` within which the client wants to find `Feature`s. Instead of returning a single response the method yields zero or more responses.
-
-#### Request-streaming RPC
-
-The request-streaming method `RecordRoute` uses an [iterator](https://docs.python.org/2/library/stdtypes.html#iterator-types) of request values and returns a single response value.
-
-```python
-  def RecordRoute(self, request_iterator, context):
-    point_count = 0
-    feature_count = 0
-    distance = 0.0
-    prev_point = None
-
-    start_time = time.time()
-    for point in request_iterator:
-      point_count += 1
-      if get_feature(self.db, point):
-        feature_count += 1
-      if prev_point:
-        distance += get_distance(prev_point, point)
-      prev_point = point
-
-    elapsed_time = time.time() - start_time
-    return route_guide_pb2.RouteSummary(point_count=point_count,
-                                        feature_count=feature_count,
-                                        distance=int(distance),
-                                        elapsed_time=int(elapsed_time))
-```
-
-#### Bidirectional streaming RPC
-
-Lastly let's look at the bidirectionally-streaming method `RouteChat`.
-
-```python
-  def RouteChat(self, request_iterator, context):
-    prev_notes = []
-    for new_note in request_iterator:
-      for prev_note in prev_notes:
-        if prev_note.location == new_note.location:
-          yield prev_note
-      prev_notes.append(new_note)
-```
-
-This method's semantics are a combination of those of the request-streaming method and the response-streaming method. It is passed an iterator of request values and is itself an iterator of response values.
-
-### Starting the server
-
-Once you have implemented all the `RouteGuide` methods, the next step is to start up a gRPC server so that clients can actually use your service:
-
-```python
-def serve():
-  server = route_guide_pb2.beta_create_RouteGuide_server(RouteGuideServicer())
-  server.add_insecure_port('[::]:50051')
-  server.start()
-```
-
-Because `start()` does not block you may need to sleep-loop if there is nothing else for your code to do while serving.
-
-<a name="client"></a>
-## Creating the client
-
-You can see the complete example client code in [route_guide_client.py](route_guide_client.py).
-
-### Creating a stub
-
-To call service methods, we first need to create a *stub*.
-
-We use the `beta_create_RouteGuide_stub` function of the `route_guide_pb2` module, generated from our .proto.
-
-```python
-channel = implementations.insecure_channel('localhost', 50051)
-stub = beta_create_RouteGuide_stub(channel)
-```
-
-The returned object implements all the methods defined by the `BetaRouteGuideStub` interface.
-
-### Calling service methods
-
-For RPC methods that return a single response ("response-unary" methods), gRPC Python supports both synchronous (blocking) and asynchronous (non-blocking) control flow semantics. For response-streaming RPC methods, calls immediately return an iterator of response values. Calls to that iterator's `next()` method block until the response to be yielded from the iterator becomes available.
-
-#### Simple RPC
-
-A synchronous call to the simple RPC `GetFeature` is nearly as straightforward as calling a local method. The RPC call waits for the server to respond, and will either return a response or raise an exception:
-
-```python
-feature = stub.GetFeature(point, timeout_in_seconds)
-```
-
-An asynchronous call to `GetFeature` is similar, but like calling a local method asynchronously in a thread pool:
-
-```python
-feature_future = stub.GetFeature.future(point, timeout_in_seconds)
-feature = feature_future.result()
-```
-
-#### Response-streaming RPC
-
-Calling the response-streaming `ListFeatures` is similar to working with sequence types:
-
-```python
-for feature in stub.ListFeatures(rectangle, timeout_in_seconds):
-```
-
-#### Request-streaming RPC
-
-Calling the request-streaming `RecordRoute` is similar to passing a sequence to a local method. Like the simple RPC above that also returns a single response, it can be called synchronously or asynchronously:
-
-```python
-route_summary = stub.RecordRoute(point_sequence, timeout_in_seconds)
-```
-
-```python
-route_summary_future = stub.RecordRoute.future(point_sequence, timeout_in_seconds)
-route_summary = route_summary_future.result()
-```
-
-#### Bidirectional streaming RPC
-
-Calling the bidirectionally-streaming `RouteChat` has (as is the case on the service-side) a combination of the request-streaming and response-streaming semantics:
-
-```python
-for received_route_note in stub.RouteChat(sent_routes, timeout_in_seconds):
-```
-
-## Try it out!
-
-Run the server, which will listen on port 50051:
-
-```shell
-$ python route_guide_server.py
-```
-
-Run the client (in a different terminal):
-
-```shell
-$ python route_guide_client.py
-```
+[This code's documentation lives on the grpc.io site.](http://www.grpc.io/docs/tutorials/basic/python.html)
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index b48b7f0..231bc98 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -45,10 +45,8 @@
 #include <grpc/support/log.h>
 #include <grpc/support/useful.h>
 
-enum descriptor_state {
-  NOT_READY = 0,
-  READY = 1
-}; /* or a pointer to a closure to call */
+#define CLOSURE_NOT_READY ((grpc_closure *)0)
+#define CLOSURE_READY ((grpc_closure *)1)
 
 /* We need to keep a freelist not because of any concerns of malloc performance
  * but instead so that implementations with multiple threads in (for example)
@@ -88,14 +86,13 @@
   gpr_mu_unlock(&fd_freelist_mu);
   if (r == NULL) {
     r = gpr_malloc(sizeof(grpc_fd));
-    gpr_mu_init(&r->set_state_mu);
-    gpr_mu_init(&r->watcher_mu);
+    gpr_mu_init(&r->mu);
   }
 
   gpr_atm_rel_store(&r->refst, 1);
-  gpr_atm_rel_store(&r->readst, NOT_READY);
-  gpr_atm_rel_store(&r->writest, NOT_READY);
-  gpr_atm_rel_store(&r->shutdown, 0);
+  r->shutdown = 0;
+  r->read_closure = CLOSURE_NOT_READY;
+  r->write_closure = CLOSURE_NOT_READY;
   r->fd = fd;
   r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
       &r->inactive_watcher_root;
@@ -107,8 +104,7 @@
 }
 
 static void destroy(grpc_fd *fd) {
-  gpr_mu_destroy(&fd->set_state_mu);
-  gpr_mu_destroy(&fd->watcher_mu);
+  gpr_mu_destroy(&fd->mu);
   gpr_free(fd);
 }
 
@@ -173,39 +169,35 @@
   return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
 }
 
-static void pollset_kick_locked(grpc_pollset *pollset) {
-  gpr_mu_lock(GRPC_POLLSET_MU(pollset));
-  grpc_pollset_kick(pollset, NULL);
-  gpr_mu_unlock(GRPC_POLLSET_MU(pollset));
+static void pollset_kick_locked(grpc_fd_watcher *watcher) {
+  gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset));
+  GPR_ASSERT(watcher->worker);
+  grpc_pollset_kick_ext(watcher->pollset, watcher->worker,
+                        GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+  gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset));
 }
 
 static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
   if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
-    pollset_kick_locked(fd->inactive_watcher_root.next->pollset);
+    pollset_kick_locked(fd->inactive_watcher_root.next);
   } else if (fd->read_watcher) {
-    pollset_kick_locked(fd->read_watcher->pollset);
+    pollset_kick_locked(fd->read_watcher);
   } else if (fd->write_watcher) {
-    pollset_kick_locked(fd->write_watcher->pollset);
+    pollset_kick_locked(fd->write_watcher);
   }
 }
 
-static void maybe_wake_one_watcher(grpc_fd *fd) {
-  gpr_mu_lock(&fd->watcher_mu);
-  maybe_wake_one_watcher_locked(fd);
-  gpr_mu_unlock(&fd->watcher_mu);
-}
-
 static void wake_all_watchers_locked(grpc_fd *fd) {
   grpc_fd_watcher *watcher;
   for (watcher = fd->inactive_watcher_root.next;
        watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
-    pollset_kick_locked(watcher->pollset);
+    pollset_kick_locked(watcher);
   }
   if (fd->read_watcher) {
-    pollset_kick_locked(fd->read_watcher->pollset);
+    pollset_kick_locked(fd->read_watcher);
   }
   if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
-    pollset_kick_locked(fd->write_watcher->pollset);
+    pollset_kick_locked(fd->write_watcher);
   }
 }
 
@@ -218,7 +210,7 @@
                     const char *reason) {
   fd->on_done_closure = on_done;
   shutdown(fd->fd, SHUT_RDWR);
-  gpr_mu_lock(&fd->watcher_mu);
+  gpr_mu_lock(&fd->mu);
   REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
   if (!has_watchers(fd)) {
     fd->closed = 1;
@@ -227,7 +219,7 @@
   } else {
     wake_all_watchers_locked(fd);
   }
-  gpr_mu_unlock(&fd->watcher_mu);
+  gpr_mu_unlock(&fd->mu);
   UNREF_BY(fd, 2, reason); /* drop the reference */
 }
 
@@ -247,136 +239,121 @@
 void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
 #endif
 
-static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st,
-                      grpc_closure *closure) {
-  switch (gpr_atm_acq_load(st)) {
-    case NOT_READY:
-      /* There is no race if the descriptor is already ready, so we skip
-         the interlocked op in that case.  As long as the app doesn't
-         try to set the same upcall twice (which it shouldn't) then
-         oldval should never be anything other than READY or NOT_READY.  We
-         don't
-         check for user error on the fast path. */
-      if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) {
-        /* swap was successful -- the closure will run after the next
-           set_ready call.  NOTE: we don't have an ABA problem here,
-           since we should never have concurrent calls to the same
-           notify_on function. */
-        maybe_wake_one_watcher(fd);
-        return;
-      }
-    /* swap was unsuccessful due to an intervening set_ready call.
-       Fall through to the READY code below */
-    case READY:
-      GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
-      gpr_atm_rel_store(st, NOT_READY);
-      grpc_exec_ctx_enqueue(exec_ctx, closure,
-                            !gpr_atm_acq_load(&fd->shutdown));
-      return;
-    default: /* WAITING */
-      /* upcallptr was set to a different closure.  This is an error! */
-      gpr_log(GPR_ERROR,
-              "User called a notify_on function with a previous callback still "
-              "pending");
-      abort();
-  }
-  gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
-  abort();
-}
-
-static void set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
-                             gpr_atm *st) {
-  gpr_intptr state = gpr_atm_acq_load(st);
-
-  switch (state) {
-    case READY:
-      /* duplicate ready, ignore */
-      return;
-    case NOT_READY:
-      if (gpr_atm_rel_cas(st, NOT_READY, READY)) {
-        /* swap was successful -- the closure will run after the next
-           notify_on call. */
-        return;
-      }
-      /* swap was unsuccessful due to an intervening set_ready call.
-         Fall through to the WAITING code below */
-      state = gpr_atm_acq_load(st);
-    default: /* waiting */
-      GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
-                 gpr_atm_no_barrier_load(st) != NOT_READY);
-      grpc_exec_ctx_enqueue(exec_ctx, (grpc_closure *)state,
-                            !gpr_atm_acq_load(&fd->shutdown));
-      gpr_atm_rel_store(st, NOT_READY);
-      return;
+static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+                             grpc_closure **st, grpc_closure *closure) {
+  if (*st == CLOSURE_NOT_READY) {
+    /* not ready ==> switch to a waiting state by setting the closure */
+    *st = closure;
+  } else if (*st == CLOSURE_READY) {
+    /* already ready ==> queue the closure to run immediately */
+    *st = CLOSURE_NOT_READY;
+    grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown);
+    maybe_wake_one_watcher_locked(fd);
+  } else {
+    /* upcallptr was set to a different closure.  This is an error! */
+    gpr_log(GPR_ERROR,
+            "User called a notify_on function with a previous callback still "
+            "pending");
+    abort();
   }
 }
 
-static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *st) {
+/* returns 1 if state becomes not ready */
+static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+                            grpc_closure **st) {
+  if (*st == CLOSURE_READY) {
+    /* duplicate ready ==> ignore */
+    return 0;
+  } else if (*st == CLOSURE_NOT_READY) {
+    /* not ready, and not waiting ==> flag ready */
+    *st = CLOSURE_READY;
+    return 0;
+  } else {
+    /* waiting ==> queue closure */
+    grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown);
+    *st = CLOSURE_NOT_READY;
+    return 1;
+  }
+}
+
+static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) {
   /* only one set_ready can be active at once (but there may be a racing
      notify_on) */
-  gpr_mu_lock(&fd->set_state_mu);
+  gpr_mu_lock(&fd->mu);
   set_ready_locked(exec_ctx, fd, st);
-  gpr_mu_unlock(&fd->set_state_mu);
+  gpr_mu_unlock(&fd->mu);
 }
 
 void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
-  gpr_mu_lock(&fd->set_state_mu);
+  gpr_mu_lock(&fd->mu);
   GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
-  gpr_atm_rel_store(&fd->shutdown, 1);
-  set_ready_locked(exec_ctx, fd, &fd->readst);
-  set_ready_locked(exec_ctx, fd, &fd->writest);
-  gpr_mu_unlock(&fd->set_state_mu);
+  fd->shutdown = 1;
+  set_ready_locked(exec_ctx, fd, &fd->read_closure);
+  set_ready_locked(exec_ctx, fd, &fd->write_closure);
+  gpr_mu_unlock(&fd->mu);
 }
 
 void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                             grpc_closure *closure) {
-  notify_on(exec_ctx, fd, &fd->readst, closure);
+  gpr_mu_lock(&fd->mu);
+  notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
+  gpr_mu_unlock(&fd->mu);
 }
 
 void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
                              grpc_closure *closure) {
-  notify_on(exec_ctx, fd, &fd->writest, closure);
+  gpr_mu_lock(&fd->mu);
+  notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
+  gpr_mu_unlock(&fd->mu);
 }
 
 gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
-                              gpr_uint32 read_mask, gpr_uint32 write_mask,
-                              grpc_fd_watcher *watcher) {
+                              grpc_pollset_worker *worker, gpr_uint32 read_mask,
+                              gpr_uint32 write_mask, grpc_fd_watcher *watcher) {
   gpr_uint32 mask = 0;
+  grpc_closure *cur;
+  int requested;
   /* keep track of pollers that have requested our events, in case they change
    */
   GRPC_FD_REF(fd, "poll");
 
-  gpr_mu_lock(&fd->watcher_mu);
+  gpr_mu_lock(&fd->mu);
+
   /* if we are shutdown, then don't add to the watcher set */
   if (gpr_atm_no_barrier_load(&fd->shutdown)) {
     watcher->fd = NULL;
     watcher->pollset = NULL;
-    gpr_mu_unlock(&fd->watcher_mu);
+    watcher->worker = NULL;
+    gpr_mu_unlock(&fd->mu);
     GRPC_FD_UNREF(fd, "poll");
     return 0;
   }
+
   /* if there is nobody polling for read, but we need to, then start doing so */
-  if (read_mask && !fd->read_watcher &&
-      (gpr_uintptr)gpr_atm_acq_load(&fd->readst) > READY) {
+  cur = fd->read_closure;
+  requested = cur != CLOSURE_READY;
+  if (read_mask && fd->read_watcher == NULL && requested) {
     fd->read_watcher = watcher;
     mask |= read_mask;
   }
   /* if there is nobody polling for write, but we need to, then start doing so
    */
-  if (write_mask && !fd->write_watcher &&
-      (gpr_uintptr)gpr_atm_acq_load(&fd->writest) > READY) {
+  cur = fd->write_closure;
+  requested = cur != CLOSURE_READY;
+  if (write_mask && fd->write_watcher == NULL && requested) {
     fd->write_watcher = watcher;
     mask |= write_mask;
   }
   /* if not polling, remember this watcher in case we need someone to later */
-  if (mask == 0) {
+  if (mask == 0 && worker != NULL) {
     watcher->next = &fd->inactive_watcher_root;
     watcher->prev = watcher->next->prev;
     watcher->next->prev = watcher->prev->next = watcher;
   }
   watcher->pollset = pollset;
+  watcher->worker = worker;
   watcher->fd = fd;
-  gpr_mu_unlock(&fd->watcher_mu);
+  gpr_mu_unlock(&fd->mu);
 
   return mask;
 }
@@ -391,24 +368,39 @@
     return;
   }
 
-  gpr_mu_lock(&fd->watcher_mu);
+  gpr_mu_lock(&fd->mu);
+
   if (watcher == fd->read_watcher) {
     /* remove read watcher, kick if we still need a read */
     was_polling = 1;
-    kick = kick || !got_read;
+    if (!got_read) {
+      kick = 1;
+    }
     fd->read_watcher = NULL;
   }
   if (watcher == fd->write_watcher) {
     /* remove write watcher, kick if we still need a write */
     was_polling = 1;
-    kick = kick || !got_write;
+    if (!got_write) {
+      kick = 1;
+    }
     fd->write_watcher = NULL;
   }
-  if (!was_polling) {
+  if (!was_polling && watcher->worker != NULL) {
     /* remove from inactive list */
     watcher->next->prev = watcher->prev;
     watcher->prev->next = watcher->next;
   }
+  if (got_read) {
+    if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
+      kick = 1;
+    }
+  }
+  if (got_write) {
+    if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
+      kick = 1;
+    }
+  }
   if (kick) {
     maybe_wake_one_watcher_locked(fd);
   }
@@ -417,17 +409,17 @@
     close(fd->fd);
     grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
   }
-  gpr_mu_unlock(&fd->watcher_mu);
+  gpr_mu_unlock(&fd->mu);
 
   GRPC_FD_UNREF(fd, "poll");
 }
 
 void grpc_fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
-  set_ready(exec_ctx, fd, &fd->readst);
+  set_ready(exec_ctx, fd, &fd->read_closure);
 }
 
 void grpc_fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
-  set_ready(exec_ctx, fd, &fd->writest);
+  set_ready(exec_ctx, fd, &fd->write_closure);
 }
 
 #endif
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 089aa4d..dc917eb 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -46,6 +46,7 @@
   struct grpc_fd_watcher *next;
   struct grpc_fd_watcher *prev;
   grpc_pollset *pollset;
+  grpc_pollset_worker *worker;
   grpc_fd *fd;
 } grpc_fd_watcher;
 
@@ -58,8 +59,8 @@
      and just unref by 1 when we're ready to flag the object as orphaned */
   gpr_atm refst;
 
-  gpr_mu set_state_mu;
-  gpr_atm shutdown;
+  gpr_mu mu;
+  int shutdown;
   int closed;
 
   /* The watcher list.
@@ -84,18 +85,16 @@
      If at a later time there becomes need of a poller to poll, one of
      the inactive pollers may be kicked out of their poll loops to take
      that responsibility. */
-  gpr_mu watcher_mu;
   grpc_fd_watcher inactive_watcher_root;
   grpc_fd_watcher *read_watcher;
   grpc_fd_watcher *write_watcher;
 
-  gpr_atm readst;
-  gpr_atm writest;
+  grpc_closure *read_closure;
+  grpc_closure *write_closure;
 
   struct grpc_fd *freelist_next;
 
   grpc_closure *on_done_closure;
-  grpc_closure *shutdown_closures[2];
 
   grpc_iomgr_object iomgr_object;
 };
@@ -126,10 +125,12 @@
    fd's current interest (such as epoll) do not need to call this function.
    MUST NOT be called with a pollset lock taken */
 gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
-                              gpr_uint32 read_mask, gpr_uint32 write_mask,
-                              grpc_fd_watcher *rec);
+                              grpc_pollset_worker *worker, gpr_uint32 read_mask,
+                              gpr_uint32 write_mask, grpc_fd_watcher *rec);
 /* Complete polling previously started with grpc_fd_begin_poll
-   MUST NOT be called with a pollset lock taken */
+   MUST NOT be called with a pollset lock taken
+   if got_read or got_write are 1, also does the become_{readable,writable} as
+   appropriate. */
 void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
                       int got_read, int got_write);
 
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index faf0a63..ba9ba73 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -72,7 +72,7 @@
   /* We pretend to be polling whilst adding an fd to keep the fd from being
      closed during the add. This may result in a spurious wakeup being assigned
      to this pollset whilst adding, but that should be benign. */
-  GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0);
+  GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
   if (watcher.fd != NULL) {
     ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
     ev.data.ptr = fd;
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 1356ebe..faa6c14 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -102,6 +102,9 @@
 static void multipoll_with_poll_pollset_maybe_work_and_unlock(
     grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
     gpr_timespec deadline, gpr_timespec now) {
+#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
+#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
+
   int timeout;
   int r;
   size_t i, j, fd_count;
@@ -147,8 +150,8 @@
   gpr_mu_unlock(&pollset->mu);
 
   for (i = 2; i < pfd_count; i++) {
-    pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN,
-                                               POLLOUT, &watchers[i]);
+    pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker,
+                                               POLLIN, POLLOUT, &watchers[i]);
   }
 
   /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
@@ -157,34 +160,29 @@
   r = grpc_poll_function(pfds, pfd_count, timeout);
   GRPC_SCHEDULING_END_BLOCKING_REGION;
 
-  for (i = 2; i < pfd_count; i++) {
-    grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN,
-                     pfds[i].revents & POLLOUT);
-  }
-
   if (r < 0) {
-    if (errno != EINTR) {
-      gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+    gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+    for (i = 2; i < pfd_count; i++) {
+      grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
     }
   } else if (r == 0) {
-    /* do nothing */
+    for (i = 2; i < pfd_count; i++) {
+      grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+    }
   } else {
-    if (pfds[0].revents & POLLIN) {
+    if (pfds[0].revents & POLLIN_CHECK) {
       grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
     }
-    if (pfds[1].revents & POLLIN) {
+    if (pfds[1].revents & POLLIN_CHECK) {
       grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
     }
     for (i = 2; i < pfd_count; i++) {
       if (watchers[i].fd == NULL) {
+        grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
         continue;
       }
-      if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
-        grpc_fd_become_readable(exec_ctx, watchers[i].fd);
-      }
-      if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) {
-        grpc_fd_become_writable(exec_ctx, watchers[i].fd);
-      }
+      grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
+                       pfds[i].revents & POLLOUT_CHECK);
     }
   }
 
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index b663780..d056866 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -98,31 +98,63 @@
   worker->prev->next = worker->next->prev = worker;
 }
 
-void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+void grpc_pollset_kick_ext(grpc_pollset *p,
+                           grpc_pollset_worker *specific_worker,
+                           gpr_uint32 flags) {
   /* pollset->mu already held */
   if (specific_worker != NULL) {
     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+      GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
       for (specific_worker = p->root_worker.next;
            specific_worker != &p->root_worker;
            specific_worker = specific_worker->next) {
         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
       }
       p->kicked_without_pollers = 1;
+      return;
     } else if (gpr_tls_get(&g_current_thread_worker) !=
                (gpr_intptr)specific_worker) {
+      if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+        specific_worker->reevaluate_polling_on_wakeup = 1;
+      }
       grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+      return;
+    } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
+      if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+        specific_worker->reevaluate_polling_on_wakeup = 1;
+      }
+      grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+      return;
     }
   } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
+    GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
     specific_worker = pop_front_worker(p);
     if (specific_worker != NULL) {
+      if (gpr_tls_get(&g_current_thread_worker) ==
+          (gpr_intptr)specific_worker) {
+        push_back_worker(p, specific_worker);
+        specific_worker = pop_front_worker(p);
+        if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
+            gpr_tls_get(&g_current_thread_worker) ==
+                (gpr_intptr)specific_worker) {
+          push_back_worker(p, specific_worker);
+          return;
+        }
+      }
       push_back_worker(p, specific_worker);
       grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
+      return;
     } else {
       p->kicked_without_pollers = 1;
+      return;
     }
   }
 }
 
+void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
+  grpc_pollset_kick_ext(p, specific_worker, 0);
+}
+
 /* global state management */
 
 void grpc_pollset_global_init(void) {
@@ -195,52 +227,88 @@
   /* pollset->mu already held */
   int added_worker = 0;
   int locked = 1;
+  int queued_work = 0;
+  int keep_polling = 0;
   /* this must happen before we (potentially) drop pollset->mu */
   worker->next = worker->prev = NULL;
+  worker->reevaluate_polling_on_wakeup = 0;
   /* TODO(ctiller): pool these */
   grpc_wakeup_fd_init(&worker->wakeup_fd);
+  /* If there's work waiting for the pollset to be idle, and the
+     pollset is idle, then do that work */
   if (!grpc_pollset_has_workers(pollset) &&
       !grpc_closure_list_empty(pollset->idle_jobs)) {
     grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
     goto done;
   }
+  /* Check alarms - these are a global resource so we just ping
+     each time through on every pollset.
+     May update deadline to ensure timely wakeups.
+     TODO(ctiller): can this work be localized? */
   if (grpc_alarm_check(exec_ctx, now, &deadline)) {
     gpr_mu_unlock(&pollset->mu);
     locked = 0;
     goto done;
   }
+  /* If we're shutting down then we don't execute any extended work */
   if (pollset->shutting_down) {
     goto done;
   }
+  /* Give do_promote priority so we don't starve it out */
   if (pollset->in_flight_cbs) {
-    /* Give do_promote priority so we don't starve it out */
     gpr_mu_unlock(&pollset->mu);
     locked = 0;
     goto done;
   }
-  if (!pollset->kicked_without_pollers) {
-    push_front_worker(pollset, worker);
-    added_worker = 1;
-    gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
-    gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker);
-    pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, deadline,
-                                           now);
-    locked = 0;
-    gpr_tls_set(&g_current_thread_poller, 0);
-    gpr_tls_set(&g_current_thread_worker, 0);
-  } else {
-    pollset->kicked_without_pollers = 0;
+  /* Start polling, and keep doing so while we're being asked to
+     re-evaluate our pollers (this allows poll() based pollers to
+     ensure they don't miss wakeups) */
+  keep_polling = 1;
+  while (keep_polling) {
+    keep_polling = 0;
+    if (!pollset->kicked_without_pollers) {
+      if (!added_worker) {
+        push_front_worker(pollset, worker);
+        added_worker = 1;
+      }
+      gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
+      gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker);
+      pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker,
+                                             deadline, now);
+      locked = 0;
+      gpr_tls_set(&g_current_thread_poller, 0);
+      gpr_tls_set(&g_current_thread_worker, 0);
+    } else {
+      pollset->kicked_without_pollers = 0;
+    }
+  /* Finished execution - start cleaning up.
+     Note that we may arrive here from outside the enclosing while() loop.
+     In that case we won't loop though as we haven't added worker to the
+     worker list, which means nobody could ask us to re-evaluate polling). */
+  done:
+    if (!locked) {
+      queued_work |= grpc_exec_ctx_flush(exec_ctx);
+      gpr_mu_lock(&pollset->mu);
+      locked = 1;
+    }
+    /* If we're forced to re-evaluate polling (via grpc_pollset_kick with
+       GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
+       a loop */
+    if (worker->reevaluate_polling_on_wakeup) {
+      worker->reevaluate_polling_on_wakeup = 0;
+      pollset->kicked_without_pollers = 0;
+      if (queued_work) {
+        /* If there's queued work on the list, then set the deadline to be
+           immediate so we get back out of the polling loop quickly */
+        deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+      }
+      keep_polling = 1;
+    }
   }
-done:
-  if (!locked) {
-    grpc_exec_ctx_flush(exec_ctx);
-    gpr_mu_lock(&pollset->mu);
-    locked = 1;
-  }
-  grpc_wakeup_fd_destroy(&worker->wakeup_fd);
   if (added_worker) {
     remove_worker(pollset, worker);
   }
+  grpc_wakeup_fd_destroy(&worker->wakeup_fd);
   if (pollset->shutting_down) {
     if (grpc_pollset_has_workers(pollset)) {
       grpc_pollset_kick(pollset, NULL);
@@ -454,6 +522,9 @@
                                                 grpc_pollset_worker *worker,
                                                 gpr_timespec deadline,
                                                 gpr_timespec now) {
+#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
+#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
+
   struct pollfd pfd[3];
   grpc_fd *fd;
   grpc_fd_watcher fd_watcher;
@@ -479,8 +550,8 @@
     pfd[2].revents = 0;
     GRPC_FD_REF(fd, "basicpoll_begin");
     gpr_mu_unlock(&pollset->mu);
-    pfd[2].events =
-        (short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
+    pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN,
+                                              POLLOUT, &fd_watcher);
     if (pfd[2].events != 0) {
       nfds++;
     }
@@ -497,31 +568,27 @@
   GRPC_SCHEDULING_END_BLOCKING_REGION;
   GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
 
-  if (fd) {
-    grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN,
-                     pfd[2].revents & POLLOUT);
-  }
-
   if (r < 0) {
-    if (errno != EINTR) {
-      gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+    gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+    if (fd) {
+      grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
     }
   } else if (r == 0) {
-    /* do nothing */
+    if (fd) {
+      grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
+    }
   } else {
-    if (pfd[0].revents & POLLIN) {
+    if (pfd[0].revents & POLLIN_CHECK) {
       grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
     }
-    if (pfd[1].revents & POLLIN) {
+    if (pfd[1].revents & POLLIN_CHECK) {
       grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
     }
     if (nfds > 2) {
-      if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) {
-        grpc_fd_become_readable(exec_ctx, fd);
-      }
-      if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) {
-        grpc_fd_become_writable(exec_ctx, fd);
-      }
+      grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
+                       pfd[2].revents & POLLOUT_CHECK);
+    } else if (fd) {
+      grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
     }
   }
 
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 83c5258..34f76db 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -50,6 +50,7 @@
 
 typedef struct grpc_pollset_worker {
   grpc_wakeup_fd wakeup_fd;
+  int reevaluate_polling_on_wakeup;
   struct grpc_pollset_worker *next;
   struct grpc_pollset_worker *prev;
 } grpc_pollset_worker;
@@ -111,6 +112,16 @@
 int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
                                          gpr_timespec now);
 
+/* Allow kick to wakeup the currently polling worker */
+#define GRPC_POLLSET_CAN_KICK_SELF 1
+/* Force the wakee to repoll when awoken */
+#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
+/* As per grpc_pollset_kick, with an extended set of flags (defined above)
+   -- mostly for fd_posix's use. */
+void grpc_pollset_kick_ext(grpc_pollset *p,
+                           grpc_pollset_worker *specific_worker,
+                           gpr_uint32 flags);
+
 /* turn a pollset into a multipoller: platform specific */
 typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx,
                                                       grpc_pollset *pollset,
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index d917c7a..989fe5f 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -690,165 +690,187 @@
       });
     });
   });
-  describe('Call propagation', function() {
-    var proxy;
-    var proxy_impl;
-    beforeEach(function() {
-      proxy = new grpc.Server();
-      proxy_impl = {
-        unary: function(call) {},
-        clientStream: function(stream) {},
-        serverStream: function(stream) {},
-        bidiStream: function(stream) {}
+});
+describe('Call propagation', function() {
+  var proxy;
+  var proxy_impl;
+
+  var test_service;
+  var Client;
+  var client;
+  var server;
+  before(function() {
+    var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
+    test_service = test_proto.lookup('TestService');
+    server = new grpc.Server();
+    server.addProtoService(test_service, {
+      unary: function(call) {},
+      clientStream: function(stream) {},
+      serverStream: function(stream) {},
+      bidiStream: function(stream) {}
+    });
+    var port = server.bind('localhost:0', server_insecure_creds);
+    Client = surface_client.makeProtobufClientConstructor(test_service);
+    client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
+    server.start();
+  });
+  after(function() {
+    server.forceShutdown();
+  });
+  beforeEach(function() {
+    proxy = new grpc.Server();
+    proxy_impl = {
+      unary: function(call) {},
+      clientStream: function(stream) {},
+      serverStream: function(stream) {},
+      bidiStream: function(stream) {}
+    };
+  });
+  afterEach(function() {
+    proxy.forceShutdown();
+  });
+  describe('Cancellation', function() {
+    it('With a unary call', function(done) {
+      done = multiDone(done, 2);
+      proxy_impl.unary = function(parent, callback) {
+        client.unary(parent.request, function(err, value) {
+          try {
+            assert(err);
+            assert.strictEqual(err.code, grpc.status.CANCELLED);
+          } finally {
+            callback(err, value);
+            done();
+          }
+        }, null, {parent: parent});
+        call.cancel();
       };
-    });
-    afterEach(function() {
-      console.log('Shutting down server');
-      proxy.forceShutdown();
-    });
-    describe('Cancellation', function() {
-      it('With a unary call', function(done) {
-        done = multiDone(done, 2);
-        proxy_impl.unary = function(parent, callback) {
-          client.unary(parent.request, function(err, value) {
-            try {
-              assert(err);
-              assert.strictEqual(err.code, grpc.status.CANCELLED);
-            } finally {
-              callback(err, value);
-              done();
-            }
-          }, null, {parent: parent});
-          call.cancel();
-        };
-        proxy.addProtoService(test_service, proxy_impl);
-        var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
-        proxy.start();
-        var proxy_client = new Client('localhost:' + proxy_port,
-                                      grpc.Credentials.createInsecure());
-        var call = proxy_client.unary({}, function(err, value) {
-          done();
-        });
+      proxy.addProtoService(test_service, proxy_impl);
+      var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
+      proxy.start();
+      var proxy_client = new Client('localhost:' + proxy_port,
+                                    grpc.Credentials.createInsecure());
+      var call = proxy_client.unary({}, function(err, value) {
+        done();
       });
-      it('With a client stream call', function(done) {
-        done = multiDone(done, 2);
-        proxy_impl.clientStream = function(parent, callback) {
-          client.clientStream(function(err, value) {
-            try {
-              assert(err);
-              assert.strictEqual(err.code, grpc.status.CANCELLED);
-            } finally {
-              callback(err, value);
-              done();
-            }
-          }, null, {parent: parent});
-          call.cancel();
-        };
-        proxy.addProtoService(test_service, proxy_impl);
-        var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
-        proxy.start();
-        var proxy_client = new Client('localhost:' + proxy_port,
-                                      grpc.Credentials.createInsecure());
-        var call = proxy_client.clientStream(function(err, value) {
-          done();
-        });
-      });
-      it('With a server stream call', function(done) {
-        done = multiDone(done, 2);
-        proxy_impl.serverStream = function(parent) {
-          var child = client.serverStream(parent.request, null,
-                                          {parent: parent});
-          child.on('error', function(err) {
+    });
+    it('With a client stream call', function(done) {
+      done = multiDone(done, 2);
+      proxy_impl.clientStream = function(parent, callback) {
+        client.clientStream(function(err, value) {
+          try {
             assert(err);
             assert.strictEqual(err.code, grpc.status.CANCELLED);
+          } finally {
+            callback(err, value);
             done();
-          });
-          call.cancel();
-        };
-        proxy.addProtoService(test_service, proxy_impl);
-        var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
-        proxy.start();
-        var proxy_client = new Client('localhost:' + proxy_port,
-                                      grpc.Credentials.createInsecure());
-        var call = proxy_client.serverStream({});
-        call.on('error', function(err) {
-          done();
-        });
-      });
-      it('With a bidi stream call', function(done) {
-        done = multiDone(done, 2);
-        proxy_impl.bidiStream = function(parent) {
-          var child = client.bidiStream(null, {parent: parent});
-          child.on('error', function(err) {
-            assert(err);
-            assert.strictEqual(err.code, grpc.status.CANCELLED);
-            done();
-          });
-          call.cancel();
-        };
-        proxy.addProtoService(test_service, proxy_impl);
-        var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
-        proxy.start();
-        var proxy_client = new Client('localhost:' + proxy_port,
-                                      grpc.Credentials.createInsecure());
-        var call = proxy_client.bidiStream();
-        call.on('error', function(err) {
-          done();
-        });
+          }
+        }, null, {parent: parent});
+        call.cancel();
+      };
+      proxy.addProtoService(test_service, proxy_impl);
+      var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
+      proxy.start();
+      var proxy_client = new Client('localhost:' + proxy_port,
+                                    grpc.Credentials.createInsecure());
+      var call = proxy_client.clientStream(function(err, value) {
+        done();
       });
     });
-    describe('Deadline', function() {
-      /* jshint bitwise:false */
-      var deadline_flags = (grpc.propagate.DEFAULTS &
-          ~grpc.propagate.CANCELLATION);
-      it('With a client stream call', function(done) {
-        done = multiDone(done, 2);
-        proxy_impl.clientStream = function(parent, callback) {
-          client.clientStream(function(err, value) {
-            try {
-              assert(err);
-              assert(err.code === grpc.status.DEADLINE_EXCEEDED ||
-                  err.code === grpc.status.INTERNAL);
-            } finally {
-              callback(err, value);
-              done();
-            }
-          }, null, {parent: parent, propagate_flags: deadline_flags});
-        };
-        proxy.addProtoService(test_service, proxy_impl);
-        var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
-        proxy.start();
-        var proxy_client = new Client('localhost:' + proxy_port,
-                                      grpc.Credentials.createInsecure());
-        var deadline = new Date();
-        deadline.setSeconds(deadline.getSeconds() + 1);
-        proxy_client.clientStream(function(err, value) {
+    it('With a server stream call', function(done) {
+      done = multiDone(done, 2);
+      proxy_impl.serverStream = function(parent) {
+        var child = client.serverStream(parent.request, null,
+                                        {parent: parent});
+        child.on('error', function(err) {
+          assert(err);
+          assert.strictEqual(err.code, grpc.status.CANCELLED);
           done();
-        }, null, {deadline: deadline});
+        });
+        call.cancel();
+      };
+      proxy.addProtoService(test_service, proxy_impl);
+      var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
+      proxy.start();
+      var proxy_client = new Client('localhost:' + proxy_port,
+                                    grpc.Credentials.createInsecure());
+      var call = proxy_client.serverStream({});
+      call.on('error', function(err) {
+        done();
       });
-      it('With a bidi stream call', function(done) {
-        done = multiDone(done, 2);
-        proxy_impl.bidiStream = function(parent) {
-          var child = client.bidiStream(
-              null, {parent: parent, propagate_flags: deadline_flags});
-          child.on('error', function(err) {
+    });
+    it('With a bidi stream call', function(done) {
+      done = multiDone(done, 2);
+      proxy_impl.bidiStream = function(parent) {
+        var child = client.bidiStream(null, {parent: parent});
+        child.on('error', function(err) {
+          assert(err);
+          assert.strictEqual(err.code, grpc.status.CANCELLED);
+          done();
+        });
+        call.cancel();
+      };
+      proxy.addProtoService(test_service, proxy_impl);
+      var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
+      proxy.start();
+      var proxy_client = new Client('localhost:' + proxy_port,
+                                    grpc.Credentials.createInsecure());
+      var call = proxy_client.bidiStream();
+      call.on('error', function(err) {
+        done();
+      });
+    });
+  });
+  describe('Deadline', function() {
+    /* jshint bitwise:false */
+    var deadline_flags = (grpc.propagate.DEFAULTS &
+        ~grpc.propagate.CANCELLATION);
+    it('With a client stream call', function(done) {
+      done = multiDone(done, 2);
+      proxy_impl.clientStream = function(parent, callback) {
+        client.clientStream(function(err, value) {
+          try {
             assert(err);
             assert(err.code === grpc.status.DEADLINE_EXCEEDED ||
                 err.code === grpc.status.INTERNAL);
+          } finally {
+            callback(err, value);
             done();
-          });
-        };
-        proxy.addProtoService(test_service, proxy_impl);
-        var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
-        proxy.start();
-        var proxy_client = new Client('localhost:' + proxy_port,
-                                      grpc.Credentials.createInsecure());
-        var deadline = new Date();
-        deadline.setSeconds(deadline.getSeconds() + 1);
-        var call = proxy_client.bidiStream(null, {deadline: deadline});
-        call.on('error', function(err) {
+          }
+        }, null, {parent: parent, propagate_flags: deadline_flags});
+      };
+      proxy.addProtoService(test_service, proxy_impl);
+      var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
+      proxy.start();
+      var proxy_client = new Client('localhost:' + proxy_port,
+                                    grpc.Credentials.createInsecure());
+      var deadline = new Date();
+      deadline.setSeconds(deadline.getSeconds() + 1);
+      proxy_client.clientStream(function(err, value) {
+        done();
+      }, null, {deadline: deadline});
+    });
+    it('With a bidi stream call', function(done) {
+      done = multiDone(done, 2);
+      proxy_impl.bidiStream = function(parent) {
+        var child = client.bidiStream(
+            null, {parent: parent, propagate_flags: deadline_flags});
+        child.on('error', function(err) {
+          assert(err);
+          assert(err.code === grpc.status.DEADLINE_EXCEEDED ||
+              err.code === grpc.status.INTERNAL);
           done();
         });
+      };
+      proxy.addProtoService(test_service, proxy_impl);
+      var proxy_port = proxy.bind('localhost:0', server_insecure_creds);
+      proxy.start();
+      var proxy_client = new Client('localhost:' + proxy_port,
+                                    grpc.Credentials.createInsecure());
+      var deadline = new Date();
+      deadline.setSeconds(deadline.getSeconds() + 1);
+      var call = proxy_client.bidiStream(null, {deadline: deadline});
+      call.on('error', function(err) {
+        done();
       });
     });
   });
diff --git a/tools/jenkins/build_interop_image.sh b/tools/jenkins/build_interop_image.sh
index b595858..3664eed 100755
--- a/tools/jenkins/build_interop_image.sh
+++ b/tools/jenkins/build_interop_image.sh
@@ -35,12 +35,12 @@
 
 cd `dirname $0`/../..
 GRPC_ROOT=`pwd`
-MOUNT_ARGS="-v $GRPC_ROOT:/var/local/jenkins/grpc"
+MOUNT_ARGS="-v $GRPC_ROOT:/var/local/jenkins/grpc:ro"
 
 GRPC_JAVA_ROOT=`cd ../grpc-java && pwd`
 if [ "$GRPC_JAVA_ROOT" != "" ]
 then
-  MOUNT_ARGS+=" -v $GRPC_JAVA_ROOT:/var/local/jenkins/grpc-java"
+  MOUNT_ARGS+=" -v $GRPC_JAVA_ROOT:/var/local/jenkins/grpc-java:ro"
 else
   echo "WARNING: grpc-java not found, it won't be mounted to the docker container."
 fi
@@ -48,7 +48,7 @@
 GRPC_GO_ROOT=`cd ../grpc-go && pwd`
 if [ "$GRPC_GO_ROOT" != "" ]
 then
-  MOUNT_ARGS+=" -v $GRPC_GO_ROOT:/var/local/jenkins/grpc-go"
+  MOUNT_ARGS+=" -v $GRPC_GO_ROOT:/var/local/jenkins/grpc-go:ro"
 else
   echo "WARNING: grpc-go not found, it won't be mounted to the docker container."
 fi
@@ -60,6 +60,14 @@
 #  BASE_NAME - base name used to locate the base Dockerfile and build script
 #  TTY_FLAG - optional -t flag to make docker allocate tty.
 
+# Mount service account dir if available.
+# If service_directory does not contain the service account JSON file,
+# some of the tests will fail.
+if [ -e $HOME/service_account ]
+then
+  MOUNT_ARGS+=" -v $HOME/service_account:/var/local/jenkins/service_account:ro"
+fi
+
 # Use image name based on Dockerfile checksum
 BASE_IMAGE=${BASE_NAME}_base:`sha1sum tools/jenkins/$BASE_NAME/Dockerfile | cut -f1 -d\ `
 
diff --git a/tools/jenkins/grpc_interop_csharp/build_interop.sh b/tools/jenkins/grpc_interop_csharp/build_interop.sh
index e91cbed..8fde687 100755
--- a/tools/jenkins/grpc_interop_csharp/build_interop.sh
+++ b/tools/jenkins/grpc_interop_csharp/build_interop.sh
@@ -34,6 +34,9 @@
 mkdir -p /var/local/git
 git clone --recursive /var/local/jenkins/grpc /var/local/git/grpc
 
+# copy service account keys if available
+cp -r /var/local/jenkins/service_account $HOME || true
+
 cd /var/local/git/grpc
 
 make install-certs
diff --git a/tools/jenkins/grpc_interop_cxx/build_interop.sh b/tools/jenkins/grpc_interop_cxx/build_interop.sh
index 4163e11..1c0828d 100755
--- a/tools/jenkins/grpc_interop_cxx/build_interop.sh
+++ b/tools/jenkins/grpc_interop_cxx/build_interop.sh
@@ -34,6 +34,9 @@
 mkdir -p /var/local/git
 git clone --recursive /var/local/jenkins/grpc /var/local/git/grpc
 
+# copy service account keys if available
+cp -r /var/local/jenkins/service_account $HOME || true
+
 cd /var/local/git/grpc
 
 make install-certs
diff --git a/tools/jenkins/grpc_interop_go/build_interop.sh b/tools/jenkins/grpc_interop_go/build_interop.sh
index 78dd4ea..05fc6df 100755
--- a/tools/jenkins/grpc_interop_go/build_interop.sh
+++ b/tools/jenkins/grpc_interop_go/build_interop.sh
@@ -36,6 +36,9 @@
 # to test instead of using "go get" to download from Github directly.
 git clone --recursive /var/local/jenkins/grpc-go src/gooogle.golang.org/grpc
 
+# copy service account keys if available
+cp -r /var/local/jenkins/service_account $HOME || true
+
 # Get dependencies from GitHub
 # NOTE: once grpc-go dependencies change, this needs to be updated manually
 # but we don't expect this to happen any time soon.
diff --git a/tools/jenkins/grpc_interop_java/build_interop.sh b/tools/jenkins/grpc_interop_java/build_interop.sh
index 4ee2f44..9997c63 100755
--- a/tools/jenkins/grpc_interop_java/build_interop.sh
+++ b/tools/jenkins/grpc_interop_java/build_interop.sh
@@ -34,6 +34,9 @@
 mkdir -p /var/local/git
 git clone --recursive --depth 1 /var/local/jenkins/grpc-java /var/local/git/grpc-java
 
+# copy service account keys if available
+cp -r /var/local/jenkins/service_account $HOME || true
+
 cd /var/local/git/grpc-java
 
 ./gradlew :grpc-interop-testing:installDist -PskipCodegen=true
diff --git a/tools/jenkins/grpc_interop_node/build_interop.sh b/tools/jenkins/grpc_interop_node/build_interop.sh
index 55e2a40..84e25e3 100755
--- a/tools/jenkins/grpc_interop_node/build_interop.sh
+++ b/tools/jenkins/grpc_interop_node/build_interop.sh
@@ -34,6 +34,9 @@
 mkdir -p /var/local/git
 git clone --recursive /var/local/jenkins/grpc /var/local/git/grpc
 
+# copy service account keys if available
+cp -r /var/local/jenkins/service_account $HOME || true
+
 cd /var/local/git/grpc
 nvm use 0.12
 nvm alias default 0.12  # prevent the need to run 'nvm use' in every shell
diff --git a/tools/jenkins/grpc_interop_php/build_interop.sh b/tools/jenkins/grpc_interop_php/build_interop.sh
index 745dea8..cd9d678 100755
--- a/tools/jenkins/grpc_interop_php/build_interop.sh
+++ b/tools/jenkins/grpc_interop_php/build_interop.sh
@@ -34,6 +34,9 @@
 mkdir -p /var/local/git
 git clone --recursive /var/local/jenkins/grpc /var/local/git/grpc
 
+# copy service account keys if available
+cp -r /var/local/jenkins/service_account $HOME || true
+
 cd /var/local/git/grpc
 rvm --default use ruby-2.1
 
diff --git a/tools/jenkins/grpc_interop_ruby/build_interop.sh b/tools/jenkins/grpc_interop_ruby/build_interop.sh
index 7d407e7..c5023f5 100755
--- a/tools/jenkins/grpc_interop_ruby/build_interop.sh
+++ b/tools/jenkins/grpc_interop_ruby/build_interop.sh
@@ -34,6 +34,9 @@
 mkdir -p /var/local/git
 git clone --recursive /var/local/jenkins/grpc /var/local/git/grpc
 
+# copy service account keys if available
+cp -r /var/local/jenkins/service_account $HOME || true
+
 cd /var/local/git/grpc
 rvm --default use ruby-2.1
 
diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py
index f0935fb..48c34f6 100755
--- a/tools/run_tests/run_interop_tests.py
+++ b/tools/run_tests/run_interop_tests.py
@@ -82,7 +82,7 @@
             ['--use_tls=true'])
 
   def cloud_to_prod_env(self):
-    return None
+    return {}
 
   def server_args(self):
     return ['bins/opt/interop_server', '--use_tls=true']
@@ -132,7 +132,7 @@
             ['--use_tls=true', '--use_test_ca=true'])
 
   def cloud_to_prod_env(self):
-    return None
+    return {}
 
   def server_args(self):
     return ['./run-test-server.sh', '--use_tls=true']
@@ -158,7 +158,7 @@
             ['--use_tls=true'])
 
   def cloud_to_prod_env(self):
-    return None
+    return {}
 
   def server_args(self):
     return ['go', 'run', 'server.go', '--use_tls=true']
@@ -250,8 +250,7 @@
 }
 
 # languages supported as cloud_to_cloud servers
-# TODO(jtattermusch): enable other languages as servers as well
-_SERVERS = ['c++', 'node', 'csharp', 'java', 'go']
+_SERVERS = ['c++', 'node', 'csharp', 'java', 'go', 'ruby']
 
 # TODO(jtattermusch): add empty_stream once PHP starts supporting it.
 # TODO(jtattermusch): add timeout_on_sleeping_server once java starts supporting it.
@@ -260,6 +259,9 @@
                'client_streaming', 'server_streaming',
                'cancel_after_begin', 'cancel_after_first_response']
 
+_AUTH_TEST_CASES = ['compute_engine_creds', 'jwt_token_creds',
+                    'oauth2_auth_token', 'per_rpc_creds']
+
 
 def docker_run_cmdline(cmdline, image, docker_args=[], cwd=None, environ=None):
   """Wraps given cmdline array to create 'docker run' cmdline from it."""
@@ -288,22 +290,54 @@
   return ['bash', '-l', '-c', ' '.join(cmdline)]
 
 
-def cloud_to_prod_jobspec(language, test_case, docker_image=None):
+def add_auth_options(language, test_case, cmdline, env):
+  """Returns (cmdline, env) tuple with cloud_to_prod_auth test options."""
+
+  language = str(language)
+  cmdline = list(cmdline)
+  env = env.copy()
+
+  # TODO(jtattermusch): this file path only works inside docker
+  key_filepath = '/root/service_account/stubbyCloudTestingTest-ee3fce360ac5.json'
+  oauth_scope_arg = '--oauth_scope=https://www.googleapis.com/auth/xapi.zoo'
+  key_file_arg = '--service_account_key_file=%s' % key_filepath
+  default_account_arg = '--default_service_account=830293263384-compute@developer.gserviceaccount.com'
+
+  if test_case in ['jwt_token_creds', 'per_rpc_creds', 'oauth2_auth_token']:
+    if language in ['csharp', 'node', 'php', 'ruby']:
+      env['GOOGLE_APPLICATION_CREDENTIALS'] = key_filepath
+    else:
+      cmdline += [key_file_arg]
+
+  if test_case in ['per_rpc_creds', 'oauth2_auth_token']:
+    cmdline += [oauth_scope_arg]
+
+  if test_case == 'compute_engine_creds':
+    cmdline += [oauth_scope_arg, default_account_arg]
+
+  return (cmdline, env)
+
+
+def cloud_to_prod_jobspec(language, test_case, docker_image=None, auth=False):
   """Creates jobspec for cloud-to-prod interop test"""
-  cmdline = bash_login_cmdline(language.cloud_to_prod_args() +
-                               ['--test_case=%s' % test_case])
+  cmdline = language.cloud_to_prod_args() + ['--test_case=%s' % test_case]
   cwd = language.client_cwd
   environ = language.cloud_to_prod_env()
+  if auth:
+    cmdline, environ = add_auth_options(language, test_case, cmdline, environ)
+  cmdline = bash_login_cmdline(cmdline)
+
   if docker_image:
     cmdline = docker_run_cmdline(cmdline, image=docker_image, cwd=cwd, environ=environ)
     cwd = None
     environ = None
 
+  suite_name='cloud_to_prod_auth' if auth else 'cloud_to_prod'
   test_job = jobset.JobSpec(
           cmdline=cmdline,
           cwd=cwd,
           environ=environ,
-          shortname="cloud_to_prod:%s:%s" % (language, test_case),
+          shortname="%s:%s:%s" % (suite_name, language, test_case),
           timeout_seconds=2*60,
           flake_retries=5 if args.allow_flakes else 0,
           timeout_retries=2 if args.allow_flakes else 0)
@@ -382,6 +416,11 @@
                   action='store_const',
                   const=True,
                   help='Run cloud_to_prod tests.')
+argp.add_argument('--cloud_to_prod_auth',
+                  default=False,
+                  action='store_const',
+                  const=True,
+                  help='Run cloud_to_prod_auth tests.')
 argp.add_argument('-s', '--server',
                   choices=['all'] + sorted(_SERVERS),
                   action='append',
@@ -476,6 +515,14 @@
                                          docker_image=docker_images.get(str(language)))
         jobs.append(test_job)
 
+  if args.cloud_to_prod_auth:
+    for language in languages:
+      for test_case in _AUTH_TEST_CASES:
+        test_job = cloud_to_prod_jobspec(language, test_case,
+                                         docker_image=docker_images.get(str(language)),
+                                         auth=True)
+        jobs.append(test_job)
+
   for server in args.override_server:
     server_name = server[0]
     (server_host, server_port) = server[1].split(':')