blob: 4d7cf3ff51eb440e862ac80c82b21b1d0ff0727a [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller6169d5f2016-03-31 07:46:18 -07003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
murgatroid9954070892016-08-08 17:01:18 -070034#include "src/core/lib/iomgr/port.h"
Craig Tiller0c0b60c2015-01-21 15:49:28 -080035
murgatroid99623dd4f2016-08-08 17:31:27 -070036#ifdef GRPC_POSIX_SOCKET
Craig Tiller0c0b60c2015-01-21 15:49:28 -080037
Makarand Dharmapurikar0579cfc2016-06-20 15:45:24 -070038#include "src/core/lib/iomgr/network_status_tracker.h"
Craig Tiller9533d042016-03-25 17:11:06 -070039#include "src/core/lib/iomgr/tcp_posix.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080040
41#include <errno.h>
Mark D. Roth808ac382016-05-13 11:14:21 -070042#include <stdbool.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043#include <stdlib.h>
44#include <string.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080045#include <sys/socket.h>
Craig Tillera8be91b2016-02-19 16:22:44 -080046#include <sys/types.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047#include <unistd.h>
48
Craig Tiller0f310802016-10-26 16:25:56 -070049#include <grpc/slice.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080050#include <grpc/support/alloc.h>
51#include <grpc/support/log.h>
Craig Tiller1b22b9d2015-07-20 13:42:22 -070052#include <grpc/support/string_util.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053#include <grpc/support/sync.h>
54#include <grpc/support/time.h>
55
Craig Tiller9533d042016-03-25 17:11:06 -070056#include "src/core/lib/debug/trace.h"
Craig Tiller8a034482016-03-28 16:09:04 -070057#include "src/core/lib/iomgr/ev_posix.h"
Craig Tiller9533d042016-03-25 17:11:06 -070058#include "src/core/lib/profiling/timers.h"
Craig Tillera59c16c2016-10-31 07:25:01 -070059#include "src/core/lib/slice/slice_internal.h"
Craig Tiller0f310802016-10-26 16:25:56 -070060#include "src/core/lib/slice/slice_string_helpers.h"
Craig Tiller9533d042016-03-25 17:11:06 -070061#include "src/core/lib/support/string.h"
Craig Tiller1b22b9d2015-07-20 13:42:22 -070062
murgatroid99623dd4f2016-08-08 17:31:27 -070063#ifdef GRPC_HAVE_MSG_NOSIGNAL
Craig Tiller2da02962015-05-06 16:14:25 -070064#define SENDMSG_FLAGS MSG_NOSIGNAL
65#else
66#define SENDMSG_FLAGS 0
67#endif
68
murgatroid99623dd4f2016-08-08 17:31:27 -070069#ifdef GRPC_MSG_IOVLEN_TYPE
70typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
Craig Tillerebc7ef22015-09-10 22:19:25 -070071#else
72typedef size_t msg_iovlen_type;
73#endif
74
Craig Tillerfaa84802015-03-01 21:56:38 -080075int grpc_tcp_trace = 0;
76
Craig Tillera82950e2015-09-22 12:33:20 -070077typedef struct {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080078 grpc_endpoint base;
ctiller18b49ab2014-12-09 14:39:16 -080079 grpc_fd *em_fd;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080080 int fd;
Mark D. Roth808ac382016-05-13 11:14:21 -070081 bool finished_edge;
Craig Tillera82950e2015-09-22 12:33:20 -070082 msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080083 size_t slice_size;
84 gpr_refcount refcount;
Craig Tillere9f385a2016-09-26 14:54:43 -070085 gpr_atm shutdown_count;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080086
Craig Tiller649deeb2015-09-24 23:19:40 -070087 /* garbage after the last read */
Craig Tillerd41a4a72016-10-26 16:16:06 -070088 grpc_slice_buffer last_read_buffer;
Craig Tiller649deeb2015-09-24 23:19:40 -070089
Craig Tillerd41a4a72016-10-26 16:16:06 -070090 grpc_slice_buffer *incoming_buffer;
91 grpc_slice_buffer *outgoing_buffer;
Craig Tillerb0298592015-08-27 07:38:01 -070092 /** slice within outgoing_buffer to write next */
93 size_t outgoing_slice_idx;
94 /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
95 size_t outgoing_byte_idx;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080096
Craig Tiller33825112015-09-18 07:44:19 -070097 grpc_closure *read_cb;
98 grpc_closure *write_cb;
yang-gdc215932015-11-30 14:25:01 -080099 grpc_closure *release_fd_cb;
yang-g5d850372015-12-01 10:32:28 -0800100 int *release_fd;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800101
Craig Tiller33825112015-09-18 07:44:19 -0700102 grpc_closure read_closure;
103 grpc_closure write_closure;
David Garcia Quintas5f228f52015-05-26 19:58:50 -0700104
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700105 char *peer_string;
Craig Tillere34c2852016-09-23 09:43:32 -0700106
Craig Tillera947f1c2016-11-04 13:53:17 -0700107 grpc_resource_user *resource_user;
Craig Tiller20afa3d2016-10-17 14:52:14 -0700108 grpc_resource_user_slice_allocator slice_allocator;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800109} grpc_tcp;
110
Craig Tillerdb25f082016-12-02 12:45:24 -0800111static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) {
112 return grpc_error_set_str(
113 grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd),
ncteisen4b36a3d2017-03-13 19:08:06 -0700114 GRPC_ERROR_STR_TARGET_ADDRESS,
115 grpc_slice_from_copied_string(tcp->peer_string));
Craig Tillerdb25f082016-12-02 12:45:24 -0800116}
117
Craig Tillera82950e2015-09-22 12:33:20 -0700118static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700119 grpc_error *error);
Craig Tillera82950e2015-09-22 12:33:20 -0700120static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700121 grpc_error *error);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800122
Craig Tillercda759d2017-01-27 11:37:37 -0800123static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
124 grpc_error *why) {
Craig Tillera82950e2015-09-22 12:33:20 -0700125 grpc_tcp *tcp = (grpc_tcp *)ep;
Craig Tillercda759d2017-01-27 11:37:37 -0800126 grpc_fd_shutdown(exec_ctx, tcp->em_fd, why);
Craig Tillera947f1c2016-11-04 13:53:17 -0700127 grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800128}
129
Craig Tillere9f385a2016-09-26 14:54:43 -0700130static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
yang-g5d850372015-12-01 10:32:28 -0800131 grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
yang-gdc215932015-11-30 14:25:01 -0800132 "tcp_unref_orphan");
Craig Tillera59c16c2016-10-31 07:25:01 -0700133 grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer);
Craig Tillera947f1c2016-11-04 13:53:17 -0700134 grpc_resource_user_unref(exec_ctx, tcp->resource_user);
Craig Tillera82950e2015-09-22 12:33:20 -0700135 gpr_free(tcp->peer_string);
Craig Tillere9f385a2016-09-26 14:54:43 -0700136 gpr_free(tcp);
Craig Tillerb0298592015-08-27 07:38:01 -0700137}
138
139/*#define GRPC_TCP_REFCOUNT_DEBUG*/
140#ifdef GRPC_TCP_REFCOUNT_DEBUG
Craig Tiller8af4c332015-09-22 12:32:31 -0700141#define TCP_UNREF(cl, tcp, reason) \
142 tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__)
Craig Tillerb0298592015-08-27 07:38:01 -0700143#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
Craig Tillera82950e2015-09-22 12:33:20 -0700144static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
145 const char *reason, const char *file, int line) {
146 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
147 reason, tcp->refcount.count, tcp->refcount.count - 1);
148 if (gpr_unref(&tcp->refcount)) {
Craig Tillere9f385a2016-09-26 14:54:43 -0700149 tcp_free(exec_ctx, tcp);
Craig Tillera82950e2015-09-22 12:33:20 -0700150 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800151}
152
Craig Tillera82950e2015-09-22 12:33:20 -0700153static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
154 int line) {
155 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
156 reason, tcp->refcount.count, tcp->refcount.count + 1);
157 gpr_ref(&tcp->refcount);
Craig Tillerb0298592015-08-27 07:38:01 -0700158}
159#else
Craig Tiller8af4c332015-09-22 12:32:31 -0700160#define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp))
Craig Tillerb0298592015-08-27 07:38:01 -0700161#define TCP_REF(tcp, reason) tcp_ref((tcp))
Craig Tillera82950e2015-09-22 12:33:20 -0700162static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
163 if (gpr_unref(&tcp->refcount)) {
Craig Tillere9f385a2016-09-26 14:54:43 -0700164 tcp_free(exec_ctx, tcp);
Craig Tillera82950e2015-09-22 12:33:20 -0700165 }
Craig Tiller592e7f22015-08-21 10:45:48 -0700166}
167
Craig Tillera82950e2015-09-22 12:33:20 -0700168static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
Craig Tillerb0298592015-08-27 07:38:01 -0700169#endif
170
Craig Tillera82950e2015-09-22 12:33:20 -0700171static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
Makarand Dharmapurikar61494432016-06-22 13:34:59 -0700172 grpc_network_status_unregister_endpoint(ep);
Craig Tillera82950e2015-09-22 12:33:20 -0700173 grpc_tcp *tcp = (grpc_tcp *)ep;
Craig Tillera59c16c2016-10-31 07:25:01 -0700174 grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700175 TCP_UNREF(exec_ctx, tcp, "destroy");
Craig Tillerb0298592015-08-27 07:38:01 -0700176}
177
Craig Tillerc027e772016-05-03 16:27:00 -0700178static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
179 grpc_error *error) {
Craig Tiller33825112015-09-18 07:44:19 -0700180 grpc_closure *cb = tcp->read_cb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800181
Craig Tiller44f62452016-06-23 08:22:30 -0700182 if (grpc_tcp_trace) {
Craig Tillera82950e2015-09-22 12:33:20 -0700183 size_t i;
Craig Tillerc027e772016-05-03 16:27:00 -0700184 const char *str = grpc_error_string(error);
185 gpr_log(GPR_DEBUG, "read: error=%s", str);
Craig Tiller7c70b6c2017-01-23 07:48:42 -0800186
Craig Tillera82950e2015-09-22 12:33:20 -0700187 for (i = 0; i < tcp->incoming_buffer->count; i++) {
Craig Tiller0f310802016-10-26 16:25:56 -0700188 char *dump = grpc_dump_slice(tcp->incoming_buffer->slices[i],
189 GPR_DUMP_HEX | GPR_DUMP_ASCII);
Mark D. Rothbc846722016-05-04 10:53:50 -0700190 gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
Craig Tillera82950e2015-09-22 12:33:20 -0700191 gpr_free(dump);
Craig Tiller6e7c6222015-02-20 15:31:21 -0800192 }
Craig Tillera82950e2015-09-22 12:33:20 -0700193 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800194
195 tcp->read_cb = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700196 tcp->incoming_buffer = NULL;
Craig Tillerde2c41c2016-09-01 15:08:08 -0700197 grpc_closure_run(exec_ctx, cb, error);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800198}
199
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800200#define MAX_READ_IOVEC 4
Craig Tiller1f8d1d52016-09-26 10:16:14 -0700201static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800202 struct msghdr msg;
203 struct iovec iov[MAX_READ_IOVEC];
204 ssize_t read_bytes;
Craig Tillerb0298592015-08-27 07:38:01 -0700205 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800206
Craig Tillera82950e2015-09-22 12:33:20 -0700207 GPR_ASSERT(!tcp->finished_edge);
208 GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
209 GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700210 GPR_TIMER_BEGIN("tcp_continue_read", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800211
Craig Tillera82950e2015-09-22 12:33:20 -0700212 for (i = 0; i < tcp->incoming_buffer->count; i++) {
Craig Tiller618e67d2016-10-26 21:08:10 -0700213 iov[i].iov_base = GRPC_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
214 iov[i].iov_len = GRPC_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
Craig Tillera82950e2015-09-22 12:33:20 -0700215 }
Craig Tiller5c07cce2015-04-28 09:21:58 -0700216
217 msg.msg_name = NULL;
218 msg.msg_namelen = 0;
219 msg.msg_iov = iov;
220 msg.msg_iovlen = tcp->iov_size;
221 msg.msg_control = NULL;
222 msg.msg_controllen = 0;
223 msg.msg_flags = 0;
224
Craig Tillerca045622016-08-09 08:38:03 -0700225 GPR_TIMER_BEGIN("recvmsg", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700226 do {
227 read_bytes = recvmsg(tcp->fd, &msg, 0);
228 } while (read_bytes < 0 && errno == EINTR);
Craig Tillerca045622016-08-09 08:38:03 -0700229 GPR_TIMER_END("recvmsg", read_bytes >= 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700230
Craig Tillera82950e2015-09-22 12:33:20 -0700231 if (read_bytes < 0) {
232 /* NB: After calling call_read_cb a parallel call of the read handler may
233 * be running. */
234 if (errno == EAGAIN) {
235 if (tcp->iov_size > 1) {
236 tcp->iov_size /= 2;
237 }
238 /* We've consumed the edge, request a new one */
239 grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
240 } else {
Craig Tiller298d4812016-12-09 08:51:48 -0800241 grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
242 tcp->incoming_buffer);
Craig Tillerdb25f082016-12-02 12:45:24 -0800243 call_read_cb(exec_ctx, tcp,
244 tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp));
Craig Tillera82950e2015-09-22 12:33:20 -0700245 TCP_UNREF(exec_ctx, tcp, "read");
Craig Tiller45724b32015-09-22 10:42:19 -0700246 }
Craig Tillera82950e2015-09-22 12:33:20 -0700247 } else if (read_bytes == 0) {
248 /* 0 read size ==> end of stream */
Craig Tillera59c16c2016-10-31 07:25:01 -0700249 grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
ncteisen4b36a3d2017-03-13 19:08:06 -0700250 call_read_cb(
251 exec_ctx, tcp,
252 tcp_annotate_error(
253 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
Craig Tillera82950e2015-09-22 12:33:20 -0700254 TCP_UNREF(exec_ctx, tcp, "read");
255 } else {
256 GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
257 if ((size_t)read_bytes < tcp->incoming_buffer->length) {
Craig Tillerd41a4a72016-10-26 16:16:06 -0700258 grpc_slice_buffer_trim_end(
Craig Tillera82950e2015-09-22 12:33:20 -0700259 tcp->incoming_buffer,
Craig Tiller649deeb2015-09-24 23:19:40 -0700260 tcp->incoming_buffer->length - (size_t)read_bytes,
261 &tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700262 } else if (tcp->iov_size < MAX_READ_IOVEC) {
263 ++tcp->iov_size;
Craig Tiller45724b32015-09-22 10:42:19 -0700264 }
Craig Tillera82950e2015-09-22 12:33:20 -0700265 GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
Craig Tillerc027e772016-05-03 16:27:00 -0700266 call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700267 TCP_UNREF(exec_ctx, tcp, "read");
268 }
Craig Tiller45724b32015-09-22 10:42:19 -0700269
Craig Tiller0ba432d2015-10-09 16:57:11 -0700270 GPR_TIMER_END("tcp_continue_read", 0);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700271}
272
Craig Tiller091e4f02016-09-29 09:57:35 -0700273static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
Craig Tiller1f8d1d52016-09-26 10:16:14 -0700274 grpc_error *error) {
Craig Tiller091e4f02016-09-29 09:57:35 -0700275 grpc_tcp *tcp = tcpp;
276 if (error != GRPC_ERROR_NONE) {
Craig Tillera59c16c2016-10-31 07:25:01 -0700277 grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
278 grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
279 &tcp->last_read_buffer);
Craig Tiller091e4f02016-09-29 09:57:35 -0700280 call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
281 TCP_UNREF(exec_ctx, tcp, "read");
282 } else {
283 tcp_do_read(exec_ctx, tcp);
284 }
Craig Tiller1f8d1d52016-09-26 10:16:14 -0700285}
286
287static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
288 if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
Craig Tiller20afa3d2016-10-17 14:52:14 -0700289 grpc_resource_user_alloc_slices(
Craig Tiller1f8d1d52016-09-26 10:16:14 -0700290 exec_ctx, &tcp->slice_allocator, tcp->slice_size,
291 (size_t)tcp->iov_size - tcp->incoming_buffer->count,
292 tcp->incoming_buffer);
293 } else {
294 tcp_do_read(exec_ctx, tcp);
295 }
296}
297
Craig Tillera82950e2015-09-22 12:33:20 -0700298static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700299 grpc_error *error) {
Craig Tillera82950e2015-09-22 12:33:20 -0700300 grpc_tcp *tcp = (grpc_tcp *)arg;
301 GPR_ASSERT(!tcp->finished_edge);
Craig Tiller5c07cce2015-04-28 09:21:58 -0700302
Craig Tillerc027e772016-05-03 16:27:00 -0700303 if (error != GRPC_ERROR_NONE) {
Craig Tillera59c16c2016-10-31 07:25:01 -0700304 grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
305 grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
306 &tcp->last_read_buffer);
Craig Tiller6a64bfd2016-05-09 13:05:03 -0700307 call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
Craig Tillera82950e2015-09-22 12:33:20 -0700308 TCP_UNREF(exec_ctx, tcp, "read");
309 } else {
310 tcp_continue_read(exec_ctx, tcp);
311 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800312}
313
Craig Tillera82950e2015-09-22 12:33:20 -0700314static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
Craig Tillerd41a4a72016-10-26 16:16:06 -0700315 grpc_slice_buffer *incoming_buffer, grpc_closure *cb) {
Craig Tillera82950e2015-09-22 12:33:20 -0700316 grpc_tcp *tcp = (grpc_tcp *)ep;
317 GPR_ASSERT(tcp->read_cb == NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800318 tcp->read_cb = cb;
Craig Tillerb0298592015-08-27 07:38:01 -0700319 tcp->incoming_buffer = incoming_buffer;
Craig Tillera59c16c2016-10-31 07:25:01 -0700320 grpc_slice_buffer_reset_and_unref_internal(exec_ctx, incoming_buffer);
Craig Tillerd41a4a72016-10-26 16:16:06 -0700321 grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
Craig Tillera82950e2015-09-22 12:33:20 -0700322 TCP_REF(tcp, "read");
323 if (tcp->finished_edge) {
Mark D. Roth808ac382016-05-13 11:14:21 -0700324 tcp->finished_edge = false;
Craig Tillera82950e2015-09-22 12:33:20 -0700325 grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
326 } else {
Craig Tiller91031da2016-12-28 15:44:25 -0800327 grpc_closure_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700328 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800329}
330
Craig Tiller27f59af2016-04-28 14:19:48 -0700331/* returns true if done, false if pending; if returning true, *error is set */
Vijay Pai8f76df42016-07-29 08:31:20 -0700332#define MAX_WRITE_IOVEC 1000
Craig Tiller27f59af2016-04-28 14:19:48 -0700333static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800334 struct msghdr msg;
335 struct iovec iov[MAX_WRITE_IOVEC];
Craig Tillerebc7ef22015-09-10 22:19:25 -0700336 msg_iovlen_type iov_size;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800337 ssize_t sent_length;
Craig Tiller32ca48c2015-09-10 11:47:15 -0700338 size_t sending_length;
339 size_t trailing;
340 size_t unwind_slice_idx;
341 size_t unwind_byte_idx;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800342
Craig Tillera82950e2015-09-22 12:33:20 -0700343 for (;;) {
344 sending_length = 0;
345 unwind_slice_idx = tcp->outgoing_slice_idx;
346 unwind_byte_idx = tcp->outgoing_byte_idx;
347 for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
Craig Tillerf40df232016-03-25 13:38:14 -0700348 iov_size != MAX_WRITE_IOVEC;
Craig Tillera82950e2015-09-22 12:33:20 -0700349 iov_size++) {
350 iov[iov_size].iov_base =
Craig Tiller618e67d2016-10-26 21:08:10 -0700351 GRPC_SLICE_START_PTR(
Craig Tillera82950e2015-09-22 12:33:20 -0700352 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
353 tcp->outgoing_byte_idx;
354 iov[iov_size].iov_len =
Craig Tiller618e67d2016-10-26 21:08:10 -0700355 GRPC_SLICE_LENGTH(
Craig Tillera82950e2015-09-22 12:33:20 -0700356 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
357 tcp->outgoing_byte_idx;
358 sending_length += iov[iov_size].iov_len;
359 tcp->outgoing_slice_idx++;
360 tcp->outgoing_byte_idx = 0;
361 }
362 GPR_ASSERT(iov_size > 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800363
Craig Tillera82950e2015-09-22 12:33:20 -0700364 msg.msg_name = NULL;
365 msg.msg_namelen = 0;
366 msg.msg_iov = iov;
367 msg.msg_iovlen = iov_size;
368 msg.msg_control = NULL;
369 msg.msg_controllen = 0;
370 msg.msg_flags = 0;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800371
Craig Tiller0ba432d2015-10-09 16:57:11 -0700372 GPR_TIMER_BEGIN("sendmsg", 1);
Craig Tillera82950e2015-09-22 12:33:20 -0700373 do {
374 /* TODO(klempner): Cork if this is a partial write */
375 sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
376 } while (sent_length < 0 && errno == EINTR);
Craig Tiller0ba432d2015-10-09 16:57:11 -0700377 GPR_TIMER_END("sendmsg", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800378
Craig Tillera82950e2015-09-22 12:33:20 -0700379 if (sent_length < 0) {
380 if (errno == EAGAIN) {
381 tcp->outgoing_slice_idx = unwind_slice_idx;
382 tcp->outgoing_byte_idx = unwind_byte_idx;
Craig Tiller27f59af2016-04-28 14:19:48 -0700383 return false;
Ken Payson1b1a8602016-11-14 10:10:47 -0800384 } else if (errno == EPIPE) {
385 *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"),
386 GRPC_ERROR_INT_GRPC_STATUS,
387 GRPC_STATUS_UNAVAILABLE);
388 return true;
Craig Tillera82950e2015-09-22 12:33:20 -0700389 } else {
Craig Tillerdb25f082016-12-02 12:45:24 -0800390 *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
Craig Tiller27f59af2016-04-28 14:19:48 -0700391 return true;
Craig Tillera82950e2015-09-22 12:33:20 -0700392 }
393 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800394
Craig Tillera82950e2015-09-22 12:33:20 -0700395 GPR_ASSERT(tcp->outgoing_byte_idx == 0);
396 trailing = sending_length - (size_t)sent_length;
397 while (trailing > 0) {
398 size_t slice_length;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800399
Craig Tillera82950e2015-09-22 12:33:20 -0700400 tcp->outgoing_slice_idx--;
Craig Tiller618e67d2016-10-26 21:08:10 -0700401 slice_length = GRPC_SLICE_LENGTH(
Craig Tillera82950e2015-09-22 12:33:20 -0700402 tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
403 if (slice_length > trailing) {
404 tcp->outgoing_byte_idx = slice_length - trailing;
405 break;
406 } else {
407 trailing -= slice_length;
408 }
409 }
Craig Tillerb0298592015-08-27 07:38:01 -0700410
Craig Tillera82950e2015-09-22 12:33:20 -0700411 if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
Craig Tiller27f59af2016-04-28 14:19:48 -0700412 *error = GRPC_ERROR_NONE;
413 return true;
Craig Tillera82950e2015-09-22 12:33:20 -0700414 }
415 };
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800416}
417
Craig Tillera82950e2015-09-22 12:33:20 -0700418static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
Craig Tillerc027e772016-05-03 16:27:00 -0700419 grpc_error *error) {
Craig Tillera82950e2015-09-22 12:33:20 -0700420 grpc_tcp *tcp = (grpc_tcp *)arg;
Craig Tiller33825112015-09-18 07:44:19 -0700421 grpc_closure *cb;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800422
Craig Tillerc027e772016-05-03 16:27:00 -0700423 if (error != GRPC_ERROR_NONE) {
Craig Tillera82950e2015-09-22 12:33:20 -0700424 cb = tcp->write_cb;
425 tcp->write_cb = NULL;
Mark D. Rothb06c6402016-05-13 12:39:30 -0700426 cb->cb(exec_ctx, cb->cb_arg, error);
Craig Tillera82950e2015-09-22 12:33:20 -0700427 TCP_UNREF(exec_ctx, tcp, "write");
428 return;
429 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800430
Craig Tiller27f59af2016-04-28 14:19:48 -0700431 if (!tcp_flush(tcp, &error)) {
Craig Tillerc3df7b42016-07-18 15:51:26 -0700432 if (grpc_tcp_trace) {
433 gpr_log(GPR_DEBUG, "write: delayed");
434 }
Craig Tillera82950e2015-09-22 12:33:20 -0700435 grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
436 } else {
437 cb = tcp->write_cb;
438 tcp->write_cb = NULL;
Craig Tillerc3df7b42016-07-18 15:51:26 -0700439 if (grpc_tcp_trace) {
440 const char *str = grpc_error_string(error);
441 gpr_log(GPR_DEBUG, "write: %s", str);
Craig Tillerc3df7b42016-07-18 15:51:26 -0700442 }
Craig Tillere7603b82016-07-18 15:43:42 -0700443
Craig Tillerde2c41c2016-09-01 15:08:08 -0700444 grpc_closure_run(exec_ctx, cb, error);
Craig Tillera82950e2015-09-22 12:33:20 -0700445 TCP_UNREF(exec_ctx, tcp, "write");
446 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800447}
448
Craig Tillera82950e2015-09-22 12:33:20 -0700449static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
Craig Tillerd41a4a72016-10-26 16:16:06 -0700450 grpc_slice_buffer *buf, grpc_closure *cb) {
Craig Tillera82950e2015-09-22 12:33:20 -0700451 grpc_tcp *tcp = (grpc_tcp *)ep;
Craig Tiller27f59af2016-04-28 14:19:48 -0700452 grpc_error *error = GRPC_ERROR_NONE;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800453
Craig Tiller44f62452016-06-23 08:22:30 -0700454 if (grpc_tcp_trace) {
Craig Tillera82950e2015-09-22 12:33:20 -0700455 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800456
Craig Tillera82950e2015-09-22 12:33:20 -0700457 for (i = 0; i < buf->count; i++) {
458 char *data =
Craig Tiller0f310802016-10-26 16:25:56 -0700459 grpc_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
Mark D. Rothbc846722016-05-04 10:53:50 -0700460 gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
Craig Tillera82950e2015-09-22 12:33:20 -0700461 gpr_free(data);
Craig Tiller6e7c6222015-02-20 15:31:21 -0800462 }
Craig Tillera82950e2015-09-22 12:33:20 -0700463 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800464
Craig Tiller0ba432d2015-10-09 16:57:11 -0700465 GPR_TIMER_BEGIN("tcp_write", 0);
Craig Tillera82950e2015-09-22 12:33:20 -0700466 GPR_ASSERT(tcp->write_cb == NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800467
Craig Tillera82950e2015-09-22 12:33:20 -0700468 if (buf->length == 0) {
Craig Tiller0ba432d2015-10-09 16:57:11 -0700469 GPR_TIMER_END("tcp_write", 0);
ncteisen4b36a3d2017-03-13 19:08:06 -0700470 grpc_closure_sched(
471 exec_ctx, cb,
472 grpc_fd_is_shutdown(tcp->em_fd)
473 ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"),
474 tcp)
475 : GRPC_ERROR_NONE);
Craig Tillera82950e2015-09-22 12:33:20 -0700476 return;
477 }
Craig Tillerb0298592015-08-27 07:38:01 -0700478 tcp->outgoing_buffer = buf;
479 tcp->outgoing_slice_idx = 0;
480 tcp->outgoing_byte_idx = 0;
481
Craig Tiller27f59af2016-04-28 14:19:48 -0700482 if (!tcp_flush(tcp, &error)) {
Craig Tillera82950e2015-09-22 12:33:20 -0700483 TCP_REF(tcp, "write");
484 tcp->write_cb = cb;
Craig Tillerc3df7b42016-07-18 15:51:26 -0700485 if (grpc_tcp_trace) {
486 gpr_log(GPR_DEBUG, "write: delayed");
487 }
Craig Tillera82950e2015-09-22 12:33:20 -0700488 grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
489 } else {
Craig Tillerc3df7b42016-07-18 15:51:26 -0700490 if (grpc_tcp_trace) {
491 const char *str = grpc_error_string(error);
492 gpr_log(GPR_DEBUG, "write: %s", str);
Craig Tillerc3df7b42016-07-18 15:51:26 -0700493 }
Craig Tiller91031da2016-12-28 15:44:25 -0800494 grpc_closure_sched(exec_ctx, cb, error);
Craig Tillera82950e2015-09-22 12:33:20 -0700495 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800496
Craig Tiller0ba432d2015-10-09 16:57:11 -0700497 GPR_TIMER_END("tcp_write", 0);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800498}
499
Craig Tillera82950e2015-09-22 12:33:20 -0700500static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
501 grpc_pollset *pollset) {
502 grpc_tcp *tcp = (grpc_tcp *)ep;
503 grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd);
ctillerd79b4862014-12-17 16:36:59 -0800504}
505
Craig Tillera82950e2015-09-22 12:33:20 -0700506static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
507 grpc_pollset_set *pollset_set) {
508 grpc_tcp *tcp = (grpc_tcp *)ep;
509 grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
Craig Tiller1ada6ad2015-07-16 16:19:14 -0700510}
511
Craig Tillera82950e2015-09-22 12:33:20 -0700512static char *tcp_get_peer(grpc_endpoint *ep) {
513 grpc_tcp *tcp = (grpc_tcp *)ep;
514 return gpr_strdup(tcp->peer_string);
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700515}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800516
Yuchen Zeng68413c22016-11-02 11:57:37 -0700517static int tcp_get_fd(grpc_endpoint *ep) {
Yuchen Zenge5ec9ac2016-10-24 14:43:12 -0700518 grpc_tcp *tcp = (grpc_tcp *)ep;
Yuchen Zeng68413c22016-11-02 11:57:37 -0700519 return tcp->fd;
Yuchen Zenge5ec9ac2016-10-24 14:43:12 -0700520}
521
Craig Tiller70bd4832016-06-30 14:20:46 -0700522static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
523 grpc_tcp *tcp = (grpc_tcp *)ep;
524 return grpc_fd_get_workqueue(tcp->em_fd);
525}
526
Craig Tiller20afa3d2016-10-17 14:52:14 -0700527static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) {
Craig Tillere34c2852016-09-23 09:43:32 -0700528 grpc_tcp *tcp = (grpc_tcp *)ep;
Craig Tillera947f1c2016-11-04 13:53:17 -0700529 return tcp->resource_user;
Craig Tillere34c2852016-09-23 09:43:32 -0700530}
531
Craig Tiller70bd4832016-06-30 14:20:46 -0700532static const grpc_endpoint_vtable vtable = {tcp_read,
533 tcp_write,
534 tcp_get_workqueue,
535 tcp_add_to_pollset,
536 tcp_add_to_pollset_set,
537 tcp_shutdown,
538 tcp_destroy,
Craig Tiller20afa3d2016-10-17 14:52:14 -0700539 tcp_get_resource_user,
Yuchen Zenge5ec9ac2016-10-24 14:43:12 -0700540 tcp_get_peer,
Yuchen Zeng68413c22016-11-02 11:57:37 -0700541 tcp_get_fd};
Craig Tiller1b22b9d2015-07-20 13:42:22 -0700542
Craig Tiller20afa3d2016-10-17 14:52:14 -0700543grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
544 grpc_resource_quota *resource_quota,
Craig Tillere34c2852016-09-23 09:43:32 -0700545 size_t slice_size, const char *peer_string) {
Craig Tillera82950e2015-09-22 12:33:20 -0700546 grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800547 tcp->base.vtable = &vtable;
Craig Tillera82950e2015-09-22 12:33:20 -0700548 tcp->peer_string = gpr_strdup(peer_string);
Craig Tiller0a8a0172016-02-25 07:36:27 -0800549 tcp->fd = grpc_fd_wrapped_fd(em_fd);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800550 tcp->read_cb = NULL;
551 tcp->write_cb = NULL;
yang-gdc215932015-11-30 14:25:01 -0800552 tcp->release_fd_cb = NULL;
yang-g5d850372015-12-01 10:32:28 -0800553 tcp->release_fd = NULL;
Craig Tillerb0298592015-08-27 07:38:01 -0700554 tcp->incoming_buffer = NULL;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800555 tcp->slice_size = slice_size;
Craig Tiller2b1fd662015-04-28 07:33:37 -0700556 tcp->iov_size = 1;
Mark D. Roth808ac382016-05-13 11:14:21 -0700557 tcp->finished_edge = true;
Craig Tillera947f1c2016-11-04 13:53:17 -0700558 /* paired with unref in grpc_tcp_destroy */
559 gpr_ref_init(&tcp->refcount, 1);
Craig Tillere9f385a2016-09-26 14:54:43 -0700560 gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
nnoble0c475f02014-12-05 15:37:39 -0800561 tcp->em_fd = em_fd;
Craig Tiller3cb34472016-12-28 16:11:38 -0800562 grpc_closure_init(&tcp->read_closure, tcp_handle_read, tcp,
563 grpc_schedule_on_exec_ctx);
564 grpc_closure_init(&tcp->write_closure, tcp_handle_write, tcp,
565 grpc_schedule_on_exec_ctx);
Craig Tillerd41a4a72016-10-26 16:16:06 -0700566 grpc_slice_buffer_init(&tcp->last_read_buffer);
Craig Tillera947f1c2016-11-04 13:53:17 -0700567 tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
568 grpc_resource_user_slice_allocator_init(
569 &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
Makarand Dharmapurikar0579cfc2016-06-20 15:45:24 -0700570 /* Tell network status tracker about new endpoint */
571 grpc_network_status_register_endpoint(&tcp->base);
David Garcia Quintas5f228f52015-05-26 19:58:50 -0700572
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800573 return &tcp->base;
574}
Craig Tiller0c0b60c2015-01-21 15:49:28 -0800575
Dan Born43a78032016-01-05 17:17:45 -0800576int grpc_tcp_fd(grpc_endpoint *ep) {
577 grpc_tcp *tcp = (grpc_tcp *)ep;
578 GPR_ASSERT(ep->vtable == &vtable);
579 return grpc_fd_wrapped_fd(tcp->em_fd);
580}
581
yang-gdc215932015-11-30 14:25:01 -0800582void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
yang-g5d850372015-12-01 10:32:28 -0800583 int *fd, grpc_closure *done) {
Craig Tillerce2ff3c2016-09-26 15:34:20 -0700584 grpc_network_status_unregister_endpoint(ep);
yang-gdc215932015-11-30 14:25:01 -0800585 grpc_tcp *tcp = (grpc_tcp *)ep;
yang-g5d850372015-12-01 10:32:28 -0800586 GPR_ASSERT(ep->vtable == &vtable);
587 tcp->release_fd = fd;
yang-gdc215932015-11-30 14:25:01 -0800588 tcp->release_fd_cb = done;
Craig Tillera59c16c2016-10-31 07:25:01 -0700589 grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer);
yang-gdc215932015-11-30 14:25:01 -0800590 TCP_UNREF(exec_ctx, tcp, "destroy");
591}
592
Craig Tiller190d3602015-02-18 09:23:38 -0800593#endif