blob: 0db7cd9f0e16652c7b5a61b974b54e114eb2bb9e [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller0c0b60c2015-01-21 15:49:28 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
ctiller18b49ab2014-12-09 14:39:16 -080038#include "src/core/iomgr/tcp_posix.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039
40#include <errno.h>
41#include <stdlib.h>
42#include <string.h>
43#include <sys/types.h>
44#include <sys/socket.h>
45#include <unistd.h>
46
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047#include <grpc/support/alloc.h>
48#include <grpc/support/log.h>
49#include <grpc/support/slice.h>
Craig Tiller1b22b9d2015-07-20 13:42:22 -070050#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051#include <grpc/support/sync.h>
52#include <grpc/support/time.h>
53
Craig Tiller1b22b9d2015-07-20 13:42:22 -070054#include "src/core/support/string.h"
55#include "src/core/debug/trace.h"
56#include "src/core/profiling/timers.h"
57
Craig Tiller2da02962015-05-06 16:14:25 -070058#ifdef GPR_HAVE_MSG_NOSIGNAL
59#define SENDMSG_FLAGS MSG_NOSIGNAL
60#else
61#define SENDMSG_FLAGS 0
62#endif
63
Craig Tillerfaa84802015-03-01 21:56:38 -080064int grpc_tcp_trace = 0;
65
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080066typedef struct {
67 grpc_endpoint base;
ctiller18b49ab2014-12-09 14:39:16 -080068 grpc_fd *em_fd;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080069 int fd;
Craig Tiller8674cb12015-06-05 07:09:25 -070070 int iov_size; /* Number of slices to allocate per read attempt */
Craig Tiller5c07cce2015-04-28 09:21:58 -070071 int finished_edge;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080072 size_t slice_size;
73 gpr_refcount refcount;
74
Craig Tillerb0298592015-08-27 07:38:01 -070075 gpr_slice_buffer *incoming_buffer;
76 gpr_slice_buffer *outgoing_buffer;
77 /** slice within outgoing_buffer to write next */
78 size_t outgoing_slice_idx;
79 /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
80 size_t outgoing_byte_idx;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080081
Craig Tillerb0298592015-08-27 07:38:01 -070082 grpc_iomgr_closure *read_cb;
83 grpc_iomgr_closure *write_cb;
Craig Tiller0fcd53c2015-02-18 15:10:53 -080084
85 grpc_iomgr_closure read_closure;
86 grpc_iomgr_closure write_closure;
David Garcia Quintas5f228f52015-05-26 19:58:50 -070087
Craig Tiller1b22b9d2015-07-20 13:42:22 -070088 char *peer_string;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080089} grpc_tcp;
90
Craig Tillerb0298592015-08-27 07:38:01 -070091static void tcp_handle_read(void *arg /* grpc_tcp */, int success);
92static void tcp_handle_write(void *arg /* grpc_tcp */, int success);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080093
Craig Tillerb0298592015-08-27 07:38:01 -070094static void tcp_shutdown(grpc_endpoint *ep) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080095 grpc_tcp *tcp = (grpc_tcp *)ep;
ctiller18b49ab2014-12-09 14:39:16 -080096 grpc_fd_shutdown(tcp->em_fd);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080097}
98
Craig Tillerb0298592015-08-27 07:38:01 -070099static void tcp_free(grpc_tcp *tcp) {
100 grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
101 gpr_free(tcp->peer_string);
102 gpr_free(tcp);
103}
104
105/*#define GRPC_TCP_REFCOUNT_DEBUG*/
106#ifdef GRPC_TCP_REFCOUNT_DEBUG
107#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
108#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
109static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
110 int line) {
111 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
112 reason, tcp->refcount.count, tcp->refcount.count - 1);
113 if (gpr_unref(&tcp->refcount)) {
114 tcp_free(tcp);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800115 }
116}
117
Craig Tillerb0298592015-08-27 07:38:01 -0700118static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
119 int line) {
120 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
121 reason, tcp->refcount.count, tcp->refcount.count + 1);
122 gpr_ref(&tcp->refcount);
123}
124#else
125#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
126#define TCP_REF(tcp, reason) tcp_ref((tcp))
127static void tcp_unref(grpc_tcp *tcp) {
128 if (gpr_unref(&tcp->refcount)) {
129 tcp_free(tcp);
130 }
Craig Tiller592e7f22015-08-21 10:45:48 -0700131}
132
Craig Tillerb0298592015-08-27 07:38:01 -0700133static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
134#endif
135
136static void tcp_destroy(grpc_endpoint *ep) {
137 grpc_tcp *tcp = (grpc_tcp *)ep;
138 TCP_UNREF(tcp, "destroy");
139}
140
141static void call_read_cb(grpc_tcp *tcp, int success) {
142 grpc_iomgr_closure *cb = tcp->read_cb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800143
Craig Tillerfaa84802015-03-01 21:56:38 -0800144 if (grpc_tcp_trace) {
Craig Tiller6e7c6222015-02-20 15:31:21 -0800145 size_t i;
Craig Tillerb0298592015-08-27 07:38:01 -0700146 gpr_log(GPR_DEBUG, "read: success=%d", success);
147 for (i = 0; i < tcp->incoming_buffer->count; i++) {
148 char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
149 GPR_DUMP_HEX | GPR_DUMP_ASCII);
Craig Tiller994c2622015-07-23 14:00:58 -0700150 gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
Craig Tiller6e7c6222015-02-20 15:31:21 -0800151 gpr_free(dump);
152 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800153 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800154
155 tcp->read_cb = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700156 tcp->incoming_buffer = NULL;
157 cb->cb(cb->cb_arg, success);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800158}
159
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800160#define MAX_READ_IOVEC 4
Craig Tillerb0298592015-08-27 07:38:01 -0700161static void tcp_continue_read(grpc_tcp *tcp) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800162 struct msghdr msg;
163 struct iovec iov[MAX_READ_IOVEC];
164 ssize_t read_bytes;
Craig Tillerb0298592015-08-27 07:38:01 -0700165 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800166
Craig Tiller5c07cce2015-04-28 09:21:58 -0700167 GPR_ASSERT(!tcp->finished_edge);
Craig Tillerb0298592015-08-27 07:38:01 -0700168 GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
169 GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700170 GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800171
Craig Tillerb0298592015-08-27 07:38:01 -0700172 while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
173 gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
174 gpr_slice_malloc(tcp->slice_size));
175 }
176 for (i = 0; i < tcp->incoming_buffer->count; i++) {
177 iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
178 iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
179 }
Craig Tiller5c07cce2015-04-28 09:21:58 -0700180
181 msg.msg_name = NULL;
182 msg.msg_namelen = 0;
183 msg.msg_iov = iov;
184 msg.msg_iovlen = tcp->iov_size;
185 msg.msg_control = NULL;
186 msg.msg_controllen = 0;
187 msg.msg_flags = 0;
188
David Garcia Quintasf667f1b2015-05-04 13:15:46 -0700189 GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700190 do {
191 read_bytes = recvmsg(tcp->fd, &msg, 0);
192 } while (read_bytes < 0 && errno == EINTR);
David Garcia Quintasf667f1b2015-05-04 13:15:46 -0700193 GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700194
Craig Tiller5c07cce2015-04-28 09:21:58 -0700195 if (read_bytes < 0) {
Craig Tillerb0298592015-08-27 07:38:01 -0700196 /* NB: After calling call_read_cb a parallel call of the read handler may
Craig Tiller5c07cce2015-04-28 09:21:58 -0700197 * be running. */
198 if (errno == EAGAIN) {
199 if (tcp->iov_size > 1) {
200 tcp->iov_size /= 2;
201 }
Craig Tillerb0298592015-08-27 07:38:01 -0700202 /* We've consumed the edge, request a new one */
203 grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700204 } else {
205 /* TODO(klempner): Log interesting errors */
Craig Tillerb0298592015-08-27 07:38:01 -0700206 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
207 call_read_cb(tcp, 0);
208 TCP_UNREF(tcp, "read");
Craig Tiller5c07cce2015-04-28 09:21:58 -0700209 }
210 } else if (read_bytes == 0) {
211 /* 0 read size ==> end of stream */
Craig Tillerb0298592015-08-27 07:38:01 -0700212 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
213 call_read_cb(tcp, 0);
214 TCP_UNREF(tcp, "read");
Craig Tiller5c07cce2015-04-28 09:21:58 -0700215 } else {
Craig Tillerb0298592015-08-27 07:38:01 -0700216 GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
217 if ((size_t)read_bytes < tcp->incoming_buffer->length) {
218 gpr_slice_buffer_trim_end(tcp->incoming_buffer,
219 tcp->incoming_buffer->length - read_bytes);
220 } else if (tcp->iov_size < MAX_READ_IOVEC) {
Craig Tiller5c07cce2015-04-28 09:21:58 -0700221 ++tcp->iov_size;
222 }
Craig Tillerb0298592015-08-27 07:38:01 -0700223 GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
224 call_read_cb(tcp, 1);
225 TCP_UNREF(tcp, "read");
Craig Tiller5c07cce2015-04-28 09:21:58 -0700226 }
227
Craig Tillerd6c16552015-05-01 13:21:57 -0700228 GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700229}
230
Craig Tillerb0298592015-08-27 07:38:01 -0700231static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
Craig Tiller5c07cce2015-04-28 09:21:58 -0700232 grpc_tcp *tcp = (grpc_tcp *)arg;
233 GPR_ASSERT(!tcp->finished_edge);
234
ctiller58393c22015-01-07 14:03:30 -0800235 if (!success) {
Craig Tillerb0298592015-08-27 07:38:01 -0700236 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
237 call_read_cb(tcp, 0);
238 TCP_UNREF(tcp, "read");
Craig Tiller5c07cce2015-04-28 09:21:58 -0700239 } else {
Craig Tillerb0298592015-08-27 07:38:01 -0700240 tcp_continue_read(tcp);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800241 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800242}
243
Craig Tillerb0298592015-08-27 07:38:01 -0700244static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
245 gpr_slice_buffer *incoming_buffer,
246 grpc_iomgr_closure *cb) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800247 grpc_tcp *tcp = (grpc_tcp *)ep;
248 GPR_ASSERT(tcp->read_cb == NULL);
249 tcp->read_cb = cb;
Craig Tillerb0298592015-08-27 07:38:01 -0700250 tcp->incoming_buffer = incoming_buffer;
251 gpr_slice_buffer_reset_and_unref(incoming_buffer);
252 TCP_REF(tcp, "read");
Craig Tiller5c07cce2015-04-28 09:21:58 -0700253 if (tcp->finished_edge) {
254 tcp->finished_edge = 0;
255 grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
256 } else {
Craig Tillerb0298592015-08-27 07:38:01 -0700257 grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700258 }
Craig Tillerb0298592015-08-27 07:38:01 -0700259 /* TODO(ctiller): immediate return */
260 return GRPC_ENDPOINT_PENDING;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800261}
262
263#define MAX_WRITE_IOVEC 16
Craig Tillerb0298592015-08-27 07:38:01 -0700264static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800265 struct msghdr msg;
266 struct iovec iov[MAX_WRITE_IOVEC];
267 int iov_size;
268 ssize_t sent_length;
Craig Tillerb0298592015-08-27 07:38:01 -0700269 ssize_t sending_length;
270 ssize_t trailing;
271 ssize_t unwind_slice_idx;
272 ssize_t unwind_byte_idx;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800273
274 for (;;) {
Craig Tillerb0298592015-08-27 07:38:01 -0700275 sending_length = 0;
276 unwind_slice_idx = tcp->outgoing_slice_idx;
277 unwind_byte_idx = tcp->outgoing_byte_idx;
278 for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
279 iov_size != MAX_WRITE_IOVEC;
280 iov_size++) {
281 iov[iov_size].iov_base =
282 GPR_SLICE_START_PTR(
283 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
284 tcp->outgoing_byte_idx;
285 iov[iov_size].iov_len =
286 GPR_SLICE_LENGTH(
287 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
288 tcp->outgoing_byte_idx;
289 sending_length += iov[iov_size].iov_len;
290 tcp->outgoing_slice_idx++;
291 tcp->outgoing_byte_idx = 0;
292 }
293 GPR_ASSERT(iov_size > 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800294
295 msg.msg_name = NULL;
296 msg.msg_namelen = 0;
297 msg.msg_iov = iov;
298 msg.msg_iovlen = iov_size;
299 msg.msg_control = NULL;
300 msg.msg_controllen = 0;
301 msg.msg_flags = 0;
302
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700303 GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800304 do {
305 /* TODO(klempner): Cork if this is a partial write */
Craig Tiller2da02962015-05-06 16:14:25 -0700306 sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800307 } while (sent_length < 0 && errno == EINTR);
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700308 GRPC_TIMER_END(GRPC_PTAG_SENDMSG, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800309
310 if (sent_length < 0) {
311 if (errno == EAGAIN) {
Craig Tillerb0298592015-08-27 07:38:01 -0700312 tcp->outgoing_slice_idx = unwind_slice_idx;
313 tcp->outgoing_byte_idx = unwind_byte_idx;
314 return GRPC_ENDPOINT_PENDING;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800315 } else {
316 /* TODO(klempner): Log some of these */
Craig Tillerb0298592015-08-27 07:38:01 -0700317 return GRPC_ENDPOINT_ERROR;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800318 }
319 }
320
Craig Tillerb0298592015-08-27 07:38:01 -0700321 GPR_ASSERT(tcp->outgoing_byte_idx == 0);
322 trailing = sending_length - sent_length;
323 while (trailing > 0) {
324 ssize_t slice_length;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800325
Craig Tillerb0298592015-08-27 07:38:01 -0700326 tcp->outgoing_slice_idx--;
327 slice_length = GPR_SLICE_LENGTH(
328 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
329 if (slice_length > trailing) {
330 tcp->outgoing_byte_idx = slice_length - trailing;
331 break;
332 } else {
333 trailing -= slice_length;
334 }
335 }
336
337 if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
338 return GRPC_ENDPOINT_DONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800339 }
340 };
341}
342
Craig Tillerb0298592015-08-27 07:38:01 -0700343static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800344 grpc_tcp *tcp = (grpc_tcp *)arg;
Craig Tillerb0298592015-08-27 07:38:01 -0700345 grpc_endpoint_op_status status;
346 grpc_iomgr_closure *cb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800347
ctiller58393c22015-01-07 14:03:30 -0800348 if (!success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800349 cb = tcp->write_cb;
350 tcp->write_cb = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700351 cb->cb(cb->cb_arg, 0);
352 TCP_UNREF(tcp, "write");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800353 return;
354 }
355
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700356 GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
Craig Tillerb0298592015-08-27 07:38:01 -0700357 status = tcp_flush(tcp);
358 if (status == GRPC_ENDPOINT_PENDING) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800359 grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800360 } else {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800361 cb = tcp->write_cb;
362 tcp->write_cb = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700363 cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE);
364 TCP_UNREF(tcp, "write");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800365 }
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700366 GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800367}
368
Craig Tillerb0298592015-08-27 07:38:01 -0700369static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
370 gpr_slice_buffer *buf,
371 grpc_iomgr_closure *cb) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800372 grpc_tcp *tcp = (grpc_tcp *)ep;
Craig Tillerb0298592015-08-27 07:38:01 -0700373 grpc_endpoint_op_status status;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800374
Craig Tillerfaa84802015-03-01 21:56:38 -0800375 if (grpc_tcp_trace) {
Craig Tiller6e7c6222015-02-20 15:31:21 -0800376 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800377
Craig Tillerb0298592015-08-27 07:38:01 -0700378 for (i = 0; i < buf->count; i++) {
379 char *data =
380 gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
Craig Tiller6e7c6222015-02-20 15:31:21 -0800381 gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
382 gpr_free(data);
383 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800384 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800385
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700386 GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800387 GPR_ASSERT(tcp->write_cb == NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800388
Craig Tillerb0298592015-08-27 07:38:01 -0700389 if (buf->length == 0) {
390 GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
391 return GRPC_ENDPOINT_DONE;
392 }
393 tcp->outgoing_buffer = buf;
394 tcp->outgoing_slice_idx = 0;
395 tcp->outgoing_byte_idx = 0;
396
397 status = tcp_flush(tcp);
398 if (status == GRPC_ENDPOINT_PENDING) {
399 TCP_REF(tcp, "write");
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800400 tcp->write_cb = cb;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800401 grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800402 }
403
David Garcia Quintasbbc0b772015-04-29 14:10:05 -0700404 GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800405 return status;
406}
407
Craig Tillerb0298592015-08-27 07:38:01 -0700408static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
ctiller58393c22015-01-07 14:03:30 -0800409 grpc_tcp *tcp = (grpc_tcp *)ep;
410 grpc_pollset_add_fd(pollset, tcp->em_fd);
ctillerd79b4862014-12-17 16:36:59 -0800411}
412
Craig Tillerb0298592015-08-27 07:38:01 -0700413static void tcp_add_to_pollset_set(grpc_endpoint *ep,
414 grpc_pollset_set *pollset_set) {
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700415 grpc_tcp *tcp = (grpc_tcp *)ep;
416 grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
417}
418
Craig Tillerb0298592015-08-27 07:38:01 -0700419static char *tcp_get_peer(grpc_endpoint *ep) {
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700420 grpc_tcp *tcp = (grpc_tcp *)ep;
421 return gpr_strdup(tcp->peer_string);
422}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800423
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700424static const grpc_endpoint_vtable vtable = {
Craig Tillerb0298592015-08-27 07:38:01 -0700425 tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
426 tcp_shutdown, tcp_destroy, tcp_get_peer};
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700427
428grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
429 const char *peer_string) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800430 grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
431 tcp->base.vtable = &vtable;
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700432 tcp->peer_string = gpr_strdup(peer_string);
ctiller58393c22015-01-07 14:03:30 -0800433 tcp->fd = em_fd->fd;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800434 tcp->read_cb = NULL;
435 tcp->write_cb = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700436 tcp->incoming_buffer = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800437 tcp->slice_size = slice_size;
Craig Tiller2b1fd662015-04-28 07:33:37 -0700438 tcp->iov_size = 1;
Craig Tiller5c07cce2015-04-28 09:21:58 -0700439 tcp->finished_edge = 1;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800440 /* paired with unref in grpc_tcp_destroy */
441 gpr_ref_init(&tcp->refcount, 1);
nnoble0c475f02014-12-05 15:37:39 -0800442 tcp->em_fd = em_fd;
Craig Tillerb0298592015-08-27 07:38:01 -0700443 tcp->read_closure.cb = tcp_handle_read;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800444 tcp->read_closure.cb_arg = tcp;
Craig Tillerb0298592015-08-27 07:38:01 -0700445 tcp->write_closure.cb = tcp_handle_write;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800446 tcp->write_closure.cb_arg = tcp;
David Garcia Quintas5f228f52015-05-26 19:58:50 -0700447
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800448 return &tcp->base;
449}
Craig Tiller0c0b60c2015-01-21 15:49:28 -0800450
Craig Tiller190d3602015-02-18 09:23:38 -0800451#endif