Add resource quota support to uv TCP code
diff --git a/src/core/lib/iomgr/endpoint_pair_uv.c b/src/core/lib/iomgr/endpoint_pair_uv.c
index 7941e20..ff24894 100644
--- a/src/core/lib/iomgr/endpoint_pair_uv.c
+++ b/src/core/lib/iomgr/endpoint_pair_uv.c
@@ -41,8 +41,9 @@
 
 #include "src/core/lib/iomgr/endpoint_pair.h"
 
-grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
-                                                   size_t read_slice_size) {
+grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(
+    const char *name, grpc_resource_quota *resource_quota,
+    size_t read_slice_size) {
   grpc_endpoint_pair endpoint_pair;
   // TODO(mlumish): implement this properly under libuv
   GPR_ASSERT(false &&
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index bfc9058..40c847a 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -712,3 +712,10 @@
   grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user,
                            count * length, &slice_allocator->on_allocated);
 }
+
+gpr_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx,
+                                          grpc_resource_user *resource_user,
+                                          size_t size) {
+  grpc_resource_user_alloc(exec_ctx, resource_user, size, NULL);
+  return ru_slice_create(resource_user, size);
+}
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index 6dfac55..da68f21 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -221,4 +221,9 @@
     grpc_resource_user_slice_allocator *slice_allocator, size_t length,
     size_t count, gpr_slice_buffer *dest);
 
+/* Allocate one slice of length \a size synchronously. */
+gpr_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx,
+                                          grpc_resource_user *resource_user,
+                                          size_t size);
+
 #endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index 6274667..b07f9ce 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -54,9 +54,12 @@
   grpc_endpoint **endpoint;
   int refs;
   char *addr_name;
+  grpc_resource_quota *resource_quota;
 } grpc_uv_tcp_connect;
 
-static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect *connect) {
+static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx,
+                                   grpc_uv_tcp_connect *connect) {
+  grpc_resource_quota_internal_unref(exec_ctx, connect->resource_quota);
   gpr_free(connect);
 }
 
@@ -74,7 +77,7 @@
   }
   done = (--connect->refs == 0);
   if (done) {
-    uv_tcp_connect_cleanup(connect);
+    uv_tcp_connect_cleanup(exec_ctx, connect);
   }
 }
 
@@ -86,8 +89,8 @@
   grpc_closure *closure = connect->closure;
   grpc_timer_cancel(&exec_ctx, &connect->alarm);
   if (status == 0) {
-    *connect->endpoint =
-        grpc_tcp_create(connect->tcp_handle, connect->addr_name);
+    *connect->endpoint = grpc_tcp_create(
+        connect->tcp_handle, connect->resource_quota, connect->addr_name);
   } else {
     error = GRPC_ERROR_CREATE("Failed to connect to remote host");
     error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status);
@@ -105,7 +108,7 @@
   }
   done = (--connect->refs == 0);
   if (done) {
-    uv_tcp_connect_cleanup(connect);
+    uv_tcp_connect_cleanup(&exec_ctx, connect);
   }
   grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL);
   grpc_exec_ctx_finish(&exec_ctx);
@@ -114,16 +117,31 @@
 static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
                                     grpc_closure *closure, grpc_endpoint **ep,
                                     grpc_pollset_set *interested_parties,
+                                    const grpc_channel_args *channel_args,
                                     const grpc_resolved_address *resolved_addr,
                                     gpr_timespec deadline) {
   grpc_uv_tcp_connect *connect;
+  grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL);
+  (void)channel_args;
   (void)interested_parties;
+
+  if (channel_args != NULL) {
+    for (size_t i = 0; i < channel_args->num_args; i++) {
+      if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+        grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+        resource_quota = grpc_resource_quota_internal_ref(
+            channel_args->args[i].value.pointer.p);
+      }
+    }
+  }
+
   connect = gpr_malloc(sizeof(grpc_uv_tcp_connect));
   memset(connect, 0, sizeof(grpc_uv_tcp_connect));
   connect->closure = closure;
   connect->endpoint = ep;
   connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t));
   connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
+  connect->resource_quota = resource_quota;
   uv_tcp_init(uv_default_loop(), connect->tcp_handle);
   connect->connect_req.data = connect;
   // TODO(murgatroid99): figure out what the return value here means
