blob: eceb0feadb1463ec6111aa40476b181f14190a53 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
Craig Tiller06059952015-02-18 08:34:56 -08003 * Copyright 2015, Google Inc.
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08004 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
Craig Tiller0c0b60c2015-01-21 15:49:28 -080034#include <grpc/support/port_platform.h>
35
36#ifdef GPR_POSIX_SOCKET
37
ctiller18b49ab2014-12-09 14:39:16 -080038#include "src/core/iomgr/tcp_posix.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080039
40#include <errno.h>
41#include <stdlib.h>
42#include <string.h>
43#include <sys/types.h>
44#include <sys/socket.h>
45#include <unistd.h>
46
Craig Tiller485d7762015-01-23 12:54:05 -080047#include "src/core/support/string.h"
Craig Tiller6e7c6222015-02-20 15:31:21 -080048#include "src/core/debug/trace.h"
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080049#include <grpc/support/alloc.h>
50#include <grpc/support/log.h>
51#include <grpc/support/slice.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080052#include <grpc/support/sync.h>
53#include <grpc/support/time.h>
54
55/* Holds a slice array and associated state. */
56typedef struct grpc_tcp_slice_state {
57 gpr_slice *slices; /* Array of slices */
58 size_t nslices; /* Size of slices array. */
59 ssize_t first_slice; /* First valid slice in array */
60 ssize_t last_slice; /* Last valid slice in array */
61 gpr_slice working_slice; /* pointer to original final slice */
62 int working_slice_valid; /* True if there is a working slice */
63 int memory_owned; /* True if slices array is owned */
64} grpc_tcp_slice_state;
65
66static void slice_state_init(grpc_tcp_slice_state *state, gpr_slice *slices,
67 size_t nslices, size_t valid_slices) {
68 state->slices = slices;
69 state->nslices = nslices;
70 if (valid_slices == 0) {
71 state->first_slice = -1;
72 } else {
73 state->first_slice = 0;
74 }
75 state->last_slice = valid_slices - 1;
76 state->working_slice_valid = 0;
77 state->memory_owned = 0;
78}
79
80/* Returns true if there is still available data */
81static int slice_state_has_available(grpc_tcp_slice_state *state) {
82 return state->first_slice != -1 && state->last_slice >= state->first_slice;
83}
84
85static ssize_t slice_state_slices_allocated(grpc_tcp_slice_state *state) {
86 if (state->first_slice == -1) {
87 return 0;
88 } else {
89 return state->last_slice - state->first_slice + 1;
90 }
91}
92
93static void slice_state_realloc(grpc_tcp_slice_state *state, size_t new_size) {
94 /* TODO(klempner): use realloc instead when first_slice is 0 */
95 /* TODO(klempner): Avoid a realloc in cases where it is unnecessary */
96 gpr_slice *slices = state->slices;
97 size_t original_size = slice_state_slices_allocated(state);
98 size_t i;
99 gpr_slice *new_slices = gpr_malloc(sizeof(gpr_slice) * new_size);
100
101 for (i = 0; i < original_size; ++i) {
102 new_slices[i] = slices[i + state->first_slice];
103 }
104
105 state->slices = new_slices;
106 state->last_slice = original_size - 1;
107 if (original_size > 0) {
108 state->first_slice = 0;
109 } else {
110 state->first_slice = -1;
111 }
112 state->nslices = new_size;
113
114 if (state->memory_owned) {
115 gpr_free(slices);
116 }
117 state->memory_owned = 1;
118}
119
120static void slice_state_remove_prefix(grpc_tcp_slice_state *state,
121 size_t prefix_bytes) {
122 gpr_slice *current_slice = &state->slices[state->first_slice];
123 size_t current_slice_size;
124
125 while (slice_state_has_available(state)) {
126 current_slice_size = GPR_SLICE_LENGTH(*current_slice);
127 if (current_slice_size > prefix_bytes) {
128 /* TODO(klempner): Get rid of the extra refcount created here by adding a
129 native "trim the first N bytes" operation to splice */
130 /* TODO(klempner): This really shouldn't be modifying the current slice
131 unless we own the slices array. */
132 *current_slice = gpr_slice_split_tail(current_slice, prefix_bytes);
133 gpr_slice_unref(*current_slice);
134 return;
135 } else {
136 gpr_slice_unref(*current_slice);
137 ++state->first_slice;
138 ++current_slice;
139 prefix_bytes -= current_slice_size;
140 }
141 }
142}
143
144static void slice_state_destroy(grpc_tcp_slice_state *state) {
145 while (slice_state_has_available(state)) {
146 gpr_slice_unref(state->slices[state->first_slice]);
147 ++state->first_slice;
148 }
149
150 if (state->memory_owned) {
151 gpr_free(state->slices);
152 state->memory_owned = 0;
153 }
154}
155
156void slice_state_transfer_ownership(grpc_tcp_slice_state *state,
157 gpr_slice **slices, size_t *nslices) {
158 *slices = state->slices + state->first_slice;
159 *nslices = state->last_slice - state->first_slice + 1;
160
161 state->first_slice = -1;
162 state->last_slice = -1;
163}
164
165/* Fills iov with the first min(iov_size, available) slices, returns number
166 filled */
167static size_t slice_state_to_iovec(grpc_tcp_slice_state *state,
168 struct iovec *iov, size_t iov_size) {
169 size_t nslices = state->last_slice - state->first_slice + 1;
170 gpr_slice *slices = state->slices + state->first_slice;
171 size_t i;
172 if (nslices < iov_size) {
173 iov_size = nslices;
174 }
175
176 for (i = 0; i < iov_size; ++i) {
177 iov[i].iov_base = GPR_SLICE_START_PTR(slices[i]);
178 iov[i].iov_len = GPR_SLICE_LENGTH(slices[i]);
179 }
180 return iov_size;
181}
182
183/* Makes n blocks available at the end of state, writes them into iov, and
184 returns the number of bytes allocated */
185static size_t slice_state_append_blocks_into_iovec(grpc_tcp_slice_state *state,
186 struct iovec *iov, size_t n,
187 size_t slice_size) {
188 size_t target_size;
189 size_t i;
190 size_t allocated_bytes;
191 ssize_t allocated_slices = slice_state_slices_allocated(state);
192
193 if (n - state->working_slice_valid >= state->nslices - state->last_slice) {
194 /* Need to grow the slice array */
195 target_size = state->nslices;
196 do {
197 target_size = target_size * 2;
198 } while (target_size < allocated_slices + n - state->working_slice_valid);
199 /* TODO(klempner): If this ever needs to support both prefix removal and
200 append, we should be smarter about the growth logic here */
201 slice_state_realloc(state, target_size);
202 }
203
204 i = 0;
205 allocated_bytes = 0;
206
207 if (state->working_slice_valid) {
208 iov[0].iov_base = GPR_SLICE_END_PTR(state->slices[state->last_slice]);
209 iov[0].iov_len = GPR_SLICE_LENGTH(state->working_slice) -
210 GPR_SLICE_LENGTH(state->slices[state->last_slice]);
211 allocated_bytes += iov[0].iov_len;
212 ++i;
213 state->slices[state->last_slice] = state->working_slice;
214 state->working_slice_valid = 0;
215 }
216
217 for (; i < n; ++i) {
218 ++state->last_slice;
219 state->slices[state->last_slice] = gpr_slice_malloc(slice_size);
220 iov[i].iov_base = GPR_SLICE_START_PTR(state->slices[state->last_slice]);
221 iov[i].iov_len = slice_size;
222 allocated_bytes += slice_size;
223 }
224 if (state->first_slice == -1) {
225 state->first_slice = 0;
226 }
227 return allocated_bytes;
228}
229
230/* Remove the last n bytes from state */
231/* TODO(klempner): Consider having this defer actual deletion until later */
232static void slice_state_remove_last(grpc_tcp_slice_state *state, size_t bytes) {
233 while (bytes > 0 && slice_state_has_available(state)) {
234 if (GPR_SLICE_LENGTH(state->slices[state->last_slice]) > bytes) {
235 state->working_slice = state->slices[state->last_slice];
236 state->working_slice_valid = 1;
237 /* TODO(klempner): Combine these into a single operation that doesn't need
238 to refcount */
239 gpr_slice_unref(gpr_slice_split_tail(
240 &state->slices[state->last_slice],
241 GPR_SLICE_LENGTH(state->slices[state->last_slice]) - bytes));
242 bytes = 0;
243 } else {
244 bytes -= GPR_SLICE_LENGTH(state->slices[state->last_slice]);
245 gpr_slice_unref(state->slices[state->last_slice]);
246 --state->last_slice;
247 if (state->last_slice == -1) {
248 state->first_slice = -1;
249 }
250 }
251 }
252}
253
254typedef struct {
255 grpc_endpoint base;
ctiller18b49ab2014-12-09 14:39:16 -0800256 grpc_fd *em_fd;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800257 int fd;
258 size_t slice_size;
259 gpr_refcount refcount;
260
261 grpc_endpoint_read_cb read_cb;
262 void *read_user_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800263 grpc_endpoint_write_cb write_cb;
264 void *write_user_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800265
266 grpc_tcp_slice_state write_state;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800267
268 grpc_iomgr_closure read_closure;
269 grpc_iomgr_closure write_closure;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800270} grpc_tcp;
271
ctiller58393c22015-01-07 14:03:30 -0800272static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
273static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800274
275static void grpc_tcp_shutdown(grpc_endpoint *ep) {
276 grpc_tcp *tcp = (grpc_tcp *)ep;
ctiller18b49ab2014-12-09 14:39:16 -0800277 grpc_fd_shutdown(tcp->em_fd);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800278}
279
280static void grpc_tcp_unref(grpc_tcp *tcp) {
281 int refcount_zero = gpr_unref(&tcp->refcount);
282 if (refcount_zero) {
ctiller58393c22015-01-07 14:03:30 -0800283 grpc_fd_orphan(tcp->em_fd, NULL, NULL);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800284 gpr_free(tcp);
285 }
286}
287
288static void grpc_tcp_destroy(grpc_endpoint *ep) {
289 grpc_tcp *tcp = (grpc_tcp *)ep;
290 grpc_tcp_unref(tcp);
291}
292
293static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
294 grpc_endpoint_cb_status status) {
295 grpc_endpoint_read_cb cb = tcp->read_cb;
296
Craig Tiller6e7c6222015-02-20 15:31:21 -0800297 if (grpc_trace_bits & GRPC_TRACE_TCP) {
298 size_t i;
299 gpr_log(GPR_DEBUG, "read: status=%d", status);
300 for (i = 0; i < nslices; i++) {
301 char *dump =
302 gpr_hexdump((char *)GPR_SLICE_START_PTR(slices[i]),
303 GPR_SLICE_LENGTH(slices[i]), GPR_HEXDUMP_PLAINTEXT);
304 gpr_log(GPR_DEBUG, "READ: %s", dump);
305 gpr_free(dump);
306 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800307 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800308
309 tcp->read_cb = NULL;
310 cb(tcp->read_user_data, slices, nslices, status);
311}
312
313#define INLINE_SLICE_BUFFER_SIZE 8
314#define MAX_READ_IOVEC 4
ctiller58393c22015-01-07 14:03:30 -0800315static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800316 grpc_tcp *tcp = (grpc_tcp *)arg;
317 int iov_size = 1;
318 gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
319 struct msghdr msg;
320 struct iovec iov[MAX_READ_IOVEC];
321 ssize_t read_bytes;
322 ssize_t allocated_bytes;
323 struct grpc_tcp_slice_state read_state;
324 gpr_slice *final_slices;
325 size_t final_nslices;
326
327 slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
328 0);
329
ctiller58393c22015-01-07 14:03:30 -0800330 if (!success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800331 call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
332 grpc_tcp_unref(tcp);
333 return;
334 }
335
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800336 /* TODO(klempner): Limit the amount we read at once. */
337 for (;;) {
338 allocated_bytes = slice_state_append_blocks_into_iovec(
339 &read_state, iov, iov_size, tcp->slice_size);
340
341 msg.msg_name = NULL;
342 msg.msg_namelen = 0;
343 msg.msg_iov = iov;
344 msg.msg_iovlen = iov_size;
345 msg.msg_control = NULL;
346 msg.msg_controllen = 0;
347 msg.msg_flags = 0;
348
349 do {
350 read_bytes = recvmsg(tcp->fd, &msg, 0);
351 } while (read_bytes < 0 && errno == EINTR);
352
353 if (read_bytes < allocated_bytes) {
354 /* TODO(klempner): Consider a second read first, in hopes of getting a
355 * quick EAGAIN and saving a bunch of allocations. */
356 slice_state_remove_last(&read_state, read_bytes < 0
357 ? allocated_bytes
358 : allocated_bytes - read_bytes);
359 }
360
361 if (read_bytes < 0) {
362 /* NB: After calling the user_cb a parallel call of the read handler may
363 * be running. */
364 if (errno == EAGAIN) {
365 if (slice_state_has_available(&read_state)) {
366 /* TODO(klempner): We should probably do the call into the application
367 without all this junk on the stack */
368 /* FIXME(klempner): Refcount properly */
369 slice_state_transfer_ownership(&read_state, &final_slices,
370 &final_nslices);
371 call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK);
372 slice_state_destroy(&read_state);
373 grpc_tcp_unref(tcp);
374 } else {
375 /* Spurious read event, consume it here */
376 slice_state_destroy(&read_state);
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800377 grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800378 }
379 } else {
380 /* TODO(klempner): Log interesting errors */
381 call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR);
382 slice_state_destroy(&read_state);
383 grpc_tcp_unref(tcp);
384 }
385 return;
386 } else if (read_bytes == 0) {
387 /* 0 read size ==> end of stream */
388 if (slice_state_has_available(&read_state)) {
389 /* there were bytes already read: pass them up to the application */
390 slice_state_transfer_ownership(&read_state, &final_slices,
391 &final_nslices);
392 call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF);
393 } else {
394 call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF);
395 }
396 slice_state_destroy(&read_state);
397 grpc_tcp_unref(tcp);
398 return;
399 } else if (iov_size < MAX_READ_IOVEC) {
400 ++iov_size;
401 }
402 }
403}
404
405static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
ctiller58393c22015-01-07 14:03:30 -0800406 void *user_data) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800407 grpc_tcp *tcp = (grpc_tcp *)ep;
408 GPR_ASSERT(tcp->read_cb == NULL);
409 tcp->read_cb = cb;
410 tcp->read_user_data = user_data;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800411 gpr_ref(&tcp->refcount);
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800412 grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800413}
414
415#define MAX_WRITE_IOVEC 16
416static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
417 struct msghdr msg;
418 struct iovec iov[MAX_WRITE_IOVEC];
419 int iov_size;
420 ssize_t sent_length;
421 grpc_tcp_slice_state *state = &tcp->write_state;
422
423 for (;;) {
424 iov_size = slice_state_to_iovec(state, iov, MAX_WRITE_IOVEC);
425
426 msg.msg_name = NULL;
427 msg.msg_namelen = 0;
428 msg.msg_iov = iov;
429 msg.msg_iovlen = iov_size;
430 msg.msg_control = NULL;
431 msg.msg_controllen = 0;
432 msg.msg_flags = 0;
433
434 do {
435 /* TODO(klempner): Cork if this is a partial write */
436 sent_length = sendmsg(tcp->fd, &msg, 0);
437 } while (sent_length < 0 && errno == EINTR);
438
439 if (sent_length < 0) {
440 if (errno == EAGAIN) {
441 return GRPC_ENDPOINT_WRITE_PENDING;
442 } else {
443 /* TODO(klempner): Log some of these */
444 slice_state_destroy(state);
445 return GRPC_ENDPOINT_WRITE_ERROR;
446 }
447 }
448
449 /* TODO(klempner): Probably better to batch this after we finish flushing */
450 slice_state_remove_prefix(state, sent_length);
451
452 if (!slice_state_has_available(state)) {
453 return GRPC_ENDPOINT_WRITE_DONE;
454 }
455 };
456}
457
ctiller58393c22015-01-07 14:03:30 -0800458static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800459 grpc_tcp *tcp = (grpc_tcp *)arg;
460 grpc_endpoint_write_status write_status;
461 grpc_endpoint_cb_status cb_status;
462 grpc_endpoint_write_cb cb;
463
ctiller58393c22015-01-07 14:03:30 -0800464 if (!success) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800465 slice_state_destroy(&tcp->write_state);
466 cb = tcp->write_cb;
467 tcp->write_cb = NULL;
ctiller58393c22015-01-07 14:03:30 -0800468 cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800469 grpc_tcp_unref(tcp);
470 return;
471 }
472
473 write_status = grpc_tcp_flush(tcp);
474 if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800475 grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800476 } else {
477 slice_state_destroy(&tcp->write_state);
478 if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
479 cb_status = GRPC_ENDPOINT_CB_OK;
480 } else {
481 cb_status = GRPC_ENDPOINT_CB_ERROR;
482 }
483 cb = tcp->write_cb;
484 tcp->write_cb = NULL;
485 cb(tcp->write_user_data, cb_status);
486 grpc_tcp_unref(tcp);
487 }
488}
489
ctiller58393c22015-01-07 14:03:30 -0800490static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
491 gpr_slice *slices,
492 size_t nslices,
493 grpc_endpoint_write_cb cb,
494 void *user_data) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800495 grpc_tcp *tcp = (grpc_tcp *)ep;
496 grpc_endpoint_write_status status;
497
Craig Tiller6e7c6222015-02-20 15:31:21 -0800498 if (grpc_trace_bits & GRPC_TRACE_TCP) {
499 size_t i;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800500
Craig Tiller6e7c6222015-02-20 15:31:21 -0800501 for (i = 0; i < nslices; i++) {
502 char *data =
503 gpr_hexdump((char *)GPR_SLICE_START_PTR(slices[i]),
504 GPR_SLICE_LENGTH(slices[i]), GPR_HEXDUMP_PLAINTEXT);
505 gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
506 gpr_free(data);
507 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800508 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800509
510 GPR_ASSERT(tcp->write_cb == NULL);
511 slice_state_init(&tcp->write_state, slices, nslices, nslices);
512
513 status = grpc_tcp_flush(tcp);
514 if (status == GRPC_ENDPOINT_WRITE_PENDING) {
515 /* TODO(klempner): Consider inlining rather than malloc for small nslices */
516 slice_state_realloc(&tcp->write_state, nslices);
517 gpr_ref(&tcp->refcount);
518 tcp->write_cb = cb;
519 tcp->write_user_data = user_data;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800520 grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800521 }
522
523 return status;
524}
525
ctillerd79b4862014-12-17 16:36:59 -0800526static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
ctiller58393c22015-01-07 14:03:30 -0800527 grpc_tcp *tcp = (grpc_tcp *)ep;
528 grpc_pollset_add_fd(pollset, tcp->em_fd);
ctillerd79b4862014-12-17 16:36:59 -0800529}
530
531static const grpc_endpoint_vtable vtable = {
532 grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset,
533 grpc_tcp_shutdown, grpc_tcp_destroy};
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800534
ctiller18b49ab2014-12-09 14:39:16 -0800535grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800536 grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
537 tcp->base.vtable = &vtable;
ctiller58393c22015-01-07 14:03:30 -0800538 tcp->fd = em_fd->fd;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800539 tcp->read_cb = NULL;
540 tcp->write_cb = NULL;
541 tcp->read_user_data = NULL;
542 tcp->write_user_data = NULL;
543 tcp->slice_size = slice_size;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800544 slice_state_init(&tcp->write_state, NULL, 0, 0);
545 /* paired with unref in grpc_tcp_destroy */
546 gpr_ref_init(&tcp->refcount, 1);
nnoble0c475f02014-12-05 15:37:39 -0800547 tcp->em_fd = em_fd;
Craig Tiller0fcd53c2015-02-18 15:10:53 -0800548 tcp->read_closure.cb = grpc_tcp_handle_read;
549 tcp->read_closure.cb_arg = tcp;
550 tcp->write_closure.cb = grpc_tcp_handle_write;
551 tcp->write_closure.cb_arg = tcp;
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800552 return &tcp->base;
553}
Craig Tiller0c0b60c2015-01-21 15:49:28 -0800554
Craig Tiller190d3602015-02-18 09:23:38 -0800555#endif