blob: dd72e64b4dfafa0e4eb63547e06c84df84f522dc [file] [log] [blame]
epoger@google.comec3ed6a2011-07-28 14:26:00 +00001
2/*
3 * Copyright 2011 Google Inc.
4 *
5 * Use of this source code is governed by a BSD-style license that can be
6 * found in the LICENSE file.
7 */
yangsu@google.comc5aeccd2011-07-17 14:42:08 +00008#include <netdb.h>
9#include <unistd.h>
10#include <errno.h>
11#include <fcntl.h>
12#include "SkSockets.h"
13#include "SkData.h"
14
15SkSocket::SkSocket() {
16 fMaxfd = 0;
17 FD_ZERO(&fMasterSet);
18 fConnected = false;
19 fReady = false;
20 fReadSuspended = false;
21 fWriteSuspended = false;
22 fSockfd = this->createSocket();
23 fTimeout.tv_sec = 0;
24 fTimeout.tv_usec = 0;
25}
26
27SkSocket::~SkSocket() {
28 this->closeSocket(fSockfd);
29}
30
31int SkSocket::createSocket() {
32 int sockfd = socket(AF_INET, SOCK_STREAM, 0);
33 if (sockfd < 0) {
34 //SkDebugf("ERROR opening socket\n");
35 return -1;
36 }
37#ifdef NONBLOCKING_SOCKETS
38 this->setNonBlocking(sockfd);
39#endif
40 //SkDebugf("Opened fd:%d\n", sockfd);
41 fReady = true;
42 return sockfd;
43}
44
45void SkSocket::closeSocket(int sockfd) {
46 if (!fReady)
47 return;
48
49 //SkDebugf("Closed fd:%d\n", sockfd);
50 close(sockfd);
51
52 if (FD_ISSET(sockfd, &fMasterSet)) {
53 FD_CLR(sockfd, &fMasterSet);
54 if (sockfd >= fMaxfd) {
55 while (FD_ISSET(fMaxfd, &fMasterSet) == false && fMaxfd > 0)
56 fMaxfd -= 1;
57 }
58 }
59 if (0 == fMaxfd) {
60 fConnected = false;
61 //SkDebugf("all connections closed\n");
62 }
63}
64
65void SkSocket::onFailedConnection(int sockfd) {
66 this->closeSocket(sockfd);
67}
68
69void SkSocket::setNonBlocking(int sockfd) {
70 int flags = fcntl(sockfd, F_GETFL);
71 fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
72}
73
74void SkSocket::addToMasterSet(int sockfd) {
75 FD_SET(sockfd, &fMasterSet);
76 if (sockfd > fMaxfd)
77 fMaxfd = sockfd;
78}
79
80int SkSocket::readPacket(void (*onRead)(const void*, size_t, int, DataType,
81 void*), void* context) {
82 if (!fConnected || !fReady || NULL == onRead || fReadSuspended)
83 return -1;
84
85 int totalBytesRead = 0;
86
87 char packet[PACKET_SIZE];
88 for (int i = 0; i <= fMaxfd; ++i) {
89 if (!FD_ISSET (i, &fMasterSet))
90 continue;
91
92 memset(packet, 0, PACKET_SIZE);
93 SkDynamicMemoryWStream stream;
94 int attempts = 0;
95 bool failure = false;
96 int bytesReadInTransfer = 0;
97 int bytesReadInPacket = 0;
98 header h;
99 h.done = false;
100 h.bytes = 0;
101 while (!h.done && fConnected && !failure) {
102 int retval = read(i, packet + bytesReadInPacket,
103 PACKET_SIZE - bytesReadInPacket);
104
105 ++attempts;
106 if (retval < 0) {
107#ifdef NONBLOCKING_SOCKETS
108 if (errno == EWOULDBLOCK || errno == EAGAIN) {
109 if (bytesReadInPacket > 0 || bytesReadInTransfer > 0)
110 continue; //incomplete packet or frame, keep tring
111 else
112 break; //nothing to read
113 }
114#endif
115 //SkDebugf("Read() failed with error: %s\n", strerror(errno));
116 failure = true;
117 break;
118 }
119
120 if (retval == 0) {
121 //SkDebugf("Peer closed connection or connection failed\n");
122 failure = true;
123 break;
124 }
125
126 SkASSERT(retval > 0);
127 bytesReadInPacket += retval;
128 if (bytesReadInPacket < PACKET_SIZE) {
129 //SkDebugf("Read %d/%d\n", bytesReadInPacket, PACKET_SIZE);
130 continue; //incomplete packet, keep trying
131 }
132
133 SkASSERT((bytesReadInPacket == PACKET_SIZE) && !failure);
134 memcpy(&h.done, packet, sizeof(bool));
135 memcpy(&h.bytes, packet + sizeof(bool), sizeof(int));
136 memcpy(&h.type, packet + sizeof(bool) + sizeof(int), sizeof(DataType));
137 if (h.bytes > CONTENT_SIZE || h.bytes <= 0) {
138 //SkDebugf("bad packet\n");
139 failure = true;
140 break;
141 }
142 //SkDebugf("read packet(done:%d, bytes:%d) from fd:%d in %d attempts\n",
143 // h.done, h.bytes, fSockfd, attempts);
144 stream.write(packet + HEADER_SIZE, h.bytes);\
145 bytesReadInPacket = 0;
146 attempts = 0;
147 bytesReadInTransfer += h.bytes;
148 }
149
150 if (failure) {
151 onRead(NULL, 0, i, h.type, context);
152 this->onFailedConnection(i);
153 continue;
154 }
155
156 if (bytesReadInTransfer > 0) {
157 SkData* data = stream.copyToData();
158 SkASSERT(data->size() == bytesReadInTransfer);
159 onRead(data->data(), data->size(), i, h.type, context);
160 data->unref();
161
162 totalBytesRead += bytesReadInTransfer;
163 }
164 }
165 return totalBytesRead;
166}
167
168int SkSocket::writePacket(void* data, size_t size, DataType type) {
169 if (size < 0|| NULL == data || !fConnected || !fReady || fWriteSuspended)
170 return -1;
171
172 int totalBytesWritten = 0;
173 header h;
174 char packet[PACKET_SIZE];
175 for (int i = 0; i <= fMaxfd; ++i) {
176 if (!FD_ISSET (i, &fMasterSet))
177 continue;
178
179 //Don't signal broken pipe
180 setsockopt(i, SOL_SOCKET, SO_NOSIGPIPE, (void*)1, sizeof(int));
181 int bytesWrittenInTransfer = 0;
182 int bytesWrittenInPacket = 0;
183 int attempts = 0;
184 bool failure = false;
185 while (bytesWrittenInTransfer < size && fConnected && !failure) {
186 memset(packet, 0, PACKET_SIZE);
187 h.done = (size - bytesWrittenInTransfer <= CONTENT_SIZE);
188 h.bytes = (h.done) ? size - bytesWrittenInTransfer : CONTENT_SIZE;
189 h.type = type;
190 memcpy(packet, &h.done, sizeof(bool));
191 memcpy(packet + sizeof(bool), &h.bytes, sizeof(int));
192 memcpy(packet + sizeof(bool) + sizeof(int), &h.type, sizeof(DataType));
193 memcpy(packet + HEADER_SIZE, (char*)data + bytesWrittenInTransfer,
194 h.bytes);
195
196 int retval = write(i, packet + bytesWrittenInPacket,
197 PACKET_SIZE - bytesWrittenInPacket);
198 attempts++;
199
200 if (retval < 0) {
201 if (errno == EPIPE) {
202 //SkDebugf("broken pipe, client closed connection");
203 failure = true;
204 break;
205 }
206#ifdef NONBLOCKING_SOCKETS
207 else if (errno == EWOULDBLOCK || errno == EAGAIN) {
208 if (bytesWrittenInPacket > 0 || bytesWrittenInTransfer > 0)
209 continue; //incomplete packet or frame, keep tring
210 else
211 break; //client not available, skip current transfer
212 }
213#endif
214 else {
215 //SkDebugf("write(%d) failed with error:%s\n", i,
216 // strerror(errno));
217 failure = true;
218 break;
219 }
220 }
221
222 bytesWrittenInPacket += retval;
223 if (bytesWrittenInPacket < PACKET_SIZE) {
224 //SkDebugf("Wrote %d/%d\n", bytesWrittenInPacket, PACKET_SIZE);
225 continue; //incomplete packet, keep tring
226 }
227
228 SkASSERT(bytesWrittenInPacket == PACKET_SIZE);
229 //SkDebugf("wrote to packet(done:%d, bytes:%d) to fd:%d in %d tries\n",
230 // h.done, h.bytes, i, attempts);
231 bytesWrittenInTransfer += h.bytes;
232 bytesWrittenInPacket = 0;
233 attempts = 0;
234 }
235
236 if (failure) {
237 //SkDebugf("Failed to write to fd:%d, terminating connection\n", i);
238 this->onFailedConnection(i);
239 }
240
241 totalBytesWritten += bytesWrittenInTransfer;
242 }
243 return totalBytesWritten;
244}
245////////////////////////////////////////////////////////////////////////////////
246SkTCPServer::SkTCPServer(int port) {
247 sockaddr_in serverAddr;
248 serverAddr.sin_family = AF_INET;
249 serverAddr.sin_addr.s_addr = INADDR_ANY;
250 serverAddr.sin_port = htons(port);
251
252 if (bind(fSockfd, (sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
253 //SkDebugf("ERROR on binding\n");
254 fReady = false;
255 }
256}
257
258SkTCPServer::~SkTCPServer() {
259 this->disconnectAllConnections();
260}
261
262int SkTCPServer::acceptIncomingConnections() {
263 if (!fReady)
264 return -1;
265// if (fConnected)
266// return 0;
267
268 listen(fSockfd, MAX_CLIENTS);
269 ////SkDebugf("Accepting Incoming connections\n");
270 int newfd;
271
272 for (int i = 0; i < MAX_CLIENTS; ++i) {
273#ifdef NONBLOCKING_SOCKETS
274 fd_set workingSet;
275 FD_ZERO(&workingSet);
276 FD_SET(fSockfd, &workingSet);
277 int sel = select(fSockfd + 1, &workingSet, NULL, NULL, &fTimeout);
278 if (sel < 0) {
279 //SkDebugf("select() failed with error %s\n", strerror(errno));
280 continue;
281 }
282 if (sel == 0) //select() timed out
283 continue;
284#endif
285 sockaddr_in clientAddr;
286 socklen_t clientLen = sizeof(clientAddr);
287 newfd = accept(fSockfd, (struct sockaddr*)&clientAddr, &clientLen);
288 if (newfd< 0) {
289 //SkDebugf("accept() failed with error %s\n", strerror(errno));
290 continue;
291 }
292 //SkDebugf("New incoming connection - %d\n", newfd);
293 fConnected = true;
294#ifdef NONBLOCKING_SOCKETS
295 this->setNonBlocking(newfd);
296#endif
297 this->addToMasterSet(newfd);
298 }
299 return 0;
300}
301
302
303int SkTCPServer::disconnectAllConnections() {
304 ////SkDebugf("disconnecting server\n");
305 if (!fConnected || !fReady)
306 return -1;
307 for (int i = 0; i <= fMaxfd; ++i)
308 {
309 if (FD_ISSET(i, &fMasterSet))
310 this->closeSocket(i);
311 }
312 fConnected = false;
313 return 0;
314}
315
316////////////////////////////////////////////////////////////////////////////////
317SkTCPClient::SkTCPClient(const char* hostname, int port) {
318 //Add fSockfd since the client will be using it to read/write
319 this->addToMasterSet(fSockfd);
320
321 hostent* server = gethostbyname(hostname);
322 if (server) {
323 fServerAddr.sin_family = AF_INET;
324 memcpy((char*)&fServerAddr.sin_addr.s_addr, (char*)server->h_addr,
325 server->h_length);
326 fServerAddr.sin_port = htons(port);
327 }
328 else {
329 //SkDebugf("ERROR, no such host\n");
330 fReady = false;
331 }
332}
333
334void SkTCPClient::onFailedConnection(int sockfd) {
335 SkASSERT(sockfd == fSockfd);
336 this->closeSocket(fSockfd);
337 fSockfd = this->createSocket();
338 //Add fSockfd since the client will be using it to read/write
339 this->addToMasterSet(fSockfd);
340}
341
342int SkTCPClient::connectToServer() {
343 if (!fReady)
344 return -1;
345 if (fConnected)
346 return 0;
347
348 int conn = connect(fSockfd, (sockaddr*)&fServerAddr, sizeof(fServerAddr));
349 if (conn < 0) {
350#ifdef NONBLOCKING_SOCKETS
351 if (errno == EINPROGRESS || errno == EALREADY)
352 return conn;
353#endif
354 if (errno != EISCONN) {
355 //SkDebugf("error: %s\n", strerror(errno));
356 this->onFailedConnection(fSockfd);
357 return conn;
358 }
359 }
360 fConnected = true;
361 //SkDebugf("Succesfully reached server\n");
362 return 0;
363}