blob: 56c1eef024c847a0cba3ce85967050f3b4d2ba90 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller0c0b60c2015-01-21 15:49:28 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
Makarand Dharmapurikar0579cfc2016-06-20 15:45:24 -070038#include "src/core/lib/iomgr/network_status_tracker.h"
Craig Tiller9533d042016-03-25 17:11:06 -070039#include "src/core/lib/iomgr/tcp_posix.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040
41#include <errno.h>
Mark D. Roth808ac382016-05-13 11:14:21 -070042#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <stdlib.h>
44#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045#include <sys/socket.h>
Craig Tillera8be91b2016-02-19 16:22:44 -080046#include <sys/types.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047#include <unistd.h>
48
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/slice.h>
Craig Tiller1b22b9d2015-07-20 13:42:22 -070052#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053#include <grpc/support/sync.h>
54#include <grpc/support/time.h>
55
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/debug/trace.h"
Craig Tiller8a034482016-03-28 16:09:04 -070057#include "src/core/lib/iomgr/ev_posix.h"
Craig Tiller9533d042016-03-25 17:11:06 -070058#include "src/core/lib/profiling/timers.h"
59#include "src/core/lib/support/string.h"
Craig Tiller1b22b9d2015-07-20 13:42:22 -070060
Craig Tiller2da02962015-05-06 16:14:25 -070061#ifdef GPR_HAVE_MSG_NOSIGNAL
62#define SENDMSG_FLAGS MSG_NOSIGNAL
63#else
64#define SENDMSG_FLAGS 0
65#endif
66
Craig Tillerebc7ef22015-09-10 22:19:25 -070067#ifdef GPR_MSG_IOVLEN_TYPE
68typedef GPR_MSG_IOVLEN_TYPE msg_iovlen_type;
69#else
70typedef size_t msg_iovlen_type;
71#endif
72
Craig Tillerfaa84802015-03-01 21:56:38 -080073int grpc_tcp_trace = 0;
74
Craig Tillera82950e2015-09-22 12:33:20 -070075typedef struct {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080076 grpc_endpoint base;
ctiller18b49ab2014-12-09 14:39:16 -080077 grpc_fd *em_fd;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080078 int fd;
Mark D. Roth808ac382016-05-13 11:14:21 -070079 bool finished_edge;
Craig Tillera82950e2015-09-22 12:33:20 -070080 msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080081 size_t slice_size;
82 gpr_refcount refcount;
83
Craig Tiller649deeb2015-09-24 23:19:40 -070084 /* garbage after the last read */
85 gpr_slice_buffer last_read_buffer;
86
Craig Tillerb0298592015-08-27 07:38:01 -070087 gpr_slice_buffer *incoming_buffer;
88 gpr_slice_buffer *outgoing_buffer;
89 /** slice within outgoing_buffer to write next */
90 size_t outgoing_slice_idx;
91 /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
92 size_t outgoing_byte_idx;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080093
Craig Tiller33825112015-09-18 07:44:19 -070094 grpc_closure *read_cb;
95 grpc_closure *write_cb;
yang-gdc215932015-11-30 14:25:01 -080096 grpc_closure *release_fd_cb;
yang-g5d850372015-12-01 10:32:28 -080097 int *release_fd;
Craig Tiller0fcd53c2015-02-18 15:10:53 -080098
Craig Tiller33825112015-09-18 07:44:19 -070099 grpc_closure read_closure;
100 grpc_closure write_closure;
David Garcia Quintas5f228f52015-05-26 19:58:50 -0700101
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700102 char *peer_string;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800103} grpc_tcp;
104
Craig Tillera82950e2015-09-22 12:33:20 -0700105static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700106 grpc_error *error);
Craig Tillera82950e2015-09-22 12:33:20 -0700107static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700108 grpc_error *error);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800109
Craig Tillera82950e2015-09-22 12:33:20 -0700110static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
111 grpc_tcp *tcp = (grpc_tcp *)ep;
112 grpc_fd_shutdown(exec_ctx, tcp->em_fd);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800113}
114
Craig Tillera82950e2015-09-22 12:33:20 -0700115static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
yang-g5d850372015-12-01 10:32:28 -0800116 grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
yang-gdc215932015-11-30 14:25:01 -0800117 "tcp_unref_orphan");
Craig Tiller649deeb2015-09-24 23:19:40 -0700118 gpr_slice_buffer_destroy(&tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700119 gpr_free(tcp->peer_string);
120 gpr_free(tcp);
Craig Tillerb0298592015-08-27 07:38:01 -0700121}
122
123/*#define GRPC_TCP_REFCOUNT_DEBUG*/
124#ifdef GRPC_TCP_REFCOUNT_DEBUG
Craig Tiller8af4c332015-09-22 12:32:31 -0700125#define TCP_UNREF(cl, tcp, reason) \
126 tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__)
Craig Tillerb0298592015-08-27 07:38:01 -0700127#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
Craig Tillera82950e2015-09-22 12:33:20 -0700128static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
129 const char *reason, const char *file, int line) {
130 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
131 reason, tcp->refcount.count, tcp->refcount.count - 1);
132 if (gpr_unref(&tcp->refcount)) {
133 tcp_free(exec_ctx, tcp);
134 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800135}
136
Craig Tillera82950e2015-09-22 12:33:20 -0700137static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
138 int line) {
139 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
140 reason, tcp->refcount.count, tcp->refcount.count + 1);
141 gpr_ref(&tcp->refcount);
Craig Tillerb0298592015-08-27 07:38:01 -0700142}
143#else
Craig Tiller8af4c332015-09-22 12:32:31 -0700144#define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp))
Craig Tillerb0298592015-08-27 07:38:01 -0700145#define TCP_REF(tcp, reason) tcp_ref((tcp))
Craig Tillera82950e2015-09-22 12:33:20 -0700146static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
147 if (gpr_unref(&tcp->refcount)) {
148 tcp_free(exec_ctx, tcp);
149 }
Craig Tiller592e7f22015-08-21 10:45:48 -0700150}
151
Craig Tillera82950e2015-09-22 12:33:20 -0700152static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
Craig Tillerb0298592015-08-27 07:38:01 -0700153#endif
154
Craig Tillera82950e2015-09-22 12:33:20 -0700155static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
Makarand Dharmapurikar61494432016-06-22 13:34:59 -0700156 grpc_network_status_unregister_endpoint(ep);
Craig Tillera82950e2015-09-22 12:33:20 -0700157 grpc_tcp *tcp = (grpc_tcp *)ep;
158 TCP_UNREF(exec_ctx, tcp, "destroy");
Craig Tillerb0298592015-08-27 07:38:01 -0700159}
160
Craig Tillerc027e772016-05-03 16:27:00 -0700161static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
162 grpc_error *error) {
Craig Tiller33825112015-09-18 07:44:19 -0700163 grpc_closure *cb = tcp->read_cb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800164
Craig Tiller449c64b2016-06-13 16:26:50 -0700165 if (false && grpc_tcp_trace) {
Craig Tillera82950e2015-09-22 12:33:20 -0700166 size_t i;
Craig Tillerc027e772016-05-03 16:27:00 -0700167 const char *str = grpc_error_string(error);
168 gpr_log(GPR_DEBUG, "read: error=%s", str);
169 grpc_error_free_string(str);
Craig Tillera82950e2015-09-22 12:33:20 -0700170 for (i = 0; i < tcp->incoming_buffer->count; i++) {
171 char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
172 GPR_DUMP_HEX | GPR_DUMP_ASCII);
Mark D. Rothbc846722016-05-04 10:53:50 -0700173 gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
Craig Tillera82950e2015-09-22 12:33:20 -0700174 gpr_free(dump);
Craig Tiller6e7c6222015-02-20 15:31:21 -0800175 }
Craig Tillera82950e2015-09-22 12:33:20 -0700176 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800177
178 tcp->read_cb = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700179 tcp->incoming_buffer = NULL;
Craig Tiller332f1b32016-05-24 13:21:21 -0700180 grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800181}
182
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800183#define MAX_READ_IOVEC 4
Craig Tillera82950e2015-09-22 12:33:20 -0700184static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800185 struct msghdr msg;
186 struct iovec iov[MAX_READ_IOVEC];
187 ssize_t read_bytes;
Craig Tillerb0298592015-08-27 07:38:01 -0700188 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800189
Craig Tillera82950e2015-09-22 12:33:20 -0700190 GPR_ASSERT(!tcp->finished_edge);
191 GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
192 GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700193 GPR_TIMER_BEGIN("tcp_continue_read", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800194
Craig Tillera82950e2015-09-22 12:33:20 -0700195 while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
196 gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
197 gpr_slice_malloc(tcp->slice_size));
198 }
199 for (i = 0; i < tcp->incoming_buffer->count; i++) {
200 iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
201 iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
202 }
Craig Tiller5c07cce2015-04-28 09:21:58 -0700203
204 msg.msg_name = NULL;
205 msg.msg_namelen = 0;
206 msg.msg_iov = iov;
207 msg.msg_iovlen = tcp->iov_size;
208 msg.msg_control = NULL;
209 msg.msg_controllen = 0;
210 msg.msg_flags = 0;
211
Craig Tiller0ba432d2015-10-09 16:57:11 -0700212 GPR_TIMER_BEGIN("recvmsg", 1);
Craig Tillera82950e2015-09-22 12:33:20 -0700213 do {
214 read_bytes = recvmsg(tcp->fd, &msg, 0);
215 } while (read_bytes < 0 && errno == EINTR);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700216 GPR_TIMER_END("recvmsg", 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700217
Craig Tillera82950e2015-09-22 12:33:20 -0700218 if (read_bytes < 0) {
219 /* NB: After calling call_read_cb a parallel call of the read handler may
220 * be running. */
221 if (errno == EAGAIN) {
222 if (tcp->iov_size > 1) {
223 tcp->iov_size /= 2;
224 }
225 /* We've consumed the edge, request a new one */
226 grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
227 } else {
Craig Tillera82950e2015-09-22 12:33:20 -0700228 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
Craig Tiller6a64bfd2016-05-09 13:05:03 -0700229 call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg"));
Craig Tillera82950e2015-09-22 12:33:20 -0700230 TCP_UNREF(exec_ctx, tcp, "read");
Craig Tiller45724b32015-09-22 10:42:19 -0700231 }
Craig Tillera82950e2015-09-22 12:33:20 -0700232 } else if (read_bytes == 0) {
233 /* 0 read size ==> end of stream */
234 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
Craig Tiller6a64bfd2016-05-09 13:05:03 -0700235 call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("EOF"));
Craig Tillera82950e2015-09-22 12:33:20 -0700236 TCP_UNREF(exec_ctx, tcp, "read");
237 } else {
238 GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
239 if ((size_t)read_bytes < tcp->incoming_buffer->length) {
240 gpr_slice_buffer_trim_end(
241 tcp->incoming_buffer,
Craig Tiller649deeb2015-09-24 23:19:40 -0700242 tcp->incoming_buffer->length - (size_t)read_bytes,
243 &tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700244 } else if (tcp->iov_size < MAX_READ_IOVEC) {
245 ++tcp->iov_size;
Craig Tiller45724b32015-09-22 10:42:19 -0700246 }
Craig Tillera82950e2015-09-22 12:33:20 -0700247 GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
Craig Tillerc027e772016-05-03 16:27:00 -0700248 call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700249 TCP_UNREF(exec_ctx, tcp, "read");
250 }
Craig Tiller45724b32015-09-22 10:42:19 -0700251
Craig Tiller0ba432d2015-10-09 16:57:11 -0700252 GPR_TIMER_END("tcp_continue_read", 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700253}
254
Craig Tillera82950e2015-09-22 12:33:20 -0700255static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700256 grpc_error *error) {
Craig Tillera82950e2015-09-22 12:33:20 -0700257 grpc_tcp *tcp = (grpc_tcp *)arg;
258 GPR_ASSERT(!tcp->finished_edge);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700259
Craig Tillerc027e772016-05-03 16:27:00 -0700260 if (error != GRPC_ERROR_NONE) {
Craig Tillera82950e2015-09-22 12:33:20 -0700261 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
Craig Tiller6a64bfd2016-05-09 13:05:03 -0700262 call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
Craig Tillera82950e2015-09-22 12:33:20 -0700263 TCP_UNREF(exec_ctx, tcp, "read");
264 } else {
265 tcp_continue_read(exec_ctx, tcp);
266 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800267}
268
Craig Tillera82950e2015-09-22 12:33:20 -0700269static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
270 gpr_slice_buffer *incoming_buffer, grpc_closure *cb) {
271 grpc_tcp *tcp = (grpc_tcp *)ep;
272 GPR_ASSERT(tcp->read_cb == NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800273 tcp->read_cb = cb;
Craig Tillerb0298592015-08-27 07:38:01 -0700274 tcp->incoming_buffer = incoming_buffer;
Craig Tillera82950e2015-09-22 12:33:20 -0700275 gpr_slice_buffer_reset_and_unref(incoming_buffer);
Craig Tiller649deeb2015-09-24 23:19:40 -0700276 gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700277 TCP_REF(tcp, "read");
278 if (tcp->finished_edge) {
Mark D. Roth808ac382016-05-13 11:14:21 -0700279 tcp->finished_edge = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700280 grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
281 } else {
Craig Tiller332f1b32016-05-24 13:21:21 -0700282 grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL);
Craig Tillera82950e2015-09-22 12:33:20 -0700283 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800284}
285
Craig Tiller27f59af2016-04-28 14:19:48 -0700286/* returns true if done, false if pending; if returning true, *error is set */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800287#define MAX_WRITE_IOVEC 16
Craig Tiller27f59af2016-04-28 14:19:48 -0700288static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800289 struct msghdr msg;
290 struct iovec iov[MAX_WRITE_IOVEC];
Craig Tillerebc7ef22015-09-10 22:19:25 -0700291 msg_iovlen_type iov_size;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800292 ssize_t sent_length;
Craig Tiller32ca48c2015-09-10 11:47:15 -0700293 size_t sending_length;
294 size_t trailing;
295 size_t unwind_slice_idx;
296 size_t unwind_byte_idx;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800297
Craig Tillera82950e2015-09-22 12:33:20 -0700298 for (;;) {
299 sending_length = 0;
300 unwind_slice_idx = tcp->outgoing_slice_idx;
301 unwind_byte_idx = tcp->outgoing_byte_idx;
302 for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
Craig Tillerf40df232016-03-25 13:38:14 -0700303 iov_size != MAX_WRITE_IOVEC;
Craig Tillera82950e2015-09-22 12:33:20 -0700304 iov_size++) {
305 iov[iov_size].iov_base =
306 GPR_SLICE_START_PTR(
307 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
308 tcp->outgoing_byte_idx;
309 iov[iov_size].iov_len =
310 GPR_SLICE_LENGTH(
311 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
312 tcp->outgoing_byte_idx;
313 sending_length += iov[iov_size].iov_len;
314 tcp->outgoing_slice_idx++;
315 tcp->outgoing_byte_idx = 0;
316 }
317 GPR_ASSERT(iov_size > 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800318
Craig Tillera82950e2015-09-22 12:33:20 -0700319 msg.msg_name = NULL;
320 msg.msg_namelen = 0;
321 msg.msg_iov = iov;
322 msg.msg_iovlen = iov_size;
323 msg.msg_control = NULL;
324 msg.msg_controllen = 0;
325 msg.msg_flags = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800326
Craig Tiller0ba432d2015-10-09 16:57:11 -0700327 GPR_TIMER_BEGIN("sendmsg", 1);
Craig Tillera82950e2015-09-22 12:33:20 -0700328 do {
329 /* TODO(klempner): Cork if this is a partial write */
330 sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
331 } while (sent_length < 0 && errno == EINTR);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700332 GPR_TIMER_END("sendmsg", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800333
Craig Tillera82950e2015-09-22 12:33:20 -0700334 if (sent_length < 0) {
335 if (errno == EAGAIN) {
336 tcp->outgoing_slice_idx = unwind_slice_idx;
337 tcp->outgoing_byte_idx = unwind_byte_idx;
Craig Tiller27f59af2016-04-28 14:19:48 -0700338 return false;
Craig Tillera82950e2015-09-22 12:33:20 -0700339 } else {
Craig Tillerc027e772016-05-03 16:27:00 -0700340 *error = GRPC_OS_ERROR(errno, "sendmsg");
Craig Tiller27f59af2016-04-28 14:19:48 -0700341 return true;
Craig Tillera82950e2015-09-22 12:33:20 -0700342 }
343 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800344
Craig Tillera82950e2015-09-22 12:33:20 -0700345 GPR_ASSERT(tcp->outgoing_byte_idx == 0);
346 trailing = sending_length - (size_t)sent_length;
347 while (trailing > 0) {
348 size_t slice_length;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800349
Craig Tillera82950e2015-09-22 12:33:20 -0700350 tcp->outgoing_slice_idx--;
351 slice_length = GPR_SLICE_LENGTH(
352 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
353 if (slice_length > trailing) {
354 tcp->outgoing_byte_idx = slice_length - trailing;
355 break;
356 } else {
357 trailing -= slice_length;
358 }
359 }
Craig Tillerb0298592015-08-27 07:38:01 -0700360
Craig Tillera82950e2015-09-22 12:33:20 -0700361 if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
Craig Tiller27f59af2016-04-28 14:19:48 -0700362 *error = GRPC_ERROR_NONE;
363 return true;
Craig Tillera82950e2015-09-22 12:33:20 -0700364 }
365 };
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800366}
367
Craig Tillera82950e2015-09-22 12:33:20 -0700368static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700369 grpc_error *error) {
Craig Tillera82950e2015-09-22 12:33:20 -0700370 grpc_tcp *tcp = (grpc_tcp *)arg;
Craig Tiller33825112015-09-18 07:44:19 -0700371 grpc_closure *cb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800372
Craig Tillerc027e772016-05-03 16:27:00 -0700373 if (error != GRPC_ERROR_NONE) {
Craig Tillera82950e2015-09-22 12:33:20 -0700374 cb = tcp->write_cb;
375 tcp->write_cb = NULL;
Mark D. Rothb06c6402016-05-13 12:39:30 -0700376 cb->cb(exec_ctx, cb->cb_arg, error);
Craig Tillera82950e2015-09-22 12:33:20 -0700377 TCP_UNREF(exec_ctx, tcp, "write");
378 return;
379 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800380
Craig Tiller27f59af2016-04-28 14:19:48 -0700381 if (!tcp_flush(tcp, &error)) {
Craig Tillera82950e2015-09-22 12:33:20 -0700382 grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
383 } else {
384 cb = tcp->write_cb;
385 tcp->write_cb = NULL;
Craig Tiller0ba432d2015-10-09 16:57:11 -0700386 GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
Mark D. Rothb06c6402016-05-13 12:39:30 -0700387 cb->cb(exec_ctx, cb->cb_arg, error);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700388 GPR_TIMER_END("tcp_handle_write.cb", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700389 TCP_UNREF(exec_ctx, tcp, "write");
Craig Tillere0d6c572016-05-13 07:23:36 -0700390 GRPC_ERROR_UNREF(error);
Craig Tillera82950e2015-09-22 12:33:20 -0700391 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800392}
393
Craig Tillera82950e2015-09-22 12:33:20 -0700394static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
395 gpr_slice_buffer *buf, grpc_closure *cb) {
396 grpc_tcp *tcp = (grpc_tcp *)ep;
Craig Tiller27f59af2016-04-28 14:19:48 -0700397 grpc_error *error = GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800398
Craig Tiller449c64b2016-06-13 16:26:50 -0700399 if (false && grpc_tcp_trace) {
Craig Tillera82950e2015-09-22 12:33:20 -0700400 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800401
Craig Tillera82950e2015-09-22 12:33:20 -0700402 for (i = 0; i < buf->count; i++) {
403 char *data =
404 gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
Mark D. Rothbc846722016-05-04 10:53:50 -0700405 gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
Craig Tillera82950e2015-09-22 12:33:20 -0700406 gpr_free(data);
Craig Tiller6e7c6222015-02-20 15:31:21 -0800407 }
Craig Tillera82950e2015-09-22 12:33:20 -0700408 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800409
Craig Tiller0ba432d2015-10-09 16:57:11 -0700410 GPR_TIMER_BEGIN("tcp_write", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700411 GPR_ASSERT(tcp->write_cb == NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800412
Craig Tillera82950e2015-09-22 12:33:20 -0700413 if (buf->length == 0) {
Craig Tiller0ba432d2015-10-09 16:57:11 -0700414 GPR_TIMER_END("tcp_write", 0);
Craig Tiller13c09402016-06-15 09:41:33 -0700415 grpc_exec_ctx_sched(exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd)
416 ? GRPC_ERROR_CREATE("EOF")
417 : GRPC_ERROR_NONE,
418 NULL);
Craig Tillera82950e2015-09-22 12:33:20 -0700419 return;
420 }
Craig Tillerb0298592015-08-27 07:38:01 -0700421 tcp->outgoing_buffer = buf;
422 tcp->outgoing_slice_idx = 0;
423 tcp->outgoing_byte_idx = 0;
424
Craig Tiller27f59af2016-04-28 14:19:48 -0700425 if (!tcp_flush(tcp, &error)) {
Craig Tillera82950e2015-09-22 12:33:20 -0700426 TCP_REF(tcp, "write");
427 tcp->write_cb = cb;
428 grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
429 } else {
Craig Tiller332f1b32016-05-24 13:21:21 -0700430 grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
Craig Tillera82950e2015-09-22 12:33:20 -0700431 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800432
Craig Tiller0ba432d2015-10-09 16:57:11 -0700433 GPR_TIMER_END("tcp_write", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800434}
435
Craig Tillera82950e2015-09-22 12:33:20 -0700436static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
437 grpc_pollset *pollset) {
438 grpc_tcp *tcp = (grpc_tcp *)ep;
439 grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd);
ctillerd79b4862014-12-17 16:36:59 -0800440}
441
Craig Tillera82950e2015-09-22 12:33:20 -0700442static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
443 grpc_pollset_set *pollset_set) {
444 grpc_tcp *tcp = (grpc_tcp *)ep;
445 grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700446}
447
Craig Tillera82950e2015-09-22 12:33:20 -0700448static char *tcp_get_peer(grpc_endpoint *ep) {
449 grpc_tcp *tcp = (grpc_tcp *)ep;
450 return gpr_strdup(tcp->peer_string);
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700451}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800452
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700453static const grpc_endpoint_vtable vtable = {
Craig Tillerf40df232016-03-25 13:38:14 -0700454 tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
Craig Tillera82950e2015-09-22 12:33:20 -0700455 tcp_shutdown, tcp_destroy, tcp_get_peer};
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700456
Craig Tillera82950e2015-09-22 12:33:20 -0700457grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
458 const char *peer_string) {
459 grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800460 tcp->base.vtable = &vtable;
Craig Tillera82950e2015-09-22 12:33:20 -0700461 tcp->peer_string = gpr_strdup(peer_string);
Craig Tiller0a8a0172016-02-25 07:36:27 -0800462 tcp->fd = grpc_fd_wrapped_fd(em_fd);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800463 tcp->read_cb = NULL;
464 tcp->write_cb = NULL;
yang-gdc215932015-11-30 14:25:01 -0800465 tcp->release_fd_cb = NULL;
yang-g5d850372015-12-01 10:32:28 -0800466 tcp->release_fd = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700467 tcp->incoming_buffer = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800468 tcp->slice_size = slice_size;
Craig Tiller2b1fd662015-04-28 07:33:37 -0700469 tcp->iov_size = 1;
Mark D. Roth808ac382016-05-13 11:14:21 -0700470 tcp->finished_edge = true;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800471 /* paired with unref in grpc_tcp_destroy */
Craig Tillera82950e2015-09-22 12:33:20 -0700472 gpr_ref_init(&tcp->refcount, 1);
nnoble0c475f02014-12-05 15:37:39 -0800473 tcp->em_fd = em_fd;
Craig Tillerb0298592015-08-27 07:38:01 -0700474 tcp->read_closure.cb = tcp_handle_read;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800475 tcp->read_closure.cb_arg = tcp;
Craig Tillerb0298592015-08-27 07:38:01 -0700476 tcp->write_closure.cb = tcp_handle_write;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800477 tcp->write_closure.cb_arg = tcp;
Craig Tiller649deeb2015-09-24 23:19:40 -0700478 gpr_slice_buffer_init(&tcp->last_read_buffer);
Makarand Dharmapurikar0579cfc2016-06-20 15:45:24 -0700479 /* Tell network status tracker about new endpoint */
480 grpc_network_status_register_endpoint(&tcp->base);
David Garcia Quintas5f228f52015-05-26 19:58:50 -0700481
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800482 return &tcp->base;
483}
Craig Tiller0c0b60c2015-01-21 15:49:28 -0800484
Dan Born43a78032016-01-05 17:17:45 -0800485int grpc_tcp_fd(grpc_endpoint *ep) {
486 grpc_tcp *tcp = (grpc_tcp *)ep;
487 GPR_ASSERT(ep->vtable == &vtable);
488 return grpc_fd_wrapped_fd(tcp->em_fd);
489}
490
yang-gdc215932015-11-30 14:25:01 -0800491void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
yang-g5d850372015-12-01 10:32:28 -0800492 int *fd, grpc_closure *done) {
yang-gdc215932015-11-30 14:25:01 -0800493 grpc_tcp *tcp = (grpc_tcp *)ep;
yang-g5d850372015-12-01 10:32:28 -0800494 GPR_ASSERT(ep->vtable == &vtable);
495 tcp->release_fd = fd;
yang-gdc215932015-11-30 14:25:01 -0800496 tcp->release_fd_cb = done;
497 TCP_UNREF(exec_ctx, tcp, "destroy");
498}
499
Craig Tiller190d3602015-02-18 09:23:38 -0800500#endif