@@ -138,16 +156,18 @@
 // overridden by api_fuzzer.c
 void (*grpc_tcp_client_connect_impl)(
     grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep,
-    grpc_pollset_set *interested_parties, const grpc_resolved_address *addr,
+    grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args,
+    const grpc_resolved_address *addr,
     gpr_timespec deadline) = tcp_client_connect_impl;
 
 void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
                              grpc_endpoint **ep,
                              grpc_pollset_set *interested_parties,
+                             const grpc_channel_args *channel_args,
                              const grpc_resolved_address *addr,
                              gpr_timespec deadline) {
-  grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr,
-                               deadline);
+  grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties,
+                               channel_args, addr, deadline);
 }
 
 #endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 73e4db3..b5b9b92 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -76,13 +76,30 @@
 
   /* shutdown callback */
   grpc_closure *shutdown_complete;
+
+  grpc_resource_quota *resource_quota;
 };
 
-grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete,
+grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
+                                   grpc_closure *shutdown_complete,
                                    const grpc_channel_args *args,
                                    grpc_tcp_server **server) {
   grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
-  (void)args;
+  s->resource_quota = grpc_resource_quota_create(NULL);
+  for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
+    if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
+      if (args->args[i].type == GRPC_ARG_POINTER) {
+        grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+        s->resource_quota =
+            grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
+      } else {
+        grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+        gpr_free(s);
+        return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
+                                 " must be a pointer to a buffer pool");
+      }
+    }
+  }
   gpr_ref_init(&s->refs, 1);
   s->on_accept_cb = NULL;
   s->on_accept_cb_arg = NULL;
@@ -119,6 +136,7 @@
     gpr_free(sp->handle);
     gpr_free(sp);
   }
+  grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
   gpr_free(s);
 }
 
@@ -201,7 +219,7 @@
     } else {
       gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
     }
-    ep = grpc_tcp_create(client, peer_name_string);
+    ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
     sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
                              &acceptor);
     grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 3860fe3..b90e0c8 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -1,35 +1,35 @@
 /*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- *     * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *     * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
+*
+* Copyright 2016, Google Inc.
+* All rights reserved.
+*
+* Redistribution and use in source and binary forms, with or without
+* modification, are permitted provided that the following conditions are
+* met:
+*
+*     * Redistributions of source code must retain the above copyright
+* notice, this list of conditions and the following disclaimer.
+*     * Redistributions in binary form must reproduce the above
+* copyright notice, this list of conditions and the following disclaimer
+* in the documentation and/or other materials provided with the
+* distribution.
+*     * Neither the name of Google Inc. nor the names of its
+* contributors may be used to endorse or promote products derived from
+* this software without specific prior written permission.
+*
+* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*
+*/
 
 #include "src/core/lib/iomgr/port.h"
 
@@ -54,6 +54,9 @@
   grpc_endpoint base;
   gpr_refcount refcount;
 
+  uv_write_t write_req;
+  uv_shutdown_t shutdown_req;
+
   uv_tcp_t *handle;
 
   grpc_closure *read_cb;
@@ -64,14 +67,23 @@
   gpr_slice_buffer *write_slices;
   uv_buf_t *write_buffers;
 
+  grpc_resource_user resource_user;
+
   bool shutting_down;
+  bool resource_user_shutting_down;
+
   char *peer_string;
   grpc_pollset *pollset;
 } grpc_tcp;
 
 static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); }
 
-static void tcp_free(grpc_tcp *tcp) { gpr_free(tcp); }
+static void tcp_free(grpc_tcp *tcp) {
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user);
+  gpr_free(tcp);
+  grpc_exec_ctx_finish(&exec_ctx);
+}
 
 /*#define GRPC_TCP_REFCOUNT_DEBUG*/
 #ifdef GRPC_TCP_REFCOUNT_DEBUG
