blob: f3be41aa571ad44c54a5503bf70d12f82e540fa2 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller0c0b60c2015-01-21 15:49:28 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
ctiller18b49ab2014-12-09 14:39:16 -080038#include "src/core/iomgr/tcp_posix.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039
40#include <errno.h>
41#include <stdlib.h>
42#include <string.h>
43#include <sys/types.h>
44#include <sys/socket.h>
45#include <unistd.h>
46
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047#include <grpc/support/alloc.h>
48#include <grpc/support/log.h>
49#include <grpc/support/slice.h>
Craig Tiller1b22b9d2015-07-20 13:42:22 -070050#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051#include <grpc/support/sync.h>
52#include <grpc/support/time.h>
53
Craig Tiller1b22b9d2015-07-20 13:42:22 -070054#include "src/core/support/string.h"
55#include "src/core/debug/trace.h"
56#include "src/core/profiling/timers.h"
57
Craig Tiller2da02962015-05-06 16:14:25 -070058#ifdef GPR_HAVE_MSG_NOSIGNAL
59#define SENDMSG_FLAGS MSG_NOSIGNAL
60#else
61#define SENDMSG_FLAGS 0
62#endif
63
Craig Tillerebc7ef22015-09-10 22:19:25 -070064#ifdef GPR_MSG_IOVLEN_TYPE
65typedef GPR_MSG_IOVLEN_TYPE msg_iovlen_type;
66#else
67typedef size_t msg_iovlen_type;
68#endif
69
Craig Tillerfaa84802015-03-01 21:56:38 -080070int grpc_tcp_trace = 0;
71
Craig Tillera82950e2015-09-22 12:33:20 -070072typedef struct {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080073 grpc_endpoint base;
ctiller18b49ab2014-12-09 14:39:16 -080074 grpc_fd *em_fd;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080075 int fd;
Craig Tiller5c07cce2015-04-28 09:21:58 -070076 int finished_edge;
Craig Tillera82950e2015-09-22 12:33:20 -070077 msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080078 size_t slice_size;
79 gpr_refcount refcount;
80
Craig Tiller649deeb2015-09-24 23:19:40 -070081 /* garbage after the last read */
82 gpr_slice_buffer last_read_buffer;
83
Craig Tillerb0298592015-08-27 07:38:01 -070084 gpr_slice_buffer *incoming_buffer;
85 gpr_slice_buffer *outgoing_buffer;
86 /** slice within outgoing_buffer to write next */
87 size_t outgoing_slice_idx;
88 /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
89 size_t outgoing_byte_idx;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080090
Craig Tiller33825112015-09-18 07:44:19 -070091 grpc_closure *read_cb;
92 grpc_closure *write_cb;
yang-gdc215932015-11-30 14:25:01 -080093 grpc_closure *release_fd_cb;
yang-g5d850372015-12-01 10:32:28 -080094 int *release_fd;
Craig Tiller0fcd53c2015-02-18 15:10:53 -080095
Craig Tiller33825112015-09-18 07:44:19 -070096 grpc_closure read_closure;
97 grpc_closure write_closure;
David Garcia Quintas5f228f52015-05-26 19:58:50 -070098
Craig Tiller1b22b9d2015-07-20 13:42:22 -070099 char *peer_string;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800100} grpc_tcp;
101
Craig Tillera82950e2015-09-22 12:33:20 -0700102static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
103 int success);
104static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
105 int success);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800106
Craig Tillera82950e2015-09-22 12:33:20 -0700107static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
108 grpc_tcp *tcp = (grpc_tcp *)ep;
109 grpc_fd_shutdown(exec_ctx, tcp->em_fd);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800110}
111
Craig Tillera82950e2015-09-22 12:33:20 -0700112static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
yang-g5d850372015-12-01 10:32:28 -0800113 grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
yang-gdc215932015-11-30 14:25:01 -0800114 "tcp_unref_orphan");
Craig Tiller649deeb2015-09-24 23:19:40 -0700115 gpr_slice_buffer_destroy(&tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700116 gpr_free(tcp->peer_string);
117 gpr_free(tcp);
Craig Tillerb0298592015-08-27 07:38:01 -0700118}
119
120/*#define GRPC_TCP_REFCOUNT_DEBUG*/
121#ifdef GRPC_TCP_REFCOUNT_DEBUG
Craig Tiller8af4c332015-09-22 12:32:31 -0700122#define TCP_UNREF(cl, tcp, reason) \
123 tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__)
Craig Tillerb0298592015-08-27 07:38:01 -0700124#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
Craig Tillera82950e2015-09-22 12:33:20 -0700125static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
126 const char *reason, const char *file, int line) {
127 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
128 reason, tcp->refcount.count, tcp->refcount.count - 1);
129 if (gpr_unref(&tcp->refcount)) {
130 tcp_free(exec_ctx, tcp);
131 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800132}
133
Craig Tillera82950e2015-09-22 12:33:20 -0700134static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
135 int line) {
136 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
137 reason, tcp->refcount.count, tcp->refcount.count + 1);
138 gpr_ref(&tcp->refcount);
Craig Tillerb0298592015-08-27 07:38:01 -0700139}
140#else
Craig Tiller8af4c332015-09-22 12:32:31 -0700141#define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp))
Craig Tillerb0298592015-08-27 07:38:01 -0700142#define TCP_REF(tcp, reason) tcp_ref((tcp))
Craig Tillera82950e2015-09-22 12:33:20 -0700143static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
144 if (gpr_unref(&tcp->refcount)) {
145 tcp_free(exec_ctx, tcp);
146 }
Craig Tiller592e7f22015-08-21 10:45:48 -0700147}
148
Craig Tillera82950e2015-09-22 12:33:20 -0700149static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
Craig Tillerb0298592015-08-27 07:38:01 -0700150#endif
151
Craig Tillera82950e2015-09-22 12:33:20 -0700152static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
153 grpc_tcp *tcp = (grpc_tcp *)ep;
154 TCP_UNREF(exec_ctx, tcp, "destroy");
Craig Tillerb0298592015-08-27 07:38:01 -0700155}
156
Craig Tillera82950e2015-09-22 12:33:20 -0700157static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
Craig Tiller33825112015-09-18 07:44:19 -0700158 grpc_closure *cb = tcp->read_cb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800159
Craig Tillera82950e2015-09-22 12:33:20 -0700160 if (grpc_tcp_trace) {
161 size_t i;
162 gpr_log(GPR_DEBUG, "read: success=%d", success);
163 for (i = 0; i < tcp->incoming_buffer->count; i++) {
164 char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
165 GPR_DUMP_HEX | GPR_DUMP_ASCII);
166 gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
167 gpr_free(dump);
Craig Tiller6e7c6222015-02-20 15:31:21 -0800168 }
Craig Tillera82950e2015-09-22 12:33:20 -0700169 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800170
171 tcp->read_cb = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700172 tcp->incoming_buffer = NULL;
Craig Tillera82950e2015-09-22 12:33:20 -0700173 cb->cb(exec_ctx, cb->cb_arg, success);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800174}
175
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800176#define MAX_READ_IOVEC 4
Craig Tillera82950e2015-09-22 12:33:20 -0700177static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800178 struct msghdr msg;
179 struct iovec iov[MAX_READ_IOVEC];
180 ssize_t read_bytes;
Craig Tillerb0298592015-08-27 07:38:01 -0700181 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800182
Craig Tillera82950e2015-09-22 12:33:20 -0700183 GPR_ASSERT(!tcp->finished_edge);
184 GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
185 GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700186 GPR_TIMER_BEGIN("tcp_continue_read", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800187
Craig Tillera82950e2015-09-22 12:33:20 -0700188 while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
189 gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
190 gpr_slice_malloc(tcp->slice_size));
191 }
192 for (i = 0; i < tcp->incoming_buffer->count; i++) {
193 iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
194 iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
195 }
Craig Tiller5c07cce2015-04-28 09:21:58 -0700196
197 msg.msg_name = NULL;
198 msg.msg_namelen = 0;
199 msg.msg_iov = iov;
200 msg.msg_iovlen = tcp->iov_size;
201 msg.msg_control = NULL;
202 msg.msg_controllen = 0;
203 msg.msg_flags = 0;
204
Craig Tiller0ba432d2015-10-09 16:57:11 -0700205 GPR_TIMER_BEGIN("recvmsg", 1);
Craig Tillera82950e2015-09-22 12:33:20 -0700206 do {
207 read_bytes = recvmsg(tcp->fd, &msg, 0);
208 } while (read_bytes < 0 && errno == EINTR);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700209 GPR_TIMER_END("recvmsg", 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700210
Craig Tillera82950e2015-09-22 12:33:20 -0700211 if (read_bytes < 0) {
212 /* NB: After calling call_read_cb a parallel call of the read handler may
213 * be running. */
214 if (errno == EAGAIN) {
215 if (tcp->iov_size > 1) {
216 tcp->iov_size /= 2;
217 }
218 /* We've consumed the edge, request a new one */
219 grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
220 } else {
221 /* TODO(klempner): Log interesting errors */
222 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
223 call_read_cb(exec_ctx, tcp, 0);
224 TCP_UNREF(exec_ctx, tcp, "read");
Craig Tiller45724b32015-09-22 10:42:19 -0700225 }
Craig Tillera82950e2015-09-22 12:33:20 -0700226 } else if (read_bytes == 0) {
227 /* 0 read size ==> end of stream */
228 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
229 call_read_cb(exec_ctx, tcp, 0);
230 TCP_UNREF(exec_ctx, tcp, "read");
231 } else {
232 GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
233 if ((size_t)read_bytes < tcp->incoming_buffer->length) {
234 gpr_slice_buffer_trim_end(
235 tcp->incoming_buffer,
Craig Tiller649deeb2015-09-24 23:19:40 -0700236 tcp->incoming_buffer->length - (size_t)read_bytes,
237 &tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700238 } else if (tcp->iov_size < MAX_READ_IOVEC) {
239 ++tcp->iov_size;
Craig Tiller45724b32015-09-22 10:42:19 -0700240 }
Craig Tillera82950e2015-09-22 12:33:20 -0700241 GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
242 call_read_cb(exec_ctx, tcp, 1);
243 TCP_UNREF(exec_ctx, tcp, "read");
244 }
Craig Tiller45724b32015-09-22 10:42:19 -0700245
Craig Tiller0ba432d2015-10-09 16:57:11 -0700246 GPR_TIMER_END("tcp_continue_read", 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700247}
248
Craig Tillera82950e2015-09-22 12:33:20 -0700249static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
250 int success) {
251 grpc_tcp *tcp = (grpc_tcp *)arg;
252 GPR_ASSERT(!tcp->finished_edge);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700253
Craig Tillera82950e2015-09-22 12:33:20 -0700254 if (!success) {
255 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
256 call_read_cb(exec_ctx, tcp, 0);
257 TCP_UNREF(exec_ctx, tcp, "read");
258 } else {
259 tcp_continue_read(exec_ctx, tcp);
260 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800261}
262
Craig Tillera82950e2015-09-22 12:33:20 -0700263static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
264 gpr_slice_buffer *incoming_buffer, grpc_closure *cb) {
265 grpc_tcp *tcp = (grpc_tcp *)ep;
266 GPR_ASSERT(tcp->read_cb == NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800267 tcp->read_cb = cb;
Craig Tillerb0298592015-08-27 07:38:01 -0700268 tcp->incoming_buffer = incoming_buffer;
Craig Tillera82950e2015-09-22 12:33:20 -0700269 gpr_slice_buffer_reset_and_unref(incoming_buffer);
Craig Tiller649deeb2015-09-24 23:19:40 -0700270 gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700271 TCP_REF(tcp, "read");
272 if (tcp->finished_edge) {
273 tcp->finished_edge = 0;
274 grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
275 } else {
276 grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, 1);
277 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800278}
279
Craig Tillera82950e2015-09-22 12:33:20 -0700280typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result;
Craig Tillerd1bec032015-09-18 17:29:00 -0700281
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800282#define MAX_WRITE_IOVEC 16
Craig Tillera82950e2015-09-22 12:33:20 -0700283static flush_result tcp_flush(grpc_tcp *tcp) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800284 struct msghdr msg;
285 struct iovec iov[MAX_WRITE_IOVEC];
Craig Tillerebc7ef22015-09-10 22:19:25 -0700286 msg_iovlen_type iov_size;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800287 ssize_t sent_length;
Craig Tiller32ca48c2015-09-10 11:47:15 -0700288 size_t sending_length;
289 size_t trailing;
290 size_t unwind_slice_idx;
291 size_t unwind_byte_idx;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800292
Craig Tillera82950e2015-09-22 12:33:20 -0700293 for (;;) {
294 sending_length = 0;
295 unwind_slice_idx = tcp->outgoing_slice_idx;
296 unwind_byte_idx = tcp->outgoing_byte_idx;
297 for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
Craig Tiller71a0f9d2015-09-28 17:22:01 -0700298 iov_size != MAX_WRITE_IOVEC;
Craig Tillera82950e2015-09-22 12:33:20 -0700299 iov_size++) {
300 iov[iov_size].iov_base =
301 GPR_SLICE_START_PTR(
302 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
303 tcp->outgoing_byte_idx;
304 iov[iov_size].iov_len =
305 GPR_SLICE_LENGTH(
306 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
307 tcp->outgoing_byte_idx;
308 sending_length += iov[iov_size].iov_len;
309 tcp->outgoing_slice_idx++;
310 tcp->outgoing_byte_idx = 0;
311 }
312 GPR_ASSERT(iov_size > 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800313
Craig Tillera82950e2015-09-22 12:33:20 -0700314 msg.msg_name = NULL;
315 msg.msg_namelen = 0;
316 msg.msg_iov = iov;
317 msg.msg_iovlen = iov_size;
318 msg.msg_control = NULL;
319 msg.msg_controllen = 0;
320 msg.msg_flags = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800321
Craig Tiller0ba432d2015-10-09 16:57:11 -0700322 GPR_TIMER_BEGIN("sendmsg", 1);
Craig Tillera82950e2015-09-22 12:33:20 -0700323 do {
324 /* TODO(klempner): Cork if this is a partial write */
325 sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
326 } while (sent_length < 0 && errno == EINTR);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700327 GPR_TIMER_END("sendmsg", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800328
Craig Tillera82950e2015-09-22 12:33:20 -0700329 if (sent_length < 0) {
330 if (errno == EAGAIN) {
331 tcp->outgoing_slice_idx = unwind_slice_idx;
332 tcp->outgoing_byte_idx = unwind_byte_idx;
333 return FLUSH_PENDING;
334 } else {
335 /* TODO(klempner): Log some of these */
336 return FLUSH_ERROR;
337 }
338 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800339
Craig Tillera82950e2015-09-22 12:33:20 -0700340 GPR_ASSERT(tcp->outgoing_byte_idx == 0);
341 trailing = sending_length - (size_t)sent_length;
342 while (trailing > 0) {
343 size_t slice_length;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800344
Craig Tillera82950e2015-09-22 12:33:20 -0700345 tcp->outgoing_slice_idx--;
346 slice_length = GPR_SLICE_LENGTH(
347 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
348 if (slice_length > trailing) {
349 tcp->outgoing_byte_idx = slice_length - trailing;
350 break;
351 } else {
352 trailing -= slice_length;
353 }
354 }
Craig Tillerb0298592015-08-27 07:38:01 -0700355
Craig Tillera82950e2015-09-22 12:33:20 -0700356 if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
357 return FLUSH_DONE;
358 }
359 };
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800360}
361
Craig Tillera82950e2015-09-22 12:33:20 -0700362static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
363 int success) {
364 grpc_tcp *tcp = (grpc_tcp *)arg;
Craig Tillerd1bec032015-09-18 17:29:00 -0700365 flush_result status;
Craig Tiller33825112015-09-18 07:44:19 -0700366 grpc_closure *cb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367
Craig Tillera82950e2015-09-22 12:33:20 -0700368 if (!success) {
369 cb = tcp->write_cb;
370 tcp->write_cb = NULL;
371 cb->cb(exec_ctx, cb->cb_arg, 0);
372 TCP_UNREF(exec_ctx, tcp, "write");
373 return;
374 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800375
Craig Tillera82950e2015-09-22 12:33:20 -0700376 status = tcp_flush(tcp);
377 if (status == FLUSH_PENDING) {
378 grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
379 } else {
380 cb = tcp->write_cb;
381 tcp->write_cb = NULL;
Craig Tiller0ba432d2015-10-09 16:57:11 -0700382 GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700383 cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700384 GPR_TIMER_END("tcp_handle_write.cb", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700385 TCP_UNREF(exec_ctx, tcp, "write");
386 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800387}
388
Craig Tillera82950e2015-09-22 12:33:20 -0700389static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
390 gpr_slice_buffer *buf, grpc_closure *cb) {
391 grpc_tcp *tcp = (grpc_tcp *)ep;
Craig Tillerd1bec032015-09-18 17:29:00 -0700392 flush_result status;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800393
Craig Tillera82950e2015-09-22 12:33:20 -0700394 if (grpc_tcp_trace) {
395 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800396
Craig Tillera82950e2015-09-22 12:33:20 -0700397 for (i = 0; i < buf->count; i++) {
398 char *data =
399 gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
400 gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
401 gpr_free(data);
Craig Tiller6e7c6222015-02-20 15:31:21 -0800402 }
Craig Tillera82950e2015-09-22 12:33:20 -0700403 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800404
Craig Tiller0ba432d2015-10-09 16:57:11 -0700405 GPR_TIMER_BEGIN("tcp_write", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700406 GPR_ASSERT(tcp->write_cb == NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800407
Craig Tillera82950e2015-09-22 12:33:20 -0700408 if (buf->length == 0) {
Craig Tiller0ba432d2015-10-09 16:57:11 -0700409 GPR_TIMER_END("tcp_write", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700410 grpc_exec_ctx_enqueue(exec_ctx, cb, 1);
411 return;
412 }
Craig Tillerb0298592015-08-27 07:38:01 -0700413 tcp->outgoing_buffer = buf;
414 tcp->outgoing_slice_idx = 0;
415 tcp->outgoing_byte_idx = 0;
416
Craig Tillera82950e2015-09-22 12:33:20 -0700417 status = tcp_flush(tcp);
418 if (status == FLUSH_PENDING) {
419 TCP_REF(tcp, "write");
420 tcp->write_cb = cb;
421 grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
422 } else {
423 grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE);
424 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800425
Craig Tiller0ba432d2015-10-09 16:57:11 -0700426 GPR_TIMER_END("tcp_write", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800427}
428
Craig Tillera82950e2015-09-22 12:33:20 -0700429static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
430 grpc_pollset *pollset) {
431 grpc_tcp *tcp = (grpc_tcp *)ep;
432 grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd);
ctillerd79b4862014-12-17 16:36:59 -0800433}
434
Craig Tillera82950e2015-09-22 12:33:20 -0700435static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
436 grpc_pollset_set *pollset_set) {
437 grpc_tcp *tcp = (grpc_tcp *)ep;
438 grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700439}
440
Craig Tillera82950e2015-09-22 12:33:20 -0700441static char *tcp_get_peer(grpc_endpoint *ep) {
442 grpc_tcp *tcp = (grpc_tcp *)ep;
443 return gpr_strdup(tcp->peer_string);
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700444}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800445
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700446static const grpc_endpoint_vtable vtable = {
Craig Tiller71a0f9d2015-09-28 17:22:01 -0700447 tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
Craig Tillera82950e2015-09-22 12:33:20 -0700448 tcp_shutdown, tcp_destroy, tcp_get_peer};
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700449
Craig Tillera82950e2015-09-22 12:33:20 -0700450grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
451 const char *peer_string) {
452 grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800453 tcp->base.vtable = &vtable;
Craig Tillera82950e2015-09-22 12:33:20 -0700454 tcp->peer_string = gpr_strdup(peer_string);
ctiller58393c22015-01-07 14:03:30 -0800455 tcp->fd = em_fd->fd;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800456 tcp->read_cb = NULL;
457 tcp->write_cb = NULL;
yang-gdc215932015-11-30 14:25:01 -0800458 tcp->release_fd_cb = NULL;
yang-g5d850372015-12-01 10:32:28 -0800459 tcp->release_fd = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700460 tcp->incoming_buffer = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800461 tcp->slice_size = slice_size;
Craig Tiller2b1fd662015-04-28 07:33:37 -0700462 tcp->iov_size = 1;
Craig Tiller5c07cce2015-04-28 09:21:58 -0700463 tcp->finished_edge = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800464 /* paired with unref in grpc_tcp_destroy */
Craig Tillera82950e2015-09-22 12:33:20 -0700465 gpr_ref_init(&tcp->refcount, 1);
nnoble0c475f02014-12-05 15:37:39 -0800466 tcp->em_fd = em_fd;
Craig Tillerb0298592015-08-27 07:38:01 -0700467 tcp->read_closure.cb = tcp_handle_read;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800468 tcp->read_closure.cb_arg = tcp;
Craig Tillerb0298592015-08-27 07:38:01 -0700469 tcp->write_closure.cb = tcp_handle_write;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800470 tcp->write_closure.cb_arg = tcp;
Craig Tiller649deeb2015-09-24 23:19:40 -0700471 gpr_slice_buffer_init(&tcp->last_read_buffer);
David Garcia Quintas5f228f52015-05-26 19:58:50 -0700472
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800473 return &tcp->base;
474}
Craig Tiller0c0b60c2015-01-21 15:49:28 -0800475
yang-gdc215932015-11-30 14:25:01 -0800476void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
yang-g5d850372015-12-01 10:32:28 -0800477 int *fd, grpc_closure *done) {
yang-gdc215932015-11-30 14:25:01 -0800478 grpc_tcp *tcp = (grpc_tcp *)ep;
yang-g5d850372015-12-01 10:32:28 -0800479 GPR_ASSERT(ep->vtable == &vtable);
480 tcp->release_fd = fd;
yang-gdc215932015-11-30 14:25:01 -0800481 tcp->release_fd_cb = done;
482 TCP_UNREF(exec_ctx, tcp, "destroy");
483}
484
Craig Tiller190d3602015-02-18 09:23:38 -0800485#endif