blob: a43a6cfdedfc1b282165ba53eb1991006333e339 [file] [log] [blame]
Ben Murdochb8a8cc12014-11-26 15:28:44 +00001# Copyright 2012 the V8 project authors. All rights reserved.
2# Redistribution and use in source and binary forms, with or without
3# modification, are permitted provided that the following conditions are
4# met:
5#
6# * Redistributions of source code must retain the above copyright
7# notice, this list of conditions and the following disclaimer.
8# * Redistributions in binary form must reproduce the above
9# copyright notice, this list of conditions and the following
10# disclaimer in the documentation and/or other materials provided
11# with the distribution.
12# * Neither the name of Google Inc. nor the names of its
13# contributors may be used to endorse or promote products derived
14# from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
28
29import os
30import socket
31import subprocess
32import threading
33import time
34
35from . import distro
36from ..local import execution
37from ..local import perfdata
38from ..objects import peer
39from ..objects import workpacket
40from ..server import compression
41from ..server import constants
42from ..server import local_handler
43from ..server import signatures
44
45
46def GetPeers():
47 data = local_handler.LocalQuery([constants.REQUEST_PEERS])
48 if not data: return []
49 return [ peer.Peer.Unpack(p) for p in data ]
50
51
52class NetworkedRunner(execution.Runner):
53 def __init__(self, suites, progress_indicator, context, peers, workspace):
54 self.suites = suites
55 num_tests = 0
56 datapath = os.path.join("out", "testrunner_data")
57 # TODO(machenbach): These fields should exist now in the superclass.
58 # But there is no super constructor call. Check if this is a problem.
59 self.perf_data_manager = perfdata.PerfDataManager(datapath)
60 self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
61 for s in suites:
62 for t in s.tests:
63 t.duration = self.perfdata.FetchPerfData(t) or 1.0
64 num_tests += len(s.tests)
65 self._CommonInit(num_tests, progress_indicator, context)
66 self.tests = [] # Only used if we need to fall back to local execution.
67 self.tests_lock = threading.Lock()
68 self.peers = peers
69 self.pubkey_fingerprint = None # Fetched later.
70 self.base_rev = subprocess.check_output(
71 "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace,
72 shell=True).strip()
73 self.base_svn_rev = subprocess.check_output(
74 "cd %s; git log -1 %s" # Get commit description.
75 " | grep -e '^\s*git-svn-id:'" # Extract "git-svn-id" line.
76 " | awk '{print $2}'" # Extract "repository@revision" part.
77 " | sed -e 's/.*@//'" % # Strip away "repository@".
78 (workspace, self.base_rev), shell=True).strip()
79 self.patch = subprocess.check_output(
80 "cd %s; git diff %s" % (workspace, self.base_rev), shell=True)
81 self.binaries = {}
82 self.initialization_lock = threading.Lock()
83 self.initialization_lock.acquire() # Released when init is done.
84 self._OpenLocalConnection()
85 self.local_receiver_thread = threading.Thread(
86 target=self._ListenLocalConnection)
87 self.local_receiver_thread.daemon = True
88 self.local_receiver_thread.start()
89 self.initialization_lock.acquire()
90 self.initialization_lock.release()
91
92 def _OpenLocalConnection(self):
93 self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
94 code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT))
95 if code != 0:
96 raise RuntimeError("Failed to connect to local server")
97 compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket)
98
99 def _ListenLocalConnection(self):
100 release_lock_countdown = 1 # Pubkey.
101 self.local_receiver = compression.Receiver(self.local_socket)
102 while not self.local_receiver.IsDone():
103 data = self.local_receiver.Current()
104 if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT:
105 pubkey = data[1]
106 if not pubkey: raise RuntimeError("Received empty public key")
107 self.pubkey_fingerprint = pubkey
108 release_lock_countdown -= 1
109 if release_lock_countdown == 0:
110 self.initialization_lock.release()
111 release_lock_countdown -= 1 # Prevent repeated triggering.
112 self.local_receiver.Advance()
113
114 def Run(self, jobs):
115 self.indicator.Starting()
116 need_libv8 = False
117 for s in self.suites:
118 shell = s.shell()
119 if shell not in self.binaries:
120 path = os.path.join(self.context.shell_dir, shell)
121 # Check if this is a shared library build.
122 try:
123 ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path),
124 shell=True)
125 ldd = ldd.strip().split(" ")
126 assert ldd[0] == "libv8.so"
127 assert ldd[1] == "=>"
128 need_libv8 = True
129 binary_needs_libv8 = True
130 libv8 = signatures.ReadFileAndSignature(ldd[2])
131 except:
132 binary_needs_libv8 = False
133 binary = signatures.ReadFileAndSignature(path)
134 if binary[0] is None:
135 print("Error: Failed to create signature.")
136 assert binary[1] != 0
137 return binary[1]
138 binary.append(binary_needs_libv8)
139 self.binaries[shell] = binary
140 if need_libv8:
141 self.binaries["libv8.so"] = libv8
142 distro.Assign(self.suites, self.peers)
143 # Spawn one thread for each peer.
144 threads = []
145 for p in self.peers:
146 thread = threading.Thread(target=self._TalkToPeer, args=[p])
147 threads.append(thread)
148 thread.start()
149 try:
150 for thread in threads:
151 # Use a timeout so that signals (Ctrl+C) will be processed.
152 thread.join(timeout=10000000)
153 self._AnalyzePeerRuntimes()
154 except KeyboardInterrupt:
155 self.terminate = True
156 raise
157 except Exception, _e:
158 # If there's an exception we schedule an interruption for any
159 # remaining threads...
160 self.terminate = True
161 # ...and then reraise the exception to bail out.
162 raise
163 compression.Send(constants.END_OF_STREAM, self.local_socket)
164 self.local_socket.close()
165 if self.tests:
166 self._RunInternal(jobs)
167 self.indicator.Done()
168 return not self.failed
169
170 def _TalkToPeer(self, peer):
171 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
172 sock.settimeout(self.context.timeout + 10)
173 code = sock.connect_ex((peer.address, constants.PEER_PORT))
174 if code == 0:
175 try:
176 peer.runtime = None
177 start_time = time.time()
178 packet = workpacket.WorkPacket(peer=peer, context=self.context,
179 base_revision=self.base_svn_rev,
180 patch=self.patch,
181 pubkey=self.pubkey_fingerprint)
182 data, test_map = packet.Pack(self.binaries)
183 compression.Send(data, sock)
184 compression.Send(constants.END_OF_STREAM, sock)
185 rec = compression.Receiver(sock)
186 while not rec.IsDone() and not self.terminate:
187 data_list = rec.Current()
188 for data in data_list:
189 test_id = data[0]
190 if test_id < 0:
191 # The peer is reporting an error.
192 with self.lock:
193 print("\nPeer %s reports error: %s" % (peer.address, data[1]))
194 continue
195 test = test_map.pop(test_id)
196 test.MergeResult(data)
197 try:
198 self.perfdata.UpdatePerfData(test)
199 except Exception, e:
200 print("UpdatePerfData exception: %s" % e)
201 pass # Just keep working.
202 with self.lock:
203 perf_key = self.perfdata.GetKey(test)
204 compression.Send(
205 [constants.INFORM_DURATION, perf_key, test.duration,
206 self.context.arch, self.context.mode],
207 self.local_socket)
208 self.indicator.AboutToRun(test)
209 has_unexpected_output = test.suite.HasUnexpectedOutput(test)
210 if has_unexpected_output:
211 self.failed.append(test)
212 if test.output.HasCrashed():
213 self.crashed += 1
214 else:
215 self.succeeded += 1
216 self.remaining -= 1
217 self.indicator.HasRun(test, has_unexpected_output)
218 rec.Advance()
219 peer.runtime = time.time() - start_time
220 except KeyboardInterrupt:
221 sock.close()
222 raise
223 except Exception, e:
224 print("Got exception: %s" % e)
225 pass # Fall back to local execution.
226 else:
227 compression.Send([constants.UNRESPONSIVE_PEER, peer.address],
228 self.local_socket)
229 sock.close()
230 if len(test_map) > 0:
231 # Some tests have not received any results. Run them locally.
232 print("\nNo results for %d tests, running them locally." % len(test_map))
233 self._EnqueueLocally(test_map)
234
235 def _EnqueueLocally(self, test_map):
236 with self.tests_lock:
237 for test in test_map:
238 self.tests.append(test_map[test])
239
240 def _AnalyzePeerRuntimes(self):
241 total_runtime = 0.0
242 total_work = 0.0
243 for p in self.peers:
244 if p.runtime is None:
245 return
246 total_runtime += p.runtime
247 total_work += p.assigned_work
248 for p in self.peers:
249 p.assigned_work /= total_work
250 p.runtime /= total_runtime
251 perf_correction = p.assigned_work / p.runtime
252 old_perf = p.relative_performance
253 p.relative_performance = (old_perf + perf_correction) / 2.0
254 compression.Send([constants.UPDATE_PERF, p.address,
255 p.relative_performance],
256 self.local_socket)