blob: fa198fd8e1c7c7267d9fd5410f0bfa2399875a3d [file] [log] [blame]
murgatroid999030c812016-09-16 13:25:08 -07001/*
2 *
3 * Copyright 2016, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/lib/iomgr/port.h"
35
36#ifdef GRPC_UV
37
38#include <limits.h>
39#include <string.h>
40
41#include <grpc/support/alloc.h>
42#include <grpc/support/log.h>
43#include <grpc/support/slice_buffer.h>
44#include <grpc/support/string_util.h>
45
46#include "src/core/lib/iomgr/error.h"
47#include "src/core/lib/iomgr/network_status_tracker.h"
48#include "src/core/lib/iomgr/tcp_uv.h"
49#include "src/core/lib/support/string.h"
50
51int grpc_tcp_trace = 0;
52
53typedef struct {
54 grpc_endpoint base;
55 gpr_refcount refcount;
56
57 uv_tcp_t *handle;
58
59 grpc_closure *read_cb;
60 grpc_closure *write_cb;
61
62 gpr_slice read_slice;
63 gpr_slice_buffer *read_slices;
64 gpr_slice_buffer *write_slices;
65 uv_buf_t *write_buffers;
66
67 int shutting_down;
68 char *peer_string;
69 grpc_pollset *pollset;
70} grpc_tcp;
71
72static void uv_close_callback(uv_handle_t *handle) {
73 gpr_log(GPR_DEBUG, "Freeing uv_tcp_t handle %p", handle);
74 gpr_free(handle);
75}
76
77static void tcp_free(grpc_tcp *tcp) {
78 gpr_free(tcp);
79}
80
81/*#define GRPC_TCP_REFCOUNT_DEBUG*/
82#ifdef GRPC_TCP_REFCOUNT_DEBUG
83#define TCP_UNREF(tcp, reason) \
84 tcp_unref((tcp), (reason), __FILE__, __LINE__)
85#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
86static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
87 int line) {
88 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
89 reason, tcp->refcount.count, tcp->refcount.count - 1);
90 if (gpr_unref(&tcp->refcount)) {
91 tcp_free(tcp);
92 }
93}
94
95static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
96 int line) {
97 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
98 reason, tcp->refcount.count, tcp->refcount.count + 1);
99 gpr_ref(&tcp->refcount);
100}
101#else
102#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
103#define TCP_REF(tcp, reason) tcp_ref((tcp))
104static void tcp_unref(grpc_tcp *tcp) {
105 if (gpr_unref(&tcp->refcount)) {
106 tcp_free(tcp);
107 }
108}
109
110static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
111#endif
112
113static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
114 grpc_tcp *tcp = handle->data;
115 (void)suggested_size;
116 tcp->read_slice = gpr_slice_malloc(GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
117 buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice);
118 buf->len = GPR_SLICE_LENGTH(tcp->read_slice);
119}
120
121static void read_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
122 gpr_slice sub;
123 grpc_error *error;
124 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
125 grpc_tcp *tcp = stream->data;
126 grpc_closure *cb = tcp->read_cb;
127 if (nread == 0) {
128 // Nothing happened. Wait for the next callback
129 return;
130 }
131 TCP_UNREF(tcp, "read");
132 tcp->read_cb = NULL;
133 // TODO(murgatroid99): figure out what the return value here means
134 uv_read_stop(stream);
135 if (nread == UV_EOF) {
136 error = GRPC_ERROR_CREATE("EOF");
137 } else if (nread > 0) {
138 // Successful read
139 sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, nread);
140 gpr_slice_buffer_add(tcp->read_slices, sub);
141 error = GRPC_ERROR_NONE;
142 if (grpc_tcp_trace) {
143 size_t i;
144 const char *str = grpc_error_string(error);
145 gpr_log(GPR_DEBUG, "read: error=%s", str);
146 grpc_error_free_string(str);
147 for (i = 0; i < tcp->read_slices->count; i++) {
148 char *dump = gpr_dump_slice(tcp->read_slices->slices[i],
149 GPR_DUMP_HEX | GPR_DUMP_ASCII);
150 gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
151 gpr_free(dump);
152 }
153 }
154 } else {
155 // nread < 0: Error
156 error = GRPC_ERROR_CREATE("TCP Read failed");
157 }
158 grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
159 grpc_exec_ctx_finish(&exec_ctx);
160}
161
162static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
163 gpr_slice_buffer *read_slices, grpc_closure *cb) {
164 grpc_tcp *tcp = (grpc_tcp *)ep;
165 int status;
166 grpc_error *error = GRPC_ERROR_NONE;
167 GPR_ASSERT(tcp->read_cb == NULL);
168 tcp->read_cb = cb;
169 tcp->read_slices = read_slices;
170 gpr_slice_buffer_reset_and_unref(read_slices);
171 TCP_REF(tcp, "read");
172 // TODO(murgatroid99): figure out what the return value here means
173 status = uv_read_start((uv_stream_t *)tcp->handle, alloc_uv_buf, read_callback);
174 if (status != 0) {
175 error = GRPC_ERROR_CREATE("TCP Read failed at start");
176 error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
177 uv_strerror(status));
178 grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
179 }
180 if (grpc_tcp_trace) {
181 const char *str = grpc_error_string(error);
182 gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str);
183 }
184}
185
186static void write_callback(uv_write_t *req, int status) {
187 grpc_tcp *tcp = req->data;
188 grpc_error *error;
189 grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
190 grpc_closure *cb = tcp->write_cb;
191 tcp->write_cb = NULL;
192 TCP_UNREF(tcp, "write");
193 if (status == 0) {
194 error = GRPC_ERROR_NONE;
195 } else {
196 error = GRPC_ERROR_CREATE("TCP Write failed");
197 }
198 if (grpc_tcp_trace) {
199 const char *str = grpc_error_string(error);
200 gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
201 }
202 gpr_free(tcp->write_buffers);
203 gpr_free(req);
204 grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
205 grpc_exec_ctx_finish(&exec_ctx);
206}
207
208static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
209 gpr_slice_buffer *write_slices,
210 grpc_closure *cb) {
211 grpc_tcp *tcp = (grpc_tcp *)ep;
212 uv_buf_t *buffers;
213 unsigned int buffer_count;
214 unsigned int i;
215 gpr_slice *slice;
216 uv_write_t *write_req;
217
218 if (grpc_tcp_trace) {
219 size_t i;
220
221 for (i = 0; i < write_slices->count; i++) {
222 char *data =
223 gpr_dump_slice(write_slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
224 gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
225 gpr_free(data);
226 }
227 }
228
229 if (tcp->shutting_down) {
230 grpc_exec_ctx_sched(exec_ctx, cb,
231 GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
232 return;
233 }
234
235 GPR_ASSERT(tcp->write_cb == NULL);
236 tcp->write_slices = write_slices;
237 GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
238 if (tcp->write_slices->count == 0) {
239 // No slices means we don't have to do anything,
240 // and libuv doesn't like empty writes
241 grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL);
242 return;
243 }
244
245 tcp->write_cb = cb;
246 buffer_count = (unsigned int)tcp->write_slices->count;
247 buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count);
248 for (i = 0; i < buffer_count; i++) {
249 slice = &tcp->write_slices->slices[i];
250 buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice);
251 buffers[i].len = GPR_SLICE_LENGTH(*slice);
252 }
253 write_req = gpr_malloc(sizeof(uv_write_t));
254 write_req->data = tcp;
255 TCP_REF(tcp, "write");
256 // TODO(murgatroid99): figure out what the return value here means
257 uv_write(write_req, (uv_stream_t *)tcp->handle, buffers, buffer_count,
258 write_callback);
259}
260
261static void uv_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
262 grpc_pollset *pollset) {
263 // No-op. We're ignoring pollsets currently
264 (void) exec_ctx;
265 (void) ep;
266 (void) pollset;
267 grpc_tcp *tcp = (grpc_tcp *) ep;
268 tcp->pollset = pollset;
269}
270
271static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
272 grpc_pollset_set *pollset) {
273 // No-op. We're ignoring pollsets currently
274 (void) exec_ctx;
275 (void) ep;
276 (void) pollset;
277}
278
279static void shutdown_callback(uv_shutdown_t *req, int status) {
280 gpr_free(req);
281}
282
283static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
284 grpc_tcp *tcp = (grpc_tcp *)ep;
285 uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t));
286 uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
287}
288
289static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
290 grpc_network_status_unregister_endpoint(ep);
291 grpc_tcp *tcp = (grpc_tcp *)ep;
292 gpr_log(GPR_DEBUG, "Closing uv_tcp_t handle %p", tcp->handle);
293 uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
294 TCP_UNREF(tcp, "destroy");
295}
296
297static char *uv_get_peer(grpc_endpoint *ep) {
298 grpc_tcp *tcp = (grpc_tcp *)ep;
299 return gpr_strdup(tcp->peer_string);
300}
301
302static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) {return NULL; }
303
304static grpc_endpoint_vtable vtable = {uv_endpoint_read,
305 uv_endpoint_write,
306 uv_get_workqueue,
307 uv_add_to_pollset,
308 uv_add_to_pollset_set,
309 uv_endpoint_shutdown,
310 uv_destroy,
311 uv_get_peer};
312
313grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) {
314 grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
315
316 if (grpc_tcp_trace) {
317 gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp);
318 }
319
320 memset(tcp, 0, sizeof(grpc_tcp));
321 tcp->base.vtable = &vtable;
322 tcp->handle = handle;
323 handle->data = tcp;
324 gpr_ref_init(&tcp->refcount, 1);
325 tcp->peer_string = gpr_strdup(peer_string);
326 /* Tell network status tracking code about the new endpoint */
327 grpc_network_status_register_endpoint(&tcp->base);
328
329#ifndef GRPC_UV_TCP_HOLD_LOOP
330 uv_unref((uv_handle_t *)handle);
331#endif
332
333 return &tcp->base;
334}
335
336#endif /* GRPC_UV */