Add resource quota support to uv TCP code
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 3860fe3..b90e0c8 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -1,35 +1,35 @@
 /*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- *     * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *     * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *     * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
+*
+* Copyright 2016, Google Inc.
+* All rights reserved.
+*
+* Redistribution and use in source and binary forms, with or without
+* modification, are permitted provided that the following conditions are
+* met:
+*
+*     * Redistributions of source code must retain the above copyright
+* notice, this list of conditions and the following disclaimer.
+*     * Redistributions in binary form must reproduce the above
+* copyright notice, this list of conditions and the following disclaimer
+* in the documentation and/or other materials provided with the
+* distribution.
+*     * Neither the name of Google Inc. nor the names of its
+* contributors may be used to endorse or promote products derived from
+* this software without specific prior written permission.
+*
+* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*
+*/
 
 #include "src/core/lib/iomgr/port.h"
 
@@ -54,6 +54,9 @@
   grpc_endpoint base;
   gpr_refcount refcount;
 
+  uv_write_t write_req;
+  uv_shutdown_t shutdown_req;
+
   uv_tcp_t *handle;
 
   grpc_closure *read_cb;
@@ -64,14 +67,23 @@
   gpr_slice_buffer *write_slices;
   uv_buf_t *write_buffers;
 
+  grpc_resource_user resource_user;
+
   bool shutting_down;
+  bool resource_user_shutting_down;
+
   char *peer_string;
   grpc_pollset *pollset;
 } grpc_tcp;
 
 static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); }
 
-static void tcp_free(grpc_tcp *tcp) { gpr_free(tcp); }
+static void tcp_free(grpc_tcp *tcp) {
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+  grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user);
+  gpr_free(tcp);
+  grpc_exec_ctx_finish(&exec_ctx);
+}
 
 /*#define GRPC_TCP_REFCOUNT_DEBUG*/
 #ifdef GRPC_TCP_REFCOUNT_DEBUG
@@ -106,11 +118,14 @@
 
 static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
                          uv_buf_t *buf) {
+  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
   grpc_tcp *tcp = handle->data;
   (void)suggested_size;
-  tcp->read_slice = gpr_slice_malloc(GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+  tcp->read_slice = grpc_resource_user_slice_malloc(
+      &exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
   buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
   buf->len = GPR_SLICE_LENGTH(tcp->read_slice);
+  grpc_exec_ctx_finish(&exec_ctx);
 }
 
 static void read_callback(uv_stream_t *stream, ssize_t nread,
@@ -198,7 +213,8 @@
     gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
   }
   gpr_free(tcp->write_buffers);
-  gpr_free(req);
+  grpc_resource_user_free(&exec_ctx, &tcp->resource_user,
+                          sizeof(uv_buf_t) * tcp->write_slices->count);
   grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
   grpc_exec_ctx_finish(&exec_ctx);
 }
@@ -243,12 +259,15 @@
   tcp->write_cb = cb;
   buffer_count = (unsigned int)tcp->write_slices->count;
   buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count);
+  grpc_resource_user_alloc(exec_ctx, &tcp->resource_user,
+                           sizeof(uv_buf_t) * buffer_count, NULL);
   for (i = 0; i < buffer_count; i++) {
     slice = &tcp->write_slices->slices[i];
     buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice);
     buffers[i].len = GPR_SLICE_LENGTH(*slice);
   }
-  write_req = gpr_malloc(sizeof(uv_write_t));
+  tcp->write_buffers = buffers;
+  write_req = &tcp->write_req;
   write_req->data = tcp;
   TCP_REF(tcp, "write");
   // TODO(murgatroid99): figure out what the return value here means
@@ -274,13 +293,29 @@
   (void)pollset;
 }
 
-static void shutdown_callback(uv_shutdown_t *req, int status) { gpr_free(req); }
+static void shutdown_callback(uv_shutdown_t *req, int status) {}
+
+static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
+                                        grpc_error *error) {
+  TCP_UNREF(arg, "resource_user");
+}
+
+static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx,
+                                            grpc_tcp *tcp) {
+  if (!tcp->resource_user_shutting_down) {
+    tcp->resource_user_shutting_down = true;
+    TCP_REF(tcp, "resource_user");
+    grpc_resource_user_shutdown(
+        exec_ctx, &tcp->resource_user,
+        grpc_closure_create(resource_user_shutdown_done, tcp));
+  }
+}
 
 static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
   grpc_tcp *tcp = (grpc_tcp *)ep;
   if (!tcp->shutting_down) {
     tcp->shutting_down = true;
-    uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t));
+    uv_shutdown_t *req = &tcp->shutdown_req;
     uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
   }
 }
@@ -289,6 +324,7 @@
   grpc_network_status_unregister_endpoint(ep);
   grpc_tcp *tcp = (grpc_tcp *)ep;
   uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
+  uv_resource_user_maybe_shutdown(exec_ctx, tcp);
   TCP_UNREF(tcp, "destroy");
 }
 
@@ -297,18 +333,21 @@
   return gpr_strdup(tcp->peer_string);
 }
 
+static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) {
+  grpc_tcp *tcp = (grpc_tcp *)ep;
+  return &tcp->resource_user;
+}
+
 static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
 
-static grpc_endpoint_vtable vtable = {uv_endpoint_read,
-                                      uv_endpoint_write,
-                                      uv_get_workqueue,
-                                      uv_add_to_pollset,
-                                      uv_add_to_pollset_set,
-                                      uv_endpoint_shutdown,
-                                      uv_destroy,
-                                      uv_get_peer};
+static grpc_endpoint_vtable vtable = {
+    uv_endpoint_read,  uv_endpoint_write,     uv_get_workqueue,
+    uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown,
+    uv_destroy,        uv_get_resource_user,  uv_get_peer};
 
-grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) {
+grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
+                               grpc_resource_quota *resource_quota,
+                               char *peer_string) {
   grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
 
   if (grpc_tcp_trace) {
@@ -325,6 +364,8 @@
   gpr_ref_init(&tcp->refcount, 1);
   tcp->peer_string = gpr_strdup(peer_string);
   tcp->shutting_down = false;
+  tcp->resource_user_shutting_down = false;
+  grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
   /* Tell network status tracking code about the new endpoint */
   grpc_network_status_register_endpoint(&tcp->base);