blob: 70fbf75c2396f89c0f8af1d14191ecdb9b91228f [file] [log] [blame]
yangsu@google.comc5aeccd2011-07-17 14:42:08 +00001#include <netdb.h>
2#include <unistd.h>
3#include <errno.h>
4#include <fcntl.h>
5#include "SkSockets.h"
6#include "SkData.h"
7
8SkSocket::SkSocket() {
9 fMaxfd = 0;
10 FD_ZERO(&fMasterSet);
11 fConnected = false;
12 fReady = false;
13 fReadSuspended = false;
14 fWriteSuspended = false;
15 fSockfd = this->createSocket();
16 fTimeout.tv_sec = 0;
17 fTimeout.tv_usec = 0;
18}
19
20SkSocket::~SkSocket() {
21 this->closeSocket(fSockfd);
22}
23
24int SkSocket::createSocket() {
25 int sockfd = socket(AF_INET, SOCK_STREAM, 0);
26 if (sockfd < 0) {
27 //SkDebugf("ERROR opening socket\n");
28 return -1;
29 }
30#ifdef NONBLOCKING_SOCKETS
31 this->setNonBlocking(sockfd);
32#endif
33 //SkDebugf("Opened fd:%d\n", sockfd);
34 fReady = true;
35 return sockfd;
36}
37
38void SkSocket::closeSocket(int sockfd) {
39 if (!fReady)
40 return;
41
42 //SkDebugf("Closed fd:%d\n", sockfd);
43 close(sockfd);
44
45 if (FD_ISSET(sockfd, &fMasterSet)) {
46 FD_CLR(sockfd, &fMasterSet);
47 if (sockfd >= fMaxfd) {
48 while (FD_ISSET(fMaxfd, &fMasterSet) == false && fMaxfd > 0)
49 fMaxfd -= 1;
50 }
51 }
52 if (0 == fMaxfd) {
53 fConnected = false;
54 //SkDebugf("all connections closed\n");
55 }
56}
57
58void SkSocket::onFailedConnection(int sockfd) {
59 this->closeSocket(sockfd);
60}
61
62void SkSocket::setNonBlocking(int sockfd) {
63 int flags = fcntl(sockfd, F_GETFL);
64 fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
65}
66
67void SkSocket::addToMasterSet(int sockfd) {
68 FD_SET(sockfd, &fMasterSet);
69 if (sockfd > fMaxfd)
70 fMaxfd = sockfd;
71}
72
73int SkSocket::readPacket(void (*onRead)(const void*, size_t, int, DataType,
74 void*), void* context) {
75 if (!fConnected || !fReady || NULL == onRead || fReadSuspended)
76 return -1;
77
78 int totalBytesRead = 0;
79
80 char packet[PACKET_SIZE];
81 for (int i = 0; i <= fMaxfd; ++i) {
82 if (!FD_ISSET (i, &fMasterSet))
83 continue;
84
85 memset(packet, 0, PACKET_SIZE);
86 SkDynamicMemoryWStream stream;
87 int attempts = 0;
88 bool failure = false;
89 int bytesReadInTransfer = 0;
90 int bytesReadInPacket = 0;
91 header h;
92 h.done = false;
93 h.bytes = 0;
94 while (!h.done && fConnected && !failure) {
95 int retval = read(i, packet + bytesReadInPacket,
96 PACKET_SIZE - bytesReadInPacket);
97
98 ++attempts;
99 if (retval < 0) {
100#ifdef NONBLOCKING_SOCKETS
101 if (errno == EWOULDBLOCK || errno == EAGAIN) {
102 if (bytesReadInPacket > 0 || bytesReadInTransfer > 0)
103 continue; //incomplete packet or frame, keep tring
104 else
105 break; //nothing to read
106 }
107#endif
108 //SkDebugf("Read() failed with error: %s\n", strerror(errno));
109 failure = true;
110 break;
111 }
112
113 if (retval == 0) {
114 //SkDebugf("Peer closed connection or connection failed\n");
115 failure = true;
116 break;
117 }
118
119 SkASSERT(retval > 0);
120 bytesReadInPacket += retval;
121 if (bytesReadInPacket < PACKET_SIZE) {
122 //SkDebugf("Read %d/%d\n", bytesReadInPacket, PACKET_SIZE);
123 continue; //incomplete packet, keep trying
124 }
125
126 SkASSERT((bytesReadInPacket == PACKET_SIZE) && !failure);
127 memcpy(&h.done, packet, sizeof(bool));
128 memcpy(&h.bytes, packet + sizeof(bool), sizeof(int));
129 memcpy(&h.type, packet + sizeof(bool) + sizeof(int), sizeof(DataType));
130 if (h.bytes > CONTENT_SIZE || h.bytes <= 0) {
131 //SkDebugf("bad packet\n");
132 failure = true;
133 break;
134 }
135 //SkDebugf("read packet(done:%d, bytes:%d) from fd:%d in %d attempts\n",
136 // h.done, h.bytes, fSockfd, attempts);
137 stream.write(packet + HEADER_SIZE, h.bytes);\
138 bytesReadInPacket = 0;
139 attempts = 0;
140 bytesReadInTransfer += h.bytes;
141 }
142
143 if (failure) {
144 onRead(NULL, 0, i, h.type, context);
145 this->onFailedConnection(i);
146 continue;
147 }
148
149 if (bytesReadInTransfer > 0) {
150 SkData* data = stream.copyToData();
151 SkASSERT(data->size() == bytesReadInTransfer);
152 onRead(data->data(), data->size(), i, h.type, context);
153 data->unref();
154
155 totalBytesRead += bytesReadInTransfer;
156 }
157 }
158 return totalBytesRead;
159}
160
161int SkSocket::writePacket(void* data, size_t size, DataType type) {
162 if (size < 0|| NULL == data || !fConnected || !fReady || fWriteSuspended)
163 return -1;
164
165 int totalBytesWritten = 0;
166 header h;
167 char packet[PACKET_SIZE];
168 for (int i = 0; i <= fMaxfd; ++i) {
169 if (!FD_ISSET (i, &fMasterSet))
170 continue;
171
172 //Don't signal broken pipe
173 setsockopt(i, SOL_SOCKET, SO_NOSIGPIPE, (void*)1, sizeof(int));
174 int bytesWrittenInTransfer = 0;
175 int bytesWrittenInPacket = 0;
176 int attempts = 0;
177 bool failure = false;
178 while (bytesWrittenInTransfer < size && fConnected && !failure) {
179 memset(packet, 0, PACKET_SIZE);
180 h.done = (size - bytesWrittenInTransfer <= CONTENT_SIZE);
181 h.bytes = (h.done) ? size - bytesWrittenInTransfer : CONTENT_SIZE;
182 h.type = type;
183 memcpy(packet, &h.done, sizeof(bool));
184 memcpy(packet + sizeof(bool), &h.bytes, sizeof(int));
185 memcpy(packet + sizeof(bool) + sizeof(int), &h.type, sizeof(DataType));
186 memcpy(packet + HEADER_SIZE, (char*)data + bytesWrittenInTransfer,
187 h.bytes);
188
189 int retval = write(i, packet + bytesWrittenInPacket,
190 PACKET_SIZE - bytesWrittenInPacket);
191 attempts++;
192
193 if (retval < 0) {
194 if (errno == EPIPE) {
195 //SkDebugf("broken pipe, client closed connection");
196 failure = true;
197 break;
198 }
199#ifdef NONBLOCKING_SOCKETS
200 else if (errno == EWOULDBLOCK || errno == EAGAIN) {
201 if (bytesWrittenInPacket > 0 || bytesWrittenInTransfer > 0)
202 continue; //incomplete packet or frame, keep tring
203 else
204 break; //client not available, skip current transfer
205 }
206#endif
207 else {
208 //SkDebugf("write(%d) failed with error:%s\n", i,
209 // strerror(errno));
210 failure = true;
211 break;
212 }
213 }
214
215 bytesWrittenInPacket += retval;
216 if (bytesWrittenInPacket < PACKET_SIZE) {
217 //SkDebugf("Wrote %d/%d\n", bytesWrittenInPacket, PACKET_SIZE);
218 continue; //incomplete packet, keep tring
219 }
220
221 SkASSERT(bytesWrittenInPacket == PACKET_SIZE);
222 //SkDebugf("wrote to packet(done:%d, bytes:%d) to fd:%d in %d tries\n",
223 // h.done, h.bytes, i, attempts);
224 bytesWrittenInTransfer += h.bytes;
225 bytesWrittenInPacket = 0;
226 attempts = 0;
227 }
228
229 if (failure) {
230 //SkDebugf("Failed to write to fd:%d, terminating connection\n", i);
231 this->onFailedConnection(i);
232 }
233
234 totalBytesWritten += bytesWrittenInTransfer;
235 }
236 return totalBytesWritten;
237}
238////////////////////////////////////////////////////////////////////////////////
239SkTCPServer::SkTCPServer(int port) {
240 sockaddr_in serverAddr;
241 serverAddr.sin_family = AF_INET;
242 serverAddr.sin_addr.s_addr = INADDR_ANY;
243 serverAddr.sin_port = htons(port);
244
245 if (bind(fSockfd, (sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
246 //SkDebugf("ERROR on binding\n");
247 fReady = false;
248 }
249}
250
251SkTCPServer::~SkTCPServer() {
252 this->disconnectAllConnections();
253}
254
255int SkTCPServer::acceptIncomingConnections() {
256 if (!fReady)
257 return -1;
258// if (fConnected)
259// return 0;
260
261 listen(fSockfd, MAX_CLIENTS);
262 ////SkDebugf("Accepting Incoming connections\n");
263 int newfd;
264
265 for (int i = 0; i < MAX_CLIENTS; ++i) {
266#ifdef NONBLOCKING_SOCKETS
267 fd_set workingSet;
268 FD_ZERO(&workingSet);
269 FD_SET(fSockfd, &workingSet);
270 int sel = select(fSockfd + 1, &workingSet, NULL, NULL, &fTimeout);
271 if (sel < 0) {
272 //SkDebugf("select() failed with error %s\n", strerror(errno));
273 continue;
274 }
275 if (sel == 0) //select() timed out
276 continue;
277#endif
278 sockaddr_in clientAddr;
279 socklen_t clientLen = sizeof(clientAddr);
280 newfd = accept(fSockfd, (struct sockaddr*)&clientAddr, &clientLen);
281 if (newfd< 0) {
282 //SkDebugf("accept() failed with error %s\n", strerror(errno));
283 continue;
284 }
285 //SkDebugf("New incoming connection - %d\n", newfd);
286 fConnected = true;
287#ifdef NONBLOCKING_SOCKETS
288 this->setNonBlocking(newfd);
289#endif
290 this->addToMasterSet(newfd);
291 }
292 return 0;
293}
294
295
296int SkTCPServer::disconnectAllConnections() {
297 ////SkDebugf("disconnecting server\n");
298 if (!fConnected || !fReady)
299 return -1;
300 for (int i = 0; i <= fMaxfd; ++i)
301 {
302 if (FD_ISSET(i, &fMasterSet))
303 this->closeSocket(i);
304 }
305 fConnected = false;
306 return 0;
307}
308
309////////////////////////////////////////////////////////////////////////////////
310SkTCPClient::SkTCPClient(const char* hostname, int port) {
311 //Add fSockfd since the client will be using it to read/write
312 this->addToMasterSet(fSockfd);
313
314 hostent* server = gethostbyname(hostname);
315 if (server) {
316 fServerAddr.sin_family = AF_INET;
317 memcpy((char*)&fServerAddr.sin_addr.s_addr, (char*)server->h_addr,
318 server->h_length);
319 fServerAddr.sin_port = htons(port);
320 }
321 else {
322 //SkDebugf("ERROR, no such host\n");
323 fReady = false;
324 }
325}
326
327void SkTCPClient::onFailedConnection(int sockfd) {
328 SkASSERT(sockfd == fSockfd);
329 this->closeSocket(fSockfd);
330 fSockfd = this->createSocket();
331 //Add fSockfd since the client will be using it to read/write
332 this->addToMasterSet(fSockfd);
333}
334
335int SkTCPClient::connectToServer() {
336 if (!fReady)
337 return -1;
338 if (fConnected)
339 return 0;
340
341 int conn = connect(fSockfd, (sockaddr*)&fServerAddr, sizeof(fServerAddr));
342 if (conn < 0) {
343#ifdef NONBLOCKING_SOCKETS
344 if (errno == EINPROGRESS || errno == EALREADY)
345 return conn;
346#endif
347 if (errno != EISCONN) {
348 //SkDebugf("error: %s\n", strerror(errno));
349 this->onFailedConnection(fSockfd);
350 return conn;
351 }
352 }
353 fConnected = true;
354 //SkDebugf("Succesfully reached server\n");
355 return 0;
356}