| /* |
| * |
| * Copyright 2015, Google Inc. |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are |
| * met: |
| * |
| * * Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * * Redistributions in binary form must reproduce the above |
| * copyright notice, this list of conditions and the following disclaimer |
| * in the documentation and/or other materials provided with the |
| * distribution. |
| * * Neither the name of Google Inc. nor the names of its |
| * contributors may be used to endorse or promote products derived from |
| * this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| * |
| */ |
| |
| #include "src/core/iomgr/fd_posix.h" |
| |
| #include <ctype.h> |
| #include <errno.h> |
| #include <fcntl.h> |
| #include <netinet/in.h> |
| #include <poll.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <sys/socket.h> |
| #include <sys/time.h> |
| #include <unistd.h> |
| |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/log.h> |
| #include <grpc/support/sync.h> |
| #include <grpc/support/time.h> |
| #include "test/core/util/test_config.h" |
| |
| static grpc_pollset g_pollset; |
| |
| /* buffer size used to send and receive data. |
| 1024 is the minimal value to set TCP send and receive buffer. */ |
| #define BUF_SIZE 1024 |
| |
| /* Create a test socket with the right properties for testing. |
| port is the TCP port to listen or connect to. |
| Return a socket FD and sockaddr_in. */ |
| static void |
| create_test_socket (int port, int *socket_fd, struct sockaddr_in *sin) |
| { |
| int fd; |
| int one = 1; |
| int buf_size = BUF_SIZE; |
| int flags; |
| |
| fd = socket (AF_INET, SOCK_STREAM, 0); |
| setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one)); |
| /* Reset the size of socket send buffer to the minimal value to facilitate |
| buffer filling up and triggering notify_on_write */ |
| GPR_ASSERT (setsockopt (fd, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof (buf_size)) != -1); |
| GPR_ASSERT (setsockopt (fd, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof (buf_size)) != -1); |
| /* Make fd non-blocking */ |
| flags = fcntl (fd, F_GETFL, 0); |
| GPR_ASSERT (fcntl (fd, F_SETFL, flags | O_NONBLOCK) == 0); |
| *socket_fd = fd; |
| |
| /* Use local address for test */ |
| sin->sin_family = AF_INET; |
| sin->sin_addr.s_addr = htonl (0x7f000001); |
| GPR_ASSERT (port >= 0 && port < 65536); |
| sin->sin_port = htons ((gpr_uint16) port); |
| } |
| |
| /* Dummy gRPC callback */ |
| void |
| no_op_cb (void *arg, int success) |
| { |
| } |
| |
| /* =======An upload server to test notify_on_read=========== |
| The server simply reads and counts a stream of bytes. */ |
| |
| /* An upload server. */ |
| typedef struct |
| { |
| grpc_fd *em_fd; /* listening fd */ |
| ssize_t read_bytes_total; /* total number of received bytes */ |
| int done; /* set to 1 when a server finishes serving */ |
| grpc_closure listen_closure; |
| } server; |
| |
| static void |
| server_init (server * sv) |
| { |
| sv->read_bytes_total = 0; |
| sv->done = 0; |
| } |
| |
| /* An upload session. |
| Created when a new upload request arrives in the server. */ |
| typedef struct |
| { |
| server *sv; /* not owned by a single session */ |
| grpc_fd *em_fd; /* fd to read upload bytes */ |
| char read_buf[BUF_SIZE]; /* buffer to store upload bytes */ |
| grpc_closure session_read_closure; |
| } session; |
| |
| /* Called when an upload session can be safely shutdown. |
| Close session FD and start to shutdown listen FD. */ |
| static void |
| session_shutdown_cb (void *arg, /*session */ |
| int success, grpc_closure_list * closure_list) |
| { |
| session *se = arg; |
| server *sv = se->sv; |
| grpc_fd_orphan (exec_ctx, se->em_fd, NULL, "a"); |
| gpr_free (se); |
| /* Start to shutdown listen fd. */ |
| grpc_fd_shutdown (exec_ctx, sv->em_fd); |
| } |
| |
| /* Called when data become readable in a session. */ |
| static void |
| session_read_cb (void *arg, /*session */ |
| int success, grpc_closure_list * closure_list) |
| { |
| session *se = arg; |
| int fd = se->em_fd->fd; |
| |
| ssize_t read_once = 0; |
| ssize_t read_total = 0; |
| |
| if (!success) |
| { |
| session_shutdown_cb (exec_ctx, arg, 1); |
| return; |
| } |
| |
| do |
| { |
| read_once = read (fd, se->read_buf, BUF_SIZE); |
| if (read_once > 0) |
| read_total += read_once; |
| } |
| while (read_once > 0); |
| se->sv->read_bytes_total += read_total; |
| |
| /* read() returns 0 to indicate the TCP connection was closed by the client. |
| read(fd, read_buf, 0) also returns 0 which should never be called as such. |
| It is possible to read nothing due to spurious edge event or data has |
| been drained, In such a case, read() returns -1 and set errno to EAGAIN. */ |
| if (read_once == 0) |
| { |
| session_shutdown_cb (exec_ctx, arg, 1); |
| } |
| else if (read_once == -1) |
| { |
| if (errno == EAGAIN) |
| { |
| /* An edge triggered event is cached in the kernel until next poll. |
| In the current single thread implementation, session_read_cb is called |
| in the polling thread, such that polling only happens after this |
| callback, and will catch read edge event if data is available again |
| before notify_on_read. |
| TODO(chenw): in multi-threaded version, callback and polling can be |
| run in different threads. polling may catch a persist read edge event |
| before notify_on_read is called. */ |
| grpc_fd_notify_on_read (exec_ctx, se->em_fd, &se->session_read_closure); |
| } |
| else |
| { |
| gpr_log (GPR_ERROR, "Unhandled read error %s", strerror (errno)); |
| abort (); |
| } |
| } |
| } |
| |
| /* Called when the listen FD can be safely shutdown. |
| Close listen FD and signal that server can be shutdown. */ |
| static void |
| listen_shutdown_cb (void *arg /*server */ , int success, |
| grpc_closure_list * closure_list) |
| { |
| server *sv = arg; |
| |
| grpc_fd_orphan (exec_ctx, sv->em_fd, NULL, "b"); |
| |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| sv->done = 1; |
| grpc_pollset_kick (&g_pollset, NULL); |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| } |
| |
| /* Called when a new TCP connection request arrives in the listening port. */ |
| static void |
| listen_cb (void *arg, /*=sv_arg*/ |
| int success, grpc_closure_list * closure_list) |
| { |
| server *sv = arg; |
| int fd; |
| int flags; |
| session *se; |
| struct sockaddr_storage ss; |
| socklen_t slen = sizeof (ss); |
| grpc_fd *listen_em_fd = sv->em_fd; |
| |
| if (!success) |
| { |
| listen_shutdown_cb (exec_ctx, arg, 1); |
| return; |
| } |
| |
| fd = accept (listen_em_fd->fd, (struct sockaddr *) &ss, &slen); |
| GPR_ASSERT (fd >= 0); |
| GPR_ASSERT (fd < FD_SETSIZE); |
| flags = fcntl (fd, F_GETFL, 0); |
| fcntl (fd, F_SETFL, flags | O_NONBLOCK); |
| se = gpr_malloc (sizeof (*se)); |
| se->sv = sv; |
| se->em_fd = grpc_fd_create (fd, "listener"); |
| grpc_pollset_add_fd (exec_ctx, &g_pollset, se->em_fd); |
| se->session_read_closure.cb = session_read_cb; |
| se->session_read_closure.cb_arg = se; |
| grpc_fd_notify_on_read (exec_ctx, se->em_fd, &se->session_read_closure); |
| |
| grpc_fd_notify_on_read (exec_ctx, listen_em_fd, &sv->listen_closure); |
| } |
| |
| /* Max number of connections pending to be accepted by listen(). */ |
| #define MAX_NUM_FD 1024 |
| |
| /* Start a test server, return the TCP listening port bound to listen_fd. |
| listen_cb() is registered to be interested in reading from listen_fd. |
| When connection request arrives, listen_cb() is called to accept the |
| connection request. */ |
| static int |
| server_start (grpc_exec_ctx * exec_ctx, server * sv) |
| { |
| int port = 0; |
| int fd; |
| struct sockaddr_in sin; |
| socklen_t addr_len; |
| |
| create_test_socket (port, &fd, &sin); |
| addr_len = sizeof (sin); |
| GPR_ASSERT (bind (fd, (struct sockaddr *) &sin, addr_len) == 0); |
| GPR_ASSERT (getsockname (fd, (struct sockaddr *) &sin, &addr_len) == 0); |
| port = ntohs (sin.sin_port); |
| GPR_ASSERT (listen (fd, MAX_NUM_FD) == 0); |
| |
| sv->em_fd = grpc_fd_create (fd, "server"); |
| grpc_pollset_add_fd (exec_ctx, &g_pollset, sv->em_fd); |
| /* Register to be interested in reading from listen_fd. */ |
| sv->listen_closure.cb = listen_cb; |
| sv->listen_closure.cb_arg = sv; |
| grpc_fd_notify_on_read (exec_ctx, sv->em_fd, &sv->listen_closure); |
| |
| return port; |
| } |
| |
| /* Wait and shutdown a sever. */ |
| static void |
| server_wait_and_shutdown (server * sv) |
| { |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| while (!sv->done) |
| { |
| grpc_closure_list closure_list = GRPC_CLOSURE_LIST_INIT; |
| grpc_pollset_worker worker; |
| grpc_pollset_work (&g_pollset, &worker, gpr_now (GPR_CLOCK_MONOTONIC), gpr_inf_future (GPR_CLOCK_MONOTONIC), &closure_list); |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| grpc_closure_list_run (&closure_list); |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| } |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| } |
| |
| /* ===An upload client to test notify_on_write=== */ |
| |
| /* Client write buffer size */ |
| #define CLIENT_WRITE_BUF_SIZE 10 |
| /* Total number of times that the client fills up the write buffer */ |
| #define CLIENT_TOTAL_WRITE_CNT 3 |
| |
| /* An upload client. */ |
| typedef struct |
| { |
| grpc_fd *em_fd; |
| char write_buf[CLIENT_WRITE_BUF_SIZE]; |
| ssize_t write_bytes_total; |
| /* Number of times that the client fills up the write buffer and calls |
| notify_on_write to schedule another write. */ |
| int client_write_cnt; |
| |
| int done; /* set to 1 when a client finishes sending */ |
| grpc_closure write_closure; |
| } client; |
| |
| static void |
| client_init (client * cl) |
| { |
| memset (cl->write_buf, 0, sizeof (cl->write_buf)); |
| cl->write_bytes_total = 0; |
| cl->client_write_cnt = 0; |
| cl->done = 0; |
| } |
| |
| /* Called when a client upload session is ready to shutdown. */ |
| static void |
| client_session_shutdown_cb (void *arg /*client */ , int success, |
| grpc_closure_list * closure_list) |
| { |
| client *cl = arg; |
| grpc_fd_orphan (exec_ctx, cl->em_fd, NULL, "c"); |
| cl->done = 1; |
| grpc_pollset_kick (&g_pollset, NULL); |
| } |
| |
| /* Write as much as possible, then register notify_on_write. */ |
| static void |
| client_session_write (void *arg, /*client */ |
| int success, grpc_closure_list * closure_list) |
| { |
| client *cl = arg; |
| int fd = cl->em_fd->fd; |
| ssize_t write_once = 0; |
| |
| if (!success) |
| { |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| client_session_shutdown_cb (exec_ctx, arg, 1); |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| return; |
| } |
| |
| do |
| { |
| write_once = write (fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE); |
| if (write_once > 0) |
| cl->write_bytes_total += write_once; |
| } |
| while (write_once > 0); |
| |
| if (errno == EAGAIN) |
| { |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) |
| { |
| cl->write_closure.cb = client_session_write; |
| cl->write_closure.cb_arg = cl; |
| grpc_fd_notify_on_write (exec_ctx, cl->em_fd, &cl->write_closure); |
| cl->client_write_cnt++; |
| } |
| else |
| { |
| client_session_shutdown_cb (exec_ctx, arg, 1); |
| } |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| } |
| else |
| { |
| gpr_log (GPR_ERROR, "unknown errno %s", strerror (errno)); |
| abort (); |
| } |
| } |
| |
| /* Start a client to send a stream of bytes. */ |
| static void |
| client_start (grpc_exec_ctx * exec_ctx, client * cl, int port) |
| { |
| int fd; |
| struct sockaddr_in sin; |
| create_test_socket (port, &fd, &sin); |
| if (connect (fd, (struct sockaddr *) &sin, sizeof (sin)) == -1) |
| { |
| if (errno == EINPROGRESS) |
| { |
| struct pollfd pfd; |
| pfd.fd = fd; |
| pfd.events = POLLOUT; |
| pfd.revents = 0; |
| if (poll (&pfd, 1, -1) == -1) |
| { |
| gpr_log (GPR_ERROR, "poll() failed during connect; errno=%d", errno); |
| abort (); |
| } |
| } |
| else |
| { |
| gpr_log (GPR_ERROR, "Failed to connect to the server (errno=%d)", errno); |
| abort (); |
| } |
| } |
| |
| cl->em_fd = grpc_fd_create (fd, "client"); |
| grpc_pollset_add_fd (exec_ctx, &g_pollset, cl->em_fd); |
| |
| client_session_write (exec_ctx, cl, 1); |
| } |
| |
| /* Wait for the signal to shutdown a client. */ |
| static void |
| client_wait_and_shutdown (client * cl) |
| { |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| while (!cl->done) |
| { |
| grpc_pollset_worker worker; |
| grpc_closure_list closure_list = GRPC_CLOSURE_LIST_INIT; |
| grpc_pollset_work (&g_pollset, &worker, gpr_now (GPR_CLOCK_MONOTONIC), gpr_inf_future (GPR_CLOCK_MONOTONIC), &closure_list); |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| grpc_closure_list_run (&closure_list); |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| } |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| } |
| |
| /* Test grpc_fd. Start an upload server and client, upload a stream of |
| bytes from the client to the server, and verify that the total number of |
| sent bytes is equal to the total number of received bytes. */ |
| static void |
| test_grpc_fd (void) |
| { |
| server sv; |
| client cl; |
| int port; |
| grpc_closure_list closure_list = GRPC_CLOSURE_LIST_INIT; |
| |
| server_init (&sv); |
| port = server_start (&sv, &closure_list); |
| client_init (&cl); |
| client_start (&cl, port, &closure_list); |
| grpc_closure_list_run (&closure_list); |
| client_wait_and_shutdown (&cl); |
| server_wait_and_shutdown (&sv); |
| GPR_ASSERT (sv.read_bytes_total == cl.write_bytes_total); |
| gpr_log (GPR_INFO, "Total read bytes %d", sv.read_bytes_total); |
| } |
| |
| typedef struct fd_change_data |
| { |
| void (*cb_that_ran) (grpc_exec_ctx * exec_ctx, void *, int success); |
| } fd_change_data; |
| |
| void |
| init_change_data (fd_change_data * fdc) |
| { |
| fdc->cb_that_ran = NULL; |
| } |
| |
| void |
| destroy_change_data (fd_change_data * fdc) |
| { |
| } |
| |
| static void |
| first_read_callback (void *arg /* fd_change_data */ , int success, |
| grpc_closure_list * closure_list) |
| { |
| fd_change_data *fdc = arg; |
| |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| fdc->cb_that_ran = first_read_callback; |
| grpc_pollset_kick (&g_pollset, NULL); |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| } |
| |
| static void |
| second_read_callback (void *arg /* fd_change_data */ , int success, |
| grpc_closure_list * closure_list) |
| { |
| fd_change_data *fdc = arg; |
| |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| fdc->cb_that_ran = second_read_callback; |
| grpc_pollset_kick (&g_pollset, NULL); |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| } |
| |
| /* Test that changing the callback we use for notify_on_read actually works. |
| Note that we have two different but almost identical callbacks above -- the |
| point is to have two different function pointers and two different data |
| pointers and make sure that changing both really works. */ |
| static void |
| test_grpc_fd_change (void) |
| { |
| grpc_fd *em_fd; |
| fd_change_data a, b; |
| int flags; |
| int sv[2]; |
| char data; |
| ssize_t result; |
| grpc_closure first_closure; |
| grpc_closure second_closure; |
| grpc_closure_list closure_list = GRPC_CLOSURE_LIST_INIT; |
| |
| first_closure.cb = first_read_callback; |
| first_closure.cb_arg = &a; |
| second_closure.cb = second_read_callback; |
| second_closure.cb_arg = &b; |
| |
| init_change_data (&a); |
| init_change_data (&b); |
| |
| GPR_ASSERT (socketpair (AF_UNIX, SOCK_STREAM, 0, sv) == 0); |
| flags = fcntl (sv[0], F_GETFL, 0); |
| GPR_ASSERT (fcntl (sv[0], F_SETFL, flags | O_NONBLOCK) == 0); |
| flags = fcntl (sv[1], F_GETFL, 0); |
| GPR_ASSERT (fcntl (sv[1], F_SETFL, flags | O_NONBLOCK) == 0); |
| |
| em_fd = grpc_fd_create (sv[0], "test_grpc_fd_change"); |
| grpc_pollset_add_fd (&g_pollset, em_fd, &closure_list); |
| |
| /* Register the first callback, then make its FD readable */ |
| grpc_fd_notify_on_read (em_fd, &first_closure, &closure_list); |
| data = 0; |
| result = write (sv[1], &data, 1); |
| GPR_ASSERT (result == 1); |
| |
| /* And now wait for it to run. */ |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| while (a.cb_that_ran == NULL) |
| { |
| grpc_pollset_worker worker; |
| grpc_pollset_work (&g_pollset, &worker, gpr_now (GPR_CLOCK_MONOTONIC), gpr_inf_future (GPR_CLOCK_MONOTONIC), &closure_list); |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| grpc_closure_list_run (&closure_list); |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| } |
| GPR_ASSERT (a.cb_that_ran == first_read_callback); |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| |
| /* And drain the socket so we can generate a new read edge */ |
| result = read (sv[0], &data, 1); |
| GPR_ASSERT (result == 1); |
| |
| /* Now register a second callback with distinct change data, and do the same |
| thing again. */ |
| grpc_fd_notify_on_read (em_fd, &second_closure, &closure_list); |
| data = 0; |
| result = write (sv[1], &data, 1); |
| GPR_ASSERT (result == 1); |
| |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| while (b.cb_that_ran == NULL) |
| { |
| grpc_pollset_worker worker; |
| grpc_pollset_work (&g_pollset, &worker, gpr_now (GPR_CLOCK_MONOTONIC), gpr_inf_future (GPR_CLOCK_MONOTONIC), &closure_list); |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| grpc_closure_list_run (&closure_list); |
| gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset)); |
| } |
| /* Except now we verify that second_read_callback ran instead */ |
| GPR_ASSERT (b.cb_that_ran == second_read_callback); |
| gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset)); |
| |
| grpc_fd_orphan (em_fd, NULL, "d", &closure_list); |
| grpc_closure_list_run (&closure_list); |
| destroy_change_data (&a); |
| destroy_change_data (&b); |
| close (sv[1]); |
| } |
| |
| static void |
| destroy_pollset (grpc_exec_ctx * exec_ctx, void *p, int success) |
| { |
| grpc_pollset_destroy (p); |
| } |
| |
| int |
| main (int argc, char **argv) |
| { |
| grpc_closure destroyed; |
| grpc_closure_list closure_list = GRPC_CLOSURE_LIST_INIT; |
| grpc_test_init (argc, argv); |
| grpc_iomgr_init (); |
| grpc_pollset_init (&g_pollset); |
| test_grpc_fd (); |
| test_grpc_fd_change (); |
| grpc_closure_init (&destroyed, destroy_pollset, &g_pollset); |
| grpc_pollset_shutdown (&g_pollset, &destroyed, &closure_list); |
| grpc_closure_list_run (&closure_list); |
| grpc_iomgr_shutdown (); |
| return 0; |
| } |