Jarkko Poyry | 3c82736 | 2014-09-02 11:48:52 +0300 | [diff] [blame] | 1 | /*------------------------------------------------------------------------- |
| 2 | * drawElements Quality Program Execution Server |
| 3 | * --------------------------------------------- |
| 4 | * |
| 5 | * Copyright 2014 The Android Open Source Project |
| 6 | * |
| 7 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 8 | * you may not use this file except in compliance with the License. |
| 9 | * You may obtain a copy of the License at |
| 10 | * |
| 11 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 12 | * |
| 13 | * Unless required by applicable law or agreed to in writing, software |
| 14 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 16 | * See the License for the specific language governing permissions and |
| 17 | * limitations under the License. |
| 18 | * |
| 19 | *//*! |
| 20 | * \file |
| 21 | * \brief Test Execution Server. |
| 22 | *//*--------------------------------------------------------------------*/ |
| 23 | |
| 24 | #include "xsExecutionServer.hpp" |
| 25 | #include "deClock.h" |
| 26 | |
| 27 | #include <cstdio> |
| 28 | |
| 29 | using std::vector; |
| 30 | using std::string; |
| 31 | |
| 32 | #if 1 |
| 33 | # define DBG_PRINT(X) printf X |
| 34 | #else |
| 35 | # define DBG_PRINT(X) |
| 36 | #endif |
| 37 | |
| 38 | namespace xs |
| 39 | { |
| 40 | |
| 41 | inline bool MessageBuilder::isComplete (void) const |
| 42 | { |
| 43 | if (m_buffer.size() < MESSAGE_HEADER_SIZE) |
| 44 | return false; |
| 45 | else |
| 46 | return (int)m_buffer.size() == getMessageSize(); |
| 47 | } |
| 48 | |
| 49 | const deUint8* MessageBuilder::getMessageData (void) const |
| 50 | { |
| 51 | return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL; |
| 52 | } |
| 53 | |
| 54 | int MessageBuilder::getMessageDataSize (void) const |
| 55 | { |
| 56 | DE_ASSERT(isComplete()); |
| 57 | return (int)m_buffer.size() - MESSAGE_HEADER_SIZE; |
| 58 | } |
| 59 | |
| 60 | void MessageBuilder::read (ByteBuffer& src) |
| 61 | { |
| 62 | // Try to get header. |
| 63 | if (m_buffer.size() < MESSAGE_HEADER_SIZE) |
| 64 | { |
| 65 | while (m_buffer.size() < MESSAGE_HEADER_SIZE && |
| 66 | src.getNumElements() > 0) |
| 67 | m_buffer.push_back(src.popBack()); |
| 68 | |
| 69 | DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE); |
| 70 | |
| 71 | if (m_buffer.size() == MESSAGE_HEADER_SIZE) |
| 72 | { |
| 73 | // Got whole header, parse it. |
| 74 | Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize); |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | if (m_buffer.size() >= MESSAGE_HEADER_SIZE) |
| 79 | { |
| 80 | // We have header. |
| 81 | int msgSize = getMessageSize(); |
| 82 | int numBytesLeft = msgSize - (int)m_buffer.size(); |
| 83 | int numToRead = de::min(src.getNumElements(), numBytesLeft); |
| 84 | |
| 85 | if (numToRead > 0) |
| 86 | { |
| 87 | int curBufPos = (int)m_buffer.size(); |
| 88 | m_buffer.resize(curBufPos+numToRead); |
| 89 | src.popBack(&m_buffer[curBufPos], numToRead); |
| 90 | } |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | void MessageBuilder::clear (void) |
| 95 | { |
| 96 | m_buffer.clear(); |
| 97 | m_messageType = MESSAGETYPE_NONE; |
| 98 | m_messageSize = 0; |
| 99 | } |
| 100 | |
| 101 | ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode) |
| 102 | : TcpServer (family, port) |
Jarkko Pöyry | 3fdee35 | 2015-02-13 19:31:58 -0800 | [diff] [blame] | 103 | , m_testDriver (testProcess) |
Jarkko Poyry | 3c82736 | 2014-09-02 11:48:52 +0300 | [diff] [blame] | 104 | , m_runMode (runMode) |
| 105 | { |
| 106 | } |
| 107 | |
| 108 | ExecutionServer::~ExecutionServer (void) |
| 109 | { |
| 110 | } |
| 111 | |
| 112 | TestDriver* ExecutionServer::acquireTestDriver (void) |
| 113 | { |
| 114 | if (!m_testDriverLock.tryLock()) |
| 115 | throw Error("Failed to acquire test driver"); |
| 116 | |
| 117 | return &m_testDriver; |
| 118 | } |
| 119 | |
| 120 | void ExecutionServer::releaseTestDriver (TestDriver* driver) |
| 121 | { |
| 122 | DE_ASSERT(&m_testDriver == driver); |
| 123 | DE_UNREF(driver); |
| 124 | m_testDriverLock.unlock(); |
| 125 | } |
| 126 | |
| 127 | ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress) |
| 128 | { |
| 129 | printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort()); |
| 130 | return new ExecutionRequestHandler(this, socket); |
| 131 | } |
| 132 | |
| 133 | void ExecutionServer::connectionDone (ConnectionHandler* handler) |
| 134 | { |
| 135 | if (m_runMode == RUNMODE_SINGLE_EXEC) |
| 136 | m_socket.close(); |
| 137 | |
| 138 | TcpServer::connectionDone(handler); |
| 139 | } |
| 140 | |
| 141 | ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket) |
| 142 | : ConnectionHandler (server, socket) |
| 143 | , m_execServer (server) |
| 144 | , m_testDriver (DE_NULL) |
| 145 | , m_bufferIn (RECV_BUFFER_SIZE) |
| 146 | , m_bufferOut (SEND_BUFFER_SIZE) |
| 147 | , m_run (false) |
| 148 | , m_sendRecvTmpBuf (SEND_RECV_TMP_BUFFER_SIZE) |
| 149 | { |
| 150 | // Set flags. |
| 151 | m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC); |
| 152 | |
| 153 | // Init protocol keepalives. |
| 154 | initKeepAlives(); |
| 155 | } |
| 156 | |
| 157 | ExecutionRequestHandler::~ExecutionRequestHandler (void) |
| 158 | { |
| 159 | if (m_testDriver) |
| 160 | m_execServer->releaseTestDriver(m_testDriver); |
| 161 | } |
| 162 | |
| 163 | void ExecutionRequestHandler::handle (void) |
| 164 | { |
| 165 | DBG_PRINT(("ExecutionRequestHandler::handle()\n")); |
| 166 | |
| 167 | try |
| 168 | { |
| 169 | // Process execution session. |
| 170 | processSession(); |
| 171 | } |
| 172 | catch (const std::exception& e) |
| 173 | { |
| 174 | printf("ExecutionRequestHandler::run(): %s\n", e.what()); |
| 175 | } |
| 176 | |
| 177 | DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n")); |
| 178 | |
| 179 | // Release test driver. |
| 180 | if (m_testDriver) |
| 181 | { |
| 182 | try |
| 183 | { |
| 184 | m_testDriver->reset(); |
| 185 | } |
| 186 | catch (...) |
| 187 | { |
| 188 | } |
| 189 | m_execServer->releaseTestDriver(m_testDriver); |
| 190 | m_testDriver = DE_NULL; |
| 191 | } |
| 192 | |
| 193 | // Close connection. |
| 194 | if (m_socket->isConnected()) |
| 195 | m_socket->shutdown(); |
| 196 | } |
| 197 | |
| 198 | void ExecutionRequestHandler::acquireTestDriver (void) |
| 199 | { |
| 200 | DE_ASSERT(!m_testDriver); |
| 201 | |
| 202 | // Try to acquire test driver - may fail. |
| 203 | m_testDriver = m_execServer->acquireTestDriver(); |
| 204 | DE_ASSERT(m_testDriver); |
| 205 | m_testDriver->reset(); |
| 206 | |
| 207 | } |
| 208 | |
| 209 | void ExecutionRequestHandler::processSession (void) |
| 210 | { |
| 211 | m_run = true; |
| 212 | |
| 213 | deUint64 lastIoTime = deGetMicroseconds(); |
| 214 | |
| 215 | while (m_run) |
| 216 | { |
| 217 | bool anyIO = false; |
| 218 | |
| 219 | // Read from socket to buffer. |
| 220 | anyIO = receive() || anyIO; |
| 221 | |
| 222 | // Send bytes in buffer. |
| 223 | anyIO = send() || anyIO; |
| 224 | |
| 225 | // Process incoming data. |
| 226 | if (m_bufferIn.getNumElements() > 0) |
| 227 | { |
| 228 | DE_ASSERT(!m_msgBuilder.isComplete()); |
| 229 | m_msgBuilder.read(m_bufferIn); |
| 230 | } |
| 231 | |
| 232 | if (m_msgBuilder.isComplete()) |
| 233 | { |
| 234 | // Process message. |
| 235 | processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize()); |
| 236 | |
| 237 | m_msgBuilder.clear(); |
| 238 | } |
| 239 | |
| 240 | // Keepalives, anyone? |
| 241 | pollKeepAlives(); |
| 242 | |
| 243 | // Poll test driver for IO. |
| 244 | if (m_testDriver) |
| 245 | anyIO = getTestDriver()->poll(m_bufferOut) || anyIO; |
| 246 | |
| 247 | // If no IO happens in a reasonable amount of time, go to sleep. |
| 248 | { |
| 249 | deUint64 curTime = deGetMicroseconds(); |
| 250 | if (anyIO) |
| 251 | lastIoTime = curTime; |
| 252 | else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000) |
| 253 | deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while. |
| 254 | else |
| 255 | deYield(); // Just give other threads chance to run. |
| 256 | } |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, int dataSize) |
| 261 | { |
| 262 | switch (type) |
| 263 | { |
| 264 | case MESSAGETYPE_HELLO: |
| 265 | { |
| 266 | HelloMessage msg(data, dataSize); |
| 267 | DBG_PRINT(("HelloMessage: version = %d\n", msg.version)); |
| 268 | if (msg.version != PROTOCOL_VERSION) |
| 269 | throw ProtocolError("Unsupported protocol version"); |
| 270 | break; |
| 271 | } |
| 272 | |
| 273 | case MESSAGETYPE_TEST: |
| 274 | { |
| 275 | TestMessage msg(data, dataSize); |
| 276 | DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str())); |
| 277 | break; |
| 278 | } |
| 279 | |
| 280 | case MESSAGETYPE_KEEPALIVE: |
| 281 | { |
| 282 | KeepAliveMessage msg(data, dataSize); |
| 283 | DBG_PRINT(("KeepAliveMessage\n")); |
| 284 | keepAliveReceived(); |
| 285 | break; |
| 286 | } |
| 287 | |
| 288 | case MESSAGETYPE_EXECUTE_BINARY: |
| 289 | { |
| 290 | ExecuteBinaryMessage msg(data, dataSize); |
| 291 | DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str())); |
| 292 | getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str()); |
| 293 | keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed. |
| 294 | break; |
| 295 | } |
| 296 | |
| 297 | case MESSAGETYPE_STOP_EXECUTION: |
| 298 | { |
| 299 | StopExecutionMessage msg(data, dataSize); |
| 300 | DBG_PRINT(("StopExecutionMessage\n")); |
| 301 | getTestDriver()->stopProcess(); |
| 302 | break; |
| 303 | } |
| 304 | |
| 305 | default: |
| 306 | throw ProtocolError("Unsupported message"); |
| 307 | } |
| 308 | } |
| 309 | |
| 310 | void ExecutionRequestHandler::initKeepAlives (void) |
| 311 | { |
| 312 | deUint64 curTime = deGetMicroseconds(); |
| 313 | m_lastKeepAliveSent = curTime; |
| 314 | m_lastKeepAliveReceived = curTime; |
| 315 | } |
| 316 | |
| 317 | void ExecutionRequestHandler::keepAliveReceived (void) |
| 318 | { |
| 319 | m_lastKeepAliveReceived = deGetMicroseconds(); |
| 320 | } |
| 321 | |
| 322 | void ExecutionRequestHandler::pollKeepAlives (void) |
| 323 | { |
| 324 | deUint64 curTime = deGetMicroseconds(); |
| 325 | |
| 326 | // Check that we've got keepalives in timely fashion. |
| 327 | if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000) |
| 328 | throw ProtocolError("Keepalive timeout occurred"); |
| 329 | |
| 330 | // Send some? |
| 331 | if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 && |
| 332 | m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE) |
| 333 | { |
| 334 | vector<deUint8> buf; |
| 335 | KeepAliveMessage().write(buf); |
| 336 | m_bufferOut.pushFront(&buf[0], (int)buf.size()); |
| 337 | |
| 338 | m_lastKeepAliveSent = deGetMicroseconds(); |
| 339 | } |
| 340 | } |
| 341 | |
| 342 | bool ExecutionRequestHandler::receive (void) |
| 343 | { |
| 344 | int maxLen = de::min<int>((int)m_sendRecvTmpBuf.size(), m_bufferIn.getNumFree()); |
| 345 | |
| 346 | if (maxLen > 0) |
| 347 | { |
| 348 | int numRecv; |
| 349 | deSocketResult result = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv); |
| 350 | |
| 351 | if (result == DE_SOCKETRESULT_SUCCESS) |
| 352 | { |
| 353 | DE_ASSERT(numRecv > 0); |
| 354 | m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], numRecv); |
| 355 | return true; |
| 356 | } |
| 357 | else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) |
| 358 | { |
| 359 | m_run = false; |
| 360 | return true; |
| 361 | } |
| 362 | else if (result == DE_SOCKETRESULT_WOULD_BLOCK) |
| 363 | return false; |
| 364 | else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) |
| 365 | throw ConnectionError("Connection terminated"); |
| 366 | else |
| 367 | throw ConnectionError("receive() failed"); |
| 368 | } |
| 369 | else |
| 370 | return false; |
| 371 | } |
| 372 | |
| 373 | bool ExecutionRequestHandler::send (void) |
| 374 | { |
| 375 | int maxLen = de::min<int>((int)m_sendRecvTmpBuf.size(), m_bufferOut.getNumElements()); |
| 376 | |
| 377 | if (maxLen > 0) |
| 378 | { |
| 379 | m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], maxLen); |
| 380 | |
| 381 | int numSent; |
| 382 | deSocketResult result = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent); |
| 383 | |
| 384 | if (result == DE_SOCKETRESULT_SUCCESS) |
| 385 | { |
| 386 | DE_ASSERT(numSent > 0); |
| 387 | m_bufferOut.popBack(numSent); |
| 388 | return true; |
| 389 | } |
| 390 | else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) |
| 391 | { |
| 392 | m_run = false; |
| 393 | return true; |
| 394 | } |
| 395 | else if (result == DE_SOCKETRESULT_WOULD_BLOCK) |
| 396 | return false; |
| 397 | else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) |
| 398 | throw ConnectionError("Connection terminated"); |
| 399 | else |
| 400 | throw ConnectionError("send() failed"); |
| 401 | } |
| 402 | else |
| 403 | return false; |
| 404 | } |
| 405 | |
| 406 | } // xs |