blob: 14489b056fc1479ae8e1f21e851ba363416f4cdf [file] [log] [blame]
Di Qian38c02a72019-11-18 19:14:07 -08001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package com.android.tradefed.cluster;
17
Daniel Peykov0cb492a2019-12-13 10:49:35 -080018import com.android.ddmlib.testrunner.TestResult.TestStatus;
Di Qian38c02a72019-11-18 19:14:07 -080019import com.android.tradefed.build.BuildInfo;
Julien Desprez5c330472020-02-05 14:36:20 -080020import com.android.tradefed.cluster.ClusterHostEvent.HostEventType;
Di Qian38c02a72019-11-18 19:14:07 -080021import com.android.tradefed.command.CommandScheduler;
22import com.android.tradefed.command.ICommandScheduler;
23import com.android.tradefed.command.remote.DeviceDescriptor;
24import com.android.tradefed.config.ConfigurationException;
25import com.android.tradefed.config.IConfiguration;
26import com.android.tradefed.device.DeviceAllocationState;
27import com.android.tradefed.device.DeviceNotAvailableException;
28import com.android.tradefed.device.FreeDeviceState;
29import com.android.tradefed.device.IDeviceManager;
30import com.android.tradefed.device.ITestDevice;
31import com.android.tradefed.device.NoDeviceException;
32import com.android.tradefed.device.battery.BatteryController;
33import com.android.tradefed.device.battery.IBatteryInfo;
34import com.android.tradefed.device.battery.IBatteryInfo.BatteryState;
35import com.android.tradefed.invoker.IInvocationContext;
36import com.android.tradefed.invoker.InvocationContext;
Julien Desprez5c330472020-02-05 14:36:20 -080037import com.android.tradefed.invoker.logger.InvocationMetricLogger.InvocationMetricKey;
Di Qian38c02a72019-11-18 19:14:07 -080038import com.android.tradefed.log.LogUtil.CLog;
39import com.android.tradefed.result.CollectingTestListener;
40import com.android.tradefed.result.ITestSummaryListener;
41import com.android.tradefed.result.TestRunResult;
42import com.android.tradefed.result.TestSummary;
43import com.android.tradefed.util.FileUtil;
44import com.android.tradefed.util.MultiMap;
45import com.android.tradefed.util.QuotationAwareTokenizer;
Julien Desprez87770662020-02-25 11:28:46 -080046import com.android.tradefed.util.StreamUtil;
Di Qian38c02a72019-11-18 19:14:07 -080047
Di Qian38c02a72019-11-18 19:14:07 -080048import com.google.common.primitives.Ints;
49
50import org.json.JSONException;
51
52import java.io.File;
53import java.io.IOException;
54import java.util.Collection;
55import java.util.Collections;
56import java.util.HashMap;
57import java.util.HashSet;
58import java.util.LinkedList;
59import java.util.List;
60import java.util.Map;
Di Qian38c02a72019-11-18 19:14:07 -080061import java.util.Optional;
62import java.util.Set;
63import java.util.concurrent.RejectedExecutionHandler;
64import java.util.concurrent.ScheduledFuture;
65import java.util.concurrent.ScheduledThreadPoolExecutor;
66import java.util.concurrent.ThreadFactory;
67import java.util.concurrent.ThreadPoolExecutor;
68import java.util.concurrent.TimeUnit;
69
70/**
71 * A {@link ICommandScheduler} to support TFC (Tradefed Cluster). This scheduler runs commands from
72 * TFC command-queue and uploads invocation events to TFC command-event-queue.
73 */
74public class ClusterCommandScheduler extends CommandScheduler {
75
76 /** The {@link ScheduledThreadPoolExecutor} used to manage heartbeats. */
77 private ScheduledThreadPoolExecutor mHeartbeatThreadPool = null;
78
79 /** The {@link IClusterOptions} instance used to store cluster-related settings. */
80 private IClusterOptions mClusterOptions;
81
82 /** The {@link IClusterClient} instance used to interact with the TFC backend. */
83 private IClusterClient mClusterClient;
84
85 /**
86 * A {@link ThreadFactory} which returns threads in a dedicated heartbeat group.
87 *
88 * <p>This class is used as a factory by {@code mHeartbeatThreadPool} in order to segregate
89 * heartbeat threads from other "stray" threads to avoid tripping loose thread detection in
90 * {@link CommandScheduler}.
91 */
92 private static class HeartbeatThreadFactory implements ThreadFactory {
93 private static final ThreadGroup HB_GROUP;
94
95 static {
96 // fetch root thread group as this class may be initialized by an invocation thread
97 ThreadGroup tg = Thread.currentThread().getThreadGroup();
98 while (tg.getParent() != null) {
99 tg = tg.getParent();
100 }
101 HB_GROUP = new ThreadGroup(tg, "ClusterCommandScheduler.heartbeat");
102 }
103
104 @Override
105 public Thread newThread(Runnable r) {
106 Thread thread = new Thread(HB_GROUP, r);
107 // heartbeat should always get cancelled, but ensure it doesn't prevent JVM exit
108 thread.setDaemon(true);
109 return thread;
110 }
111 }
112
113 /** {@inheritDoc} */
114 @Override
115 public void start() {
116 UploadHostEventWithState(HostState.RUNNING);
117 super.start();
118 }
119
120 /** {@inheritDoc} */
121 @Override
122 public void shutdown() {
123 UploadHostEventWithState(HostState.QUITTING);
124 getHeartbeatThreadPool().shutdown();
125 super.shutdown();
126 }
127
128 @Override
129 public synchronized void shutdownHard() {
130 UploadHostEventWithState(HostState.KILLING);
131 getHeartbeatThreadPool().shutdown();
132 super.shutdownHard();
133 }
134
135 /**
136 * A {@link com.android.tradefed.command.ICommandScheduler.IScheduledInvocationListener} to
137 * upload events to TFC.
138 */
139 class InvocationEventHandler extends CollectingTestListener
140 implements IScheduledInvocationListener, ITestSummaryListener {
141
142 private ScheduledFuture<?> mHeartbeat;
143 private final ClusterCommand mCommandTask;
144 private Set<String> mDeviceSerials = new HashSet<>();
145 private String mSummary;
Yichun Li6e2a24a2020-02-21 14:30:35 -0800146 private Set<String> processedSummaries = new HashSet<>();
Di Qian38c02a72019-11-18 19:14:07 -0800147 private String mError;
148 private File mWorkDir;
149 private InvocationStatus mInvocationStatus;
150
151 /**
152 * Creates a {@link InvocationEventHandler} to track the given {@link ClusterCommand}.
153 *
154 * @param commandTask the {@link ClusterCommand} to track.
155 */
156 public InvocationEventHandler(ClusterCommand commandTask) {
157 mCommandTask = commandTask;
158 }
159
160 /**
161 * Sets a work directory for an invocation.
162 *
163 * @param dir a work directory.
164 */
165 public void setWorkDir(File dir) {
166 mWorkDir = dir;
167 }
168
169 private ClusterCommandEvent.Builder createEventBuilder() {
170 final ClusterCommandEvent.Builder builder =
171 ClusterCommandEvent.createEventBuilder(mCommandTask)
172 .setHostName(ClusterHostUtil.getHostName());
173 if (!mDeviceSerials.isEmpty()) {
174 builder.setDeviceSerials(mDeviceSerials);
175 }
176 return builder;
177 }
178
179 private void updateInvocationStatus() {
180 if (!getClusterOptions().shouldUploadInvocationStatus()) {
181 return;
182 }
183 final InvocationStatus obj = new InvocationStatus();
184 final Collection<TestRunResult> testRunResults = this.getMergedTestRunResults();
185 for (final TestRunResult result : testRunResults) {
186 final TestGroupStatus testGroupStatus =
187 new TestGroupStatus(
188 result.getName(),
189 result.getNumTests(),
190 result.getNumCompleteTests(),
191 result.getNumAllFailedTests(),
Daniel Peykov0cb492a2019-12-13 10:49:35 -0800192 result.getNumTestsInState(TestStatus.PASSED),
Di Qian38c02a72019-11-18 19:14:07 -0800193 result.isRunComplete(),
Moon Kimd15525e2020-04-28 16:49:10 -0700194 result.getElapsedTime());
Di Qian38c02a72019-11-18 19:14:07 -0800195 obj.addTestGroupStatus(testGroupStatus);
196 }
197 mInvocationStatus = obj;
198 }
199
200 /** {@inheritDoc} */
201 @Override
202 public void invocationInitiated(IInvocationContext context) {
203 for (ITestDevice device : context.getDevices()) {
204 mDeviceSerials.add(device.getSerialNumber());
205 }
206 final ClusterCommandEvent event =
207 createEventBuilder()
208 .setType(ClusterCommandEvent.Type.InvocationInitiated)
209 .build();
210 getClusterClient().getCommandEventUploader().postEvent(event);
211 getClusterClient().getCommandEventUploader().flush();
212 mHeartbeat = startHeartbeat();
213 // Check that devices are in charging state before starting the invocation.
214 for (ITestDevice device : context.getDevices()) {
215 try {
216 BatteryState state = BatteryController.getDeviceChargingState(device);
217 if (BatteryState.NOT_CHARGING.equals(state)) {
218 IBatteryInfo info = BatteryController.getBatteryInfoForDevice(device);
219 if (info != null) {
220 info.enableCharging(device);
221 }
222 }
223 } catch (DeviceNotAvailableException e) {
224 CLog.e(e);
225 }
226 }
227 }
228
229 /** {@inheritDoc} */
230 @Override
231 public void invocationStarted(IInvocationContext context) {
232 super.invocationStarted(context);
233 final ClusterCommandEvent event =
234 createEventBuilder()
235 .setType(ClusterCommandEvent.Type.InvocationStarted)
236 .build();
237 getClusterClient().getCommandEventUploader().postEvent(event);
238 getClusterClient().getCommandEventUploader().flush();
239 }
240
241 @Override
242 public void testRunStarted(String name, int numTests) {
243 testRunStarted(name, numTests, 0);
244 }
245
246 @Override
247 public void testRunStarted(String name, int numTests, int attemptNumber) {
248 testRunStarted(name, numTests, attemptNumber, System.currentTimeMillis());
249 }
250
251 /** {@inheritDoc} */
252 @Override
253 public void testRunStarted(String name, int numTests, int attemptNumber, long startTime) {
254 super.testRunStarted(name, numTests, attemptNumber, startTime);
255 updateInvocationStatus();
256 }
257
258 /** {@inheritDoc} */
259 @Override
260 public void invocationFailed(Throwable cause) {
261 super.invocationFailed(cause);
262
Julien Desprez87770662020-02-25 11:28:46 -0800263 mError = StreamUtil.getStackTrace(cause);
Di Qian38c02a72019-11-18 19:14:07 -0800264 }
265
266 /** {@inheritDoc} */
267 @Override
268 public void invocationEnded(long elapsedTime) {
269 super.invocationEnded(elapsedTime);
270
271 ClusterCommandEvent event =
272 createEventBuilder()
273 .setType(ClusterCommandEvent.Type.InvocationEnded)
274 .setData(ClusterCommandEvent.DATA_KEY_ERROR, mError)
275 .build();
276 getClusterClient().getCommandEventUploader().postEvent(event);
277 getClusterClient().getCommandEventUploader().flush();
278 }
279
280 /** {@inheritDoc} */
281 @Override
282 public void invocationComplete(
283 IInvocationContext metadata, Map<ITestDevice, FreeDeviceState> devicesStates) {
284 if (mWorkDir != null) {
285 FileUtil.recursiveDelete(mWorkDir);
286 }
287
288 // TODO: handle multi-device where only one of the build could be missing.
289 if (getPrimaryBuildInfo() == null && mError == null) {
290 mError = "build not found";
291 }
292
Julien Desprez5c330472020-02-05 14:36:20 -0800293 String fetchBuildTimeMillis = "-1";
294 String setupTimeMillis = "-1";
295 if (metadata != null) {
296 fetchBuildTimeMillis =
297 metadata.getAttributes()
298 .getUniqueMap()
299 .get(InvocationMetricKey.FETCH_BUILD.toString());
300 setupTimeMillis =
301 metadata.getAttributes()
302 .getUniqueMap()
303 .get(InvocationMetricKey.SETUP.toString());
Di Qian38c02a72019-11-18 19:14:07 -0800304 }
305
306 // Stop heartbeat thread before sending InvocationCompleted event.
307 if (mHeartbeat != null) {
308 mHeartbeat.cancel(true);
309 }
310 updateInvocationStatus();
311 final ClusterCommandEvent event =
312 createEventBuilder()
313 .setType(ClusterCommandEvent.Type.InvocationCompleted)
314 .setInvocationStatus(mInvocationStatus)
315 .setData(ClusterCommandEvent.DATA_KEY_ERROR, mError)
316 .setData(ClusterCommandEvent.DATA_KEY_SUMMARY, mSummary)
317 .setData(
318 ClusterCommandEvent.DATA_KEY_FETCH_BUILD_TIME_MILLIS,
Julien Desprez5c330472020-02-05 14:36:20 -0800319 fetchBuildTimeMillis)
Di Qian38c02a72019-11-18 19:14:07 -0800320 .setData(
Julien Desprez5c330472020-02-05 14:36:20 -0800321 ClusterCommandEvent.DATA_KEY_SETUP_TIME_MILLIS, setupTimeMillis)
Di Qian38c02a72019-11-18 19:14:07 -0800322 .setData(
323 ClusterCommandEvent.DATA_KEY_TOTAL_TEST_COUNT,
324 Integer.toString(getNumTotalTests()))
325 .setData(
326 ClusterCommandEvent.DATA_KEY_FAILED_TEST_COUNT,
327 Integer.toString(getNumAllFailedTests()))
328 .setData(
Daniel Peykov0cb492a2019-12-13 10:49:35 -0800329 ClusterCommandEvent.DATA_KEY_PASSED_TEST_COUNT,
330 Integer.toString(getNumTestsInState(TestStatus.PASSED)))
331 .setData(
Di Qian38c02a72019-11-18 19:14:07 -0800332 ClusterCommandEvent.DATA_KEY_FAILED_TEST_RUN_COUNT,
333 Integer.toString(getNumAllFailedTestRuns()))
334 .build();
335 getClusterClient().getCommandEventUploader().postEvent(event);
336 getClusterClient().getCommandEventUploader().flush();
337 }
338
339 /** {@inheritDoc} */
340 @Override
Yichun Lib935e792020-02-10 18:31:48 -0800341 public void putEarlySummary(List<TestSummary> summaries) {
342 if (getClusterOptions().shouldCollectEarlyTestSummary()) {
343 putSummary(summaries);
344 }
345 }
346
347 /** {@inheritDoc} */
348 @Override
Di Qian38c02a72019-11-18 19:14:07 -0800349 public void putSummary(List<TestSummary> summaries) {
Yichun Lib935e792020-02-10 18:31:48 -0800350 StringBuilder sb = new StringBuilder();
Di Qian38c02a72019-11-18 19:14:07 -0800351 for (final TestSummary summary : summaries) {
Yichun Li6e2a24a2020-02-21 14:30:35 -0800352 String summaryString = summary.getSummary().toString();
353 if (!processedSummaries.contains(summaryString)) {
354 processedSummaries.add(summaryString);
355 sb.append(summaryString);
356 sb.append("\n");
357 }
Di Qian38c02a72019-11-18 19:14:07 -0800358 }
Yichun Lib935e792020-02-10 18:31:48 -0800359 mSummary = mSummary == null ? sb.toString() : mSummary + sb.toString();
Di Qian38c02a72019-11-18 19:14:07 -0800360 }
361
362 private ScheduledFuture<?> startHeartbeat() {
363 return getHeartbeatThreadPool()
364 .scheduleAtFixedRate(
365 new HeartbeatSender(),
366 0,
367 getClusterOptions().getInvocationHeartbeatInterval(),
368 TimeUnit.MILLISECONDS);
369 }
370
371 class HeartbeatSender implements Runnable {
372 @Override
373 public void run() {
374 try {
375 // check cluster command's status
376 if (getClusterOptions().checkCommandState()) {
377 ClusterCommand.State status =
378 getClusterClient()
379 .getCommandState(
380 mCommandTask.getRequestId(),
381 mCommandTask.getCommandId());
382 if (ClusterCommand.State.CANCELED.equals(status)) {
frankfeng9a79fe02020-03-13 10:48:30 -0700383 // TODO: retrieve cancel reason from TFC.
384 String cause =
385 String.format(
frankfeng34da1ef2020-04-09 17:53:40 -0700386 "The cluster client %s has marked command "
387 + "(requestId=%s, commandId=%s) canceled",
388 getClusterClient().getClass().getSimpleName(),
frankfeng9a79fe02020-03-13 10:48:30 -0700389 mCommandTask.getRequestId(),
390 mCommandTask.getCommandId());
391 CLog.w("Stop invocation due to: %s", cause);
Di Qian38c02a72019-11-18 19:14:07 -0800392 Optional.ofNullable(getInvocationContext())
393 .map(IInvocationContext::getInvocationId)
394 .map(Ints::tryParse)
frankfeng9a79fe02020-03-13 10:48:30 -0700395 .ifPresent(invocationId -> stopInvocation(invocationId, cause));
Di Qian38c02a72019-11-18 19:14:07 -0800396 }
397 }
398
399 final ClusterCommandEvent event =
400 createEventBuilder()
401 .setType(ClusterCommandEvent.Type.TestRunInProgress)
402 .setInvocationStatus(mInvocationStatus)
403 .build();
404 getClusterClient().getCommandEventUploader().postEvent(event);
405 } catch (Exception e) {
406 CLog.e("Error sending heartbeat to TFC:");
407 CLog.e(e);
408 }
409 }
410 }
411 }
412
413 synchronized ScheduledThreadPoolExecutor getHeartbeatThreadPool() {
414 if (mHeartbeatThreadPool == null) {
415 mHeartbeatThreadPool = new ScheduledThreadPoolExecutor(1, new HeartbeatThreadFactory());
416 // instead of throwing some exception on shutdown we simply log it.
417 mHeartbeatThreadPool.setRejectedExecutionHandler(
418 new RejectedExecutionHandler() {
419 @Override
420 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
421 CLog.w(
422 "Rejecting Task %s rejected from executor %s",
423 r.toString(), e.toString());
424 }
425 });
426 // continue existing heartbeats after shutdown (until invocation is complete)
427 mHeartbeatThreadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
428 }
429 return mHeartbeatThreadPool;
430 }
431
432 /** {@inheritDoc} */
433 @Override
434 protected void processReadyCommands(IDeviceManager manager) {
435 super.processReadyCommands(manager);
436
437 if (isShuttingDown()) {
438 return;
439 }
440
441 List<ClusterCommand> commands = null;
442 MultiMap<String, DeviceDescriptor> devices = getAvailableDevices(manager);
443 if (devices.isEmpty()) {
444 CLog.d("No devices are available for testing.");
445 return;
446 }
447 // Lease command tasks through the leasehosttasks API.
448 // Here we get all devices (available or not), so TFC will analyze the device tree to
449 // decide which group is allocated and which group is available.
450 devices = getDevices(manager, false);
451 commands = fetchHostCommands(devices);
452 if (commands.isEmpty()) {
453 CLog.d("No commands available for testing.");
454 return;
455 }
456 execCommands(commands);
457 }
458
459 /**
460 * Returns a map containing available devices grouped by their types.
461 *
462 * @param manager a {@link IDeviceManager}.
463 * @return a {@link MultiMap} of String to DeviceDescriptor containing available devices.
464 */
465 MultiMap<String, DeviceDescriptor> getAvailableDevices(IDeviceManager manager) {
466 return getDevices(manager, true);
467 }
468
469 /**
470 * Returns a map containing devices grouped by their types.
471 *
472 * @param manager a {@link IDeviceManager}.
473 * @param availableOnly only return available devices or all devices.
474 * @return a {@link MultiMap} of String to DeviceDescriptor containing available devices.
475 */
476 MultiMap<String, DeviceDescriptor> getDevices(IDeviceManager manager, boolean availableOnly) {
477 // Getting available device types
478 final MultiMap<String, DeviceDescriptor> devices = new MultiMap<>();
479 for (final DeviceDescriptor device : manager.listAllDevices()) {
480 if (availableOnly && device.getState() != DeviceAllocationState.Available) {
481 continue;
482 }
483 if (ClusterHostUtil.isIpPort(device.getSerial())) {
484 // Note(b/28802876): Skipping IP:PORT serials from cluster scheduling because they
485 // behave differently from physical devices and are not fully supported by TF.
486 continue;
487 }
488 String runTargetFormat = getClusterOptions().getRunTargetFormat();
489 String runTarget =
490 ClusterHostUtil.getRunTarget(
491 device, runTargetFormat, getClusterOptions().getDeviceTag());
492 CLog.d("%s is available", runTarget);
493 devices.put(runTarget, device);
494 }
495 return devices;
496 }
497
498 /**
499 * Get available flashing permits.
500 *
501 * @return the number of available flashing permits.
502 */
503 private int getAvailableFlashingPermits() {
504 // By default there is no limit on available flashing permits.
505 int availableFlashingPermits = Integer.MAX_VALUE;
506 final IClusterOptions options = getClusterOptions();
507
508 boolean checkFlashingPermitsLease = options.checkFlashingPermitsOnLease();
509 if (checkFlashingPermitsLease) {
510 availableFlashingPermits = getDeviceManager().getAvailableFlashingPermits();
511 CLog.i("available flasher permits %d", availableFlashingPermits);
512 }
513 return availableFlashingPermits;
514 }
515
516 /**
517 * Fetches commands for devices from the Tradefed Cluster's leasehosttasks API.
518 *
519 * @param devices a {@link MultiMap} of String to DeviceDescriptor containing devices.
520 * @return a list of {@link ClusterCommand}s.
521 */
522 List<ClusterCommand> fetchHostCommands(final MultiMap<String, DeviceDescriptor> devices) {
523 CLog.d("fetching cluster host commands from leasehosttasks...");
524 int availableFlashingPermits = getAvailableFlashingPermits();
525
526 // Don't try to lease if there are no flasher permits available
527 if (availableFlashingPermits == 0) {
528 CLog.i("There is no available flashing permits. Not lease any additional commands.");
529 return Collections.<ClusterCommand>emptyList();
530 }
531
532 final IClusterOptions options = getClusterOptions();
533 final MultiMap<String, String> deviceGroups = options.getDeviceGroup();
534 final Map<String, String> deviceToGroup = new HashMap<>();
535 for (String group : deviceGroups.keySet()) {
536 for (String deviceSerial : deviceGroups.get(group)) {
537 deviceToGroup.put(deviceSerial, group);
538 }
539 }
540 List<ClusterDeviceInfo> deviceInfos = new LinkedList<>();
541 for (String runTarget : devices.keySet()) {
542 for (DeviceDescriptor d : devices.get(runTarget)) {
543 String groupName = deviceToGroup.getOrDefault(d.getSerial(), null);
544 ClusterDeviceInfo deviceInfo =
545 new ClusterDeviceInfo.Builder()
546 .setDeviceDescriptor(d)
547 .setRunTarget(runTarget)
548 .setGroupName(groupName)
549 .build();
550 deviceInfos.add(deviceInfo);
551 }
552 }
553 try {
554 int count = Math.min(deviceInfos.size(), availableFlashingPermits);
555 List<ClusterCommand> commands =
556 getClusterClient()
557 .leaseHostCommands(
558 options.getClusterId(),
559 ClusterHostUtil.getHostName(),
560 deviceInfos,
561 options.getNextClusterIds(),
562 count);
563 return commands;
564 } catch (JSONException e) {
565 CLog.e(e);
566 return Collections.<ClusterCommand>emptyList();
567 }
568 }
569
570 /**
571 * Executes commands fetched from the cluster command queue.
572 *
573 * @param commands a list of {@link ClusterCommand}s fetched from the cluster command queue.
574 */
575 void execCommands(final List<ClusterCommand> commands) {
576 for (final ClusterCommand commandTask : commands) {
577 try {
578 final InvocationEventHandler handler = new InvocationEventHandler(commandTask);
579 switch (commandTask.getRequestType()) {
580 case UNMANAGED:
581 execClusterCommand(commandTask, handler);
582 break;
583 case MANAGED:
584 execManagedClusterCommand(commandTask, handler);
585 break;
586 default:
587 throw new UnsupportedOperationException();
588 }
589 } catch (NoDeviceException e) {
590 CLog.w(
591 "no device meets requirements for cluster command [%s]; returning...",
592 commandTask.getTaskId());
593 CLog.w(e);
594 IClusterEventUploader<ClusterCommandEvent> eventUploader =
595 getClusterClient().getCommandEventUploader();
596 eventUploader.postEvent(
597 ClusterCommandEvent.createEventBuilder(commandTask)
598 .setHostName(ClusterHostUtil.getHostName())
599 .setType(ClusterCommandEvent.Type.AllocationFailed)
600 .build());
601 eventUploader.flush();
602 } catch (ConfigurationException | IOException | JSONException e) {
603 CLog.w("failed to execute cluster command [%s]: %s", commandTask.getTaskId(), e);
604 CLog.w(e);
605 IClusterEventUploader<ClusterCommandEvent> eventUploader =
606 getClusterClient().getCommandEventUploader();
607 eventUploader.postEvent(
608 ClusterCommandEvent.createEventBuilder(commandTask)
609 .setHostName(ClusterHostUtil.getHostName())
610 .setType(ClusterCommandEvent.Type.ConfigurationError)
611 .setData(ClusterCommandEvent.DATA_KEY_ERROR, e.toString())
612 .build());
613 eventUploader.flush();
614 }
615 }
616 }
617
618 void execClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler)
619 throws ConfigurationException, IllegalArgumentException, NoDeviceException {
620 String cmdLine = commandTask.getCommandLine();
621 String[] args = QuotationAwareTokenizer.tokenizeLine(cmdLine);
622 // If it is a dry run command skip execution.
623 if (dryRunCommand(handler, args)) {
624 return;
625 }
626 // Append device serials to command.
627 // By assigning all applicable serials, TF will try one by one until allocation
628 // succeeds (or fails for all). This mitigates the issue where a single bad
629 // device can starve tests.
630 if (commandTask.getTargetDeviceSerials() != null) {
631 for (String serial : commandTask.getTargetDeviceSerials()) {
632 cmdLine += " --serial ";
633 cmdLine += serial;
634 }
635 }
636 CLog.i("executing cluster command: [%s] %s", commandTask.getTaskId(), cmdLine);
637 execCommand(handler, QuotationAwareTokenizer.tokenizeLine(cmdLine));
638 }
639
640 void execManagedClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler)
641 throws IOException, JSONException, ConfigurationException, NoDeviceException {
642 File workDir = null;
643 try {
644 workDir = new File(System.getProperty("java.io.tmpdir"), commandTask.getAttemptId());
645 workDir.mkdirs();
646 final String requestId = commandTask.getRequestId();
647 final String commandId = commandTask.getCommandId();
648 final IClusterClient client = getClusterClient();
649 final TestEnvironment testEnvironment = client.getTestEnvironment(requestId);
650 final List<TestResource> testResources = client.getTestResources(requestId);
651 final TestContext testContext = client.getTestContext(requestId, commandId);
652 testResources.addAll(testContext.getTestResources());
653 final File configFile =
654 new ClusterCommandConfigBuilder()
655 .setWorkDir(workDir)
656 .setClusterCommand(commandTask)
657 .setTestEnvironment(testEnvironment)
658 .setTestResources(testResources)
659 .setTestContext(testContext)
660 .build();
661 CLog.i("executing cluster command: [%s] %s", commandTask.getTaskId(), configFile);
662 CLog.d("configFile: %s", FileUtil.readStringFromFile(configFile));
663 // FIXME: Find a way to upload a config file after an invocation is completed for
664 // debugging.
665 handler.setWorkDir(workDir);
666 execCommand(handler, new String[] {configFile.getAbsolutePath()});
667 // Unset workDir to avoid being cleaned up
668 workDir = null;
669 } finally {
670 if (workDir != null) {
671 FileUtil.recursiveDelete(workDir);
672 }
673 }
674 }
675
676 /**
677 * Determines if a given command is a dry-run. If the command is a dry-run, validate it. If
678 * there are any configs issue, it will throw a ConfigurationException.
679 *
680 * @param handler {@link InvocationEventHandler} to report events for dry-run validation.
681 * @param args the command to validate.
682 * @return true if the command are a dry run, false otherwise.
683 * @throws ConfigurationException
684 */
685 protected boolean dryRunCommand(final InvocationEventHandler handler, String[] args)
686 throws ConfigurationException {
687 IConfiguration config =
688 getConfigFactory().createConfigurationFromArgs(args, null, getKeyStoreClient());
689 if (config.getCommandOptions().isDryRunMode()) {
690 IInvocationContext context = new InvocationContext();
691 context.addDeviceBuildInfo("stub", new BuildInfo());
692 handler.invocationStarted(context);
693 config.validateOptions();
694 handler.invocationEnded(0);
695 IInvocationContext nullMeta = null;
696 handler.invocationComplete(nullMeta, null);
697 return true;
698 }
699 return false;
700 }
701
702 /** Get the {@link IClusterOptions} instance used to store cluster-related settings. */
703 IClusterOptions getClusterOptions() {
704 if (mClusterOptions == null) {
705 mClusterOptions = ClusterHostUtil.getClusterOptions();
706 }
707 return mClusterOptions;
708 }
709
710 /** Get the {@link IClusterClient} instance used to interact with the TFC backend. */
711 IClusterClient getClusterClient() {
712 if (mClusterClient == null) {
713 mClusterClient = ClusterHostUtil.getClusterClient();
714 }
715 return mClusterClient;
716 }
717
718 /** Event triggered, to upload host states */
719 private void UploadHostEventWithState(HostState state) {
720 try {
721 IClusterEventUploader<ClusterHostEvent> Uploader =
722 getClusterClient().getHostEventUploader();
723 ClusterHostEvent.Builder builder =
724 new ClusterHostEvent.Builder()
725 .setClusterId(getClusterOptions().getClusterId())
726 .setHostEventType(HostEventType.HostStateChanged)
727 .setHostName(ClusterHostUtil.getHostName())
728 .setHostState(state);
729 CLog.d("event uploading with state %s", state.toString());
730 ClusterHostEvent event = builder.build();
731 Uploader.postEvent(event);
732 CLog.d("event %s uploaded with state %s", event.toString(), state.toString());
733 Uploader.flush();
734 } catch (RuntimeException e) {
735 CLog.e("failed to upload host state %s to TFC: %s", state.toString(), e);
736 }
737 }
738}