@@ -106,11 +118,14 @@
 
 static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
                          uv_buf_t *buf) {
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_tcp *tcp = handle->data;
   (void)suggested_size;
-  tcp->read_slice = gpr_slice_malloc(GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+  tcp->read_slice = grpc_resource_user_slice_malloc(
+      &exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
   buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
   buf->len = GPR_SLICE_LENGTH(tcp->read_slice);
+  grpc_exec_ctx_finish(&exec_ctx);
 }
 
 static void read_callback(uv_stream_t *stream, ssize_t nread,
@@ -198,7 +213,8 @@
     gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
   }
   gpr_free(tcp->write_buffers);
-  gpr_free(req);
+  grpc_resource_user_free(&exec_ctx, &tcp->resource_user,
+                          sizeof(uv_buf_t) * tcp->write_slices->count);
   grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
   grpc_exec_ctx_finish(&exec_ctx);
 }
@@ -243,12 +259,15 @@
   tcp->write_cb = cb;
   buffer_count = (unsigned int)tcp->write_slices->count;
   buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count);
+  grpc_resource_user_alloc(exec_ctx, &tcp->resource_user,
+                           sizeof(uv_buf_t) * buffer_count, NULL);
   for (i = 0; i < buffer_count; i++) {
     slice = &tcp->write_slices->slices[i];
     buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice);
     buffers[i].len = GPR_SLICE_LENGTH(*slice);
   }
-  write_req = gpr_malloc(sizeof(uv_write_t));
+  tcp->write_buffers = buffers;
+  write_req = &tcp->write_req;
   write_req->data = tcp;
   TCP_REF(tcp, "write");
   // TODO(murgatroid99): figure out what the return value here means
@@ -274,13 +293,29 @@
   (void)pollset;
 }
 
-static void shutdown_callback(uv_shutdown_t *req, int status) { gpr_free(req); }
+static void shutdown_callback(uv_shutdown_t *req, int status) {}
+
+static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
+                                        grpc_error *error) {
+  TCP_UNREF(arg, "resource_user");
+}
+
+static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx,
+                                            grpc_tcp *tcp) {
+  if (!tcp->resource_user_shutting_down) {
+    tcp->resource_user_shutting_down = true;
+    TCP_REF(tcp, "resource_user");
+    grpc_resource_user_shutdown(
+        exec_ctx, &tcp->resource_user,
+        grpc_closure_create(resource_user_shutdown_done, tcp));
+  }
+}
 
 static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   if (!tcp->shutting_down) {
     tcp->shutting_down = true;
-    uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t));
+    uv_shutdown_t *req = &tcp->shutdown_req;
     uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
   }
 }
@@ -289,6 +324,7 @@
   grpc_network_status_unregister_endpoint(ep);
   grpc_tcp *tcp = (grpc_tcp *)ep;
   uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
+  uv_resource_user_maybe_shutdown(exec_ctx, tcp);
   TCP_UNREF(tcp, "destroy");
 }
 
@@ -297,18 +333,21 @@
   return gpr_strdup(tcp->peer_string);
 }
 
+static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) {
+  grpc_tcp *tcp = (grpc_tcp *)ep;
+  return &tcp->resource_user;
+}
+
 static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
 
-static grpc_endpoint_vtable vtable = {uv_endpoint_read,
-                                      uv_endpoint_write,
-                                      uv_get_workqueue,
-                                      uv_add_to_pollset,
-                                      uv_add_to_pollset_set,
-                                      uv_endpoint_shutdown,
-                                      uv_destroy,
-                                      uv_get_peer};
+static grpc_endpoint_vtable vtable = {
+    uv_endpoint_read,  uv_endpoint_write,     uv_get_workqueue,
+    uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown,
+    uv_destroy,        uv_get_resource_user,  uv_get_peer};
 
-grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) {
+grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
+                               grpc_resource_quota *resource_quota,
+                               char *peer_string) {
   grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
 
   if (grpc_tcp_trace) {
@@ -325,6 +364,8 @@
   gpr_ref_init(&tcp->refcount, 1);
   tcp->peer_string = gpr_strdup(peer_string);
   tcp->shutting_down = false;
+  tcp->resource_user_shutting_down = false;
+  grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
   /* Tell network status tracking code about the new endpoint */
   grpc_network_status_register_endpoint(&tcp->base);
 
diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h
index eed4115..970fcaf 100644
--- a/src/core/lib/iomgr/tcp_uv.h
+++ b/src/core/lib/iomgr/tcp_uv.h
@@ -52,6 +52,8 @@
 
 #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
 
-grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string);
+grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
+                               grpc_resource_quota *resource_quota,
+                               char *peer_string);
 
 #endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */