blob: c842aba579c4ddb16d8be20091b7292c9bb7d230 [file] [log] [blame]
Ben Murdochb8a8cc12014-11-26 15:28:44 +00001# Copyright 2012 the V8 project authors. All rights reserved.
2# Redistribution and use in source and binary forms, with or without
3# modification, are permitted provided that the following conditions are
4# met:
5#
6# * Redistributions of source code must retain the above copyright
7# notice, this list of conditions and the following disclaimer.
8# * Redistributions in binary form must reproduce the above
9# copyright notice, this list of conditions and the following
10# disclaimer in the documentation and/or other materials provided
11# with the distribution.
12# * Neither the name of Google Inc. nor the names of its
13# contributors may be used to endorse or promote products derived
14# from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
28
29import os
30import socket
31import subprocess
32import threading
33import time
34
35from . import distro
36from ..local import execution
37from ..local import perfdata
38from ..objects import peer
39from ..objects import workpacket
40from ..server import compression
41from ..server import constants
42from ..server import local_handler
43from ..server import signatures
44
45
46def GetPeers():
47 data = local_handler.LocalQuery([constants.REQUEST_PEERS])
48 if not data: return []
49 return [ peer.Peer.Unpack(p) for p in data ]
50
51
52class NetworkedRunner(execution.Runner):
53 def __init__(self, suites, progress_indicator, context, peers, workspace):
54 self.suites = suites
Ben Murdochb8a8cc12014-11-26 15:28:44 +000055 datapath = os.path.join("out", "testrunner_data")
56 # TODO(machenbach): These fields should exist now in the superclass.
57 # But there is no super constructor call. Check if this is a problem.
58 self.perf_data_manager = perfdata.PerfDataManager(datapath)
59 self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode)
60 for s in suites:
61 for t in s.tests:
62 t.duration = self.perfdata.FetchPerfData(t) or 1.0
Ben Murdoch4a90d5f2016-03-22 12:00:34 +000063 self._CommonInit(suites, progress_indicator, context)
Ben Murdochb8a8cc12014-11-26 15:28:44 +000064 self.tests = [] # Only used if we need to fall back to local execution.
65 self.tests_lock = threading.Lock()
66 self.peers = peers
67 self.pubkey_fingerprint = None # Fetched later.
68 self.base_rev = subprocess.check_output(
69 "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace,
70 shell=True).strip()
71 self.base_svn_rev = subprocess.check_output(
72 "cd %s; git log -1 %s" # Get commit description.
73 " | grep -e '^\s*git-svn-id:'" # Extract "git-svn-id" line.
74 " | awk '{print $2}'" # Extract "repository@revision" part.
75 " | sed -e 's/.*@//'" % # Strip away "repository@".
76 (workspace, self.base_rev), shell=True).strip()
77 self.patch = subprocess.check_output(
78 "cd %s; git diff %s" % (workspace, self.base_rev), shell=True)
79 self.binaries = {}
80 self.initialization_lock = threading.Lock()
81 self.initialization_lock.acquire() # Released when init is done.
82 self._OpenLocalConnection()
83 self.local_receiver_thread = threading.Thread(
84 target=self._ListenLocalConnection)
85 self.local_receiver_thread.daemon = True
86 self.local_receiver_thread.start()
87 self.initialization_lock.acquire()
88 self.initialization_lock.release()
89
90 def _OpenLocalConnection(self):
91 self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
92 code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT))
93 if code != 0:
94 raise RuntimeError("Failed to connect to local server")
95 compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket)
96
97 def _ListenLocalConnection(self):
98 release_lock_countdown = 1 # Pubkey.
99 self.local_receiver = compression.Receiver(self.local_socket)
100 while not self.local_receiver.IsDone():
101 data = self.local_receiver.Current()
102 if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT:
103 pubkey = data[1]
104 if not pubkey: raise RuntimeError("Received empty public key")
105 self.pubkey_fingerprint = pubkey
106 release_lock_countdown -= 1
107 if release_lock_countdown == 0:
108 self.initialization_lock.release()
109 release_lock_countdown -= 1 # Prevent repeated triggering.
110 self.local_receiver.Advance()
111
112 def Run(self, jobs):
113 self.indicator.Starting()
114 need_libv8 = False
115 for s in self.suites:
116 shell = s.shell()
117 if shell not in self.binaries:
118 path = os.path.join(self.context.shell_dir, shell)
119 # Check if this is a shared library build.
120 try:
121 ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path),
122 shell=True)
123 ldd = ldd.strip().split(" ")
124 assert ldd[0] == "libv8.so"
125 assert ldd[1] == "=>"
126 need_libv8 = True
127 binary_needs_libv8 = True
128 libv8 = signatures.ReadFileAndSignature(ldd[2])
129 except:
130 binary_needs_libv8 = False
131 binary = signatures.ReadFileAndSignature(path)
132 if binary[0] is None:
133 print("Error: Failed to create signature.")
134 assert binary[1] != 0
135 return binary[1]
136 binary.append(binary_needs_libv8)
137 self.binaries[shell] = binary
138 if need_libv8:
139 self.binaries["libv8.so"] = libv8
140 distro.Assign(self.suites, self.peers)
141 # Spawn one thread for each peer.
142 threads = []
143 for p in self.peers:
144 thread = threading.Thread(target=self._TalkToPeer, args=[p])
145 threads.append(thread)
146 thread.start()
147 try:
148 for thread in threads:
149 # Use a timeout so that signals (Ctrl+C) will be processed.
150 thread.join(timeout=10000000)
151 self._AnalyzePeerRuntimes()
152 except KeyboardInterrupt:
153 self.terminate = True
154 raise
155 except Exception, _e:
156 # If there's an exception we schedule an interruption for any
157 # remaining threads...
158 self.terminate = True
159 # ...and then reraise the exception to bail out.
160 raise
161 compression.Send(constants.END_OF_STREAM, self.local_socket)
162 self.local_socket.close()
163 if self.tests:
164 self._RunInternal(jobs)
165 self.indicator.Done()
166 return not self.failed
167
168 def _TalkToPeer(self, peer):
169 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
170 sock.settimeout(self.context.timeout + 10)
171 code = sock.connect_ex((peer.address, constants.PEER_PORT))
172 if code == 0:
173 try:
174 peer.runtime = None
175 start_time = time.time()
176 packet = workpacket.WorkPacket(peer=peer, context=self.context,
177 base_revision=self.base_svn_rev,
178 patch=self.patch,
179 pubkey=self.pubkey_fingerprint)
180 data, test_map = packet.Pack(self.binaries)
181 compression.Send(data, sock)
182 compression.Send(constants.END_OF_STREAM, sock)
183 rec = compression.Receiver(sock)
184 while not rec.IsDone() and not self.terminate:
185 data_list = rec.Current()
186 for data in data_list:
187 test_id = data[0]
188 if test_id < 0:
189 # The peer is reporting an error.
190 with self.lock:
191 print("\nPeer %s reports error: %s" % (peer.address, data[1]))
192 continue
193 test = test_map.pop(test_id)
194 test.MergeResult(data)
195 try:
196 self.perfdata.UpdatePerfData(test)
197 except Exception, e:
198 print("UpdatePerfData exception: %s" % e)
199 pass # Just keep working.
200 with self.lock:
201 perf_key = self.perfdata.GetKey(test)
202 compression.Send(
203 [constants.INFORM_DURATION, perf_key, test.duration,
204 self.context.arch, self.context.mode],
205 self.local_socket)
206 self.indicator.AboutToRun(test)
207 has_unexpected_output = test.suite.HasUnexpectedOutput(test)
208 if has_unexpected_output:
209 self.failed.append(test)
210 if test.output.HasCrashed():
211 self.crashed += 1
212 else:
213 self.succeeded += 1
214 self.remaining -= 1
215 self.indicator.HasRun(test, has_unexpected_output)
216 rec.Advance()
217 peer.runtime = time.time() - start_time
218 except KeyboardInterrupt:
219 sock.close()
220 raise
221 except Exception, e:
222 print("Got exception: %s" % e)
223 pass # Fall back to local execution.
224 else:
225 compression.Send([constants.UNRESPONSIVE_PEER, peer.address],
226 self.local_socket)
227 sock.close()
228 if len(test_map) > 0:
229 # Some tests have not received any results. Run them locally.
230 print("\nNo results for %d tests, running them locally." % len(test_map))
231 self._EnqueueLocally(test_map)
232
233 def _EnqueueLocally(self, test_map):
234 with self.tests_lock:
235 for test in test_map:
236 self.tests.append(test_map[test])
237
238 def _AnalyzePeerRuntimes(self):
239 total_runtime = 0.0
240 total_work = 0.0
241 for p in self.peers:
242 if p.runtime is None:
243 return
244 total_runtime += p.runtime
245 total_work += p.assigned_work
246 for p in self.peers:
247 p.assigned_work /= total_work
248 p.runtime /= total_runtime
249 perf_correction = p.assigned_work / p.runtime
250 old_perf = p.relative_performance
251 p.relative_performance = (old_perf + perf_correction) / 2.0
252 compression.Send([constants.UPDATE_PERF, p.address,
253 p.relative_performance],
254 self.local_socket)