blob: d16e49e45950b4fb60def40fc43b51c3d8850e3a [file] [log] [blame]
David Klempner7f3ed1e2015-01-16 15:35:56 -08001/*
2 *
3 * Copyright 2015, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/core/iomgr/pollset_kick_posix.h"
35
36#include <errno.h>
37#include <string.h>
38#include <unistd.h>
39
40#include "src/core/iomgr/socket_utils_posix.h"
41#include <grpc/support/alloc.h>
42#include <grpc/support/log.h>
43
44/* This implementation is based on a freelist of pipes. */
45
46typedef struct grpc_kick_pipe_info {
47 int pipe_read_fd;
48 int pipe_write_fd;
49 struct grpc_kick_pipe_info *next;
50} grpc_kick_pipe_info;
51
52static grpc_kick_pipe_info *pipe_freelist = NULL;
53static gpr_mu pipe_freelist_mu;
54
55static grpc_kick_pipe_info *allocate_pipe() {
56 grpc_kick_pipe_info *info;
57 gpr_mu_lock(&pipe_freelist_mu);
58 if (pipe_freelist != NULL) {
59 info = pipe_freelist;
60 pipe_freelist = pipe_freelist->next;
61 } else {
62 int pipefd[2];
63 /* TODO(klempner): Make this nonfatal */
64 GPR_ASSERT(0 == pipe(pipefd));
65 GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1));
66 GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1));
67 info = gpr_malloc(sizeof(*info));
68 info->pipe_read_fd = pipefd[0];
69 info->pipe_write_fd = pipefd[1];
70 info->next = NULL;
71 }
72 gpr_mu_unlock(&pipe_freelist_mu);
73 return info;
74}
75
76static void free_pipe(grpc_kick_pipe_info *pipe_info) {
77 /* TODO(klempner): Start closing pipes if the free list gets too large */
78 gpr_mu_lock(&pipe_freelist_mu);
79 pipe_info->next = pipe_freelist;
80 pipe_freelist = pipe_info;
81 gpr_mu_unlock(&pipe_freelist_mu);
82}
83
84void grpc_pollset_kick_global_init() {
85 pipe_freelist = NULL;
86 gpr_mu_init(&pipe_freelist_mu);
87}
88
89void grpc_pollset_kick_global_destroy() {
90 while (pipe_freelist != NULL) {
91 grpc_kick_pipe_info *current = pipe_freelist;
92 pipe_freelist = pipe_freelist->next;
93 close(current->pipe_read_fd);
94 close(current->pipe_write_fd);
95 gpr_free(current);
96 }
97 gpr_mu_destroy(&pipe_freelist_mu);
98}
99
100void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) {
101 gpr_mu_init(&kick_state->mu);
102 kick_state->kicked = 0;
103 kick_state->pipe_info = NULL;
104}
105
106void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) {
107 gpr_mu_destroy(&kick_state->mu);
108 GPR_ASSERT(kick_state->pipe_info == NULL);
109}
110
111int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) {
112 gpr_mu_lock(&kick_state->mu);
113 if (kick_state->kicked) {
114 kick_state->kicked = 0;
115 gpr_mu_unlock(&kick_state->mu);
116 return -1;
117 }
118 kick_state->pipe_info = allocate_pipe();
119 gpr_mu_unlock(&kick_state->mu);
120 return kick_state->pipe_info->pipe_read_fd;
121}
122
123void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state) {
124 char buf[128];
125 int r;
126
127 for (;;) {
128 r = read(kick_state->pipe_info->pipe_read_fd, buf, sizeof(buf));
129 if (r > 0) continue;
130 if (r == 0) return;
131 switch (errno) {
132 case EAGAIN:
133 return;
134 case EINTR:
135 continue;
136 default:
137 gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno));
138 return;
139 }
140 }
141}
142
143void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state) {
144 gpr_mu_lock(&kick_state->mu);
145 free_pipe(kick_state->pipe_info);
146 kick_state->pipe_info = NULL;
147 gpr_mu_unlock(&kick_state->mu);
148}
149
150void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
151 gpr_mu_lock(&kick_state->mu);
152 if (kick_state->pipe_info != NULL) {
153 char c = 0;
154 while (write(kick_state->pipe_info->pipe_write_fd, &c, 1) != 1 &&
155 errno == EINTR)
156 ;
157 } else {
158 kick_state->kicked = 1;
159 }
160 gpr_mu_unlock(&kick_state->mu);
161}