blob: d99c8058576a4d39b18c19b34a53ff06fb03da67 [file] [log] [blame]
Jarkko Poyry3c827362014-09-02 11:48:52 +03001/*-------------------------------------------------------------------------
2 * drawElements Quality Program Execution Server
3 * ---------------------------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Test Execution Server.
22 *//*--------------------------------------------------------------------*/
23
24#include "xsExecutionServer.hpp"
25#include "deClock.h"
26
27#include <cstdio>
28
29using std::vector;
30using std::string;
31
32#if 1
33# define DBG_PRINT(X) printf X
34#else
35# define DBG_PRINT(X)
36#endif
37
38namespace xs
39{
40
41inline bool MessageBuilder::isComplete (void) const
42{
43 if (m_buffer.size() < MESSAGE_HEADER_SIZE)
44 return false;
45 else
Jarkko Pöyry745d7c62015-05-19 12:24:51 -070046 return m_buffer.size() == getMessageSize();
Jarkko Poyry3c827362014-09-02 11:48:52 +030047}
48
49const deUint8* MessageBuilder::getMessageData (void) const
50{
51 return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
52}
53
Jarkko Pöyry745d7c62015-05-19 12:24:51 -070054size_t MessageBuilder::getMessageDataSize (void) const
Jarkko Poyry3c827362014-09-02 11:48:52 +030055{
56 DE_ASSERT(isComplete());
Jarkko Pöyry745d7c62015-05-19 12:24:51 -070057 return m_buffer.size() - MESSAGE_HEADER_SIZE;
Jarkko Poyry3c827362014-09-02 11:48:52 +030058}
59
60void MessageBuilder::read (ByteBuffer& src)
61{
62 // Try to get header.
63 if (m_buffer.size() < MESSAGE_HEADER_SIZE)
64 {
65 while (m_buffer.size() < MESSAGE_HEADER_SIZE &&
66 src.getNumElements() > 0)
67 m_buffer.push_back(src.popBack());
68
69 DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);
70
71 if (m_buffer.size() == MESSAGE_HEADER_SIZE)
72 {
73 // Got whole header, parse it.
74 Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
75 }
76 }
77
78 if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
79 {
80 // We have header.
Jarkko Pöyry745d7c62015-05-19 12:24:51 -070081 size_t msgSize = getMessageSize();
82 size_t numBytesLeft = msgSize - m_buffer.size();
83 size_t numToRead = (size_t)de::min(src.getNumElements(), (int)numBytesLeft);
Jarkko Poyry3c827362014-09-02 11:48:52 +030084
85 if (numToRead > 0)
86 {
87 int curBufPos = (int)m_buffer.size();
88 m_buffer.resize(curBufPos+numToRead);
Jarkko Pöyry745d7c62015-05-19 12:24:51 -070089 src.popBack(&m_buffer[curBufPos], (int)numToRead);
Jarkko Poyry3c827362014-09-02 11:48:52 +030090 }
91 }
92}
93
94void MessageBuilder::clear (void)
95{
96 m_buffer.clear();
97 m_messageType = MESSAGETYPE_NONE;
98 m_messageSize = 0;
99}
100
101ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode)
102 : TcpServer (family, port)
Jarkko Pöyry3fdee352015-02-13 19:31:58 -0800103 , m_testDriver (testProcess)
Jarkko Poyry3c827362014-09-02 11:48:52 +0300104 , m_runMode (runMode)
105{
106}
107
108ExecutionServer::~ExecutionServer (void)
109{
110}
111
112TestDriver* ExecutionServer::acquireTestDriver (void)
113{
114 if (!m_testDriverLock.tryLock())
115 throw Error("Failed to acquire test driver");
116
117 return &m_testDriver;
118}
119
120void ExecutionServer::releaseTestDriver (TestDriver* driver)
121{
122 DE_ASSERT(&m_testDriver == driver);
123 DE_UNREF(driver);
124 m_testDriverLock.unlock();
125}
126
127ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress)
128{
129 printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
130 return new ExecutionRequestHandler(this, socket);
131}
132
133void ExecutionServer::connectionDone (ConnectionHandler* handler)
134{
135 if (m_runMode == RUNMODE_SINGLE_EXEC)
136 m_socket.close();
137
138 TcpServer::connectionDone(handler);
139}
140
141ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket)
142 : ConnectionHandler (server, socket)
143 , m_execServer (server)
144 , m_testDriver (DE_NULL)
145 , m_bufferIn (RECV_BUFFER_SIZE)
146 , m_bufferOut (SEND_BUFFER_SIZE)
147 , m_run (false)
148 , m_sendRecvTmpBuf (SEND_RECV_TMP_BUFFER_SIZE)
149{
150 // Set flags.
151 m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC);
152
153 // Init protocol keepalives.
154 initKeepAlives();
155}
156
157ExecutionRequestHandler::~ExecutionRequestHandler (void)
158{
159 if (m_testDriver)
160 m_execServer->releaseTestDriver(m_testDriver);
161}
162
163void ExecutionRequestHandler::handle (void)
164{
165 DBG_PRINT(("ExecutionRequestHandler::handle()\n"));
166
167 try
168 {
169 // Process execution session.
170 processSession();
171 }
172 catch (const std::exception& e)
173 {
174 printf("ExecutionRequestHandler::run(): %s\n", e.what());
175 }
176
177 DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));
178
179 // Release test driver.
180 if (m_testDriver)
181 {
182 try
183 {
184 m_testDriver->reset();
185 }
186 catch (...)
187 {
188 }
189 m_execServer->releaseTestDriver(m_testDriver);
190 m_testDriver = DE_NULL;
191 }
192
193 // Close connection.
194 if (m_socket->isConnected())
195 m_socket->shutdown();
196}
197
198void ExecutionRequestHandler::acquireTestDriver (void)
199{
200 DE_ASSERT(!m_testDriver);
201
202 // Try to acquire test driver - may fail.
203 m_testDriver = m_execServer->acquireTestDriver();
204 DE_ASSERT(m_testDriver);
205 m_testDriver->reset();
206
207}
208
209void ExecutionRequestHandler::processSession (void)
210{
211 m_run = true;
212
213 deUint64 lastIoTime = deGetMicroseconds();
214
215 while (m_run)
216 {
217 bool anyIO = false;
218
219 // Read from socket to buffer.
220 anyIO = receive() || anyIO;
221
222 // Send bytes in buffer.
223 anyIO = send() || anyIO;
224
225 // Process incoming data.
226 if (m_bufferIn.getNumElements() > 0)
227 {
228 DE_ASSERT(!m_msgBuilder.isComplete());
229 m_msgBuilder.read(m_bufferIn);
230 }
231
232 if (m_msgBuilder.isComplete())
233 {
234 // Process message.
235 processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize());
236
237 m_msgBuilder.clear();
238 }
239
240 // Keepalives, anyone?
241 pollKeepAlives();
242
243 // Poll test driver for IO.
244 if (m_testDriver)
245 anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;
246
247 // If no IO happens in a reasonable amount of time, go to sleep.
248 {
249 deUint64 curTime = deGetMicroseconds();
250 if (anyIO)
251 lastIoTime = curTime;
252 else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000)
253 deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
254 else
255 deYield(); // Just give other threads chance to run.
256 }
257 }
258}
259
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700260void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, size_t dataSize)
Jarkko Poyry3c827362014-09-02 11:48:52 +0300261{
262 switch (type)
263 {
264 case MESSAGETYPE_HELLO:
265 {
266 HelloMessage msg(data, dataSize);
267 DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
268 if (msg.version != PROTOCOL_VERSION)
269 throw ProtocolError("Unsupported protocol version");
270 break;
271 }
272
273 case MESSAGETYPE_TEST:
274 {
275 TestMessage msg(data, dataSize);
276 DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
277 break;
278 }
279
280 case MESSAGETYPE_KEEPALIVE:
281 {
282 KeepAliveMessage msg(data, dataSize);
283 DBG_PRINT(("KeepAliveMessage\n"));
284 keepAliveReceived();
285 break;
286 }
287
288 case MESSAGETYPE_EXECUTE_BINARY:
289 {
290 ExecuteBinaryMessage msg(data, dataSize);
291 DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
292 getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
293 keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
294 break;
295 }
296
297 case MESSAGETYPE_STOP_EXECUTION:
298 {
299 StopExecutionMessage msg(data, dataSize);
300 DBG_PRINT(("StopExecutionMessage\n"));
301 getTestDriver()->stopProcess();
302 break;
303 }
304
305 default:
306 throw ProtocolError("Unsupported message");
307 }
308}
309
310void ExecutionRequestHandler::initKeepAlives (void)
311{
312 deUint64 curTime = deGetMicroseconds();
313 m_lastKeepAliveSent = curTime;
314 m_lastKeepAliveReceived = curTime;
315}
316
317void ExecutionRequestHandler::keepAliveReceived (void)
318{
319 m_lastKeepAliveReceived = deGetMicroseconds();
320}
321
322void ExecutionRequestHandler::pollKeepAlives (void)
323{
324 deUint64 curTime = deGetMicroseconds();
325
326 // Check that we've got keepalives in timely fashion.
327 if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000)
328 throw ProtocolError("Keepalive timeout occurred");
329
330 // Send some?
331 if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 &&
332 m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
333 {
334 vector<deUint8> buf;
335 KeepAliveMessage().write(buf);
336 m_bufferOut.pushFront(&buf[0], (int)buf.size());
337
338 m_lastKeepAliveSent = deGetMicroseconds();
339 }
340}
341
342bool ExecutionRequestHandler::receive (void)
343{
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700344 size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree());
Jarkko Poyry3c827362014-09-02 11:48:52 +0300345
346 if (maxLen > 0)
347 {
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700348 size_t numRecv;
Jarkko Poyry3c827362014-09-02 11:48:52 +0300349 deSocketResult result = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);
350
351 if (result == DE_SOCKETRESULT_SUCCESS)
352 {
353 DE_ASSERT(numRecv > 0);
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700354 m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv);
Jarkko Poyry3c827362014-09-02 11:48:52 +0300355 return true;
356 }
357 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
358 {
359 m_run = false;
360 return true;
361 }
362 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
363 return false;
364 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
365 throw ConnectionError("Connection terminated");
366 else
367 throw ConnectionError("receive() failed");
368 }
369 else
370 return false;
371}
372
373bool ExecutionRequestHandler::send (void)
374{
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700375 size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements());
Jarkko Poyry3c827362014-09-02 11:48:52 +0300376
377 if (maxLen > 0)
378 {
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700379 m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen);
Jarkko Poyry3c827362014-09-02 11:48:52 +0300380
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700381 size_t numSent;
Jarkko Poyry3c827362014-09-02 11:48:52 +0300382 deSocketResult result = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);
383
384 if (result == DE_SOCKETRESULT_SUCCESS)
385 {
386 DE_ASSERT(numSent > 0);
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700387 m_bufferOut.popBack((int)numSent);
Jarkko Poyry3c827362014-09-02 11:48:52 +0300388 return true;
389 }
390 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
391 {
392 m_run = false;
393 return true;
394 }
395 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
396 return false;
397 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
398 throw ConnectionError("Connection terminated");
399 else
400 throw ConnectionError("send() failed");
401 }
402 else
403 return false;
404}
405
406} // xs