blob: 1778a4fcb382b4fb615a796638e37e1166cdf65a [file] [log] [blame]
Jarkko Poyry3c827362014-09-02 11:48:52 +03001/*-------------------------------------------------------------------------
2 * drawElements Quality Program Test Executor
3 * ------------------------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Tcp/Ip communication link.
22 *//*--------------------------------------------------------------------*/
23
24#include "xeTcpIpLink.hpp"
25#include "xsProtocol.hpp"
26#include "deClock.h"
27#include "deInt32.h"
28
29namespace xe
30{
31
32enum
33{
34 SEND_BUFFER_BLOCK_SIZE = 1024,
35 SEND_BUFFER_NUM_BLOCKS = 64
36};
37
38// Utilities for writing messages out.
39
40static void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize)
41{
42 deUint8 hdr[xs::MESSAGE_HEADER_SIZE];
43 xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44 dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
45}
46
47static void writeKeepalive (de::BlockBuffer<deUint8>& dst)
48{
49 writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
50 dst.flush();
51}
52
53static void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList)
54{
55 int nameSize = (int)strlen(name) + 1;
56 int paramsSize = (int)strlen(params) + 1;
57 int workDirSize = (int)strlen(workDir) + 1;
58 int caseListSize = (int)strlen(caseList) + 1;
59 int totalSize = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
60
61 writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
62 dst.write(nameSize, (const deUint8*)name);
63 dst.write(paramsSize, (const deUint8*)params);
64 dst.write(workDirSize, (const deUint8*)workDir);
65 dst.write(caseListSize, (const deUint8*)caseList);
66 dst.flush();
67}
68
69static void writeStopExecution (de::BlockBuffer<deUint8>& dst)
70{
71 writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
72 dst.flush();
73}
74
75// TcpIpLinkState
76
77TcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr)
78 : m_state (initialState)
79 , m_error (initialErr)
80 , m_lastKeepaliveReceived (0)
81 , m_stateChangedCallback (DE_NULL)
82 , m_testLogDataCallback (DE_NULL)
83 , m_infoLogDataCallback (DE_NULL)
84 , m_userPtr (DE_NULL)
85{
86}
87
88TcpIpLinkState::~TcpIpLinkState (void)
89{
90}
91
92CommLinkState TcpIpLinkState::getState (void) const
93{
94 de::ScopedLock lock(m_lock);
95
96 return m_state;
97}
98
99CommLinkState TcpIpLinkState::getState (std::string& error) const
100{
101 de::ScopedLock lock(m_lock);
102
103 error = m_error;
104 return m_state;
105}
106
107void TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr)
108{
109 de::ScopedLock lock(m_lock);
110
111 m_stateChangedCallback = stateChangedCallback;
112 m_testLogDataCallback = testLogDataCallback;
113 m_infoLogDataCallback = infoLogDataCallback;
114 m_userPtr = userPtr;
115}
116
117void TcpIpLinkState::setState (CommLinkState state, const char* error)
118{
119 CommLink::StateChangedFunc callback = DE_NULL;
120 void* userPtr = DE_NULL;
121
122 {
123 de::ScopedLock lock(m_lock);
124
125 m_state = state;
126 m_error = error;
127
128 callback = m_stateChangedCallback;
129 userPtr = m_userPtr;
130 }
131
132 if (callback)
133 callback(userPtr, state, error);
134}
135
136void TcpIpLinkState::onTestLogData (const deUint8* bytes, int numBytes) const
137{
138 CommLink::LogDataFunc callback = DE_NULL;
139 void* userPtr = DE_NULL;
140
141 m_lock.lock();
142 callback = m_testLogDataCallback;
143 userPtr = m_userPtr;
144 m_lock.unlock();
145
146 if (callback)
147 callback(userPtr, bytes, numBytes);
148}
149
150void TcpIpLinkState::onInfoLogData (const deUint8* bytes, int numBytes) const
151{
152 CommLink::LogDataFunc callback = DE_NULL;
153 void* userPtr = DE_NULL;
154
155 m_lock.lock();
156 callback = m_infoLogDataCallback;
157 userPtr = m_userPtr;
158 m_lock.unlock();
159
160 if (callback)
161 callback(userPtr, bytes, numBytes);
162}
163
164void TcpIpLinkState::onKeepaliveReceived (void)
165{
166 de::ScopedLock lock(m_lock);
167 m_lastKeepaliveReceived = deGetMicroseconds();
168}
169
170deUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const
171{
172 de::ScopedLock lock(m_lock);
173 return m_lastKeepaliveReceived;
174}
175
176// TcpIpSendThread
177
178TcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state)
179 : m_socket (socket)
180 , m_state (state)
181 , m_buffer (SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
182 , m_isRunning (false)
183{
184}
185
186TcpIpSendThread::~TcpIpSendThread (void)
187{
188}
189
190void TcpIpSendThread::start (void)
191{
192 DE_ASSERT(!m_isRunning);
193
194 // Reset state.
195 m_buffer.clear();
196 m_isRunning = true;
197
198 de::Thread::start();
199}
200
201void TcpIpSendThread::run (void)
202{
203 try
204 {
205 deUint8 buf[SEND_BUFFER_BLOCK_SIZE];
206
207 while (!m_buffer.isCanceled())
208 {
209 int numToSend = 0;
210 int numSent = 0;
211 deSocketResult result = DE_SOCKETRESULT_LAST;
212
213 try
214 {
215 // Wait for single byte and then try to read more.
216 m_buffer.read(1, &buf[0]);
217 numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]);
218 }
219 catch (const de::BlockBuffer<deUint8>::CanceledException&)
220 {
221 // Handled in loop condition.
222 }
223
224 while (numSent < numToSend)
225 {
226 result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent);
227
228 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
229 XE_FAIL("Connection closed");
230 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
231 XE_FAIL("Connection terminated");
232 else if (result == DE_SOCKETRESULT_ERROR)
233 XE_FAIL("Socket error");
234 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
235 {
236 // \note Socket should not be in non-blocking mode.
237 DE_ASSERT(numSent <= 0);
238 deYield();
239 }
240 else
241 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
242 }
243 }
244 }
245 catch (const std::exception& e)
246 {
247 m_state.setState(COMMLINKSTATE_ERROR, e.what());
248 }
249}
250
251void TcpIpSendThread::stop (void)
252{
253 if (m_isRunning)
254 {
255 m_buffer.cancel();
256 join();
257 m_isRunning = false;
258 }
259}
260
261// TcpIpRecvThread
262
263TcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state)
264 : m_socket (socket)
265 , m_state (state)
266 , m_curMsgPos (0)
267 , m_isRunning (false)
268{
269}
270
271TcpIpRecvThread::~TcpIpRecvThread (void)
272{
273}
274
275void TcpIpRecvThread::start (void)
276{
277 DE_ASSERT(!m_isRunning);
278
279 // Reset state.
280 m_curMsgPos = 0;
281 m_isRunning = true;
282
283 de::Thread::start();
284}
285
286void TcpIpRecvThread::run (void)
287{
288 try
289 {
290 for (;;)
291 {
292 bool hasHeader = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
293 bool hasPayload = false;
294 int messageSize = 0;
295 xs::MessageType messageType = (xs::MessageType)0;
296
297 if (hasHeader)
298 {
299 xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
300 hasPayload = m_curMsgPos >= messageSize;
301 }
302
303 if (hasPayload)
304 {
305 // Process message.
306 handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE);
307 m_curMsgPos = 0;
308 }
309 else
310 {
311 // Try to receive missing bytes.
312 int curSize = hasHeader ? messageSize : xs::MESSAGE_HEADER_SIZE;
313 int bytesToRecv = curSize-m_curMsgPos;
314 int numRecv = 0;
315 deSocketResult result = DE_SOCKETRESULT_LAST;
316
317 if ((int)m_curMsgBuf.size() < curSize)
318 m_curMsgBuf.resize(curSize);
319
320 result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
321
322 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
323 XE_FAIL("Connection closed");
324 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
325 XE_FAIL("Connection terminated");
326 else if (result == DE_SOCKETRESULT_ERROR)
327 XE_FAIL("Socket error");
328 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
329 {
330 // \note Socket should not be in non-blocking mode.
331 DE_ASSERT(numRecv <= 0);
332 deYield();
333 }
334 else
335 {
336 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
337 DE_ASSERT(numRecv <= bytesToRecv);
338 m_curMsgPos += numRecv;
339 // Continue receiving bytes / handle message in next iter.
340 }
341 }
342 }
343 }
344 catch (const std::exception& e)
345 {
346 m_state.setState(COMMLINKSTATE_ERROR, e.what());
347 }
348}
349
350void TcpIpRecvThread::stop (void)
351{
352 if (m_isRunning)
353 {
354 // \note Socket must be closed before terminating receive thread.
355 XE_CHECK(!m_socket.isReceiveOpen());
356
357 join();
358 m_isRunning = false;
359 }
360}
361
362void TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, int dataSize)
363{
364 switch (messageType)
365 {
366 case xs::MESSAGETYPE_KEEPALIVE:
367 m_state.onKeepaliveReceived();
368 break;
369
370 case xs::MESSAGETYPE_PROCESS_STARTED:
371 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
372 m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
373 break;
374
375 case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
376 {
377 xs::ProcessLaunchFailedMessage msg(data, dataSize);
378 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message");
379 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
380 break;
381 }
382
383 case xs::MESSAGETYPE_PROCESS_FINISHED:
384 {
385 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
386 xs::ProcessFinishedMessage msg(data, dataSize);
387 m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
388 DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
389 break;
390 }
391
392 case xs::MESSAGETYPE_PROCESS_LOG_DATA:
393 case xs::MESSAGETYPE_INFO:
394 // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
395 if (data[dataSize-1] == 0)
396 dataSize -= 1;
397
398 if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
399 {
400 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message");
401 m_state.onTestLogData(&data[0], dataSize);
402 }
403 else
404 m_state.onInfoLogData(&data[0], dataSize);
405 break;
406
407 default:
408 XE_FAIL("Unknown message");
409 }
410}
411
412// TcpIpLink
413
414TcpIpLink::TcpIpLink (void)
415 : m_state (COMMLINKSTATE_ERROR, "Not connected")
416 , m_sendThread (m_socket, m_state)
417 , m_recvThread (m_socket, m_state)
418 , m_keepaliveTimer (DE_NULL)
419{
420 m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
421 XE_CHECK(m_keepaliveTimer);
422}
423
424TcpIpLink::~TcpIpLink (void)
425{
426 try
427 {
428 closeConnection();
429 }
430 catch (...)
431 {
432 // Can't do much except to ignore error.
433 }
434 deTimer_destroy(m_keepaliveTimer);
435}
436
437void TcpIpLink::closeConnection (void)
438{
439 {
440 deSocketState state = m_socket.getState();
441 if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
442 m_socket.shutdown();
443 }
444
445 if (deTimer_isActive(m_keepaliveTimer))
446 deTimer_disable(m_keepaliveTimer);
447
448 if (m_sendThread.isRunning())
449 m_sendThread.stop();
450
451 if (m_recvThread.isRunning())
452 m_recvThread.stop();
453
454 if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
455 m_socket.close();
456}
457
458void TcpIpLink::connect (const de::SocketAddress& address)
459{
460 XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
461 XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
462 XE_CHECK(!m_sendThread.isRunning());
463 XE_CHECK(!m_recvThread.isRunning());
464
465 m_socket.connect(address);
466
467 try
468 {
469 // Clear error and set state to ready.
470 m_state.setState(COMMLINKSTATE_READY, "");
471 m_state.onKeepaliveReceived();
472
473 // Launch threads.
474 m_sendThread.start();
475 m_recvThread.start();
476
477 XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
478 }
479 catch (const std::exception& e)
480 {
481 closeConnection();
482 m_state.setState(COMMLINKSTATE_ERROR, e.what());
483 }
484}
485
486void TcpIpLink::disconnect (void)
487{
488 try
489 {
490 closeConnection();
491 m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
492 }
493 catch (const std::exception& e)
494 {
495 m_state.setState(COMMLINKSTATE_ERROR, e.what());
496 }
497}
498
499void TcpIpLink::reset (void)
500{
501 // \note Just clears error state if we are connected.
502 if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
503 {
504 m_state.setState(COMMLINKSTATE_READY, "");
505
506 // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
507 }
508 else
509 disconnect(); // Abnormal state/usage. Disconnect socket.
510}
511
512void TcpIpLink::keepaliveTimerCallback (void* ptr)
513{
514 TcpIpLink* link = static_cast<TcpIpLink*>(ptr);
515 deUint64 lastKeepalive = link->m_state.getLastKeepaliveRecevied();
516 deUint64 curTime = deGetMicroseconds();
517
518 // Check for timeout.
519 if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000)
520 link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
521
522 // Enqueue new keepalive.
523 try
524 {
525 writeKeepalive(link->m_sendThread.getBuffer());
526 }
527 catch (const de::BlockBuffer<deUint8>::CanceledException&)
528 {
529 // Ignore. Can happen in connection teardown.
530 }
531}
532
533CommLinkState TcpIpLink::getState (void) const
534{
535 return m_state.getState();
536}
537
538CommLinkState TcpIpLink::getState (std::string& message) const
539{
540 return m_state.getState(message);
541}
542
543void TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr)
544{
545 m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
546}
547
548void TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList)
549{
550 XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
551
552 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
553 writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
554}
555
556void TcpIpLink::stopTestProcess (void)
557{
558 XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
559 writeStopExecution(m_sendThread.getBuffer());
560}
561
562} // xe