blob: fd8fcb05c3c8317e5a27b32f37eab2945f7edded [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller0c0b60c2015-01-21 15:49:28 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
Makarand Dharmapurikar0579cfc2016-06-20 15:45:24 -070038#include "src/core/lib/iomgr/network_status_tracker.h"
Craig Tiller9533d042016-03-25 17:11:06 -070039#include "src/core/lib/iomgr/tcp_posix.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040
41#include <errno.h>
Mark D. Roth808ac382016-05-13 11:14:21 -070042#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <stdlib.h>
44#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045#include <sys/socket.h>
Craig Tillera8be91b2016-02-19 16:22:44 -080046#include <sys/types.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047#include <unistd.h>
48
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/slice.h>
Craig Tiller1b22b9d2015-07-20 13:42:22 -070052#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053#include <grpc/support/sync.h>
54#include <grpc/support/time.h>
55
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/debug/trace.h"
Craig Tiller8a034482016-03-28 16:09:04 -070057#include "src/core/lib/iomgr/ev_posix.h"
Craig Tiller9533d042016-03-25 17:11:06 -070058#include "src/core/lib/profiling/timers.h"
59#include "src/core/lib/support/string.h"
Craig Tiller1b22b9d2015-07-20 13:42:22 -070060
Craig Tiller2da02962015-05-06 16:14:25 -070061#ifdef GPR_HAVE_MSG_NOSIGNAL
62#define SENDMSG_FLAGS MSG_NOSIGNAL
63#else
64#define SENDMSG_FLAGS 0
65#endif
66
Craig Tillerebc7ef22015-09-10 22:19:25 -070067#ifdef GPR_MSG_IOVLEN_TYPE
68typedef GPR_MSG_IOVLEN_TYPE msg_iovlen_type;
69#else
70typedef size_t msg_iovlen_type;
71#endif
72
Craig Tillerfaa84802015-03-01 21:56:38 -080073int grpc_tcp_trace = 0;
74
Craig Tillera82950e2015-09-22 12:33:20 -070075typedef struct {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080076 grpc_endpoint base;
ctiller18b49ab2014-12-09 14:39:16 -080077 grpc_fd *em_fd;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080078 int fd;
Mark D. Roth808ac382016-05-13 11:14:21 -070079 bool finished_edge;
Craig Tillera82950e2015-09-22 12:33:20 -070080 msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080081 size_t slice_size;
82 gpr_refcount refcount;
83
Craig Tiller649deeb2015-09-24 23:19:40 -070084 /* garbage after the last read */
85 gpr_slice_buffer last_read_buffer;
86
Craig Tillerb0298592015-08-27 07:38:01 -070087 gpr_slice_buffer *incoming_buffer;
88 gpr_slice_buffer *outgoing_buffer;
89 /** slice within outgoing_buffer to write next */
90 size_t outgoing_slice_idx;
91 /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
92 size_t outgoing_byte_idx;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080093
Craig Tiller33825112015-09-18 07:44:19 -070094 grpc_closure *read_cb;
95 grpc_closure *write_cb;
yang-gdc215932015-11-30 14:25:01 -080096 grpc_closure *release_fd_cb;
yang-g5d850372015-12-01 10:32:28 -080097 int *release_fd;
Craig Tiller0fcd53c2015-02-18 15:10:53 -080098
Craig Tiller33825112015-09-18 07:44:19 -070099 grpc_closure read_closure;
100 grpc_closure write_closure;
David Garcia Quintas5f228f52015-05-26 19:58:50 -0700101
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700102 char *peer_string;
Craig Tillere34c2852016-09-23 09:43:32 -0700103
104 grpc_buffer_user buffer_user;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800105} grpc_tcp;
106
Craig Tillera82950e2015-09-22 12:33:20 -0700107static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700108 grpc_error *error);
Craig Tillera82950e2015-09-22 12:33:20 -0700109static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700110 grpc_error *error);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800111
Craig Tillera82950e2015-09-22 12:33:20 -0700112static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
113 grpc_tcp *tcp = (grpc_tcp *)ep;
114 grpc_fd_shutdown(exec_ctx, tcp->em_fd);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800115}
116
Craig Tillera82950e2015-09-22 12:33:20 -0700117static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
yang-g5d850372015-12-01 10:32:28 -0800118 grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
yang-gdc215932015-11-30 14:25:01 -0800119 "tcp_unref_orphan");
Craig Tiller649deeb2015-09-24 23:19:40 -0700120 gpr_slice_buffer_destroy(&tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700121 gpr_free(tcp->peer_string);
122 gpr_free(tcp);
Craig Tillerb0298592015-08-27 07:38:01 -0700123}
124
125/*#define GRPC_TCP_REFCOUNT_DEBUG*/
126#ifdef GRPC_TCP_REFCOUNT_DEBUG
Craig Tiller8af4c332015-09-22 12:32:31 -0700127#define TCP_UNREF(cl, tcp, reason) \
128 tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__)
Craig Tillerb0298592015-08-27 07:38:01 -0700129#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
Craig Tillera82950e2015-09-22 12:33:20 -0700130static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
131 const char *reason, const char *file, int line) {
132 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
133 reason, tcp->refcount.count, tcp->refcount.count - 1);
134 if (gpr_unref(&tcp->refcount)) {
135 tcp_free(exec_ctx, tcp);
136 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800137}
138
Craig Tillera82950e2015-09-22 12:33:20 -0700139static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
140 int line) {
141 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
142 reason, tcp->refcount.count, tcp->refcount.count + 1);
143 gpr_ref(&tcp->refcount);
Craig Tillerb0298592015-08-27 07:38:01 -0700144}
145#else
Craig Tiller8af4c332015-09-22 12:32:31 -0700146#define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp))
Craig Tillerb0298592015-08-27 07:38:01 -0700147#define TCP_REF(tcp, reason) tcp_ref((tcp))
Craig Tillera82950e2015-09-22 12:33:20 -0700148static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
149 if (gpr_unref(&tcp->refcount)) {
150 tcp_free(exec_ctx, tcp);
151 }
Craig Tiller592e7f22015-08-21 10:45:48 -0700152}
153
Craig Tillera82950e2015-09-22 12:33:20 -0700154static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
Craig Tillerb0298592015-08-27 07:38:01 -0700155#endif
156
Craig Tillera82950e2015-09-22 12:33:20 -0700157static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
Makarand Dharmapurikar61494432016-06-22 13:34:59 -0700158 grpc_network_status_unregister_endpoint(ep);
Craig Tillera82950e2015-09-22 12:33:20 -0700159 grpc_tcp *tcp = (grpc_tcp *)ep;
160 TCP_UNREF(exec_ctx, tcp, "destroy");
Craig Tillerb0298592015-08-27 07:38:01 -0700161}
162
Craig Tillerc027e772016-05-03 16:27:00 -0700163static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
164 grpc_error *error) {
Craig Tiller33825112015-09-18 07:44:19 -0700165 grpc_closure *cb = tcp->read_cb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800166
Craig Tiller44f62452016-06-23 08:22:30 -0700167 if (grpc_tcp_trace) {
Craig Tillera82950e2015-09-22 12:33:20 -0700168 size_t i;
Craig Tillerc027e772016-05-03 16:27:00 -0700169 const char *str = grpc_error_string(error);
170 gpr_log(GPR_DEBUG, "read: error=%s", str);
171 grpc_error_free_string(str);
Craig Tillera82950e2015-09-22 12:33:20 -0700172 for (i = 0; i < tcp->incoming_buffer->count; i++) {
173 char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
174 GPR_DUMP_HEX | GPR_DUMP_ASCII);
Mark D. Rothbc846722016-05-04 10:53:50 -0700175 gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
Craig Tillera82950e2015-09-22 12:33:20 -0700176 gpr_free(dump);
Craig Tiller6e7c6222015-02-20 15:31:21 -0800177 }
Craig Tillera82950e2015-09-22 12:33:20 -0700178 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800179
180 tcp->read_cb = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700181 tcp->incoming_buffer = NULL;
Craig Tillerde2c41c2016-09-01 15:08:08 -0700182 grpc_closure_run(exec_ctx, cb, error);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800183}
184
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800185#define MAX_READ_IOVEC 4
Craig Tillera82950e2015-09-22 12:33:20 -0700186static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800187 struct msghdr msg;
188 struct iovec iov[MAX_READ_IOVEC];
189 ssize_t read_bytes;
Craig Tillerb0298592015-08-27 07:38:01 -0700190 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800191
Craig Tillera82950e2015-09-22 12:33:20 -0700192 GPR_ASSERT(!tcp->finished_edge);
193 GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
194 GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700195 GPR_TIMER_BEGIN("tcp_continue_read", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800196
Craig Tillera82950e2015-09-22 12:33:20 -0700197 while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
198 gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
199 gpr_slice_malloc(tcp->slice_size));
200 }
201 for (i = 0; i < tcp->incoming_buffer->count; i++) {
202 iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
203 iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
204 }
Craig Tiller5c07cce2015-04-28 09:21:58 -0700205
206 msg.msg_name = NULL;
207 msg.msg_namelen = 0;
208 msg.msg_iov = iov;
209 msg.msg_iovlen = tcp->iov_size;
210 msg.msg_control = NULL;
211 msg.msg_controllen = 0;
212 msg.msg_flags = 0;
213
Craig Tillerca045622016-08-09 08:38:03 -0700214 GPR_TIMER_BEGIN("recvmsg", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700215 do {
216 read_bytes = recvmsg(tcp->fd, &msg, 0);
217 } while (read_bytes < 0 && errno == EINTR);
Craig Tillerca045622016-08-09 08:38:03 -0700218 GPR_TIMER_END("recvmsg", read_bytes >= 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700219
Craig Tillera82950e2015-09-22 12:33:20 -0700220 if (read_bytes < 0) {
221 /* NB: After calling call_read_cb a parallel call of the read handler may
222 * be running. */
223 if (errno == EAGAIN) {
224 if (tcp->iov_size > 1) {
225 tcp->iov_size /= 2;
226 }
227 /* We've consumed the edge, request a new one */
228 grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
229 } else {
Craig Tillera82950e2015-09-22 12:33:20 -0700230 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
Craig Tiller6a64bfd2016-05-09 13:05:03 -0700231 call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg"));
Craig Tillera82950e2015-09-22 12:33:20 -0700232 TCP_UNREF(exec_ctx, tcp, "read");
Craig Tiller45724b32015-09-22 10:42:19 -0700233 }
Craig Tillera82950e2015-09-22 12:33:20 -0700234 } else if (read_bytes == 0) {
235 /* 0 read size ==> end of stream */
236 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
Craig Tiller6a64bfd2016-05-09 13:05:03 -0700237 call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("EOF"));
Craig Tillera82950e2015-09-22 12:33:20 -0700238 TCP_UNREF(exec_ctx, tcp, "read");
239 } else {
240 GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
241 if ((size_t)read_bytes < tcp->incoming_buffer->length) {
242 gpr_slice_buffer_trim_end(
243 tcp->incoming_buffer,
Craig Tiller649deeb2015-09-24 23:19:40 -0700244 tcp->incoming_buffer->length - (size_t)read_bytes,
245 &tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700246 } else if (tcp->iov_size < MAX_READ_IOVEC) {
247 ++tcp->iov_size;
Craig Tiller45724b32015-09-22 10:42:19 -0700248 }
Craig Tillera82950e2015-09-22 12:33:20 -0700249 GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
Craig Tillerc027e772016-05-03 16:27:00 -0700250 call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700251 TCP_UNREF(exec_ctx, tcp, "read");
252 }
Craig Tiller45724b32015-09-22 10:42:19 -0700253
Craig Tiller0ba432d2015-10-09 16:57:11 -0700254 GPR_TIMER_END("tcp_continue_read", 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700255}
256
Craig Tillera82950e2015-09-22 12:33:20 -0700257static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700258 grpc_error *error) {
Craig Tillera82950e2015-09-22 12:33:20 -0700259 grpc_tcp *tcp = (grpc_tcp *)arg;
260 GPR_ASSERT(!tcp->finished_edge);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700261
Craig Tillerc027e772016-05-03 16:27:00 -0700262 if (error != GRPC_ERROR_NONE) {
Craig Tillera82950e2015-09-22 12:33:20 -0700263 gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
Craig Tiller6a64bfd2016-05-09 13:05:03 -0700264 call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
Craig Tillera82950e2015-09-22 12:33:20 -0700265 TCP_UNREF(exec_ctx, tcp, "read");
266 } else {
267 tcp_continue_read(exec_ctx, tcp);
268 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800269}
270
Craig Tillera82950e2015-09-22 12:33:20 -0700271static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
272 gpr_slice_buffer *incoming_buffer, grpc_closure *cb) {
273 grpc_tcp *tcp = (grpc_tcp *)ep;
274 GPR_ASSERT(tcp->read_cb == NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800275 tcp->read_cb = cb;
Craig Tillerb0298592015-08-27 07:38:01 -0700276 tcp->incoming_buffer = incoming_buffer;
Craig Tillera82950e2015-09-22 12:33:20 -0700277 gpr_slice_buffer_reset_and_unref(incoming_buffer);
Craig Tiller649deeb2015-09-24 23:19:40 -0700278 gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700279 TCP_REF(tcp, "read");
280 if (tcp->finished_edge) {
Mark D. Roth808ac382016-05-13 11:14:21 -0700281 tcp->finished_edge = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700282 grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
283 } else {
Craig Tiller86037cd02016-09-02 19:58:43 -0700284 grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL);
Craig Tillera82950e2015-09-22 12:33:20 -0700285 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800286}
287
Craig Tiller27f59af2016-04-28 14:19:48 -0700288/* returns true if done, false if pending; if returning true, *error is set */
Vijay Pai8f76df42016-07-29 08:31:20 -0700289#define MAX_WRITE_IOVEC 1000
Craig Tiller27f59af2016-04-28 14:19:48 -0700290static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800291 struct msghdr msg;
292 struct iovec iov[MAX_WRITE_IOVEC];
Craig Tillerebc7ef22015-09-10 22:19:25 -0700293 msg_iovlen_type iov_size;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800294 ssize_t sent_length;
Craig Tiller32ca48c2015-09-10 11:47:15 -0700295 size_t sending_length;
296 size_t trailing;
297 size_t unwind_slice_idx;
298 size_t unwind_byte_idx;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800299
Craig Tillera82950e2015-09-22 12:33:20 -0700300 for (;;) {
301 sending_length = 0;
302 unwind_slice_idx = tcp->outgoing_slice_idx;
303 unwind_byte_idx = tcp->outgoing_byte_idx;
304 for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
Craig Tillerf40df232016-03-25 13:38:14 -0700305 iov_size != MAX_WRITE_IOVEC;
Craig Tillera82950e2015-09-22 12:33:20 -0700306 iov_size++) {
307 iov[iov_size].iov_base =
308 GPR_SLICE_START_PTR(
309 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
310 tcp->outgoing_byte_idx;
311 iov[iov_size].iov_len =
312 GPR_SLICE_LENGTH(
313 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
314 tcp->outgoing_byte_idx;
315 sending_length += iov[iov_size].iov_len;
316 tcp->outgoing_slice_idx++;
317 tcp->outgoing_byte_idx = 0;
318 }
319 GPR_ASSERT(iov_size > 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800320
Craig Tillera82950e2015-09-22 12:33:20 -0700321 msg.msg_name = NULL;
322 msg.msg_namelen = 0;
323 msg.msg_iov = iov;
324 msg.msg_iovlen = iov_size;
325 msg.msg_control = NULL;
326 msg.msg_controllen = 0;
327 msg.msg_flags = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800328
Craig Tiller0ba432d2015-10-09 16:57:11 -0700329 GPR_TIMER_BEGIN("sendmsg", 1);
Craig Tillera82950e2015-09-22 12:33:20 -0700330 do {
331 /* TODO(klempner): Cork if this is a partial write */
332 sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
333 } while (sent_length < 0 && errno == EINTR);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700334 GPR_TIMER_END("sendmsg", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800335
Craig Tillera82950e2015-09-22 12:33:20 -0700336 if (sent_length < 0) {
337 if (errno == EAGAIN) {
338 tcp->outgoing_slice_idx = unwind_slice_idx;
339 tcp->outgoing_byte_idx = unwind_byte_idx;
Craig Tiller27f59af2016-04-28 14:19:48 -0700340 return false;
Craig Tillera82950e2015-09-22 12:33:20 -0700341 } else {
Craig Tillerc027e772016-05-03 16:27:00 -0700342 *error = GRPC_OS_ERROR(errno, "sendmsg");
Craig Tiller27f59af2016-04-28 14:19:48 -0700343 return true;
Craig Tillera82950e2015-09-22 12:33:20 -0700344 }
345 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800346
Craig Tillera82950e2015-09-22 12:33:20 -0700347 GPR_ASSERT(tcp->outgoing_byte_idx == 0);
348 trailing = sending_length - (size_t)sent_length;
349 while (trailing > 0) {
350 size_t slice_length;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800351
Craig Tillera82950e2015-09-22 12:33:20 -0700352 tcp->outgoing_slice_idx--;
353 slice_length = GPR_SLICE_LENGTH(
354 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
355 if (slice_length > trailing) {
356 tcp->outgoing_byte_idx = slice_length - trailing;
357 break;
358 } else {
359 trailing -= slice_length;
360 }
361 }
Craig Tillerb0298592015-08-27 07:38:01 -0700362
Craig Tillera82950e2015-09-22 12:33:20 -0700363 if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
Craig Tiller27f59af2016-04-28 14:19:48 -0700364 *error = GRPC_ERROR_NONE;
365 return true;
Craig Tillera82950e2015-09-22 12:33:20 -0700366 }
367 };
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800368}
369
Craig Tillera82950e2015-09-22 12:33:20 -0700370static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700371 grpc_error *error) {
Craig Tillera82950e2015-09-22 12:33:20 -0700372 grpc_tcp *tcp = (grpc_tcp *)arg;
Craig Tiller33825112015-09-18 07:44:19 -0700373 grpc_closure *cb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800374
Craig Tillerc027e772016-05-03 16:27:00 -0700375 if (error != GRPC_ERROR_NONE) {
Craig Tillera82950e2015-09-22 12:33:20 -0700376 cb = tcp->write_cb;
377 tcp->write_cb = NULL;
Mark D. Rothb06c6402016-05-13 12:39:30 -0700378 cb->cb(exec_ctx, cb->cb_arg, error);
Craig Tillera82950e2015-09-22 12:33:20 -0700379 TCP_UNREF(exec_ctx, tcp, "write");
380 return;
381 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800382
Craig Tiller27f59af2016-04-28 14:19:48 -0700383 if (!tcp_flush(tcp, &error)) {
Craig Tillerc3df7b42016-07-18 15:51:26 -0700384 if (grpc_tcp_trace) {
385 gpr_log(GPR_DEBUG, "write: delayed");
386 }
Craig Tillera82950e2015-09-22 12:33:20 -0700387 grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
388 } else {
389 cb = tcp->write_cb;
390 tcp->write_cb = NULL;
Craig Tillerc3df7b42016-07-18 15:51:26 -0700391 if (grpc_tcp_trace) {
392 const char *str = grpc_error_string(error);
393 gpr_log(GPR_DEBUG, "write: %s", str);
394 grpc_error_free_string(str);
395 }
Craig Tillere7603b82016-07-18 15:43:42 -0700396
Craig Tillerde2c41c2016-09-01 15:08:08 -0700397 grpc_closure_run(exec_ctx, cb, error);
Craig Tillera82950e2015-09-22 12:33:20 -0700398 TCP_UNREF(exec_ctx, tcp, "write");
399 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800400}
401
Craig Tillera82950e2015-09-22 12:33:20 -0700402static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
403 gpr_slice_buffer *buf, grpc_closure *cb) {
404 grpc_tcp *tcp = (grpc_tcp *)ep;
Craig Tiller27f59af2016-04-28 14:19:48 -0700405 grpc_error *error = GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800406
Craig Tiller44f62452016-06-23 08:22:30 -0700407 if (grpc_tcp_trace) {
Craig Tillera82950e2015-09-22 12:33:20 -0700408 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800409
Craig Tillera82950e2015-09-22 12:33:20 -0700410 for (i = 0; i < buf->count; i++) {
411 char *data =
412 gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
Mark D. Rothbc846722016-05-04 10:53:50 -0700413 gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
Craig Tillera82950e2015-09-22 12:33:20 -0700414 gpr_free(data);
Craig Tiller6e7c6222015-02-20 15:31:21 -0800415 }
Craig Tillera82950e2015-09-22 12:33:20 -0700416 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800417
Craig Tiller0ba432d2015-10-09 16:57:11 -0700418 GPR_TIMER_BEGIN("tcp_write", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700419 GPR_ASSERT(tcp->write_cb == NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800420
Craig Tillera82950e2015-09-22 12:33:20 -0700421 if (buf->length == 0) {
Craig Tiller0ba432d2015-10-09 16:57:11 -0700422 GPR_TIMER_END("tcp_write", 0);
Craig Tiller13c09402016-06-15 09:41:33 -0700423 grpc_exec_ctx_sched(exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd)
424 ? GRPC_ERROR_CREATE("EOF")
425 : GRPC_ERROR_NONE,
426 NULL);
Craig Tillera82950e2015-09-22 12:33:20 -0700427 return;
428 }
Craig Tillerb0298592015-08-27 07:38:01 -0700429 tcp->outgoing_buffer = buf;
430 tcp->outgoing_slice_idx = 0;
431 tcp->outgoing_byte_idx = 0;
432
Craig Tiller27f59af2016-04-28 14:19:48 -0700433 if (!tcp_flush(tcp, &error)) {
Craig Tillera82950e2015-09-22 12:33:20 -0700434 TCP_REF(tcp, "write");
435 tcp->write_cb = cb;
Craig Tillerc3df7b42016-07-18 15:51:26 -0700436 if (grpc_tcp_trace) {
437 gpr_log(GPR_DEBUG, "write: delayed");
438 }
Craig Tillera82950e2015-09-22 12:33:20 -0700439 grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
440 } else {
Craig Tillerc3df7b42016-07-18 15:51:26 -0700441 if (grpc_tcp_trace) {
442 const char *str = grpc_error_string(error);
443 gpr_log(GPR_DEBUG, "write: %s", str);
444 grpc_error_free_string(str);
445 }
Craig Tiller332f1b32016-05-24 13:21:21 -0700446 grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
Craig Tillera82950e2015-09-22 12:33:20 -0700447 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800448
Craig Tiller0ba432d2015-10-09 16:57:11 -0700449 GPR_TIMER_END("tcp_write", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800450}
451
Craig Tillera82950e2015-09-22 12:33:20 -0700452static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
453 grpc_pollset *pollset) {
454 grpc_tcp *tcp = (grpc_tcp *)ep;
455 grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd);
ctillerd79b4862014-12-17 16:36:59 -0800456}
457
Craig Tillera82950e2015-09-22 12:33:20 -0700458static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
459 grpc_pollset_set *pollset_set) {
460 grpc_tcp *tcp = (grpc_tcp *)ep;
461 grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700462}
463
Craig Tillera82950e2015-09-22 12:33:20 -0700464static char *tcp_get_peer(grpc_endpoint *ep) {
465 grpc_tcp *tcp = (grpc_tcp *)ep;
466 return gpr_strdup(tcp->peer_string);
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700467}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800468
Craig Tiller70bd4832016-06-30 14:20:46 -0700469static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
470 grpc_tcp *tcp = (grpc_tcp *)ep;
471 return grpc_fd_get_workqueue(tcp->em_fd);
472}
473
Craig Tillere34c2852016-09-23 09:43:32 -0700474static grpc_buffer_user *tcp_get_buffer_user(grpc_endpoint *ep) {
475 grpc_tcp *tcp = (grpc_tcp *)ep;
476 return &tcp->buffer_user;
477}
478
Craig Tiller70bd4832016-06-30 14:20:46 -0700479static const grpc_endpoint_vtable vtable = {tcp_read,
480 tcp_write,
481 tcp_get_workqueue,
482 tcp_add_to_pollset,
483 tcp_add_to_pollset_set,
484 tcp_shutdown,
485 tcp_destroy,
Craig Tillere34c2852016-09-23 09:43:32 -0700486 tcp_get_buffer_user,
Craig Tiller70bd4832016-06-30 14:20:46 -0700487 tcp_get_peer};
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700488
Craig Tillere34c2852016-09-23 09:43:32 -0700489grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, grpc_buffer_pool *buffer_pool,
490 size_t slice_size, const char *peer_string) {
Craig Tillera82950e2015-09-22 12:33:20 -0700491 grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800492 tcp->base.vtable = &vtable;
Craig Tillera82950e2015-09-22 12:33:20 -0700493 tcp->peer_string = gpr_strdup(peer_string);
Craig Tiller0a8a0172016-02-25 07:36:27 -0800494 tcp->fd = grpc_fd_wrapped_fd(em_fd);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800495 tcp->read_cb = NULL;
496 tcp->write_cb = NULL;
yang-gdc215932015-11-30 14:25:01 -0800497 tcp->release_fd_cb = NULL;
yang-g5d850372015-12-01 10:32:28 -0800498 tcp->release_fd = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700499 tcp->incoming_buffer = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800500 tcp->slice_size = slice_size;
Craig Tiller2b1fd662015-04-28 07:33:37 -0700501 tcp->iov_size = 1;
Mark D. Roth808ac382016-05-13 11:14:21 -0700502 tcp->finished_edge = true;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800503 /* paired with unref in grpc_tcp_destroy */
Craig Tillera82950e2015-09-22 12:33:20 -0700504 gpr_ref_init(&tcp->refcount, 1);
nnoble0c475f02014-12-05 15:37:39 -0800505 tcp->em_fd = em_fd;
Craig Tillerb0298592015-08-27 07:38:01 -0700506 tcp->read_closure.cb = tcp_handle_read;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800507 tcp->read_closure.cb_arg = tcp;
Craig Tillerb0298592015-08-27 07:38:01 -0700508 tcp->write_closure.cb = tcp_handle_write;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800509 tcp->write_closure.cb_arg = tcp;
Craig Tiller649deeb2015-09-24 23:19:40 -0700510 gpr_slice_buffer_init(&tcp->last_read_buffer);
Craig Tillere34c2852016-09-23 09:43:32 -0700511 grpc_buffer_user_init(&tcp->buffer_user, buffer_pool);
Makarand Dharmapurikar0579cfc2016-06-20 15:45:24 -0700512 /* Tell network status tracker about new endpoint */
513 grpc_network_status_register_endpoint(&tcp->base);
David Garcia Quintas5f228f52015-05-26 19:58:50 -0700514
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800515 return &tcp->base;
516}
Craig Tiller0c0b60c2015-01-21 15:49:28 -0800517
Dan Born43a78032016-01-05 17:17:45 -0800518int grpc_tcp_fd(grpc_endpoint *ep) {
519 grpc_tcp *tcp = (grpc_tcp *)ep;
520 GPR_ASSERT(ep->vtable == &vtable);
521 return grpc_fd_wrapped_fd(tcp->em_fd);
522}
523
yang-gdc215932015-11-30 14:25:01 -0800524void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
yang-g5d850372015-12-01 10:32:28 -0800525 int *fd, grpc_closure *done) {
yang-gdc215932015-11-30 14:25:01 -0800526 grpc_tcp *tcp = (grpc_tcp *)ep;
yang-g5d850372015-12-01 10:32:28 -0800527 GPR_ASSERT(ep->vtable == &vtable);
528 tcp->release_fd = fd;
yang-gdc215932015-11-30 14:25:01 -0800529 tcp->release_fd_cb = done;
530 TCP_UNREF(exec_ctx, tcp, "destroy");
531}
532
Craig Tiller190d3602015-02-18 09:23:38 -0800533#endif